<a href="https://colab.research.google.com/github/Abhay-404/Create-Book-using-Gemini_Ai/blob/main/Eternal_Contextual_RAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ============================================================================
# CONTEXTUAL RAG WITH ELASTICSEARCH & WEB GROUNDING
# ============================================================================
# Advanced RAG pipeline with:
# ✅ Contextual Retrieval ( better accuracy)
# ✅ Elasticsearch (Hybrid: Vector + BM25)
# ✅ Cohere Reranking
# ✅ Web Search Grounding (automatic knowledge expansion)
# ✅ Production-ready architecture
# ============================================================================

## 🛠️ Step 1: Environment & Dependency Management

In [None]:
"""
Install all necessary libraries for the contextual retrieval pipeline.
"""

!pip install -q google-generativeai
#!pip install -q chromadb  # can use any other vector db
#!pip install -q rank-bm25
!pip install -q sentence-transformers
!pip install -q tiktoken
!pip install -q PyPDF2
!pip install -q elasticsearch==8.12.0  # Can use chroma or any-other vector db also
!pip install cohere # for reranking

print("✅ All dependencies installed successfully!")

In [3]:
# ============================================================================
#  Import Libraries
# ============================================================================
"""
Import required libraries and configure settings.
"""

import os
import re
import json
import time
from typing import List, Dict, Tuple, Any, Optional
from collections import defaultdict
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Core libraries
import numpy as np
import tiktoken
from tqdm.auto import tqdm

# Google Gemini
import google.generativeai as genai

# Vector database
#import chromadb
#from chromadb.config import Settings

# BM25 for keyword search
#from rank_bm25 import BM25Okapi

# Elasticsearch
from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import ConnectionError, NotFoundError

# Cohere (for reranking)
import cohere

# PDF processing
import PyPDF2

print("✅ Libraries imported successfully!")

✅ Libraries imported successfully!


## ⚙️ Step 2: System Configuration
Configure your **API keys** and define the indexing parameters.


> Note: Ensure you have added your keys to the Google Colab "Secrets" (🔑 icon) on the left sidebar.

In [4]:
# ============================================================================
# API Configuration
# ============================================================================
"""
Configure API keys for all services.

Required APIs:
1. Google Gemini: https://makersuite.google.com/app/apikey (free tier available)
2. Elasticsearch: https://cloud.elastic.co/registration (14-day free trial)
3. Cohere: https://dashboard.cohere.com/api-keys (free tier available)
"""

from google.colab import userdata

# API Keys (stored in Colab Secrets)
GEMINI_API_KEY = userdata.get('GOOGLE_API_KEY')
ELASTIC_API_KEY = userdata.get('ELASTIC_API_KEY')
COHERE_API_KEY = userdata.get('Cohre_API')

# Elasticsearch Configuration
ELASTIC_URL = "https://contextual-rag-fb24fb.es.us-central1.gcp.elastic.cloud:443"

# Pipeline Settings
INDEX_NAME = "contextual_rag_index"
EMBEDDING_DIMENSION = 768  # Gemini embedding dimension u can change this also
CHUNK_SIZE = 800  # Target tokens per chunk
CHUNK_OVERLAP = 100  # Overlap between chunks

print("✅ Configuration loaded!")
print(f"📊 Settings: Index={INDEX_NAME}, ChunkSize={CHUNK_SIZE}, Overlap={CHUNK_OVERLAP}")

# Testing Gemini API

"""
Configure and test Gemini API connection.
"""

try:
    genai.configure(api_key=GEMINI_API_KEY)

    # Test connection
    test_result = genai.embed_content(
        model="models/text-embedding-004",
        content="test",
        task_type="retrieval_document"
    )

    print("✅ Gemini API connected successfully!")
    print(f"📐 Embedding dimension: {len(test_result['embedding'])}")

except Exception as e:
    print(f"❌ Error connecting to Gemini: {e}")
    print("Please check your GOOGLE_API_KEY in Colab Secrets")

✅ Configuration loaded!
📊 Settings: Index=contextual_rag_index, ChunkSize=800, Overlap=100
✅ Gemini API connected successfully!
📐 Embedding dimension: 768


In [5]:
# ============================================================================
#  Initialize Elasticsearch
# ============================================================================

"""
Connect to Elasticsearch Cloud.
"""

try:
    es = Elasticsearch(
        hosts=[ELASTIC_URL],
        api_key=ELASTIC_API_KEY,
        request_timeout=30
    )

    # Test connection
    if es.ping():
        info = es.info()
        print("✅ Elasticsearch connected successfully!")
        print(f"📦 Cluster: {info['cluster_name']}")
        print(f"📊 Version: {info['version']['number']}")
    else:
        raise ConnectionError("Cannot ping Elasticsearch")

except Exception as e:
    print(f"❌ Error connecting to Elasticsearch: {e}")
    print("Please check your ELASTIC_URL and ELASTIC_API_KEY")

✅ Elasticsearch connected successfully!
📦 Cluster: fb24fb3fff0e46899eb6a54b975077b9
📊 Version: 8.11.0


In [6]:
# ============================================================================
# Initialize Cohere API
# ============================================================================
"""
Configure Cohere for reranking.
"""

try:
    co = cohere.ClientV2(COHERE_API_KEY)

    # Test connection with simple rerank
    test_docs = ["test document"]
    test_response = co.rerank(
        model="rerank-v4.0-fast",
        query="test",
        documents=test_docs,
        top_n=1
    )

    print("✅ Cohere API connected successfully!")
    print("🔄 Reranking ready!")

except Exception as e:
    print(f"❌ Error connecting to Cohere: {e}")
    print("Please check your Cohere API key")

✅ Cohere API connected successfully!
🔄 Reranking ready!


##📂 Step 3: Document Processing Utilities
Before we can search, we must process raw files into searchable chunks. This section handles PDF/Text loading and **Smart Chunking**, which respects sentence boundaries to prevent loss of meaning.

In [57]:
# ============================================================================
# Token Counting Utility(not imp)
# ============================================================================
"""
Utility to count tokens in text for chunk size management.
"""

