In [11]:
!pip install PyMuPDF tiktoken langchain-text-splitters



In [12]:
import fitz # PyMuPDF
import tiktoken
import json
import math
import os
import re
from collections import Counter
from typing import List, Dict, Tuple, Optional, Set, Any


In [13]:
# Import RecursiveCharacterTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter

# --- Configuration ---
# The target chunk size in tokens. A common value is 512.
CHUNK_SIZE_TOKENS = 512
# The overlap percentage between consecutive chunks (e.g., 0.15 for 15% overlap).
OVERLAP_PERCENTAGE = 0.15
# Encoding for tokenization (e.g., 'cl100k_base' for OpenAI models like GPT-4, GPT-3.5)
ENCODING_NAME = "cl100k_base"

# Heuristic for identifying common headers/footers
# Max number of lines from top/bottom of a page to consider as potential header/footer
MAX_LINES_TO_CHECK = 5
# Percentage of pages a line must appear on (excluding page 1) to be considered a common header/footer
REPETITION_THRESHOLD_PERCENT = 70

# Initialize tiktoken encoder globally for consistent token counting
ENCODER = tiktoken.get_encoding(ENCODING_NAME)


In [14]:
def count_tokens(text: str) -> int:
    """Counts tokens using the global tiktoken encoder."""
    return len(ENCODER.encode(text))

def extract_text_from_pdf(pdf_path: str) -> list[dict]:
    """
    Extracts text content page by page from a PDF document.

    Args:
        pdf_path (str): The file path to the PDF document.

    Returns:
        list[dict]: A list of dictionaries, each containing 'page_num' and 'text'
                    for a page. Returns an empty list if the file cannot be opened.
    """
    pages_content = []
    try:
        document = fitz.open(pdf_path)
        for page_num in range(len(document)):
            page = document.load_page(page_num)
            text = page.get_text("text")
            pages_content.append({"page_num": page_num + 1, "text": text})
        document.close()
    except Exception as e:
        print(f"Error reading PDF {pdf_path}: {e}")
    return pages_content

In [15]:
def identify_common_page_elements(all_pages_content: dict[str, list[dict]],
                                   max_lines: int = MAX_LINES_TO_CHECK,
                                   repetition_threshold_percent: int = REPETITION_THRESHOLD_PERCENT) -> tuple[set, set]:
    """
    Analyzes text from multiple pages (excluding first pages) to identify common
    header and footer lines based on repetition.

    Args:
        all_pages_content (dict[str, list[dict]]): Dictionary where keys are doc_ids
                                                    and values are lists of page_data.
        max_lines (int): Max number of lines from top/bottom to consider.
        repetition_threshold_percent (int): Percentage of non-first pages a line must
                                            appear on to be considered common.

    Returns:
        tuple[set, set]: Two sets: (common_header_lines, common_footer_lines).
    """
    header_candidates = Counter()
    footer_candidates = Counter()
    total_non_first_pages = 0

    for doc_id, pages_data in all_pages_content.items():
        for page_data in pages_data:
            page_num = page_data['page_num']
            page_text = page_data['text']

            # Skip the first page of each document for common element identification
            if page_num == 1:
                continue

            total_non_first_pages += 1
            lines = [line.strip() for line in page_text.split('\n') if line.strip()]

            # Collect header candidates
            for i in range(min(max_lines, len(lines))):
                header_candidates[lines[i]] += 1

            # Collect footer candidates (from the end of the page)
            for i in range(max(0, len(lines) - max_lines), len(lines)):
                footer_candidates[lines[i]] += 1

    common_header_lines = set()
    common_footer_lines = set()

    if total_non_first_pages == 0:
        print("No non-first pages found to identify common elements.")
        return common_header_lines, common_footer_lines

    threshold_count = math.ceil(total_non_first_pages * (repetition_threshold_percent / 100))
    print(f"Identifying common elements: Total non-first pages: {total_non_first_pages}, Threshold count: {threshold_count}")

    for line, count in header_candidates.items():
        if count >= threshold_count:
            common_header_lines.add(line)
            print(f"  Identified common header: '{line}' (appears {count} times)")

    for line, count in footer_candidates.items():
        # A common heuristic for page numbers: remove if it's just a number or "Page X"
        if count >= threshold_count and (re.fullmatch(r'\s*\d+\s*', line) or re.fullmatch(r'Page\s+\d+\s*(of\s+\d+)?', line, re.IGNORECASE)):
            common_footer_lines.add(line)
            print(f"  Identified common footer: '{line}' (appears {count} times)")
        # You can extend this logic to include other non-numeric common footers if needed

    return common_header_lines, common_footer_lines

In [16]:
def remove_identified_elements(page_text: str, page_num: int,
                               common_header_lines: set, common_footer_lines: set) -> str:
    """
    Removes identified common header and footer lines from a page's text.
    It skips removal for the first page.

    Args:
        page_text (str): The text content of a single page.
        page_num (int): The current page number (1-indexed).
        common_header_lines (set): Set of lines identified as common headers.
        common_footer_lines (set): Set of lines identified as common footers.

    Returns:
        str: The page text with identified common elements removed.
    """
    # Skip removal for the first page, as it contains unique, important metadata.
    if page_num == 1:
        return page_text

    lines = [line.strip() for line in page_text.split('\n')]
    cleaned_lines = []

    # Flags to stop removal once non-header/non-footer content is found
    header_removal_done = False
    footer_removal_done = False

    # Process lines from top for header removal
    for i, line in enumerate(lines):
        if not header_removal_done and line in common_header_lines:
            # This line is a common header, skip it
            continue
        else:
            # Found non-header content or no more headers, stop checking
            header_removal_done = True
            cleaned_lines.append(line) # Add this line and subsequent lines

    # Now process the cleaned lines from the bottom for footer removal
    # This is a bit tricky with `cleaned_lines` already built.
    # A simpler approach for this heuristic is to rebuild `cleaned_lines` from scratch
    # by iterating through original lines and marking for inclusion/exclusion.

    # Re-process original lines for both header and footer removal in one pass
    final_lines = []

    # Determine which lines to keep from the top (non-headers)
    temp_lines = []
    for i, line in enumerate(lines):
        if line in common_header_lines and i < MAX_LINES_TO_CHECK: # Only consider top lines for header
            continue # Skip this header line
        else:
            temp_lines.append(line)

    # Determine which lines to keep from the bottom (non-footers)
    # Iterate from the end of temp_lines
    footer_check_start_index = max(0, len(temp_lines) - MAX_LINES_TO_CHECK)
    for i, line in enumerate(temp_lines):
        if line in common_footer_lines and i >= footer_check_start_index: # Only consider bottom lines for footer
            continue # Skip this footer line
        else:
            final_lines.append(line)

    # Filter out any completely empty lines that result from removal
    return "\n".join(line for line in final_lines if line.strip() != "")

