# Elasticsearch Embedding Pipeline Simulation

Notebook ini mensimulasikan pipeline embedding Elasticsearch berdasarkan kode dari `benchmarks/embedding/contrib/pipeline/pipeline.py`. 

Pipeline ini melakukan:
1. **Data Loading**: Membaca data dari format JSONL
2. **Text Processing**: Tokenisasi dan truncation berdasarkan token limit
3. **Vector Indexing**: Batch indexing ke Elasticsearch dengan embeddings
4. **Semantic Retrieval**: Pencarian semantik dengan similarity vector
5. **Optional Reranking**: Reranking hasil retrieval untuk akurasi yang lebih baik

## Overview Pipeline Architecture

```
JSONL Data → Text Processing → Document Creation → Chunk Conversion → 
Elasticsearch Index (Vector Embeddings + Metadata) → Vector Retrieval → 
Optional Reranking → Final Results
```

## 1. Import Required Libraries

Import semua library yang diperlukan untuk simulasi pipeline Elasticsearch embedding.

In [32]:
# Core libraries
import json
import os
import asyncio
from typing import Any, Dict, List, Optional
import logging
from dataclasses import dataclass, asdict
from datetime import datetime

# Data processing
import pandas as pd
import numpy as np
from tqdm import tqdm

# Elasticsearch
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk

# Transformers & Tokenization  
from transformers import AutoTokenizer, AutoModel
import torch

# Langchain components (simulated classes)
from dataclasses import dataclass

# Utility libraries
import uuid
import time
import warnings
warnings.filterwarnings('ignore')

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("✅ All libraries imported successfully!")
print(f"📅 Timestamp: {datetime.now()}")

✅ All libraries imported successfully!
📅 Timestamp: 2025-07-24 08:29:47.501789


## 2. Define Data Structures and Configuration Classes

Mendefinisikan struktur data yang digunakan dalam pipeline embedding, termasuk konfigurasi model dan document chunks.

In [33]:
@dataclass
class ModelConfig:
    """Model configuration similar to the original pipeline."""
    provider: str = "huggingface"
    provider_model_id: str = "sentence-transformers/all-MiniLM-L6-v2"
    embedding_dimensions: int = 384
    max_tokens: int = 512
    
    def model_dump(self) -> Dict[str, Any]:
        return asdict(self)

@dataclass
class Document:
    """Document class similar to langchain Document."""
    page_content: str
    metadata: Dict[str, Any]

@dataclass 
class Chunk:
    """Chunk class from the original pipeline."""
    content: str
    id: str
    metadata: Dict[str, Any]

@dataclass
class EmbeddingState:
    """State management for pipeline."""
    id: str
    query: str
    dataset: str
    retrieved_chunks: List[Chunk] = None
    reranked_chunks: List[Chunk] = None
    supporting_facts: List[str] = None

class EmbeddingStateKeys:
    """State keys constants."""
    ID = "id"
    QUERY = "query"
    DATASET = "dataset"
    RETRIEVED_CHUNKS = "retrieved_chunks"
    RERANKED_CHUNKS = "reranked_chunks"
    SUPPORTING_FACTS = "supporting_facts"

print("✅ Data structures defined successfully!")

✅ Data structures defined successfully!


## 3. Setup Elasticsearch Connection

Konfigurasi dan koneksi ke Elasticsearch. Untuk simulasi, kita akan menggunakan koneksi lokal atau Docker Elasticsearch.