def count_tokens(text: str, encoding_name: str = "cl100k_base") -> int:
    """
    Count the number of tokens in a text string.

    Args:
        text: Input text to count tokens
        encoding_name: Tokenizer encoding (default: cl100k_base)

    Returns:
        Number of tokens in the text
    """
    try:
        encoding = tiktoken.get_encoding(encoding_name)
        tokens = encoding.encode(text)
        return len(tokens)
    except Exception as e:
        # Fallback: approximate tokens by word count
        return int(len(text.split()) * 1.3)


# Test the function
test_text = "This is a sample sentence to count tokens."
print(f"✅ Token counting utility ready!")
print(f"Sample: '{test_text}' = {count_tokens(test_text)} tokens")

# ============================================================================
# Document Loading Functions
# ============================================================================
"""
Load documents from various sources: TXT, PDF, raw text.
"""

def load_text_file(file_path: str) -> str:
    """Load content from a text file."""
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()


def load_pdf_file(file_path: str) -> str:
    """Extract text content from a PDF file."""
    text = ""
    with open(file_path, 'rb') as f:
        pdf_reader = PyPDF2.PdfReader(f)
        for page in pdf_reader.pages:
            text += page.extract_text() + "\n"
    return text


def load_document(source: str, source_type: str = "text") -> Dict[str, str]:
    """
    Load document from various sources.

    Args:
        source: File path or raw text
        source_type: Type of source ('text', 'pdf', 'raw')

    Returns:
        Dictionary with document content and metadata
    """
    if source_type == "pdf":
        content = load_pdf_file(source)
        doc_name = os.path.basename(source)
    elif source_type == "text":
        content = load_text_file(source)
        doc_name = os.path.basename(source)
    else:  # raw text
        content = source
        doc_name = "raw_document"

    return {
        "content": content,
        "name": doc_name,
        "token_count": count_tokens(content)
    }


print("✅ Document loading functions ready!")
print("Supported formats: TXT, PDF, Raw Text")



def load_pdf_pages(file_path):
    pages = []
    with open(file_path, "rb") as f:
        reader = PyPDF2.PdfReader(f)
        for i, page in enumerate(reader.pages):
            pages.append({
                "page_number": i + 1,
                "text": page.extract_text() or ""
            })
    return pages

# ============================================================================
#  Smart Chunking with Sentence Boundaries
# ============================================================================
"""
Intelligent chunking that respects sentence boundaries.
This ensures chunks don't split mid-sentence for better semantic coherence.
"""

def chunk_document(
    document: str,
    target_chunk_size: int = CHUNK_SIZE,
    chunk_overlap: int = CHUNK_OVERLAP,
    min_chunk_size: int = 200
) -> List[str]:
    """
    Split document into chunks with smart boundary detection.

    This function ensures chunks:
    - End at sentence boundaries (not mid-sentence)
    - Have target size around 800 tokens
    - Have overlap for context continuity
    - Maintain minimum viable size

    Args:
        document: Full document text
        target_chunk_size: Target tokens per chunk (~800)
        chunk_overlap: Overlap tokens between chunks (~100)
        min_chunk_size: Minimum tokens for a valid chunk

    Returns:
        List of text chunks
    """

    # Split into sentences using multiple delimiters
    sentence_endings = r'(?<=[.!?])\s+'
    sentences = re.split(sentence_endings, document)

    chunks = []
    current_chunk = []
    current_size = 0

    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue

        sentence_tokens = count_tokens(sentence)

        # If single sentence exceeds target, add it as its own chunk
        if sentence_tokens > target_chunk_size:
            if current_chunk:
                chunks.append(" ".join(current_chunk))
                current_chunk = []
                current_size = 0
            chunks.append(sentence)
            continue

        # If adding this sentence exceeds target, start new chunk
        if current_size + sentence_tokens > target_chunk_size:
            if current_chunk:
                chunks.append(" ".join(current_chunk))

                # Add overlap: keep last few sentences
                overlap_chunk = []
                overlap_size = 0
                for s in reversed(current_chunk):
                    s_tokens = count_tokens(s)
                    if overlap_size + s_tokens <= chunk_overlap:
                        overlap_chunk.insert(0, s)
                        overlap_size += s_tokens
                    else:
                        break

                current_chunk = overlap_chunk
                current_size = overlap_size

        current_chunk.append(sentence)
        current_size += sentence_tokens

    # Add remaining chunk
    if current_chunk and current_size >= min_chunk_size:
        chunks.append(" ".join(current_chunk))

    return chunks


print("✅ Smart chunking function ready!")
print("Features: Sentence-aware, ~800 tokens, 100-token overlap")

✅ Token counting utility ready!
Sample: 'This is a sample sentence to count tokens.' = 9 tokens
✅ Document loading functions ready!
Supported formats: TXT, PDF, Raw Text
✅ Smart chunking function ready!
Features: Sentence-aware, ~800 tokens, 100-token overlap


## ✨ Step 4: The Contextual Retrieval
This is the main thing, Instead of embedding a chunk in isolation, we will use Gemini 2.0 Flash to write a brief context explaining where this chunk fits in the original document. This allows the retriever to find the chunk even if it lacks specific keywords found elsewhere in the text.

In [10]:
# ============================================================================
# Context Generation with Gemini(You can use any other sasta model also)
# ============================================================================
"""
Generate contextual descriptions for each chunk using Gemini or you can use anyother model.
This is the core innovation - prepending context to chunks dramatically
improves retrieval accuracy.
"""
from google import genai
client = genai.Client(api_key=GEMINI_API_KEY)
def generate_chunk_context(
    chunk: str,
    full_document: str,
    document_name: str = "document"
) -> str:
    """
    Generate contextual description for a chunk using Gemini.

    This function uses an LLM to create a brief explanation of what
    the chunk is about in the context of the full document, which
    is then prepended to the chunk before embedding.

    Args:
        chunk: The text chunk to contextualize
        full_document: The complete document for context
        document_name: Name/identifier of the document

    Returns:
        Generated context string (50-100 tokens)
    """

    prompt = f"""<document>
{full_document}
</document>

Here is the chunk we want to situate within the whole document:
<chunk>
{chunk}
</chunk>

Please give a short succinct context (2-3 sentences) to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk.

The context should:
- Explain what this chunk is about
- Mention the document it's from ({document_name})
- Help someone searching for this information find it

Answer only with the succinct context and nothing else.
"""

    try:
        response = client.models.generate_content(
            model="gemini-2.5-flash-lite",
            contents=prompt
        )
        return response.text.strip()
    except Exception as e:
        print(f"⚠️ Error generating context: {e}")
        return f"This chunk is from {document_name}."