In [17]:
def chunk_text_with_metadata(text: str, chunk_size_tokens: int, overlap_percentage: float,
                             doc_id: str, page: int, base_clause_id_prefix: str) -> List[Dict]:
    """
    Splits a given text into chunks using RecursiveCharacterTextSplitter,
    prioritizing natural language boundaries. Each chunk is tagged with metadata.

    Args:
        text (str): The text content to be chunked.
        chunk_size_tokens (int): The target maximum number of tokens per chunk.
                                 This is converted to character length for the splitter.
        overlap_percentage (float): The percentage of overlap between consecutive chunks.
        doc_id (str): The ID of the document (e.g., "policy_123").
        page (int): The page number the content is notionally from.
        base_clause_id_prefix (str): A prefix for generating clause IDs (e.g., "1.1").

    Returns:
        list[dict]: A list of dictionaries, where each dictionary represents a chunk
                    and contains its content and metadata.
    """
    # Estimate character length based on average tokens per character
    avg_chars_per_token = 4 # Common average for English text
    chunk_size_chars = chunk_size_tokens * avg_chars_per_token
    overlap_chars = math.floor(chunk_size_chars * overlap_percentage)

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size_chars,
        chunk_overlap=overlap_chars,
        length_function=len, # Use character length for splitting
        separators=["\n\n", "\n", " ", ""] # Prioritize paragraphs, then lines, then words, then characters
    )

    raw_chunks = text_splitter.split_text(text)

    processed_chunks = []
    for i, chunk_content in enumerate(raw_chunks):
        token_length = count_tokens(chunk_content)
        clause_id = f"{base_clause_id_prefix}-{doc_id}-p{page}-c{i + 1}"

        metadata = {
            "doc_id": doc_id,
            "page": page,
            "clause_id": clause_id,
            "chunk_length_tokens": token_length,
            "chunk_length_chars": len(chunk_content)
        }

        processed_chunks.append({
            "content": chunk_content,
            "metadata": metadata
        })

    return processed_chunks

In [18]:
def process_pdfs_for_chunking(pdf_paths: list[str]):
    """
    Processes a list of PDF file paths, extracts text, dynamically identifies
    and removes common headers/footers, and chunks the cleaned text.

    Args:
        pdf_paths (list[str]): A list of file paths to the PDF documents.

    Returns:
        list[dict]: A flattened list of all processed chunks from all PDFs.
    """
    all_docs_pages_content = {}
    # First pass: Extract all text and store it for common element identification
    for pdf_path in pdf_paths:
        doc_id = os.path.splitext(os.path.basename(pdf_path))[0]
        pages_content = extract_text_from_pdf(pdf_path)
        if pages_content:
            all_docs_pages_content[doc_id] = pages_content
        else:
            print(f"Skipping {pdf_path} due to extraction errors.")

    # Identify common headers and footers across all documents (excluding first pages)
    common_header_lines, common_footer_lines = identify_common_page_elements(all_docs_pages_content)

    all_processed_chunks = []
    # Second pass: Process each page, remove identified common elements, and chunk
    for doc_id, pages_data in all_docs_pages_content.items():
        print(f"\n--- Chunking Document: {doc_id} ---")
        for page_data in pages_data:
            page_num = page_data['page_num']
            raw_page_text = page_data['text']

            # --- Dynamically remove identified common headers/footers ---
            cleaned_page_text = remove_identified_elements(
                raw_page_text, page_num, common_header_lines, common_footer_lines
            )

            print(f"  Processing Page {page_num} (raw length: {len(raw_page_text)} chars, cleaned length: {len(cleaned_page_text)} chars)")

            if not cleaned_page_text.strip():
                print(f"    Page {page_num} became empty after cleaning. Skipping chunking for this page.")
                continue

            page_chunks = chunk_text_with_metadata(
                text=cleaned_page_text,
                chunk_size_tokens=CHUNK_SIZE_TOKENS,
                overlap_percentage=OVERLAP_PERCENTAGE,
                doc_id=doc_id,
                page=page_num,
                base_clause_id_prefix="Clause"
            )
            all_processed_chunks.extend(page_chunks)
            print(f"    Generated {len(page_chunks)} chunks for page {page_num}.")

    return all_processed_chunks

In [19]:
# --- Example Usage with Placeholder PDF Paths ---
# IMPORTANT: Replace these with the actual paths to your uploaded PDF files in Colab
pdf_file_paths = [
    "/content/BAJHLIP23020V012223.pdf",
    "/content/CHOTGDP23004V012223.pdf",
    "/content/EDLHLGA23009V012223.pdf",
    "/content/HDFHLIP23024V072223.pdf",
    "/content/ICIHLIP22012V012223.pdf"
]

# Check if placeholder files exist (they won't unless you upload them)
# This is just for demonstration; in a real scenario, you'd ensure files are there.
existing_pdf_paths = [p for p in pdf_file_paths if os.path.exists(p)]


In [20]:
if not existing_pdf_paths:
    print("WARNING: No PDF files found at the specified paths. Please upload your PDFs to Colab")
    print("and update the 'pdf_file_paths' list with their correct locations.")
    print("Proceeding with a dummy text for demonstration purposes as no PDFs were found.")
    dummy_text_page1 = """
    Policy Title: Comprehensive Health Plan 2025
    Version 1.0 - Effective Date: Jan 1, 2025

    This is the unique content for page 1.
    """
    dummy_text_page2 = """
    Common Header Text
    Section 1: Eligibility

    1.1 Age Requirements:
    Applicants must be between 18 and 65 years old. Dependents up to 26 are covered if full-time students.

    Common Footer Text - Page 2
    """
    dummy_text_page3 = """
    Common Header Text
    1.2 Geographic Coverage:
    Coverage is valid in all 50 US states. Travel abroad is limited to 90 days.

    Table 1: Deductibles
    | Plan Type | Deductible | Co-pay |
    |---|---|---|
    | Basic | $1000 | $50 |
    | Premium | $500 | $25 |

    Common Footer Text - Page 3
    """

    # Simulate multiple pages for a single dummy document to test common element identification
    simulated_pages_content = {
        "dummy_doc": [
            {"page_num": 1, "text": dummy_text_page1},
            {"page_num": 2, "text": dummy_text_page2},
            {"page_num": 3, "text": dummy_text_page3},
        ]
    }

    # Manually call the two-pass process for the dummy data
    print("\n--- Generating chunks from dummy text for demonstration ---")
    common_header_lines_dummy, common_footer_lines_dummy = identify_common_page_elements(simulated_pages_content)

    processed_chunks = []
    for doc_id, pages_data in simulated_pages_content.items():
        for page_data in pages_data:
            cleaned_page_text = remove_identified_elements(
                page_data['text'], page_data['page_num'], common_header_lines_dummy, common_footer_lines_dummy
            )
            if cleaned_page_text.strip():
                page_chunks = chunk_text_with_metadata(
                    text=cleaned_page_text,
                    chunk_size_tokens=CHUNK_SIZE_TOKENS,
                    overlap_percentage=OVERLAP_PERCENTAGE,
                    doc_id=doc_id,
                    page=page_data['page_num'],
                    base_clause_id_prefix="DummyClause"
                )
                processed_chunks.extend(page_chunks)

else:
    print(f"--- Starting PDF Processing and Chunking for {len(existing_pdf_paths)} files ---")
    processed_chunks = process_pdfs_for_chunking(existing_pdf_paths)
    print(f"\n--- Total Generated Chunks: {len(processed_chunks)} ---")

# Display a few sample chunks
if processed_chunks:
    print("\n--- Sample of Processed Chunks (first 5) ---")
    for i, chunk in enumerate(processed_chunks[:5]):
        print(f"\nChunk {i+1}:")
        print(f"  Metadata: {json.dumps(chunk['metadata'], indent=2)}")
        print(f"  Content (first 200 chars): \"{chunk['content'][:200]}...\"")
        print(f"  Content length (chars): {len(chunk['content'])}")
        print(f"  Content length (tokens): {chunk['metadata']['chunk_length_tokens']}")
else:
    print("No chunks were generated. Please ensure your PDFs are uploaded and paths are correct.")

print(f"\n--- Document Processing & Chunking Complete ---")


--- Starting PDF Processing and Chunking for 5 files ---
Identifying common elements: Total non-first pages: 217, Threshold count: 152