In [34]:
class ElasticsearchVectorStore:
    """Simulated ElasticsearchVectorDataStore class."""
    
    def __init__(self, index_name: str, embedding_config: Dict[str, Any]):
        self.index_name = index_name
        self.embedding_config = embedding_config
        
        # Elasticsearch connection (sesuaikan dengan setup lokal)
        self.client = Elasticsearch([
            {'host': 'localhost', 'port': 9200, 'scheme': 'http'}
        ])
        
        # Untuk simulasi, kita akan menggunakan in-memory storage jika ES tidak tersedia
        self.use_simulation = False
        self.simulated_docs = []
        
        try:
            # Test connection
            info = self.client.info()
            logger.info(f"✅ Connected to Elasticsearch: {info['version']['number']}")
        except Exception as e:
            logger.warning(f"⚠️ Cannot connect to Elasticsearch: {e}")
            logger.info("🔄 Using in-memory simulation mode")
            self.use_simulation = True
            
        self._create_index_if_not_exists()
    
    def _create_index_if_not_exists(self):
        """Create index with proper mapping for vector search."""
        if self.use_simulation:
            return
            
        mapping = {
            "mappings": {
                "properties": {
                    "content": {"type": "text", "analyzer": "standard"},
                    "embedding": {
                        "type": "dense_vector",
                        "dims": self.embedding_config.get("embedding_dimensions", 384)
                    },
                    "metadata": {"type": "object"},
                    "chunk_id": {"type": "keyword"},
                    "timestamp": {"type": "date"}
                }
            }
        }
        
        try:
            if not self.client.indices.exists(index=self.index_name):
                self.client.indices.create(index=self.index_name, body=mapping)
                logger.info(f"✅ Created index: {self.index_name}")
            else:
                logger.info(f"📋 Index already exists: {self.index_name}")
        except Exception as e:
            logger.error(f"❌ Error creating index: {e}")
    
    def count_documents(self) -> int:
        """Count documents in index."""
        if self.use_simulation:
            return len(self.simulated_docs)
        
        try:
            result = self.client.count(index=self.index_name)
            return result['count']
        except:
            return 0
    
    def add_chunks_batch(self, chunks: List[Chunk], embeddings: List[List[float]]):
        """Add chunks with embeddings in batch."""
        if self.use_simulation:
            for chunk, embedding in zip(chunks, embeddings):
                self.simulated_docs.append({
                    'chunk': chunk,
                    'embedding': embedding
                })
            return
        
        # Prepare documents for bulk indexing
        docs = []
        for chunk, embedding in zip(chunks, embeddings):
            doc = {
                "_index": self.index_name,
                "_id": chunk.id,
                "_source": {
                    "content": chunk.content,
                    "embedding": embedding,
                    "metadata": chunk.metadata,
                    "chunk_id": chunk.id,
                    "timestamp": datetime.now()
                }
            }
            docs.append(doc)
        
        # Bulk index
        try:
            success, failed = bulk(self.client, docs)
            logger.info(f"✅ Indexed {success} documents, {len(failed)} failed")
        except Exception as e:
            logger.error(f"❌ Bulk indexing error: {e}")

# Initialize configuration
model_config = ModelConfig()
pipeline_config = {
    "vector_store_provider": "elasticsearch",
    "chunks_file_name": "corpus.jsonl",
    "retrieval_top_k": 10,
    "truncate_chunk_size": 512,
    "use_reranker": False,
    "batch_size": 32
}

print("✅ Elasticsearch configuration ready!")
print(f"📝 Model: {model_config.provider_model_id}")
print(f"🔍 Embedding dimensions: {model_config.embedding_dimensions}")

✅ Elasticsearch configuration ready!
📝 Model: sentence-transformers/all-MiniLM-L6-v2
🔍 Embedding dimensions: 384


## 4. Create Mock Data Structure

Generate sample JSONL data similar to format corpus.jsonl untuk testing pipeline. Data ini mensimulasikan dokumen yang akan diindex.

In [35]:
# Sample data yang mensimulasikan corpus.jsonl format
sample_texts = [
    "Machine learning is a subset of artificial intelligence that enables computers to learn without being explicitly programmed.",
    "Natural language processing involves the interaction between computers and human language, enabling machines to understand text.",
    "Deep learning uses neural networks with multiple layers to solve complex problems and recognize patterns in data.",
    "Computer vision allows machines to interpret and understand visual information from the world around them.",
    "Elasticsearch is a distributed search and analytics engine built on Apache Lucene for full-text search capabilities.",
    "Vector databases store and search high-dimensional vectors efficiently, enabling semantic search and similarity matching.",
    "Transformer models have revolutionized natural language processing with their attention mechanism and parallel processing.",
    "Embedding models convert text into numerical representations that capture semantic meaning and context.",
    "Retrieval-augmented generation combines information retrieval with language generation for improved AI responses.",
    "Semantic search goes beyond keyword matching to understand the meaning and intent behind search queries."
]

def create_mock_jsonl_data(texts: List[str], dataset_name: str = "sample_dataset") -> List[Dict[str, Any]]:
    """Create mock JSONL data similar to corpus format."""
    mock_data = []
    
    for i, text in enumerate(texts):
        doc = {
            "_id": f"{dataset_name}_chunk_{i:04d}",
            "text": text,
            "title": f"Document {i+1}",
            "source": f"sample_source_{i+1}",
            "category": "technology",
            "chunk_index": i,
            "word_count": len(text.split()),
            "char_count": len(text)
        }
        mock_data.append(doc)
    
    return mock_data

# Generate mock data
mock_jsonl_data = create_mock_jsonl_data(sample_texts)

print(f"✅ Generated {len(mock_jsonl_data)} mock documents")
print("\n📄 Sample document:")
print(json.dumps(mock_jsonl_data[0], indent=2))

# Save to temporary file for processing simulation
temp_jsonl_file = "/tmp/sample_corpus.jsonl"
with open(temp_jsonl_file, 'w') as f:
    for doc in mock_jsonl_data:
        f.write(json.dumps(doc) + '\n')

print(f"\n💾 Saved mock data to: {temp_jsonl_file}")

✅ Generated 10 mock documents