def contextualize_chunks(
    chunks: List[str],
    full_document: str,
    document_name: str = "document",
    show_progress: bool = True
) -> List[Dict[str, str]]:
    """
    Generate contexts for all chunks in a document.

    Args:
        chunks: List of text chunks
        full_document: Complete document text
        document_name: Document identifier
        show_progress: Show progress bar

    Returns:
        List of dictionaries containing original chunk, context, and combined text
    """

    contextualized_chunks = []

    iterator = tqdm(chunks, desc="Generating contexts") if show_progress else chunks

    for i, chunk in enumerate(iterator):
        context = generate_chunk_context(chunk, full_document, document_name)

        # Combine context with original chunk
        contextualized_text = f"{context}\n\n{chunk}"

        contextualized_chunks.append({
            "chunk_id": f"{document_name}_chunk_{i}",
            "original_chunk": chunk,
            "context": context,
            "contextualized_chunk": contextualized_text,
            "document_name": document_name \
        })

    return contextualized_chunks


print("✅ Context generation functions ready!")

✅ Context generation functions ready!


In [11]:
generate_chunk_context("a","b")

"This chunk discusses the letter 'a' within the provided document. The document itself contains the letter 'b'. This information can help users searching for specific characters within the document."

In [12]:
# ============================================================================
# Embedding Generation with Gemini
# ============================================================================
"""
Generate vector embeddings for contextualized chunks.
I am  using Gemini's text-embedding-004 model for high-quality semantic representations.
"""