--- Chunking Document: BAJHLIP23020V012223 ---
  Processing Page 1 (raw length: 4505 chars, cleaned length: 4505 chars)
    Generated 3 chunks for page 1.
  Processing Page 2 (raw length: 3985 chars, cleaned length: 3815 chars)
    Generated 3 chunks for page 2.
  Processing Page 3 (raw length: 4461 chars, cleaned length: 4303 chars)
    Generated 3 chunks for page 3.
  Processing Page 4 (raw length: 3890 chars, cleaned length: 3721 chars)
    Generated 2 chunks for page 4.
  Processing Page 5 (raw length: 4371 chars, cleaned length: 4206 chars)
    Generated 3 chunks for page 5.
  Processing Page 6 (raw length: 4644 chars, cleaned length: 4484 chars)
    Generated 3 chunks for page 6.
  Processing Page 7 (raw length: 4517 chars, cleaned length: 4353 chars)
    Generated 3 chunks for page 7.
  Processing Page 8 (raw length: 3900 chars, cleaned length: 

In [21]:
!pip install sentence-transformers qdrant-client

Collecting qdrant-client
  Downloading qdrant_client-1.15.1-py3-none-any.whl.metadata (11 kB)
Collecting portalocker<4.0,>=2.7.0 (from qdrant-client)
  Downloading portalocker-3.2.0-py3-none-any.whl.metadata (8.7 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from

In [22]:
import json
from sentence_transformers import SentenceTransformer
from qdrant_client.http.exceptions import UnexpectedResponse
from qdrant_client import QdrantClient, models
import numpy as np
import uuid
import math

In [23]:
# --- Configuration for Embeddings ---
EMBEDDING_MODEL_NAME = 'BAAI/bge-large-en-v1.5'
COLLECTION_NAME = "policy_clauses" # Name for your Qdrant collection
UPSERT_BATCH_SIZE = 256

# --- Load the Embedding Model ---
print(f"Loading embedding model: {EMBEDDING_MODEL_NAME}...")
embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME)
print("Embedding model loaded.")

Loading embedding model: BAAI/bge-large-en-v1.5...


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/779 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.34G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/366 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/191 [00:00<?, ?B/s]

Embedding model loaded.


In [24]:
def generate_embeddings(chunks: list[dict], model: SentenceTransformer) -> list[np.ndarray]:
    """
    Generates embeddings for the content of each chunk.

    Args:
        chunks (list[dict]): A list of dictionaries, each representing a chunk
                             with a 'content' key.
        model (SentenceTransformer): The loaded sentence-transformer model.

    Returns:
        list[np.ndarray]: A list of numpy arrays, where each array is the embedding
                          for a corresponding chunk.
    """
    texts = [chunk['content'] for chunk in chunks]
    print(f"Generating embeddings for {len(texts)} chunks...")
    embeddings = model.encode(texts, convert_to_numpy=True)
    print("Embeddings generated.")
    return embeddings

In [25]:
def initialize_qdrant_client(location=":memory:"):
    """
    Initializes a Qdrant client.
    For Colab, ':memory:' creates an in-memory instance (data is lost on restart).
    For persistence in Colab, you could use path="path/to/db" or run Qdrant in Docker.
    For production, you'd connect to a Qdrant server (e.g., QdrantClient(host="localhost", port=6333)).
    """
    print(f"Initializing Qdrant client at location: {location}...")
    client = QdrantClient(location=location)
    print("Qdrant client initialized.")
    return client

In [26]:
def create_qdrant_collection(client: QdrantClient, collection_name: str, embedding_dim: int):
    """
    Creates a Qdrant collection if it doesn't already exist, using recommended methods.

    Args:
        client (QdrantClient): The Qdrant client instance.
        collection_name (str): The name of the collection to create.
        embedding_dim (int): The dimensionality of the vectors to be stored.
    """
    print(f"Checking for collection '{collection_name}'...")
    # --- Deprecation Fix: Use collection_exists and create_collection ---
    if not client.collection_exists(collection_name=collection_name):
        print(f"Collection '{collection_name}' not found. Creating new collection...")
        client.create_collection(
            collection_name=collection_name,
            vectors_config=models.VectorParams(size=embedding_dim, distance=models.Distance.COSINE),
            # You can add other configurations like quantization, sharding for production
            # hnsw_config=models.HnswConfigDiff(m=16, ef_construct=100) # HNSW parameters
        )
        print(f"Collection '{collection_name}' created.")
    else:
        print(f"Collection '{collection_name}' already exists.")
    # --- End Deprecation Fix ---

In [27]:
def upsert_chunks_to_qdrant(client: QdrantClient, collection_name: str,
                             chunks: list[dict], embeddings: np.ndarray, # Changed to np.ndarray
                             batch_size: int = UPSERT_BATCH_SIZE):
    """
    Upserts (inserts/updates) chunks and their embeddings into the Qdrant collection
    in batches to prevent timeouts and manage memory.

    Args:
        client (QdrantClient): The Qdrant client instance.
        collection_name (str): The name of the collection.
        chunks (list[dict]): The list of original chunk dictionaries.
        embeddings (np.ndarray): The 2D numpy array of embeddings corresponding to the chunks.
        batch_size (int): The number of points to upsert in each batch.
    """
    all_points = []
    for i, chunk in enumerate(chunks):
        # Qdrant requires a unique ID for each point. Using UUID4.
        # Store all original metadata and content in the 'payload'

        # --- Robustness check for embedding ---
        # Ensure the embedding for this specific chunk is a 1D numpy array before proceeding
        # Accessing embeddings[i] directly from the 2D array
        if not isinstance(embeddings[i], np.ndarray) or embeddings[i].ndim != 1:
            print(f"Warning: Embedding at index {i} is not a 1D numpy array or is malformed. Skipping point.")
            continue
        # --- End robustness check ---

        point_id = str(uuid.uuid4())
        payload = {
            "content": chunk['content'],
            **chunk['metadata'] # Unpack all metadata fields
        }
        all_points.append(
            models.PointStruct(
                id=point_id,
                vector=embeddings[i].tolist(), # Convert numpy array to list for JSON serialization
                payload=payload
            )
        )

    total_points = len(all_points)
    if total_points == 0:
        print("No valid points to upsert after filtering. Skipping upsert operation.")
        return

    num_batches = math.ceil(total_points / batch_size)
    print(f"Preparing to upsert {total_points} points in {num_batches} batches of size {batch_size}...")

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, total_points)
        batch_points = all_points[start_idx:end_idx]

        print(f"Upserting batch {i+1}/{num_batches} ({len(batch_points)} points)...")
        client.upsert(
            collection_name=collection_name,
            wait=True, # Wait for the operation to complete for each batch
            points=batch_points
        )
        print(f"Batch {i+1} upserted.")

    print("All points upserted successfully.")

In [28]:
# --- Main Execution ---
# Ensure processed_chunks is available from the previous step.
# If you are running this cell separately, you need to define processed_chunks.
# For demonstration, we'll use a placeholder if processed_chunks is not defined.
try:
    if 'processed_chunks' not in locals() or not processed_chunks:
        raise NameError("processed_chunks not found or empty. Using dummy data for demonstration.")
    print("Using processed_chunks from previous step.")
except NameError:
    print("processed_chunks not found. Running a minimal chunking process with dummy data for demonstration.")
    # This is a fallback for independent execution of this cell.
    from langchain_text_splitters import RecursiveCharacterTextSplitter
    import tiktoken

    ENCODER = tiktoken.get_encoding("cl100k_base")
    def count_tokens(text: str) -> int: return len(ENCODER.encode(text))

    dummy_text_for_embedding = """
    Policy for Health Coverage. Applicants must be between 18 and 65 years old.
    Dependents up to 26 are covered if full-time students.
    Coverage is valid in all 50 US states. Travel abroad is limited to 90 days.
    This policy covers up to 90% of eligible hospitalization costs after a $1,000 deductible.
    Outpatient visits are covered at 80% after a $50 co-pay.
    Pre-existing conditions are covered after a 12-month waiting period.
    """
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=100, chunk_overlap=20, length_function=len, separators=["\n\n", "\n", " ", ""]
    )
    dummy_raw_chunks = text_splitter.split_text(dummy_text_for_embedding)
    processed_chunks = []
    for i, content in enumerate(dummy_raw_chunks):
        processed_chunks.append({
            "content": content,
            "metadata": {
                "doc_id": "dummy_emb_doc",
                "page": 1,
                "clause_id": f"dummy_c{i+1}",
                "chunk_length_tokens": count_tokens(content)
            }
        })
    print(f"Generated {len(processed_chunks)} dummy chunks for embedding demonstration.")