📄 Sample document:
{
  "_id": "sample_dataset_chunk_0000",
  "text": "Machine learning is a subset of artificial intelligence that enables computers to learn without being explicitly programmed.",
  "title": "Document 1",
  "source": "sample_source_1",
  "category": "technology",
  "chunk_index": 0,
  "word_count": 17,
  "char_count": 124
}

💾 Saved mock data to: /tmp/sample_corpus.jsonl


## 5. Initialize Tokenizer and Embedding Model

Setup tokenizer dan embedding model untuk text processing dan generate embeddings seperti pada pipeline asli.

In [36]:
class EmbeddingPipelineSimulator:
    """Simulated embedding pipeline based on the original code."""
    
    def __init__(self, model_config: ModelConfig, pipeline_config: Dict[str, Any]):
        self.model_config = model_config
        self.pipeline_config = pipeline_config
        self.tokenizer = None
        self.embedding_model = None
        self.vector_stores = {}
        
    def initialize_tokenizer_and_model(self):
        """Initialize tokenizer and embedding model."""
        try:
            # Load tokenizer
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_config.provider_model_id, 
                trust_remote_code=True
            )
            
            # Load embedding model
            self.embedding_model = AutoModel.from_pretrained(
                self.model_config.provider_model_id
            )
            
            logger.info(f"✅ Loaded model: {self.model_config.provider_model_id}")
            logger.info(f"📊 Vocab size: {len(self.tokenizer)}")
            
        except Exception as e:
            logger.error(f"❌ Error loading model: {e}")
            raise e
    
    @staticmethod
    def truncate_to_token_limit(text: str, tokenizer, max_tokens: int = 512) -> str:
        """Truncate text to token limit - same as original pipeline."""
        tokenized = tokenizer(
            text,
            truncation=True,
            max_length=max_tokens,
            return_tensors=None,
            return_attention_mask=False,
            return_token_type_ids=False,
        )
        input_ids = tokenized["input_ids"]
        return tokenizer.decode(input_ids, skip_special_tokens=True)
    
    def get_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for texts."""
        embeddings = []
        
        with torch.no_grad():
            for text in texts:
                # Tokenize
                inputs = self.tokenizer(
                    text, 
                    return_tensors='pt', 
                    truncation=True, 
                    max_length=self.model_config.max_tokens,
                    padding=True
                )
                
                # Get embeddings
                outputs = self.embedding_model(**inputs)
                
                # Mean pooling
                embedding = outputs.last_hidden_state.mean(dim=1).squeeze().tolist()
                embeddings.append(embedding)
        
        return embeddings

# Initialize pipeline simulator
pipeline_simulator = EmbeddingPipelineSimulator(model_config, pipeline_config)
pipeline_simulator.initialize_tokenizer_and_model()

print("✅ Tokenizer and embedding model initialized!")
print(f"🔤 Max tokens: {model_config.max_tokens}")

# Test embedding generation
test_text = "This is a sample text for embedding generation."
test_embedding = pipeline_simulator.get_embeddings([test_text])
print(f"🧮 Embedding dimension: {len(test_embedding[0])}")
print(f"📈 Sample embedding values: {test_embedding[0][:5]}...")

INFO:__main__:✅ Loaded model: sentence-transformers/all-MiniLM-L6-v2
INFO:__main__:📊 Vocab size: 30522


✅ Tokenizer and embedding model initialized!
🔤 Max tokens: 512
🧮 Embedding dimension: 384
📈 Sample embedding values: [-0.16524578630924225, -0.030131183564662933, 0.13367828726768494, 0.19361034035682678, 0.12457739561796188]...


## 6. Process and Prepare Documents

Load dan process documents dari mock JSONL data, handle text truncation, dan prepare metadata untuk indexing seperti pada pipeline asli.

In [37]:
def process_jsonl_documents(file_path: str, pipeline_simulator: EmbeddingPipelineSimulator) -> List[Document]:
    """Process JSONL documents - same logic as original pipeline."""
    documents = []
    
    logger.info(f"📄 Processing documents from: {file_path}")
    
    with open(file_path, "r") as f:
        for line_num, line in enumerate(f, 1):
            try:
                data = json.loads(line.strip())
                
                # Extract metadata (exclude 'text' field)
                metadata = {}
                for k, v in data.items():
                    if k == "text":
                        continue
                    if k == "_id":
                        metadata["chunk_id"] = v
                    else:
                        metadata[k] = v
                
                # Get text content
                text = data.get("text", "")
                
                # Apply truncation if configured
                truncate_chunk_size = pipeline_simulator.pipeline_config.get("truncate_chunk_size")
                if truncate_chunk_size is not None:
                    original_length = len(text.split())
                    text = EmbeddingPipelineSimulator.truncate_to_token_limit(
                        text=text, 
                        tokenizer=pipeline_simulator.tokenizer, 
                        max_tokens=truncate_chunk_size
                    )
                    new_length = len(text.split())
                    if original_length != new_length:
                        logger.debug(f"🔄 Truncated doc {line_num}: {original_length} → {new_length} tokens")
                
                # Apply text size truncation if configured
                truncate_text_size = pipeline_simulator.pipeline_config.get("truncate_text_size")
                if truncate_text_size is not None:
                    text = text[:truncate_text_size] if len(text) > truncate_text_size else text
                
                # Create document
                document = Document(
                    page_content=text,
                    metadata=metadata
                )
                documents.append(document)
                
            except json.JSONDecodeError as e:
                logger.error(f"❌ JSON decode error at line {line_num}: {e}")
            except Exception as e:
                logger.error(f"❌ Error processing line {line_num}: {e}")
    
    logger.info(f"✅ Processed {len(documents)} documents")
    return documents

def filter_complex_metadata(documents: List[Document]) -> List[Document]:
    """Filter complex metadata - simplified version."""
    filtered_docs = []
    for doc in documents:
        # Simple filtering - remove any non-serializable metadata
        filtered_metadata = {}
        for k, v in doc.metadata.items():
            if isinstance(v, (str, int, float, bool, type(None))):
                filtered_metadata[k] = v
        
        filtered_doc = Document(
            page_content=doc.page_content,
            metadata=filtered_metadata
        )
        filtered_docs.append(filtered_doc)
    
    return filtered_docs

def documents_to_chunks(documents: List[Document]) -> List[Chunk]:
    """Convert documents to chunks format."""
    chunks = []
    for doc in documents:
        chunk = Chunk(
            content=doc.page_content,
            id=doc.metadata["chunk_id"],
            metadata=doc.metadata
        )
        chunks.append(chunk)
    
    return chunks

# Process the mock JSONL data
documents = process_jsonl_documents(temp_jsonl_file, pipeline_simulator)
documents = filter_complex_metadata(documents)
chunks = documents_to_chunks(documents)

print(f"✅ Created {len(chunks)} chunks ready for indexing")
print(f"\n📋 Sample chunk:")
print(f"ID: {chunks[0].id}")
print(f"Content: {chunks[0].content[:100]}...")
print(f"Metadata: {chunks[0].metadata}")

INFO:__main__:📄 Processing documents from: /tmp/sample_corpus.jsonl
INFO:__main__:✅ Processed 10 documents


✅ Created 10 chunks ready for indexing

📋 Sample chunk:
ID: sample_dataset_chunk_0000
Content: machine learning is a subset of artificial intelligence that enables computers to learn without bein...
Metadata: {'chunk_id': 'sample_dataset_chunk_0000', 'title': 'Document 1', 'source': 'sample_source_1', 'category': 'technology', 'chunk_index': 0, 'word_count': 17, 'char_count': 124}


## 7. Generate Document Embeddings

Generate embeddings untuk semua document chunks menggunakan embedding model yang sudah dikonfigurasi.

In [38]:
# Generate embeddings for all chunks
logger.info("🧮 Generating embeddings for all chunks...")
start_time = time.time()

# Extract text content from chunks
chunk_texts = [chunk.content for chunk in chunks]

# Generate embeddings in batches for efficiency
batch_size = 4  # Smaller batch size for local processing
all_embeddings = []

for i in tqdm(range(0, len(chunk_texts), batch_size), desc="Generating embeddings"):
    batch_texts = chunk_texts[i:i + batch_size]
    batch_embeddings = pipeline_simulator.get_embeddings(batch_texts)
    all_embeddings.extend(batch_embeddings)

embedding_time = time.time() - start_time

print(f"✅ Generated {len(all_embeddings)} embeddings")
print(f"⏱️ Embedding generation took: {embedding_time:.2f} seconds")
print(f"🎯 Average time per document: {embedding_time/len(chunks):.3f} seconds")

# Validate embeddings
if all_embeddings:
    embedding_dim = len(all_embeddings[0])
    print(f"📏 Embedding dimensions: {embedding_dim}")
    print(f"🧮 Sample embedding (first 5 values): {all_embeddings[0][:5]}")
    
    # Check for consistent dimensions
    dims_consistent = all(len(emb) == embedding_dim for emb in all_embeddings)
    print(f"✅ All embeddings have consistent dimensions: {dims_consistent}")
else:
    print("❌ No embeddings generated!")

INFO:__main__:🧮 Generating embeddings for all chunks...
Generating embeddings: 100%|██████████| 3/3 [00:00<00:00, 10.79it/s]

✅ Generated 10 embeddings
⏱️ Embedding generation took: 0.28 seconds
🎯 Average time per document: 0.028 seconds
📏 Embedding dimensions: 384
🧮 Sample embedding (first 5 values): [-0.04950002580881119, 0.07518677413463593, 0.13615085184574127, 0.203369140625, 0.05386560410261154]
✅ All embeddings have consistent dimensions: True





## 8. Index Documents to Elasticsearch

Batch index processed documents dengan embeddings ke Elasticsearch, handle existing indices dan error management seperti pada pipeline asli.

In [39]:
# Create vector store and index documents
dataset_name = "sample_dataset"
model_for_index_name = model_config.provider_model_id.replace("/", "-")
index_name = f"embedding_{dataset_name.lower()}_{model_for_index_name.lower()}"

logger.info(f"📋 Creating vector store with index: {index_name}")

# Initialize vector store
vector_store = ElasticsearchVectorStore(
    index_name=index_name,
    embedding_config=model_config.model_dump()
)

# Check if index already has data (like in original pipeline)
existing_doc_count = vector_store.count_documents()
logger.info(f"📊 Existing documents in index: {existing_doc_count}")

if existing_doc_count >= len(chunks):
    logger.info(f"⏭️ Index already has {existing_doc_count} documents. Skipping indexing.")
else:
    logger.info(f"🚀 Starting batch indexing of {len(chunks)} chunks...")
    
    # Batch indexing (same as original pipeline)
    batch_size = pipeline_config.get("batch_size", 32)
    start_time = time.time()
    
    for i in tqdm(range(0, len(chunks), batch_size), desc="Indexing batches"):
        batch_chunks = chunks[i:i + batch_size]
        batch_embeddings = all_embeddings[i:i + batch_size]
        
        try:
            # Add batch to vector store
            vector_store.add_chunks_batch(batch_chunks, batch_embeddings)
            logger.debug(f"✅ Indexed batch {i//batch_size + 1}: {len(batch_chunks)} chunks")
            
        except Exception as e:
            logger.error(f"❌ Error indexing batch {i//batch_size + 1}: {e}")
            # In real pipeline, this would raise the exception
            # raise e
    
    indexing_time = time.time() - start_time
    
    # Verify indexing
    final_doc_count = vector_store.count_documents()
    logger.info(f"✅ Indexing completed!")
    logger.info(f"📊 Final document count: {final_doc_count}")
    logger.info(f"⏱️ Indexing took: {indexing_time:.2f} seconds")
    logger.info(f"🎯 Average indexing time per document: {indexing_time/len(chunks):.3f} seconds")

# Store vector store for later use
pipeline_simulator.vector_stores[dataset_name] = vector_store

print(f"\n📈 Pipeline Statistics:")
print(f"├── Documents processed: {len(chunks)}")
print(f"├── Embeddings generated: {len(all_embeddings)}")  
print(f"├── Index name: {index_name}")
print(f"├── Final document count: {vector_store.count_documents()}")
print(f"└── Vector store mode: {'Simulation' if vector_store.use_simulation else 'Elasticsearch'}")

INFO:__main__:📋 Creating vector store with index: embedding_sample_dataset_sentence-transformers-all-minilm-l6-v2
INFO:elastic_transport.transport:GET http://localhost:9200/ [status:N/A duration:0.050s]
Traceback (most recent call last):
  File "/Users/azmyaryarizaldi/Desktop/GDP/ElasticSearch/Handson/env/lib/python3.13/site-packages/urllib3/connection.py", line 198, in _new_conn
    sock = connection.create_connection(
        (self._dns_host, self.port),
    ...<2 lines>...
        socket_options=self.socket_options,
    )
  File "/Users/azmyaryarizaldi/Desktop/GDP/ElasticSearch/Handson/env/lib/python3.13/site-packages/urllib3/util/connection.py", line 85, in create_connection
    raise err
  File "/Users/azmyaryarizaldi/Desktop/GDP/ElasticSearch/Handson/env/lib/python3.13/site-packages/urllib3/util/connection.py", line 73, in create_connection
    sock.connect(sa)
    ~~~~~~~~~~~~^^^^
ConnectionRefusedError: [Errno 61] Connection refused

The above exception was the direct cause of 


📈 Pipeline Statistics:
├── Documents processed: 10
├── Embeddings generated: 10
├── Index name: embedding_sample_dataset_sentence-transformers-all-minilm-l6-v2
├── Final document count: 10
└── Vector store mode: Simulation


## 9. Implement Semantic Search

Create search functionality yang embed query text dan melakukan vector similarity search terhadap indexed documents.

In [40]:
class SemanticRetriever:
    """Semantic retrieval class simulating BasicVectorRetriever."""
    
    def __init__(self, vector_store: ElasticsearchVectorStore, top_k: int = 10):
        self.vector_store = vector_store
        self.top_k = top_k
    
    def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """Calculate cosine similarity between two vectors."""
        dot_product = sum(a * b for a, b in zip(vec1, vec2))
        magnitude1 = sum(a * a for a in vec1) ** 0.5
        magnitude2 = sum(b * b for b in vec2) ** 0.5
        
        if magnitude1 == 0 or magnitude2 == 0:
            return 0.0
        
        return dot_product / (magnitude1 * magnitude2)
    
    def search(self, query_embedding: List[float]) -> List[Dict[str, Any]]:
        """Search for similar documents using vector similarity."""
        if self.vector_store.use_simulation:
            # Simulation mode: compute similarities in memory
            similarities = []
            
            for doc in self.vector_store.simulated_docs:
                similarity = self.cosine_similarity(query_embedding, doc['embedding'])
                similarities.append({
                    'chunk': doc['chunk'],
                    'score': similarity
                })
            
            # Sort by similarity score (descending)
            similarities.sort(key=lambda x: x['score'], reverse=True)
            
            # Return top-k results
            return similarities[:self.top_k]
        
        else:
            # Real Elasticsearch mode (would use kNN search)
            try:
                search_body = {
                    "knn": {
                        "field": "embedding",
                        "query_vector": query_embedding,
                        "k": self.top_k,
                        "num_candidates": min(100, self.vector_store.count_documents())
                    },
                    "_source": ["content", "metadata", "chunk_id"]
                }
                
                response = self.vector_store.client.search(
                    index=self.vector_store.index_name,
                    body=search_body
                )
                
                results = []
                for hit in response['hits']['hits']:
                    chunk = Chunk(
                        content=hit['_source']['content'],
                        id=hit['_source']['chunk_id'],
                        metadata=hit['_source']['metadata']
                    )
                    results.append({
                        'chunk': chunk,
                        'score': hit['_score']
                    })
                
                return results
                
            except Exception as e:
                logger.error(f"❌ Elasticsearch search error: {e}")
                return []

# Initialize semantic retriever
retriever = SemanticRetriever(vector_store, top_k=pipeline_config.get("retrieval_top_k", 10))

# Test queries
test_queries = [
    "What is machine learning and artificial intelligence?",
    "How does natural language processing work?",
    "Tell me about deep learning and neural networks",
    "What is vector search and embeddings?"
]

print("🔍 Testing Semantic Search:")
print("=" * 50)

for i, query in enumerate(test_queries, 1):
    print(f"\n🔎 Query {i}: {query}")
    
    # Generate query embedding
    query_embedding = pipeline_simulator.get_embeddings([query])[0]
    
    # Perform search
    search_start = time.time()
    results = retriever.search(query_embedding)
    search_time = time.time() - search_start
    
    print(f"⏱️ Search time: {search_time:.3f} seconds")
    print(f"📊 Found {len(results)} results")
    
    # Display top 3 results
    for j, result in enumerate(results[:3], 1):
        chunk = result['chunk']
        score = result['score']
        print(f"\n   {j}. Score: {score:.4f}")
        print(f"      Content: {chunk.content[:100]}...")
        print(f"      Chunk ID: {chunk.id}")

print("\n✅ Semantic search testing completed!")

🔍 Testing Semantic Search:

🔎 Query 1: What is machine learning and artificial intelligence?
⏱️ Search time: 0.001 seconds
📊 Found 10 results

   1. Score: 0.7793
      Content: machine learning is a subset of artificial intelligence that enables computers to learn without bein...
      Chunk ID: sample_dataset_chunk_0000

   2. Score: 0.4836
      Content: deep learning uses neural networks with multiple layers to solve complex problems and recognize patt...
      Chunk ID: sample_dataset_chunk_0002

   3. Score: 0.4691
      Content: computer vision allows machines to interpret and understand visual information from the world around...
      Chunk ID: sample_dataset_chunk_0003

🔎 Query 2: How does natural language processing work?
⏱️ Search time: 0.000 seconds
📊 Found 10 results

   1. Score: 0.7726
      Content: natural language processing involves the interaction between computers and human language, enabling ...
      Chunk ID: sample_dataset_chunk_0001

   2. Score: 0.5174
     

## 10. Test Retrieval and Reranking

Implement dan test document retrieval dengan optional reranking functionality, measuring retrieval accuracy dan performance.

In [41]:
class SimpleReranker:
    """Simple reranker that simulates reranking functionality."""
    
    def __init__(self):
        pass
    
    def rerank(self, query: str, chunks: List[Chunk], query_embedding: List[float]) -> List[Dict[str, Any]]:
        """Simple reranking based on text similarity and length preference."""
        results = []
        
        for chunk in chunks:
            # Simple text-based score: keyword overlap + length penalty
            query_words = set(query.lower().split())
            chunk_words = set(chunk.content.lower().split())
            
            # Keyword overlap score
            overlap_score = len(query_words.intersection(chunk_words)) / len(query_words) if query_words else 0
            
            # Length preference (prefer moderate length chunks)
            length_score = 1.0 / (1.0 + abs(len(chunk.content.split()) - 50) * 0.01)
            
            # Combined score
            rerank_score = overlap_score * 0.7 + length_score * 0.3
            
            results.append({
                'chunk': chunk,
                'score': rerank_score,
                'overlap_score': overlap_score,
                'length_score': length_score
            })
        
        # Sort by rerank score
        results.sort(key=lambda x: x['score'], reverse=True)
        return results

def comprehensive_retrieval_test():
    """Comprehensive test of the entire retrieval pipeline."""
    
    print("🔬 Comprehensive Retrieval Pipeline Test")
    print("=" * 60)
    
    # Test configuration
    test_queries = [
        "machine learning artificial intelligence",
        "natural language processing text analysis", 
        "deep learning neural networks patterns",
        "vector database similarity search",
        "elasticsearch distributed search analytics"
    ]
    
    # Initialize reranker
    reranker = SimpleReranker()
    
    # Performance metrics
    total_search_time = 0
    total_rerank_time = 0
    
    for i, query in enumerate(test_queries, 1):
        print(f"\n🔎 Test Query {i}: '{query}'")
        print("-" * 40)
        
        # Step 1: Generate query embedding
        embed_start = time.time()
        query_embedding = pipeline_simulator.get_embeddings([query])[0]
        embed_time = time.time() - embed_start
        
        # Step 2: Initial retrieval
        search_start = time.time()
        initial_results = retriever.search(query_embedding)
        search_time = time.time() - search_start
        total_search_time += search_time
        
        print(f"📊 Initial retrieval: {len(initial_results)} results in {search_time:.3f}s")
        
        # Step 3: Reranking (if enabled in config)
        if pipeline_config.get("use_reranker", False):
            rerank_start = time.time()
            
            # Extract chunks for reranking
            initial_chunks = [result['chunk'] for result in initial_results]
            reranked_results = reranker.rerank(query, initial_chunks, query_embedding)
            
            rerank_time = time.time() - rerank_start
            total_rerank_time += rerank_time
            
            print(f"🔄 Reranking completed in {rerank_time:.3f}s")
            final_results = reranked_results
        else:
            final_results = initial_results
        
        # Step 4: Display results
        print(f"\n🎯 Top 3 Final Results:")
        for j, result in enumerate(final_results[:3], 1):
            chunk = result['chunk']
            score = result['score']
            
            print(f"   {j}. Score: {score:.4f}")
            print(f"      Content: {chunk.content[:80]}...")
            print(f"      Metadata: {chunk.metadata.get('title', 'N/A')}")
            
            # Show reranking details if available
            if 'overlap_score' in result:
                print(f"      Overlap: {result['overlap_score']:.3f}, Length: {result['length_score']:.3f}")
    
    # Performance summary
    print(f"\n📈 Performance Summary:")
    print(f"├── Total search time: {total_search_time:.3f}s")
    print(f"├── Average search time: {total_search_time/len(test_queries):.3f}s")
    if total_rerank_time > 0:
        print(f"├── Total rerank time: {total_rerank_time:.3f}s")
        print(f"├── Average rerank time: {total_rerank_time/len(test_queries):.3f}s")
    print(f"└── Total queries tested: {len(test_queries)}")

# Run comprehensive test
comprehensive_retrieval_test()

# Test with reranking enabled
print(f"\n" + "="*60)
print("🔄 Testing with Reranking Enabled")
pipeline_config["use_reranker"] = True
comprehensive_retrieval_test()
pipeline_config["use_reranker"] = False  # Reset

🔬 Comprehensive Retrieval Pipeline Test

🔎 Test Query 1: 'machine learning artificial intelligence'
----------------------------------------
📊 Initial retrieval: 10 results in 0.001s

🎯 Top 3 Final Results:
   1. Score: 0.6472
      Content: machine learning is a subset of artificial intelligence that enables computers t...
      Metadata: Document 1
   2. Score: 0.4255
      Content: deep learning uses neural networks with multiple layers to solve complex problem...
      Metadata: Document 3
   3. Score: 0.4199
      Content: computer vision allows machines to interpret and understand visual information f...
      Metadata: Document 4

🔎 Test Query 2: 'natural language processing text analysis'
----------------------------------------
📊 Initial retrieval: 10 results in 0.000s

🎯 Top 3 Final Results:
   1. Score: 0.6086
      Content: natural language processing involves the interaction between computers and human...
      Metadata: Document 2
   2. Score: 0.4426
      Content: transf

## 11. Cleanup and Performance Monitoring

Monitor index performance, document counts, dan implement cleanup procedures untuk removing test indices seperti pada pipeline asli.

In [42]:
def performance_monitoring():
    """Monitor performance and index statistics."""
    
    print("📊 Performance Monitoring & Index Statistics")
    print("=" * 60)
    
    # Vector store statistics
    for dataset_name, vs in pipeline_simulator.vector_stores.items():
        print(f"\n📋 Dataset: {dataset_name}")
        print(f"├── Index name: {vs.index_name}")
        print(f"├── Document count: {vs.count_documents()}")
        print(f"├── Storage mode: {'Simulation' if vs.use_simulation else 'Elasticsearch'}")
        print(f"└── Embedding dimensions: {vs.embedding_config.get('embedding_dimensions', 'Unknown')}")
        
        if not vs.use_simulation and hasattr(vs, 'client'):
            try:
                # Get index stats from Elasticsearch
                stats = vs.client.indices.stats(index=vs.index_name)
                index_stats = stats['indices'][vs.index_name]['total']
                
                print(f"├── Index size: {index_stats['store']['size_in_bytes']} bytes")
                print(f"├── Documents indexed: {index_stats['docs']['count']}")
                print(f"└── Search operations: {index_stats['search']['query_total']}")
                
            except Exception as e:
                print(f"├── ⚠️ Cannot retrieve ES stats: {e}")

def cleanup_pipeline():
    """Cleanup vector stores and temporary files."""
    
    print("\n🧹 Cleanup Procedures")
    print("=" * 40)
    
    # Clean up temporary files
    try:
        if os.path.exists(temp_jsonl_file):
            os.remove(temp_jsonl_file)
            print(f"✅ Removed temporary file: {temp_jsonl_file}")
    except Exception as e:
        print(f"⚠️ Error removing temp file: {e}")
    
    # Cleanup vector stores (similar to original pipeline)
    if pipeline_simulator.vector_stores:
        print(f"🗑️ Cleaning up {len(pipeline_simulator.vector_stores)} vector stores...")
        
        for dataset_name, vs in pipeline_simulator.vector_stores.items():
            try:
                if not vs.use_simulation:
                    # In real implementation, this would delete the index
                    # vs.client.indices.delete(index=vs.index_name)
                    print(f"🗑️ Would delete index: {vs.index_name}")
                else:
                    vs.simulated_docs.clear()
                    print(f"🗑️ Cleared simulated data for: {dataset_name}")
                    
            except Exception as e:
                print(f"⚠️ Error cleaning up {dataset_name}: {e}")
        
        # Clear the vector stores dictionary
        pipeline_simulator.vector_stores = {}
        print("✅ Vector stores cleanup completed")

def pipeline_summary():
    """Generate final pipeline summary."""
    
    print("\n📈 Pipeline Execution Summary")
    print("=" * 60)
    
    print(f"🏗️ Pipeline Configuration:")
    print(f"├── Model: {model_config.provider_model_id}")
    print(f"├── Embedding dimensions: {model_config.embedding_dimensions}")
    print(f"├── Max tokens: {model_config.max_tokens}")
    print(f"├── Batch size: {pipeline_config.get('batch_size', 32)}")
    print(f"├── Top-K retrieval: {pipeline_config.get('retrieval_top_k', 10)}")
    print(f"└── Reranking enabled: {pipeline_config.get('use_reranker', False)}")
    
    print(f"\n📊 Data Processing:")
    print(f"├── Documents processed: {len(chunks)}")
    print(f"├── Embeddings generated: {len(all_embeddings)}")
    print(f"├── Index name: {index_name}")
    print(f"└── Storage mode: {'Simulation' if vector_store.use_simulation else 'Elasticsearch'}")
    
    print(f"\n✅ Pipeline completed successfully!")
    print(f"🎯 This simulation demonstrates the complete Elasticsearch embedding pipeline")
    print(f"📝 All steps from the original pipeline.py have been replicated")

# Run monitoring and summary
performance_monitoring()

print("\n" + "="*60)
print("🎯 FINAL PIPELINE SUMMARY")
pipeline_summary()

# Optional: Run cleanup (uncomment if you want to clean up)
# cleanup_pipeline()

print(f"\n🏁 Elasticsearch Embedding Pipeline Simulation Complete!")
print(f"📅 Completed at: {datetime.now()}")
print("🔗 This notebook successfully simulates the entire embedding pipeline from pipeline.py")

📊 Performance Monitoring & Index Statistics

📋 Dataset: sample_dataset
├── Index name: embedding_sample_dataset_sentence-transformers-all-minilm-l6-v2
├── Document count: 10
├── Storage mode: Simulation
└── Embedding dimensions: 384

🎯 FINAL PIPELINE SUMMARY

📈 Pipeline Execution Summary
🏗️ Pipeline Configuration:
├── Model: sentence-transformers/all-MiniLM-L6-v2
├── Embedding dimensions: 384
├── Max tokens: 512
├── Batch size: 32
├── Top-K retrieval: 10
└── Reranking enabled: False

📊 Data Processing:
├── Documents processed: 10
├── Embeddings generated: 10
├── Index name: embedding_sample_dataset_sentence-transformers-all-minilm-l6-v2
└── Storage mode: Simulation

✅ Pipeline completed successfully!
🎯 This simulation demonstrates the complete Elasticsearch embedding pipeline
📝 All steps from the original pipeline.py have been replicated

🏁 Elasticsearch Embedding Pipeline Simulation Complete!
📅 Completed at: 2025-07-24 08:29:51.031363
🔗 This notebook successfully simulates the entire 