def generate_embedding(text: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            response = client.models.embed_content(
                model="text-embedding-004",
                contents=text
            )

            embedding = response.embeddings[0].values

            if len(embedding) != EMBEDDING_DIMENSION:
                raise ValueError("Embedding dimension mismatch")

            return embedding

        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                print(f"⚠️ Embedding generation failed: {e}")
                return [0.0] * EMBEDDING_DIMENSION



def create_embeddings_batch(
    chunks: List[Dict[str, str]],
    show_progress: bool = True
) -> List[Dict[str, Any]]:
    """
    Generate embeddings for all chunks.

    Args:
        chunks: List of chunk dictionaries
        show_progress: Show progress

    Returns:
        Chunks with embeddings added
    """
    iterator = tqdm(chunks, desc="Creating embeddings") if show_progress else chunks

    for chunk_data in iterator:
        embedding = generate_embedding(chunk_data["contextualized_chunk"])
        chunk_data["embedding"] = embedding
        chunk_data["token_count"] = count_tokens(chunk_data["original_chunk"])

    return chunks


print("✅ Embedding generation functions ready!")


✅ Embedding generation functions ready!


In [None]:
generate_embedding("Heloo, how are you?")

## 🔍 Step 5: Hybrid Search & Reranking Architecture
We will use a "Multiple-Pass" retrieval system:

- Hybrid Search: Elasticsearch performs a simultaneous BM25 and vector search.
- Cohere Rerank: Re-evaluates the **top 20** results to pick the absolute **best 5** for the final answer( we can **change** the number based on the data we have)

In [14]:
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║                     ELASTICSEARCH HYBRID SEARCH                          ║
# ╚══════════════════════════════════════════════════════════════════════════╝

# ============================================================================
#  Create Elasticsearch Index
# ============================================================================
"""
Create an Elasticsearch index supporting:
- Dense vector search (kNN) for semantic similarity
- BM25 keyword search for exact matches
- Hybrid search combining both
"""

def create_index(es: Elasticsearch, index_name: str, delete_if_exists: bool = True):
    """
    Create Elasticsearch index with hybrid search capabilities.

    Args:
        es: Elasticsearch client
        index_name: Name of the index
        delete_if_exists: Delete existing index if it exists
    """

    # Delete existing index if requested
    if delete_if_exists and es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        print(f"🗑️  Deleted existing index: {index_name}")

    # Index mapping with both dense vectors and text fields
    mapping = {
        "mappings": {
            "properties": {
                "chunk_id": {"type": "keyword"},
                "document_name": {"type": "keyword"},
                "original_chunk": {"type": "text"},
                "context": {"type": "text"},
                "contextualized_chunk": {"type": "text"},
                "embedding": {
                    "type": "dense_vector",
                    "dims": EMBEDDING_DIMENSION,
                    "index": True,
                    "similarity": "cosine"
                },
                "timestamp": {"type": "date"},
                "token_count": {"type": "integer"}
            }
        }
    }

    # Create index
    es.indices.create(index=index_name, body=mapping)
    print(f"✅ Created index: {index_name}")
    print(f"📊 Configured for:")
    print(f"   - Dense vector search (cosine similarity)")
    print(f"   - BM25 keyword search")
    print(f"   - Hybrid search")


# Create the index
es_client = es
create_index(es_client, INDEX_NAME, delete_if_exists=True)

# ============================================================================
# Index Documents in Elasticsearch
# ============================================================================
"""
Store contextualized chunks with embeddings in Elasticsearch.
"""

def index_documents(
    es: Elasticsearch,
    index_name: str,
    chunks: List[Dict[str, Any]],
    batch_size: int = 100
) -> Dict[str, int]:
    """
    Index documents in Elasticsearch using bulk API.

    Args:
        es: Elasticsearch client
        index_name: Target index
        chunks: Chunks with embeddings
        batch_size: Batch size for bulk indexing

    Returns:
        Statistics dictionary
    """

    print(f"📥 Indexing {len(chunks)} documents...")

    # Prepare bulk actions
    actions = []
    for chunk in chunks:
        action = {
            "_index": index_name,
            "_id": chunk["chunk_id"],
            "_source": {
                "chunk_id": chunk["chunk_id"],
                "document_name": chunk["document_name"],
                "original_chunk": chunk["original_chunk"],
                "context": chunk["context"],
                "contextualized_chunk": chunk["contextualized_chunk"],
                "embedding": chunk["embedding"],
                "token_count": chunk.get("token_count", 0),
                "timestamp": datetime.now().isoformat()
            }
        }
        actions.append(action)

    # Bulk index with progress
    success_count = 0
    error_count = 0

    with tqdm(total=len(actions), desc="Indexing") as pbar:
        for i in range(0, len(actions), batch_size):
            batch = actions[i:i + batch_size]
            try:
                success, errors = helpers.bulk(
                    es,
                    batch,
                    raise_on_error=False
                )
                success_count += success
                error_count += len(errors)
            except Exception as e:
                print(f"⚠️ Batch indexing error: {e}")
                error_count += len(batch)

            pbar.update(len(batch))

    # Refresh index
    es.indices.refresh(index=index_name)

    stats = {
        "total": len(chunks),
        "success": success_count,
        "errors": error_count
    }

    print(f"✅ Indexing complete!")
    print(f"   Success: {success_count}/{len(chunks)}")
    if error_count > 0:
        print(f"   Errors: {error_count}")

    return stats


print("✅ Indexing functions ready!")


🗑️  Deleted existing index: contextual_rag_index
✅ Created index: contextual_rag_index
📊 Configured for:
   - Dense vector search (cosine similarity)
   - BM25 keyword search
   - Hybrid search
✅ Indexing functions ready!


In [15]:
# ============================================================================
# 4.3 Hybrid Search (Vector + BM25)
# ============================================================================
"""
Combine kNN vector search and BM25 keyword search for best results.
"""

def hybrid_search(
    es: Elasticsearch,
    index_name: str,
    query: str,
    top_k: int = 20,
    knn_weight: float = 0.6, # can experiment with this
    bm25_weight: float = 0.4
) -> List[Dict[str, Any]]:
    """
    Perform hybrid search using kNN + BM25.

    Args:
        es: Elasticsearch client
        index_name: Index to search
        query: Search query
        top_k: Number of results
        knn_weight: Weight for kNN scores
        bm25_weight: Weight for BM25 scores

    Returns:
        Ranked search results
    """

    # Generate query embedding
    query_embedding = generate_embedding(query)

    # Build hybrid search query
    search_query = {
        "size": top_k,
        "query": {
            "bool": {
                "should": [
                    # BM25 text search on contextualized chunks
                    {
                        "multi_match": {
                            "query": query,
                            "fields": [
                                "contextualized_chunk^2",
                                "original_chunk",
                                "context"
                            ],
                            "type": "best_fields",
                            "boost": bm25_weight
                        }
                    }
                ]
            }
        },
        "knn": {
            "field": "embedding",
            "query_vector": query_embedding,
            "k": top_k,
            "num_candidates": top_k * 10,
            "boost": knn_weight
        },
        "_source": [
            "chunk_id",
            "document_name",
            "original_chunk",
            "context",
            "contextualized_chunk",
            "token_count"
        ]
    }

    try:
        response = es.search(index=index_name, body=search_query)

        results = []
        for hit in response['hits']['hits']:
            results.append({
                "chunk_id": hit['_source']['chunk_id'],
                "document_name": hit['_source']['document_name'],
                "original_chunk": hit['_source']['original_chunk'],
                "context": hit['_source']['context'],
                "contextualized_chunk": hit['_source']['contextualized_chunk'],
                "score": hit['_score'],
                "token_count": hit['_source'].get('token_count', 0)
            })

        return results

    except Exception as e:
        print(f"❌ Search error: {e}")
        return []


print("✅ Hybrid search function ready!")
print("🔍 Combines: kNN (semantic) + BM25 (keyword)")

✅ Hybrid search function ready!
🔍 Combines: kNN (semantic) + BM25 (keyword)


In [28]:
#Testing
results = hybrid_search(
    es=es,
    index_name=INDEX_NAME,
    query="What is an AI?",
    top_k=5
)

print("Total results:", len(results))
for r in results:
    print(r["chunk_id"], r["score"])
#    print(r["original_chunk"])
 #   print("-" * 60)
#Final Score= (BM25 score × bm25_weight)+ (Vector similarity × knn_weight)


bm25_only = hybrid_search(
    es=es,
    index_name=INDEX_NAME,
    query="What is computer vision",
    knn_weight=0.0,
    bm25_weight=1.0
)

print("\nBM25-only results:")
for r in bm25_only:
    print(r["chunk_id"], r["score"])


knn_only = hybrid_search(
    es=es,
    index_name=INDEX_NAME,
    query="What is computer vision",
    knn_weight=1.0,
    bm25_weight=0.0
)

print("\nkNN-only results:")
for r in knn_only:
    print(r["chunk_id"], r["score"])


Total results: 5
AI_Overview_chunk_4 2.144116
AI_Overview_chunk_7 0.675843
AI_Overview_chunk_8 0.6723432
AI_Overview_chunk_0 0.6504564
AI_Overview_chunk_3 0.6409435

BM25-only results:
AI_Overview_chunk_6 0.95237696
AI_Overview_chunk_1 0.8805245
AI_Overview_chunk_3 0.8642241
AI_Overview_chunk_7 0.8642241
AI_Overview_chunk_0 0.8485161
AI_Overview_chunk_4 0.79100776
AI_Overview_chunk_2 0.47863305
AI_Overview_chunk_5 0.47570294
AI_Overview_chunk_8 0.44215035

kNN-only results:
AI_Overview_chunk_7 0.83939147
AI_Overview_chunk_4 0.8130689
AI_Overview_chunk_0 0.80753374
AI_Overview_chunk_3 0.8018433
AI_Overview_chunk_6 0.8014387
AI_Overview_chunk_1 0.7960742
AI_Overview_chunk_5 0.78831875
AI_Overview_chunk_2 0.7849475
AI_Overview_chunk_8 0.78359735


In [17]:
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║                       5. COHERE RERANKING                                ║
# ╚══════════════════════════════════════════════════════════════════════════╝

# ============================================================================
# 5.1 Rerank Results with Cohere
# ============================================================================
"""
Rerank retrieved chunks using Cohere's reranking model.
This further improves accuracy
"""

def rerank_results(
    query: str,
    results: List[Dict[str, Any]],
    top_n: int = 10,
    model: str = "rerank-v4.0-fast"
) -> List[Dict[str, Any]]:
    """
    Rerank search results using Cohere.

    Args:
        query: Search query
        results: Retrieved chunks from hybrid search
        top_n: Number of top results to return after reranking
        model: Cohere reranking model

    Returns:
        Reranked results
    """

    if not results:
        return []

    try:
        # Prepare documents for reranking
        documents = [r["contextualized_chunk"] for r in results]

        # Rerank with Cohere
        rerank_response = co.rerank(
            model=model,
            query=query,
            documents=documents,
            top_n=top_n
        )

        # Map reranked results back to original data
        reranked = []
        for item in rerank_response.results:
            original_result = results[item.index].copy()
            original_result["rerank_score"] = item.relevance_score
            original_result["original_score"] = original_result["score"]
            original_result["score"] = item.relevance_score  # Use rerank score
            reranked.append(original_result)

        return reranked

    except Exception as e:
        print(f"⚠️ Reranking error: {e}")
        # Fallback: return original results
        return results[:top_n]


print("✅ Reranking function ready!")


✅ Reranking function ready!


In [36]:
#test
# Step 1: Get hybrid search results
query = "What is an AI ?"

hybrid_results = hybrid_search(
    es=es,
    index_name=INDEX_NAME,
    query=query,
    top_k=10
)

print("Before reranking:")
for i, r in enumerate(hybrid_results):
    print(f"{i+1}. {r['chunk_id']} | score={r['score']:.4f}")

# Step 2: Rerank using Cohere
reranked_results = rerank_results(
    query=query,
    results=hybrid_results,
    top_n=5
)

print("\nAfter reranking:")
for i, r in enumerate(reranked_results):
    print(
        f"{i+1}. {r['chunk_id']} | "
        f"rerank_score={r['rerank_score']:.4f} | "
        f"original_score={r['original_score']:.4f}"
    )


Before reranking:
1. AI_Overview_chunk_4 | score=2.1376
2. AI_Overview_chunk_7 | score=0.6694
3. AI_Overview_chunk_5 | score=0.6687
4. AI_Overview_chunk_8 | score=0.6675
5. AI_Overview_chunk_2 | score=0.6495
6. AI_Overview_chunk_0 | score=0.6445
7. AI_Overview_chunk_3 | score=0.6351
8. AI_Overview_chunk_1 | score=0.6338
9. AI_Overview_chunk_6 | score=0.6315

After reranking:
1. AI_Overview_chunk_0 | rerank_score=0.7071 | original_score=0.6445
2. AI_Overview_chunk_3 | rerank_score=0.6771 | original_score=0.6351
3. AI_Overview_chunk_5 | rerank_score=0.6737 | original_score=0.6687
4. AI_Overview_chunk_8 | rerank_score=0.6651 | original_score=0.6675
5. AI_Overview_chunk_7 | rerank_score=0.6492 | original_score=0.6694


## 🌐 Step 6: Web Search Fallback (Grounding)
A robust RAG system shouldn't just say "I don't know." If our internal database fails to provide a high-confidence answer, we can use Gemini Grounding to search the web, ingest that new data, and provide an answer.

In [19]:

# ============================================================================
#  Web Search with Gemini Grounding
# ============================================================================
"""
If context doesn't answer the query, search the web for information,
generate context for it, and save it to the database.
"""
from datetime import datetime, timezone
from google import genai
from google.genai.types import Tool, GenerateContentConfig, GoogleSearch

client = genai.Client(api_key=GEMINI_API_KEY)

def search_web_and_expand_knowledge(
    query: str,
    document_name = "web_ret_context"
) -> List[Dict[str, Any]]:
    """
    Search the web, create contextualized chunks, and index them.

    Args:
        query: Search query
        document_name: Name for the web search document
    Returns:
        List of newly created chunks
    """

    print(f"🌐 Searching web for: {query}")
    document_name = f"web::{query[:30]}"
    try:
        # Use Gemini with the Google Search grounding tool enabled
        search_tool = Tool(google_search=GoogleSearch())

        response = client.models.generate_content(
            model="gemini-3-flash-preview",  # models with web search grounding support
            contents=query,
            config=GenerateContentConfig(tools=[search_tool])
        )


        web_content = response.text
        print(f"✅ Retrieved web information ({count_tokens(web_content)} tokens)")
        print(f"🌐 Retrieved web content:\n{web_content}")
        # Print grounded web search queries (optional)
        if hasattr(response, "grounding_metadata"):
            print("🔎 Search queries used:", response.grounding_metadata.web_search_queries)

        # Chunk the web content
        chunks = chunk_document(web_content)
        print(f"📄 Created {len(chunks)} chunks from web search")

        # Contextualize chunks
        contextualized = contextualize_chunks(
            chunks,
            web_content,
            f"{document_name}_{datetime.now().isoformat()}",
            show_progress=False
        )

        for c in contextualized:
          c["document_name"] = document_name
        # Create embeddings
        with_embeddings = create_embeddings_batch(contextualized, show_progress=False)

        # Index in Elasticsearch
        stats = index_documents(es_client, INDEX_NAME, with_embeddings, batch_size=50)
        print(f"✅ Indexed {stats['success']} new chunks from web search")

        return with_embeddings

    except Exception as e:
        print(f"❌ Web search error: {e}")
        return []


print("✅ Web search grounding function ready!")
print("🌐 Automatically expands knowledge base when needed")


✅ Web search grounding function ready!
🌐 Automatically expands knowledge base when needed


In [None]:
search_web_and_expand_knowledge("What did the US do in Venezuela?")

##🏗️ Step 7: Pipeline Assembly & Execution
This section connects all the modules into a single query_pipeline function.

In [20]:
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║                      ANSWER GENERATION & PIPELINE                        ║
# ╚══════════════════════════════════════════════════════════════════════════╝

# ============================================================================
# Answer Generation
# ============================================================================
"""
Generate final answer using retrieved context and Gemini.
"""

def generate_answer(
    query: str,
    retrieved_chunks: List[Dict[str, Any]],
    top_k_for_generation: int = 5
) -> Dict[str, Any]:
    """
    Generate answer using retrieved context.

    Args:
        query: User question
        retrieved_chunks: Retrieved chunks
        top_k_for_generation: Number of chunks to use

    Returns:
        Answer dictionary with metadata
    """

    if not retrieved_chunks:
        return {
            "query": query,
            "answer": "No relevant information found.",
            "sources": [],
            "num_chunks_used": 0,
            "used_web_search": False
        }

    # Select top chunks
    context_chunks = retrieved_chunks[:top_k_for_generation]

    # Build context
    context_text = "\n\n---\n\n".join([
        f"[Source: {chunk['document_name']}]\n{chunk['original_chunk']}"
        for chunk in context_chunks
    ])

    # Generate answer
    prompt = f"""You are a helpful AI assistant. Answer the question based ONLY on the provided context.

Context:
{context_text}

Question: {query}

Instructions:
- Use only information from the context above
- Be specific and cite relevant details
- If the context doesn't contain enough information, say so
- Keep your answer clear and concise

Answer:"""

    try:
        response = client.models.generate_content(
            model="gemini-3-flash-preview",
            contents=prompt
        )
   #     model = genai.GenerativeModel('gemini-1.5-flash')
   #     response = model.generate_content(prompt)
        answer = response.text.strip()
    except Exception as e:
        answer = f"Error generating answer: {e}"

    return {
        "query": query,
        "answer": answer,
        "sources": [
            {
                "document": chunk['document_name'],
                "score": chunk['score'],
                "preview": chunk['original_chunk'][:200] + "..."
            }
            for chunk in context_chunks
        ],
        "num_chunks_used": len(context_chunks),
        "used_web_search": any("web_search" in chunk['document_name'] for chunk in context_chunks)
    }


print("✅ Answer generation function ready!")


✅ Answer generation function ready!


In [None]:
generate_answer("What did the US do in Venezuela?", "sd")

In [67]:

def build_contextual_rag_pipeline(
    es: Elasticsearch,
    index_name: str,
    documents: List[Dict[str, str]]
) -> Dict[str, Any]:
    """
    Build complete contextual RAG pipeline.

    Args:
        es: Elasticsearch client
        index_name: Index name
        documents: List of document dicts with 'content' and 'name'

    Returns:
        Pipeline statistics
    """

    print("\n" + "="*70)
    print("🚀 BUILDING CONTEXTUAL RAG PIPELINE")
    print("="*70)

    all_chunks_with_embeddings = []
    total_chunks = 0

    for doc in documents:
        print(f"\n📄 Processing: {doc['name']}")
        print(f"   Tokens: {count_tokens(doc['content']):,}")

        # Step 1: Chunk
        print("   ├─ Chunking...")
        chunks = chunk_document(doc['content'])
        print(f"   ├─ Created {len(chunks)} chunks")

        # Step 2: Contextualize
        print("   ├─ Generating contexts...")
        contextualized = contextualize_chunks(
            chunks,
            doc['content'],
            doc['name'],
            show_progress=True
        )

        # Step 3: Embed
        print("   ├─ Creating embeddings...")
        with_embeddings = create_embeddings_batch(contextualized, show_progress=True)

        all_chunks_with_embeddings.extend(with_embeddings)
        total_chunks += len(chunks)
        print(f"   └─ ✅ Done")

    print(f"\n📊 Total chunks: {total_chunks}")

    # Step 4: Index in Elasticsearch
    print(f"\n🔍 Indexing in Elasticsearch...")
    stats = index_documents(es, index_name, all_chunks_with_embeddings)

    print("\n" + "="*70)
    print("✅ PIPELINE BUILD COMPLETE!")
    print("="*70)

    return {
        "total_documents": len(documents),
        "total_chunks": total_chunks,
        "indexing_stats": stats,
        "index_name": index_name
    }


In [74]:
def query_pipeline(
    es: Elasticsearch,
    index_name: str,
    query: str,
    top_k: int = 20,
    top_k_for_generation: int = 5,
    use_reranking: bool = True,
    use_web_search_fallback: bool = True,
    min_score_threshold: float = 0.65 #reraker score
) -> Dict[str, Any]:
    """
    Query the RAG pipeline with all features.

    Args:
        es: Elasticsearch client
        index_name: Index name
        query: User question
        top_k: Chunks to retrieve
        top_k_for_generation: Chunks to use for answer
        use_reranking: Enable Cohere reranking
        use_web_search_fallback: Search web if no good results
        min_score_threshold: Minimum score to consider results valid

    Returns:
        Answer dictionary
    """

    print(f"\n🔍 Query: {query}")
    print("-" * 60)

    # Step 1: Retrieve with hybrid search
    print("📥 Retrieving relevant chunks...")
    results = hybrid_search(es, index_name, query, top_k=top_k)
    print(f"✅ Retrieved {len(results)} chunks")

    # Step 2: Rerank if enabled
    if use_reranking and results:
        print("🔄 Reranking results with Cohere...")
        results = rerank_results(query, results, top_n=top_k_for_generation * 2)
        print(f"✅ Reranked to top {len(results)} chunks")

    # ---------------- ONLY CHANGE ----------------
    # Use reranker score (fallback to ES score if reranking not applied)
    confidence_score = (
        results[0].get("rerank_score", results[0]["score"])
        if results else 0.0
    )
    # ---------------------------------------------

    # Step 3: Check if results are good enough
    web_search_used = False
    if use_web_search_fallback:
        if not results or confidence_score < min_score_threshold:
            print(f"⚠️ Low confidence (score: {confidence_score:.2f})")
            print("🌐 Searching web for additional context...")

            # Search web and expand knowledge base
            web_chunks = search_web_and_expand_knowledge(query)

            if web_chunks:
                # Re-search with new knowledge
                print("🔄 Re-searching with expanded knowledge base...")
                results = hybrid_search(es, index_name, query, top_k=top_k)

                if use_reranking:
                    results = rerank_results(
                        query, results, top_n=top_k_for_generation * 2
                    )

                web_search_used = True
                print("✅ Retrieved results with web-enhanced knowledge")

    # Step 4: Generate answer
    print("🤖 Generating answer...")
    answer = generate_answer(query, results, top_k_for_generation)
    answer["used_web_search"] = web_search_used

    print("-" * 60)
    print(f"💡 Answer:\n{answer['answer']}")
    print("-" * 60)
    print(f"📚 Sources used: {answer['num_chunks_used']}")
    if web_search_used:
        print("🌐 Web search was used to enhance knowledge")

    return answer


##🧪 Step 8: Demonstration & Testing
Run the final cell to see the pipeline in action with a sample dataset.

In [65]:
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║                         EXAMPLE USAGE & TESTING                          ║
# ╚══════════════════════════════════════════════════════════════════════════╝

# ============================================================================
# Sample Document
# ============================================================================
"""
Build and query the pipeline with a sample document.
"""

# Sample document (replace with your own)
sample_documents = [
    {
        "name": "AI_Overview",
        "content": """
        Artificial Intelligence (AI) is transforming technology across industries.
        Machine learning, a subset of AI, enables systems to learn from data without
        explicit programming. Deep learning uses neural networks with multiple layers
        to process complex patterns in images, text, and audio.

        Natural language processing (NLP) helps computers understand and generate
        human language. Modern NLP systems use transformer architectures, which were
        introduced in 2017 in the paper "Attention is All You Need". Transformers
        revolutionized the field by enabling parallel processing and capturing
        long-range dependencies.

        Large language models like GPT and BERT have achieved remarkable results
        in tasks like translation, summarization, and question answering. These
        models are trained on massive datasets containing billions of tokens.

        Computer vision allows machines to interpret visual information. Applications
        include facial recognition, autonomous vehicles, and medical image analysis.
        Convolutional neural networks (CNNs) are commonly used for image processing.

        AI raises important ethical considerations. Bias in training data can lead
        to unfair outcomes. Privacy concerns arise from extensive data collection.
        Job displacement due to automation is a societal challenge that requires
        careful policy responses. Responsible AI development emphasizes transparency,
        fairness, and accountability.

        The future of AI includes developments in areas like reinforcement learning,
        few-shot learning, and multimodal AI systems that can process multiple types
        of data simultaneously. Quantum computing may also accelerate AI capabilities.
        """*20
    }
]

print("\n" + "="*70)
print("📚 BUILDING PIPELINE WITH SAMPLE DOCUMENT")
print("="*70)

# Build pipeline
pipeline_stats = build_contextual_rag_pipeline(
    es_client,
    INDEX_NAME,
    sample_documents
)

print(f"\n📊 Pipeline Statistics:")
print(f"   Documents: {pipeline_stats['total_documents']}")
print(f"   Chunks: {pipeline_stats['total_chunks']}")
print(f"   Indexed: {pipeline_stats['indexing_stats']['success']}")


📚 BUILDING PIPELINE WITH SAMPLE DOCUMENT

🚀 BUILDING CONTEXTUAL RAG PIPELINE

📄 Processing: AI_Overview
   Tokens: 6,261
   ├─ Chunking...
   ├─ Created 9 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/9 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/9 [00:00<?, ?it/s]

   └─ ✅ Done

📊 Total chunks: 9

🔍 Indexing in Elasticsearch...
📥 Indexing 9 documents...


Indexing:   0%|          | 0/9 [00:00<?, ?it/s]

✅ Indexing complete!
   Success: 9/9

✅ PIPELINE BUILD COMPLETE!

📊 Pipeline Statistics:
   Documents: 1
   Chunks: 9
   Indexed: 9


In [75]:

# ============================================================================
# Test Queries
# ============================================================================
"""
Test the pipeline with various queries.
"""

print("\n" + "="*70)
print("🧪 TESTING QUERIES")
print("="*70)

# Test queries
test_queries = [
    "What is machine learning?",
    "When were transformers introduced in NLP?",
    "What are the ethical concerns with AI?",
]

for query in test_queries:
    result = query_pipeline(
        es_client,
        INDEX_NAME,
        query,
        top_k=20,
        top_k_for_generation=5,
        use_reranking=True,
        use_web_search_fallback=False  # Set to True to enable web search
    )
    print("\n" + "="*70 + "\n")



🧪 TESTING QUERIES

🔍 Query: What is machine learning?
------------------------------------------------------------
📥 Retrieving relevant chunks...
✅ Retrieved 20 chunks
🔄 Reranking results with Cohere...
✅ Reranked to top 10 chunks
🤖 Generating answer...
------------------------------------------------------------
💡 Answer:
Machine learning, a subset of Artificial Intelligence (AI), enables systems to learn from data without explicit programming.
------------------------------------------------------------
📚 Sources used: 5



🔍 Query: When were transformers introduced in NLP?
------------------------------------------------------------
📥 Retrieving relevant chunks...
✅ Retrieved 20 chunks
🔄 Reranking results with Cohere...
✅ Reranked to top 10 chunks
🤖 Generating answer...
------------------------------------------------------------
💡 Answer:
Transformers were introduced in 2017 in the paper "Attention is All You Need".
------------------------------------------------------------
📚 S

In [76]:
# ============================================================================
# 8.3 Test with Web Search Fallback
# ============================================================================
"""
Test a query that's not in the knowledge base to trigger web search.
"""

print("\n" + "="*70)
print("🌐 TESTING WEB SEARCH FALLBACK")
print("="*70)

# Query about something NOT in our documents
web_query = "What is the price of tesla today?"

result = query_pipeline(
    es_client,
    INDEX_NAME,
    web_query,
    top_k=20,
    top_k_for_generation=5,
    use_reranking=True,
    use_web_search_fallback=True  # Enable web search
)


🌐 TESTING WEB SEARCH FALLBACK

🔍 Query: What is the price of tesla today?
------------------------------------------------------------
📥 Retrieving relevant chunks...
✅ Retrieved 20 chunks
🔄 Reranking results with Cohere...
✅ Reranked to top 10 chunks
🤖 Generating answer...
------------------------------------------------------------
💡 Answer:
As of mid-day on January 6, 2026, Tesla's (TSLA) stock price is trading at approximately $434.04. It has fluctuated between $430.01 and $454.70 throughout the session.
------------------------------------------------------------
📚 Sources used: 5


In [None]:
result

## 🚀 Final Step: Loading the Data

📚 In this step, we load the data into the pipeline.
I am using NCERT textbooks as the primary data source, but you can use any type of documents (PDFs, text files, notes, reports, etc.) based on your use case.

In [None]:
# ============================================================================
# Load Custom Documents
# ============================================================================
"""
Load and process your own documents.
Uncomment and modify this section to use your documents.
"""

# Example: Load from files
"""
# Upload your files to Colab first, then:
my_documents = []

# Load a text file
doc1 = load_document("/content/your_document.txt", source_type="text")
my_documents.append(doc1)

# Load a PDF file
doc2 = load_document("/content/your_document.pdf", source_type="pdf")
my_documents.append(doc2)

# Or use raw text
doc3 = load_document("Your raw text content here...", source_type="raw")
my_documents.append(doc3)

# Build pipeline with your documents
my_pipeline = build_contextual_rag_pipeline(
    es_client,
    "my_custom_index",
    my_documents
)

# Query your pipeline
my_query = "Your question here?"
result = query_pipeline(
    es_client,
    "my_custom_index",
    my_query,
    use_reranking=True,
    use_web_search_fallback=True
)
"""

print("\n✅ Ready to load your documents!")
print("Modify the code above to load your own files")

In [68]:
pdf_text = load_pdf_pages("/content/NCERT-Class-11-Political-Science-Part-1.pdf") # You can replace this with your docs, pdfs
toc_ps1 = [
    ("Constitution: Why and How?", 3),
    ("Rights in the Indian Constitution", 28),
    ("Election and Representation", 53),
    ("Executive", 80),
    ("Legislature", 102),
    ("Judiciary", 126),
    ("Federalism", 152),
    ("Local Governments", 178),
    ("Constitution as a Living Document", 198),
    ("The Philosophy of the Constitution", 222),
]

def split_pages_by_toc(pages, toc):
    chapters = []

    for i, (title, start_page) in enumerate(toc):
        end_page = toc[i + 1][1] - 1 if i + 1 < len(toc) else pages[-1]["page_number"]

        chapter_text = []
        for p in pages:
            if start_page <= p["page_number"] <= end_page:
                chapter_text.append(p["text"])

        chapters.append({
            "name": title,
            "start_page": start_page,
            "end_page": end_page,
            "content": "\n".join(chapter_text)
        })

    return chapters


In [70]:
chapters = split_pages_by_toc(pdf_text, toc_ps1)

# Build pipeline with your documents
my_pipeline = build_contextual_rag_pipeline(
    es_client,
    "contextual_rag_index",
     chapters
)


🚀 BUILDING CONTEXTUAL RAG PIPELINE

📄 Processing: Constitution: Why and How?
   Tokens: 9,997
   ├─ Chunking...
   ├─ Created 15 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/15 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/15 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: Rights in the Indian Constitution
   Tokens: 9,527
   ├─ Chunking...
   ├─ Created 14 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/14 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/14 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: Election and Representation
   Tokens: 10,274
   ├─ Chunking...
   ├─ Created 15 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/15 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/15 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: Executive
   Tokens: 8,348
   ├─ Chunking...
   ├─ Created 12 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/12 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/12 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: Legislature
   Tokens: 8,486
   ├─ Chunking...
   ├─ Created 12 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/12 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/12 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: Judiciary
   Tokens: 9,171
   ├─ Chunking...
   ├─ Created 13 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/13 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/13 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: Federalism
   Tokens: 9,284
   ├─ Chunking...
   ├─ Created 13 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/13 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/13 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: Local Governments
   Tokens: 7,568
   ├─ Chunking...
   ├─ Created 11 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/11 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/11 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: Constitution as a Living Document
   Tokens: 9,206
   ├─ Chunking...
   ├─ Created 13 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/13 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/13 [00:00<?, ?it/s]

   └─ ✅ Done

📄 Processing: The Philosophy of the Constitution
   Tokens: 9,334
   ├─ Chunking...
   ├─ Created 13 chunks
   ├─ Generating contexts...


Generating contexts:   0%|          | 0/13 [00:00<?, ?it/s]

   ├─ Creating embeddings...


Creating embeddings:   0%|          | 0/13 [00:00<?, ?it/s]

   └─ ✅ Done

📊 Total chunks: 131

🔍 Indexing in Elasticsearch...
📥 Indexing 131 documents...


Indexing:   0%|          | 0/131 [00:00<?, ?it/s]

✅ Indexing complete!
   Success: 131/131

✅ PIPELINE BUILD COMPLETE!


In [79]:
query = "What is the role of local government?"

result = query_pipeline(
    es_client,
    INDEX_NAME,
    query,
    top_k=20,
    top_k_for_generation=5,
    use_reranking=True,
    use_web_search_fallback=True  # Enable web search
)


🔍 Query: What is the role of local government?
------------------------------------------------------------
📥 Retrieving relevant chunks...
✅ Retrieved 20 chunks
🔄 Reranking results with Cohere...
✅ Reranked to top 10 chunks
🤖 Generating answer...
------------------------------------------------------------
💡 Answer:
The role of local government is to function at the village and district level, as the government closest to the common people, involving their day-to-day life and problems. It is necessary to look after local affairs and ensure democratic decision-making by utilizing local knowledge and interest, leading to efficient and people-friendly administration.

Specifically, local governments are important for:
*   Protecting the local interests of the people.
*   Ensuring active participation and purposeful accountability in democracy.
*   Involving common citizens in decision-making concerning their lives, needs, and development.
*   Performing tasks that can be managed locally

In [80]:
result

{'query': 'What is the role of local government?',
 'answer': 'The role of local government is to function at the village and district level, as the government closest to the common people, involving their day-to-day life and problems. It is necessary to look after local affairs and ensure democratic decision-making by utilizing local knowledge and interest, leading to efficient and people-friendly administration.\n\nSpecifically, local governments are important for:\n*   Protecting the local interests of the people.\n*   Ensuring active participation and purposeful accountability in democracy.\n*   Involving common citizens in decision-making concerning their lives, needs, and development.\n*   Performing tasks that can be managed locally, thereby strengthening democratic processes.\n*   Resolving issues at the village level.\n\nExamples of their responsibilities include renovating water tanks, building school buildings, constructing village roads, fighting against domestic violence a

## Trying Mutlimodal contextual rag