Using processed_chunks from previous step.


In [29]:
# 1. Generate Embeddings
chunk_embeddings = generate_embeddings(processed_chunks, embedding_model)

# --- FIX for ValueError: The truth value of an array with more than one element is ambiguous ---
embedding_dimension = 0
# Check if chunk_embeddings is a numpy array and not empty
if isinstance(chunk_embeddings, np.ndarray) and chunk_embeddings.shape[0] > 0:
    # If it's a 2D array (common output from model.encode), get the second dimension
    embedding_dimension = chunk_embeddings.shape[1]
else:
    print("Warning: chunk_embeddings is not a valid numpy array or is empty. Setting embedding_dimension to 0.")
# --- End FIX ---


# 2. Initialize Qdrant Client (in-memory for Colab)
qdrant_client = initialize_qdrant_client(location=":memory:") # Use ":memory:" for in-memory, or path="your_local_path" for persistent local

# 3. Create Qdrant Collection
if embedding_dimension > 0:
    create_qdrant_collection(qdrant_client, COLLECTION_NAME, embedding_dimension)
else:
    print("Cannot create Qdrant collection: Embedding dimension is 0. Check chunk processing or embedding generation.")

# 4. Upsert Chunks to Qdrant
# Ensure chunk_embeddings is a numpy array before passing to upsert_chunks_to_qdrant
if processed_chunks and isinstance(chunk_embeddings, np.ndarray) and chunk_embeddings.shape[0] > 0 and embedding_dimension > 0:
    upsert_chunks_to_qdrant(qdrant_client, COLLECTION_NAME, processed_chunks, chunk_embeddings)
else:
    print("Skipping upsert: No valid chunks, embeddings, or embedding dimension is 0.")

# --- Verification (Optional - for testing the index) ---
if qdrant_client and qdrant_client.collection_exists(collection_name=COLLECTION_NAME):
    print("\n--- Testing Qdrant Search ---")
    test_query = "What is the age limit for policy holders?"
    print(f"Test Query: '{test_query}'")
    query_embedding = embedding_model.encode(test_query, convert_to_numpy=True)
    k = 1 # Define the number of results to retrieve

    # --- Use query_points with the correct 'query' argument ---
    search_results = qdrant_client.query_points(
        collection_name=COLLECTION_NAME,
        query=query_embedding.tolist(), # The query vector
        limit=k,
        with_payload=True, # Retrieve the full payload (content and metadata)
        with_vectors=False # Don't retrieve the vectors themselves, just payload
    )
    # --- End change ---

    print(f"\nTop {k} most similar chunks from Qdrant:")
    for i, hit in enumerate(search_results.points):
        print(f"\nRank {i+1} (Score: {hit.score:.4f}):")
        print(f"  ID: {hit.id}")
        print(f"  Payload: {json.dumps(hit.payload, indent=2)}")
        print(f"  Content: \"{hit.payload['content'][:200]}...\"") # Show first 200 chars

    print("\n--- Qdrant Indexing and Test Complete ---")
else:
    print("\nQdrant client or collection not ready. Check for errors.")

Generating embeddings for 505 chunks...


  return forward_call(*args, **kwargs)


Embeddings generated.
Initializing Qdrant client at location: :memory:...
Qdrant client initialized.
Checking for collection 'policy_clauses'...
Collection 'policy_clauses' not found. Creating new collection...
Collection 'policy_clauses' created.
Preparing to upsert 505 points in 2 batches of size 256...
Upserting batch 1/2 (256 points)...
Batch 1 upserted.
Upserting batch 2/2 (249 points)...
Batch 2 upserted.
All points upserted successfully.

--- Testing Qdrant Search ---
Test Query: 'What is the age limit for policy holders?'

Top 1 most similar chunks from Qdrant:

Rank 1 (Score: 0.7145):
  ID: 7e6f38f8-d582-40fa-8cc4-991cfecbbb35
  Payload: {
  "content": "appropriate premium paid.\n\uf0b7\nEntry age for the member should be between 03 months to 90 years (completed age).\n3. COVERAGE - BASE COVERS:\nThe Policy provides the following Base Covers. It is mandatory for the proposer/Insured to avail the Base Cover to\nbe eligible for taking this Policy from Cholamandalam MS General In

In [30]:
# --- Standard RAG Pipeline (Vector Search Only) ---
def perform_standard_rag_search(query: str, k: int = 2) -> List[Dict]:
    """
    Performs a standard RAG search using only vector similarity.
    This serves as a baseline for comparison.
    """
    print("Performing standard RAG search (vector similarity only)...")

    # Generate the embedding for the user's query
    query_vector = embedding_model.encode(query).tolist()

    # Use Qdrant's search method to find the top-k most similar vectors
    search_result = qdrant_client.query_points(
        collection_name=COLLECTION_NAME,
        query=query_vector, # Corrected argument name from query_vector to query
        limit=k,
        with_payload=True, # Ensure payload is returned
        with_vectors=False # No need to return vectors
    )

    # Extract the payloads (the original documents) from the search results
    retrieved_docs = [point.payload for point in search_result.points]
    return retrieved_docs

In [31]:
from typing import List
# --- Hybrid RAG Pipeline (Vector + Keyword Search) ---
def perform_hybrid_rag_search(query: str, k: int = 2) -> List[dict]:
    """
    Performs a hybrid RAG search by combining semantic and keyword search.

    This function first performs a semantic search (vector similarity) and a keyword search
    separately. It then merges and reranks the results to prioritize documents that
    are relevant in both contexts, providing a more robust retrieval.
    """
    print("Performing hybrid RAG search (vector + keyword)...")

    # 1. Perform a semantic (vector) search
    # Generate the embedding for the query
    query_vector = embedding_model.encode(query).tolist()
    semantic_results = qdrant_client.query_points(
        collection_name=COLLECTION_NAME,
        query=query_vector,
        limit=k*2  # Retrieve more candidates for better fusion
    )

    # 2. Perform a keyword search using Qdrant's 'text_match' filter
    keyword_results = qdrant_client.query_points(
        collection_name=COLLECTION_NAME,
        query_filter=models.Filter(
            should=[
                models.FieldCondition(
                    key="text",
                    match=models.MatchText(text=query)
                )
            ]
        ),
        limit=k*2 # Also retrieve more candidates
    )

    # 3. Merge and re-rank the results
    # A dictionary to store unique document IDs and their combined scores
    combined_scores = {}

    # Combine semantic results
    for rank, point in enumerate(semantic_results.points):
        if point.id not in combined_scores:
            combined_scores[point.id] = {"score": 0, "payload": point.payload}
        # Give higher scores to top-ranked semantic results.
        # We can use a simple inverse rank or a more complex fusion algorithm.
        combined_scores[point.id]["score"] += (k * 2) - rank

    # Combine keyword results
    for rank, point in enumerate(keyword_results.points):
        if point.id not in combined_scores:
            combined_scores[point.id] = {"score": 0, "payload": point.payload}
        # Give a bonus for appearing in both search types
        if combined_scores[point.id]["score"] > 0:
             combined_scores[point.id]["score"] += (k * 2) - rank + 5
        else:
             combined_scores[point.id]["score"] += (k * 2) - rank


    # Sort the combined results by their new fused score
    reranked_results = sorted(combined_scores.values(), key=lambda x: x["score"], reverse=True)

    # Return the top-k final results
    retrieved_docs = [item["payload"] for item in reranked_results][:k]
    return retrieved_docs

In [32]:
# --- Example Usage of Hybrid RAG Search ---
test_query = "What is the age limit for policy holders?"
print(f"--- Performing Hybrid RAG Search for: '{test_query}' ---")
hybrid_results = perform_hybrid_rag_search(test_query, k=2)

print(f"\n--- Top 2 Hybrid Search Results ---")
for i, result in enumerate(hybrid_results):
    print(f"\nResult {i+1}:")
    print(f"  Doc ID: {result['doc_id']}")
    print(f"  Page: {result['page']}")
    print(f"  Content: \"{result['content'][:500]}...\"")

--- Performing Hybrid RAG Search for: 'What is the age limit for policy holders?' ---
Performing hybrid RAG search (vector + keyword)...

--- Top 2 Hybrid Search Results ---

Result 1:
  Doc ID: CHOTGDP23004V012223
  Page: 8
  Content: "appropriate premium paid.

Entry age for the member should be between 03 months to 90 years (completed age).
3. COVERAGE - BASE COVERS:
The Policy provides the following Base Covers. It is mandatory for the proposer/Insured to avail the Base Cover to
be eligible for taking this Policy from Cholamandalam MS General Insurance Company Limited. Various Base and
Optional Covers applicable for the Insured under this policy is as shown in the Policy Schedule/Certificate...."

Result 2:
  Doc ID: HDFHLIP23024V072223
  Page: 8
  Content: "have his / her independent sources of income. Children Aged between 1 to 90 Days can be
covered if Newborn Baby Benefit is added by payment of additional premium subject to policy
terms and conditions.
Def. 9.
Family Floater m

In [33]:
from sentence_transformers.cross_encoder import CrossEncoder
# Get or create a CrossEncoder model for re-ranking
def get_reranking_model():
    """
    Loads a Cross-Encoder model specifically for re-ranking documents.
    """
    try:
        reranking_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
        print("Re-ranking model loaded successfully.")
        return reranking_model
    except Exception as e:
        print(f"Error loading CrossEncoder model: {e}")
        return None

In [34]:
# --- Re-ranking RAG Pipeline ---
def perform_rerank_rag_search(query: str, k: int = 2, rerank_candidates: int = 100) -> List[dict]:
    """
    Performs a RAG search with a dedicated re-ranking model.

    This function first performs a standard semantic search to retrieve a large
    pool of candidate documents. It then uses a Cross-Encoder model to re-rank
    these documents for better relevance before selecting the top-k results.
    """
    print(f"Performing re-ranking RAG search (vector + cross-encoder)...")

    # 1. Initial retrieval using a standard vector search to get many candidates
    query_vector = embedding_model.encode(query).tolist()
    initial_search_result = qdrant_client.query_points(
        collection_name=COLLECTION_NAME,
        query=query_vector,
        limit=rerank_candidates
    )

    # Check if any documents were retrieved
    if not initial_search_result.points:
        print("No documents found for re-ranking.")
        return []

    # 2. Prepare documents for re-ranking
    # The Cross-Encoder model expects a list of tuples: (query, document_text)
    candidate_docs_content = [point.payload["content"] for point in initial_search_result.points]
    sentences_to_rerank = [[query, doc_content] for doc_content in candidate_docs_content]

    # 3. Use the Cross-Encoder model to re-rank the documents
    # This generates a new, more accurate relevance score for each candidate document.
    print(f"Re-ranking {len(candidate_docs_content)} candidates...")
    reranked_scores = reranking_model.predict(sentences_to_rerank)

    # 4. Combine initial documents with their new scores
    reranked_docs_with_scores = []
    for i, score in enumerate(reranked_scores):
        reranked_docs_with_scores.append({
            "score": score,
            "payload": initial_search_result.points[i].payload
        })

    # 5. Sort by the new re-ranking score and get the top-k results
    reranked_docs_with_scores.sort(key=lambda x: x["score"], reverse=True)

    retrieved_docs = [item["payload"] for item in reranked_docs_with_scores][:k]
    return retrieved_docs

In [35]:
reranking_model = get_reranking_model()
user_query = "What are the cancellation policies for my health insurance?"
rerank_results = perform_rerank_rag_search(user_query, k=2)
print("--- Re-ranking RAG Results ---")
for i, result in enumerate(rerank_results):
    print(f"\nResult {i+1}:")
    print(f"  Doc ID: {result['doc_id']}")
    print(f"  Page: {result['page']}")
    print(f"  Content: \"{result['content']}\"")

config.json:   0%|          | 0.00/794 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/132 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

Re-ranking model loaded successfully.
Performing re-ranking RAG search (vector + cross-encoder)...
Re-ranking 100 candidates...
--- Re-ranking RAG Results ---

Result 1:
  Doc ID: BAJHLIP23020V012223
  Page: 34
  Content: "Policy, under the circumstances described in cancellation clause stated under the Policy
iii. If a claim is made in any year where a cumulative increase has been applied, then the increased Limit of
Indemnity in the Policy Period of the subsequent Global Health Care Policy shall be reduced by 20%, save that
the limit of indemnity applicable shall be preserved.
30. Changing country of residence
It is important to let Us know when You change Your country of residence. This may affect  Your cover, the
availability of the services included in  Your plan or  Your premium, even if You are moving to an area within  Your
Network, as  Your existing plan may not be valid there. Cover in some countries is subject to local health insurance
restrictions, particularly for resident

In [36]:
import requests
from google.colab import userdata
# --- Query Transformation RAG Pipeline (Vector Search Only) ---
def transform_query_with_llm(original_query: str) -> List[str]:
    """
    Transforms a complex user query into one or more sub-queries
    that are better optimized for retrieval, using the Together AI API.

    Args:
        original_query (str): The initial query from the user.

    Returns:
        List[str]: A list of transformed queries.
    """
    """
    Transforms a complex user query into one or more sub-queries
    that are better optimized for retrieval, using the Gemini API.

    Args:
        original_query (str): The initial query from the user.

    Returns:
        List[str]: A list of transformed queries.
    """
    print(f"Transforming query with Gemini LLM: '{original_query}'")

    # IMPORTANT: Replace with your actual Gemini API Key
    GEMINI_API_KEY = userdata.get('GEMINI_API_KEY') # Use userdata.get
    if not GEMINI_API_KEY:
        print("WARNING: Gemini API Key is not set in Colab secrets. Using original query.")
        return [original_query]

    # The API endpoint and model to use
    API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent?key={GEMINI_API_KEY}"

    # The prompt for the LLM to generate sub-queries
    prompt = f"""You are a helpful assistant for a document retrieval system.
A user has provided a complex query. Your task is to break it down into a list of simple,
atomic sub-queries that are optimized for finding relevant documents in a vector store.

The user's original query is: '{original_query}'

Example:
Original Query: 'How does the cancellation policy for a 1-year plan differ from a 3-year plan?'
Sub-queries:
- What is the cancellation policy for a 1-year health insurance plan?
- What is the cancellation policy for a 3-year health insurance plan?

Please respond with ONLY the list of sub-queries, each on a new line,
with a hyphen prefix, like the example above. Do not include any other text."""

    headers = {
        "Content-Type": "application/json"
    }

    payload = {
        "contents": [{"role": "user", "parts": [{"text": prompt}]}]
    }

    try:
        response = requests.post(API_URL, headers=headers, data=json.dumps(payload))
        response.raise_for_status()  # Raise an exception for bad status codes

        response_data = response.json()
        llm_output = response_data['candidates'][0]['content']['parts'][0]['text']

        # Parse the output to get a list of sub-queries
        sub_queries = [line.strip('- ').strip() for line in llm_output.split('\n') if line.strip()]

        # Fallback if parsing fails or LLM output is not as expected
        if not sub_queries:
            return [original_query]

        return sub_queries

    except requests.exceptions.RequestException as e:
        print(f"Error calling Gemini API: {e}")
        return [original_query]

In [37]:
def perform_query_transformation_rag_search(query: str, k: int = 2) -> List[Dict]:
    """
    Performs a RAG search by first transforming the user's query into
    multiple sub-queries and then retrieving documents for each.
    """
    print(f"Performing query transformation RAG search...")

    # 1. Transform the original query into sub-queries using the LLM
    sub_queries = transform_query_with_llm(query)

    all_retrieved_docs: List[Dict] = []
    seen_doc_ids: Set[Any] = set()

    # 2. Perform a retrieval for each sub-query
    for sub_query in sub_queries:
        print(f"  Retrieving documents for sub-query: '{sub_query}'")
        # Use a standard search for each sub-query for simplicity
        sub_query_results = perform_standard_rag_search(sub_query, k=k)

        # 3. Add unique results to the main list
        for doc in sub_query_results:
            doc_id = doc.get("doc_id")
            if doc_id not in seen_doc_ids:
                all_retrieved_docs.append(doc)
                seen_doc_ids.add(doc_id)

    # Note: You may want to implement a re-ranking step here to sort the final list.
    print(f"Found {len(all_retrieved_docs)} unique documents from transformed queries.")
    return all_retrieved_docs

In [38]:
user_query_transform = "How does the cancellation policy for a 1-year plan differ from a 3-year plan?"
transform_results = perform_query_transformation_rag_search(user_query_transform, k=4)
print("--- Query Transformation RAG Results ---")
for i, result in enumerate(transform_results):
    print(f"\nResult {i+1}:")
    print(f"  Doc ID: {result['doc_id']}")
    print(f"  Page: {result['page']}")
    print(f"  Content: \"{result['content'][:200]}...\"")

Performing query transformation RAG search...
Transforming query with Gemini LLM: 'How does the cancellation policy for a 1-year plan differ from a 3-year plan?'
  Retrieving documents for sub-query: 'What is the cancellation policy for a 1-year health insurance plan?'
Performing standard RAG search (vector similarity only)...
  Retrieving documents for sub-query: 'What is the cancellation policy for a 3-year health insurance plan?'
Performing standard RAG search (vector similarity only)...
Found 4 unique documents from transformed queries.
--- Query Transformation RAG Results ---

Result 1:
  Doc ID: ICIHLIP22012V012223
  Page: 22
  Content: "knowledge of the insurer.
7. Cancellation
a) The policyholder may cancel this Policy by giving
15 days' written notice, and in such an event, the
Company shall refund premium for the unexpired
Policy ..."

Result 2:
  Doc ID: HDFHLIP23024V072223
  Page: 22
  Content: "insured person.
i.
The Company shall endeavor to give notice for Renewal. Howev

In [39]:
# --- Step-Back RAG Pipeline ---
from google.colab import userdata # Added import

def generate_step_back_query_with_llm(original_query: str) -> str:
    """
    Transforms a detailed, specific query into a broader, conceptual "step-back"
    question using the Gemini API.

    Args:
        original_query (str): The initial detailed query from the user.

    Returns:
        str: A single, more general "step-back" question.
    """
    print(f"Generating step-back query with Gemini LLM: '{original_query}'")

    GEMINI_API_KEY = userdata.get('GEMINI_API_KEY') # Use userdata.get
    if not GEMINI_API_KEY:
        print("WARNING: Gemini API Key is not set in Colab secrets. Using original query.")
        return original_query

    API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent?key={GEMINI_API_KEY}"

    # The prompt for the LLM to generate a step-back query
    prompt = f"""You are a helpful assistant for a document retrieval system.
A user has provided a detailed query. Your task is to generate a single,
more general or conceptual question that a person would need to answer before they could
answer the detailed query. This is a 'step-back' question.

Original Query: '{original_query}'

Example:
Original Query: 'How much does the average family floater plan cost?'
Step-Back Question: 'What is a family floater plan?'

Please respond with ONLY the step-back question, without any other text."""

    headers = {
        "Content-Type": "application/json"
    }

    payload = {
        "contents": [{"role": "user", "parts": [{"text": prompt}]}]
    }

    try:
        response = requests.post(API_URL, headers=headers, data=json.dumps(payload))
        response.raise_for_status()

        response_data = response.json()
        step_back_query = response_data['candidates'][0]['content']['parts'][0]['text']

        return step_back_query.strip()

    except requests.exceptions.RequestException as e:
        print(f"Error calling Gemini API for step-back query: {e}")
        return original_query

In [40]:
def perform_step_back_rag_search(query: str, k: int = 2) -> List[Dict]:
    """
    Performs a RAG search using the Step-Back strategy.
    It retrieves documents for both the original query and a more general,
    LLM-generated 'step-back' question.
    """
    print(f"Performing Step-Back RAG search...")

    # 1. Generate a step-back query using the LLM
    step_back_query = generate_step_back_query_with_llm(query)

    # 2. Retrieve documents for the original query
    print(f"  Retrieving documents for original query: '{query}'")
    original_query_results = perform_standard_rag_search(query, k=k)

    # 3. Retrieve documents for the step-back query
    print(f"  Retrieving documents for step-back query: '{step_back_query}'")
    step_back_query_results = perform_standard_rag_search(step_back_query, k=k)

    all_retrieved_docs: List[Dict] = []
    seen_doc_ids: Set[Any] = set()

    # 4. Merge results, prioritizing the original query's results
    for doc in original_query_results:
        doc_id = doc.get("doc_id")
        if doc_id not in seen_doc_ids:
            all_retrieved_docs.append(doc)
            seen_doc_ids.add(doc_id)

    for doc in step_back_query_results:
        doc_id = doc.get("doc_id")
        if doc_id not in seen_doc_ids:
            all_retrieved_docs.append(doc)
            seen_doc_ids.add(doc_id)

    print(f"Found {len(all_retrieved_docs)} unique documents from original and step-back queries.")
    return all_retrieved_docs

In [41]:
user_query_step_back = "Is there a waiting period before I can claim for cataracts?"
step_back_results = perform_step_back_rag_search(user_query_step_back, k=2)
print("--- Step-Back RAG Results ---")
for i, result in enumerate(step_back_results):
    print(f"\nResult {i+1}:")
    print(f"  Doc ID: {result['doc_id']}")
    print(f"  Page: {result['page']}")
    print(f"  Content: \"{result['content'][:200]}...\"")

Performing Step-Back RAG search...
Generating step-back query with Gemini LLM: 'Is there a waiting period before I can claim for cataracts?'
  Retrieving documents for original query: 'Is there a waiting period before I can claim for cataracts?'
Performing standard RAG search (vector similarity only)...
  Retrieving documents for step-back query: 'Are cataracts covered by insurance?'
Performing standard RAG search (vector similarity only)...
Found 3 unique documents from original and step-back queries.
--- Step-Back RAG Results ---

Result 1:
  Doc ID: BAJHLIP23020V012223
  Page: 37
  Content: "make sure You keep the originals. We have the right to request original supporting documents/receipts for
auditing purposes up to 12 months after settling  Your claim. We may also request proof of pay..."

Result 2:
  Doc ID: ICIHLIP22012V012223
  Page: 29
  Content: "period as specified above, the interest amount
calculated will be on the net sanctioned amount of
respective transaction and not 

In [42]:
# --- Hypothetical Document Embeddings (HyDE) RAG Pipeline ---
from google.colab import userdata # Added import

def generate_hypothetical_document_with_llm(original_query: str) -> str:
    """
    Uses the Gemini API to generate a hypothetical document that would answer
    the user's query. This document's embedding is used for retrieval.

    Args:
        original_query (str): The initial query from the user.

    Returns:
        str: A single, plausible hypothetical document.
    """
    print(f"Generating hypothetical document with Gemini LLM: '{original_query}'")

    GEMINI_API_KEY = userdata.get('GEMINI_API_KEY') # Use userdata.get
    if not GEMINI_API_KEY:
        print("WARNING: Gemini API Key is not set in Colab secrets. Cannot generate hypothetical document. Using original query.")
        return original_query

    API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent?key={GEMINI_API_KEY}"

    # The prompt for the LLM to generate a hypothetical document
    prompt = f"""You are an expert at writing documents. You will be given a user query, and your task is
to write a short, plausible, and well-structured document that could answer the query.
Do not state that you are a language model or refer to the prompt. Just provide the document text.

User Query: '{original_query}'

Example:
User Query: 'What is the waiting period for cataracts?'
Hypothetical Document: 'The waiting period for cataract surgery under a health insurance policy is typically 24 months. This means you must have held the policy for at least two years before you can make a claim for this specific procedure. However, this period can vary depending on the specific policy, with some plans having a shorter waiting period. It is essential to check your policy documents for the exact terms and conditions related to pre-existing diseases and specific treatments like cataracts.'

Please provide the hypothetical document for the user's query."""

    headers = {
        "Content-Type": "application/json"
    }

    payload = {
        "contents": [{"role": "user", "parts": [{"text": prompt}]}]
    }

    try:
        response = requests.post(API_URL, headers=headers, data=json.dumps(payload))
        response.raise_for_status()

        response_data = response.json()
        hypothetical_document = response_data['candidates'][0]['content']['parts'][0]['text']

        return hypothetical_document.strip()

    except requests.exceptions.RequestException as e:
        print(f"Error calling Gemini API for hypothetical document: {e}")
        return original_query

In [43]:
def perform_hyde_rag_search(query: str, k: int = 2) -> List[Dict]:
    """
    Performs a RAG search using the Hypothetical Document Embeddings (HyDE) strategy.
    It generates a hypothetical document, embeds it, and then uses that embedding
    for a standard vector search.
    """
    print(f"Performing HyDE RAG search...")

    # 1. Generate a hypothetical document using the LLM
    hypothetical_document = generate_hypothetical_document_with_llm(query)

    # 2. Embed the hypothetical document
    print(f"  Embedding hypothetical document...")
    hypothetical_document_vector = embedding_model.encode(hypothetical_document).tolist()

    # 3. Use the hypothetical document's embedding to search for real documents
    search_result = qdrant_client.query_points(
        collection_name=COLLECTION_NAME,
        query=hypothetical_document_vector, # Ensure this is 'query', not 'query_vector'
        limit=k,
        with_payload=True, # Ensure payload is returned
        with_vectors=False # No need to return vectors
    )

    retrieved_docs = [point.payload for point in search_result.points]
    print(f"Found {len(retrieved_docs)} unique documents using HyDE.")
    return retrieved_docs

In [44]:
user_query_hyde = "What is the waiting period for cataracts?"
hyde_results = perform_hyde_rag_search(user_query_hyde, k=2)
print("--- HyDE RAG Results ---")
for i, result in enumerate(hyde_results):
    print(f"\nResult {i+1}:")
    print(f"  Doc ID: {result['doc_id']}")
    print(f"  Page: {result['page']}")
    print(f"  Content: \"{result['content'][:200]}...\"")

Performing HyDE RAG search...
Generating hypothetical document with Gemini LLM: 'What is the waiting period for cataracts?'
  Embedding hypothetical document...
Found 2 unique documents using HyDE.
--- HyDE RAG Results ---

Result 1:
  Doc ID: BAJHLIP23020V012223
  Page: 20
  Content: "excluded until the expiry of 36 months of continuous coverage after the date of inception of the first Global
Health Care Policy with Us.
b. In case of enhancement of Sum Insured the exclusion shall a..."

Result 2:
  Doc ID: BAJHLIP23020V012223
  Page: 24
  Content: "1) Pre-Existing Diseases (Code-Excl01)
a. Expenses related to the treatment of a Pre-Existing Disease (PED) and its direct complications shall be
excluded until the expiry of 36 months of continuous c..."


In [45]:
# --- FLARE (Forward-Looking Active REtrieval) RAG Pipeline ---
from google.colab import userdata # Added import

def generate_flare_subqueries_with_llm(query: str) -> List[str]:
    """
    Generates a list of forward-looking sub-queries from a user's initial query.
    This simulates the dynamic retrieval part of the FLARE strategy.

    Args:
        query (str): The initial query from the user.

    Returns:
        List[str]: A list of sub-queries for retrieval.
    """
    print(f"Generating FLARE sub-queries for: '{query}'")

    GEMINI_API_KEY = userdata.get('GEMINI_API_KEY') # Use userdata.get
    if not GEMINI_API_KEY:
        print("WARNING: Gemini API Key is not set in Colab secrets. Using original query.")
        return [query]

    API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent?key={GEMINI_API_KEY}"

    # The prompt for the LLM to generate forward-looking sub-queries
    # This prompt simulates the LLM "looking ahead" for information it might need.
    prompt = f"""You are an advanced retrieval assistant. Given a user's complex query, your task is to
anticipate the key facts and concepts needed to answer it thoroughly. Generate a list of
concise retrieval queries that would help a language model build a complete response.

User Query: '{query}'

Example:
User Query: 'How can I renew my policy, and what are the options for increasing my sum insured during renewal?'
Anticipated Retrieval Queries:
- How to renew a policy?
- What are the options for increasing sum insured at renewal?
- What is the process for policy renewal?

Please provide a list of anticipated retrieval queries, with each on a new line and a hyphen prefix.
Do not include any other text."""

    headers = {
        "Content-Type": "application/json"
    }

    payload = {
        "contents": [{"role": "user", "parts": [{"text": prompt}]}]
    }

    try:
        response = requests.post(API_URL, headers=headers, data=json.dumps(payload))
        response.raise_for_status()

        response_data = response.json()
        llm_output = response_data['candidates'][0]['content']['parts'][0]['text']

        sub_queries = [line.strip('- ').strip() for line in llm_output.split('\n') if line.strip()]

        if not sub_queries:
            return [query]

        return sub_queries

    except requests.exceptions.RequestException as e:
        print(f"Error calling Gemini API for FLARE sub-queries: {e}")
        return [query]

In [46]:
def perform_flare_rag_search(query: str, k: int = 2) -> List[Dict]:
    """
    Performs a simulated FLARE RAG search by first generating forward-looking
    sub-queries and then retrieving documents for each.
    """
    print(f"Performing FLARE RAG search...")

    # 1. Generate forward-looking sub-queries using the LLM
    sub_queries = generate_flare_subqueries_with_llm(query)

    all_retrieved_docs: List[Dict] = []
    seen_doc_ids: Set[Any] = set()

    # 2. Perform a retrieval for each sub-query
    for sub_query in sub_queries:
        print(f"  Retrieving documents for FLARE sub-query: '{sub_query}'")
        # Use a standard search for each sub-query
        sub_query_results = perform_standard_rag_search(sub_query, k=k)

        # 3. Add unique results to the main list
        for doc in sub_query_results:
            doc_id = doc.get("doc_id")
            if doc_id not in seen_doc_ids:
                all_retrieved_docs.append(doc)
                seen_doc_ids.add(doc_id)

    print(f"Found {len(all_retrieved_docs)} unique documents from FLARE queries.")
    return all_retrieved_docs

In [47]:
user_query_flare = "How can I renew my policy, and what are the options for increasing my sum insured?"
flare_results = perform_flare_rag_search(user_query_flare, k=2)
print("--- FLARE RAG Results ---")
for i, result in enumerate(flare_results):
    print(f"\nResult {i+1}:")
    print(f"  Doc ID: {result['doc_id']}")
    print(f"  Page: {result['page']}")
    print(f"  Content: \"{result['content'][:200]}...\"")

Performing FLARE RAG search...
Generating FLARE sub-queries for: 'How can I renew my policy, and what are the options for increasing my sum insured?'
  Retrieving documents for FLARE sub-query: 'How to renew an insurance policy?'
Performing standard RAG search (vector similarity only)...
  Retrieving documents for FLARE sub-query: 'Policy renewal process steps.'
Performing standard RAG search (vector similarity only)...
  Retrieving documents for FLARE sub-query: 'Options for increasing sum insured.'
Performing standard RAG search (vector similarity only)...
  Retrieving documents for FLARE sub-query: 'How to increase sum insured on an existing policy?'
Performing standard RAG search (vector similarity only)...
  Retrieving documents for FLARE sub-query: 'Can sum insured be increased during policy renewal?'
Performing standard RAG search (vector similarity only)...
Found 4 unique documents from FLARE queries.
--- FLARE RAG Results ---

Result 1:
  Doc ID: HDFHLIP23024V072223
  Page: 29

In [48]:
# --- Self-RAG Pipeline ---
from google.colab import userdata # Added import

def generate_self_rag_critique_with_llm(query: str, retrieved_docs: List[Dict], generated_text: str) -> Dict[str, Any]:
    """
    Simulates the LLM's self-critique process in Self-RAG. It evaluates the
    relevance of retrieved documents and the factual accuracy of a generated text.

    Args:
        query (str): The original user query.
        retrieved_docs (List[Dict]): The list of retrieved document chunks.
        generated_text (str): The text generated by the LLM so far.

    Returns:
        Dict[str, Any]: A dictionary with critique and a flag for whether to continue.
    """
    print("Generating Self-RAG critique with LLM...")
    GEMINI_API_KEY = userdata.get('GEMINI_API_KEY') # Use userdata.get
    if not GEMINI_API_KEY:
        print("WARNING: Gemini API Key is not set in Colab secrets. Assuming critique is positive.")
        return {"critique": "Documents are relevant and response is factual.", "continue_retrieval": False}

    API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-05-20:generateContent?key={GEMINI_API_KEY}"

    retrieved_content = "\n---\n".join([doc['content'] for doc in retrieved_docs])

    prompt = f"""You are a highly analytical and critical assistant for a Self-RAG system.
Your task is to evaluate a set of retrieved documents and a generated response
based on a user's query.

User Query: '{query}'
Retrieved Documents:
{retrieved_content}

Generated Response (so far): '{generated_text}'

Based on the above, provide a critique and a flag for whether more retrieval is needed.
Critique should be a string, and 'continue_retrieval' should be a boolean.
If the documents seem irrelevant or the generated text is not fully supported by the documents,
set 'continue_retrieval' to True. If the documents are sufficient and the generated text is
factual based on the documents, set it to False.

Respond in JSON format only with the following keys:
"critique": A string explaining your judgment.
"continue_retrieval": A boolean (True or False).
"""

    headers = {
        "Content-Type": "application/json"
    }

    payload = {
        "contents": [{"role": "user", "parts": [{"text": prompt}]}]
    }

    try:
        response = requests.post(API_URL, headers=headers, data=json.dumps(payload))
        response.raise_for_status()

        response_data = response.json()
        llm_output_text = response_data['candidates'][0]['content']['parts'][0]['text']

        try:
            # The LLM's response should be a JSON string
            critique_data = json.loads(llm_output_text)
            return critique_data
        except json.JSONDecodeError:
            print(f"Error parsing JSON from LLM: {llm_output_text}. Assuming positive critique.")
            return {"critique": "JSON parsing failed, but assuming critique is positive.", "continue_retrieval": False}

    except requests.exceptions.RequestException as e:
        print(f"Error calling Gemini API for Self-RAG critique: {e}. Assuming positive critique.")
        return {"critique": f"API call failed: {e}", "continue_retrieval": False}

In [49]:
def perform_self_rag_search(query: str, k: int = 2, max_retrieval_steps: int = 3) -> List[Dict]:
    """
    Performs a simulated Self-RAG search, where the LLM dynamically
    decides whether to retrieve more information.
    """
    print("Performing Self-RAG search...")

    retrieved_docs: List[Dict] = []
    generated_text: str = ""
    seen_doc_ids: Set[Any] = set()

    for step in range(max_retrieval_steps):
        # 1. Generate retrieval cues and perform retrieval
        print(f"  Step {step + 1}: Generating retrieval cues and retrieving...")

        # A simplified approach: Use the original query for the first step, then
        # use a more refined query or the generated text as a retrieval cue.
        retrieval_query = query if step == 0 else generated_text

        # Use a hybrid search for more robust retrieval
        newly_retrieved_docs = perform_hybrid_rag_search(retrieval_query, k=k)

        for doc in newly_retrieved_docs:
            doc_id = doc.get("doc_id")
            if doc_id not in seen_doc_ids:
                retrieved_docs.append(doc)
                seen_doc_ids.add(doc_id)

        # 2. Simulate LLM generation with current context
        # For this simulation, we'll just concatenate the content.
        # In a real-world scenario, you'd feed this to the LLM.
        current_context = "\n---\n".join([doc['content'] for doc in retrieved_docs])
        generated_text = f"Based on the provided documents, here is the information:\n{current_context}"

        # 3. Perform Self-Critique
        critique_result = generate_self_rag_critique_with_llm(query, retrieved_docs, generated_text)
        print(f"  Critique: {critique_result['critique']}")

        # 4. Decide whether to continue
        if not critique_result["continue_retrieval"]:
            print("  Critique is positive. Stopping retrieval.")
            break
        else:
            print("  Critique indicates more retrieval is needed. Continuing.")

    return retrieved_docs

In [50]:
user_query_self_rag = "Can you provide details on what constitutes a 'pre-existing condition' and how it affects my claim eligibility?"
self_rag_results = perform_self_rag_search(user_query_self_rag, k=2)
print("--- Self-RAG Results ---")
for i, result in enumerate(self_rag_results):
    print(f"\nResult {i+1}:")
    print(f"  Doc ID: {result['doc_id']}")
    print(f"  Page: {result['page']}")
    print(f"  Content: \"{result['content'][:200]}...\"")

Performing Self-RAG search...
  Step 1: Generating retrieval cues and retrieving...
Performing hybrid RAG search (vector + keyword)...
Generating Self-RAG critique with LLM...
Error parsing JSON from LLM: ```json
{
  "critique": "The retrieved documents include a clear definition of 'Pre-existing Disease' (Def. 36) which directly answers the first part of the user's query. However, the documents do not provide details on 'how it affects my claim eligibility'. The second document snippet is largely irrelevant to pre-existing conditions and claim eligibility related to them, focusing instead on general claim procedures and pre-approval for various treatments. The generated response is a verbatim copy of the retrieved documents, which, while containing the definition, fails to synthesize the information or address the second part of the query. More information is needed to fully answer the 'how it affects my claim eligibility' aspect.",
  "continue_retrieval": true
}
```. Assuming positiv