# Creating Data Directories

In [1]:
# =============================================================================
# CELL 1: Basic Setup, Installation, and Folder Creation
# RAG Chatbot - JioPay Customer Support Chunking Implementation
# =============================================================================

# # Install required packages
# !pip install -q transformers torch sentence-transformers
# !pip install -q tiktoken google-generativeai
# !pip install -q beautifulsoup4 requests
# !pip install -q nltk spacy
# !pip install -q scikit-learn numpy pandas
# !pip install -q langchain langchain-google-genai

# # Download required models/data
# import nltk
# nltk.download('punkt', quiet=True)
# nltk.download('stopwords', quiet=True)

# # Download spacy model for text processing
# !python -m spacy download en_core_web_sm --quiet

# Create directory structure
import os
from pathlib import Path

# Create main directories
directories = [
    'data',
    'data/raw',           # Original scraped data
    'data/processed',     # Cleaned data 
    'data/chunks',        # All chunking results
    'data/chunks/fixed',  # Fixed chunking results
    'data/chunks/semantic',   # Semantic chunking results
    'data/chunks/structural', # Structural chunking results
    'data/chunks/recursive',  # Recursive chunking results
    'data/chunks/llm_based',  # LLM-based chunking results
    'results',
    'results/ablations',  # Ablation study results
    'results/metrics',    # Evaluation metrics
    'models',            # Downloaded/cached models
    'config'             # Configuration files
]

for directory in directories:
    os.makedirs(directory, exist_ok=True)

print("✅ All packages installed successfully!")
print("✅ Required models downloaded!")
print("📁 Directory structure created:")
for directory in directories:
    print(f"   {directory}/")

# Verify setup
print("\n🔍 Verifying installation...")
try:
    import transformers
    import tiktoken  
    import nltk
    import spacy
    import pandas as pd
    import numpy as np
    import google.generativeai as genai
    print("✅ All core libraries imported successfully!")
except ImportError as e:
    print(f"❌ Import error: {e}")

print("\n🚀 Setup complete! Ready for chunking implementation.")

✅ All packages installed successfully!
✅ Required models downloaded!
📁 Directory structure created:
   data/
   data/raw/
   data/processed/
   data/chunks/
   data/chunks/fixed/
   data/chunks/semantic/
   data/chunks/structural/
   data/chunks/recursive/
   data/chunks/llm_based/
   results/
   results/ablations/
   results/metrics/
   models/
   config/

🔍 Verifying installation...


  from .autonotebook import tqdm as notebook_tqdm


✅ All core libraries imported successfully!

🚀 Setup complete! Ready for chunking implementation.


# Fixed Chunking

In [2]:
# =============================================================================
# CELL 2: Fixed Chunking Implementation
# Strategy 1: Fixed-size chunks with specified token sizes and overlaps
# =============================================================================

import json
import tiktoken
import pandas as pd
from typing import List, Dict, Tuple
from dataclasses import dataclass
import time

# Initialize tokenizer for accurate token counting
tokenizer = tiktoken.get_encoding("cl100k_base")

@dataclass
class ChunkMetadata:
    """Metadata for each chunk"""
    chunk_id: str
    source_url: str
    source_title: str
    chunk_index: int
    token_count: int
    char_count: int
    strategy: str
    strategy_params: Dict

class FixedChunker:
    """Implementation of fixed-size chunking with overlaps"""
    
    def __init__(self):
        self.tokenizer = tokenizer
        
    def count_tokens(self, text: str) -> int:
        """Count tokens in text using tiktoken"""
        return len(self.tokenizer.encode(text))
    
    def chunk_by_tokens(self, text: str, chunk_size: int, overlap: int) -> List[str]:
        """
        Split text into fixed-size chunks based on token count
        
        Args:
            text: Input text to chunk
            chunk_size: Target size of each chunk in tokens
            overlap: Number of overlapping tokens between chunks
            
        Returns:
            List of text chunks
        """
        # Encode text to tokens
        tokens = self.tokenizer.encode(text)
        
        if len(tokens) <= chunk_size:
            return [text]
        
        chunks = []
        start_idx = 0
        
        while start_idx < len(tokens):
            # Get chunk tokens
            end_idx = min(start_idx + chunk_size, len(tokens))
            chunk_tokens = tokens[start_idx:end_idx]
            
            # Decode back to text
            chunk_text = self.tokenizer.decode(chunk_tokens)
            chunks.append(chunk_text.strip())
            
            # Move start position (accounting for overlap)
            if end_idx >= len(tokens):
                break
            start_idx = end_idx - overlap
            
            # Ensure we don't get stuck in infinite loop
            if start_idx <= 0:
                start_idx = end_idx
                
        return chunks
    
    def process_document(self, doc: Dict, chunk_size: int, overlap: int) -> List[Dict]:
        """
        Process a single document with fixed chunking
        
        Args:
            doc: Document dictionary with url, title, content
            chunk_size: Target chunk size in tokens
            overlap: Overlap size in tokens
            
        Returns:
            List of chunk dictionaries with metadata
        """
        content = doc.get('content', '')
        if not content.strip():
            return []
        
        chunks = self.chunk_by_tokens(content, chunk_size, overlap)
        
        chunk_objects = []
        for i, chunk_text in enumerate(chunks):
            if chunk_text.strip():  # Skip empty chunks
                chunk_obj = {
                    'chunk_id': f"{doc['url']}#chunk_{i}",
                    'source_url': doc['url'],
                    'source_title': doc.get('title', ''),
                    'content': chunk_text.strip(),
                    'chunk_index': i,
                    'token_count': self.count_tokens(chunk_text),
                    'char_count': len(chunk_text),
                    'strategy': 'fixed',
                    'strategy_params': {
                        'chunk_size': chunk_size,
                        'overlap': overlap
                    },
                    'metadata': ChunkMetadata(
                        chunk_id=f"{doc['url']}#chunk_{i}",
                        source_url=doc['url'],
                        source_title=doc.get('title', ''),
                        chunk_index=i,
                        token_count=self.count_tokens(chunk_text),
                        char_count=len(chunk_text),
                        strategy='fixed',
                        strategy_params={'chunk_size': chunk_size, 'overlap': overlap}
                    )
                }
                chunk_objects.append(chunk_obj)
        
        return chunk_objects

def load_scraped_data() -> List[Dict]:
    """Load your actual scraped JioPay data from final.json"""
    
    try:
        with open('final.json', 'r', encoding='utf-8') as f:
            scraped_data = json.load(f)
        
        # Check if data needs format conversion
        formatted_data = []
        for item in scraped_data:
            # Handle different possible formats
            if 'content' in item:
                # Already in correct format
                formatted_item = item
            elif 'text' in item:
                # Convert 'text' field to 'content' field
                formatted_item = {
                    "url": item.get("url", "unknown"),
                    "title": item.get("title", item.get("url", "unknown")),
                    "content": item["text"]
                }
            else:
                # Skip items without text content
                continue
                
            formatted_data.append(formatted_item)
        
        print(f"✅ Successfully loaded {len(formatted_data)} documents from final.json")
        return formatted_data
        
    except FileNotFoundError:
        print("❌ final.json not found in current directory")
        print("📁 Please ensure final.json is in the same directory as this notebook")
        return []
    except json.JSONDecodeError as e:
        print(f"❌ Error parsing final.json: {e}")
        return []
    except Exception as e:
        print(f"❌ Error loading final.json: {e}")
        return []

# Initialize the fixed chunker
fixed_chunker = FixedChunker()

# Load scraped data from final.json
documents = load_scraped_data()

if not documents:
    print("❌ No documents loaded. Please check your final.json file.")
else:
    print(f"📊 Total documents loaded: {len(documents)}")
    print(f"📄 First document: {documents[0]['title']}")
    print(f"📏 Content length: {len(documents[0]['content'])} characters")
    
    # Show total content stats
    total_chars = sum(len(doc['content']) for doc in documents)
    total_tokens = sum(fixed_chunker.count_tokens(doc['content']) for doc in documents)
    print(f"📈 Total content: {total_chars:,} characters, ~{total_tokens:,} tokens")

# Define the three fixed chunking strategies as specified
strategies = [
    {"chunk_size": 256, "overlap": 0, "name": "Fixed_256_0"},
    {"chunk_size": 512, "overlap": 64, "name": "Fixed_512_64"}, 
    {"chunk_size": 1024, "overlap": 128, "name": "Fixed_1024_128"}
]

# Process documents with each strategy
results = {}

print("\n🔄 Processing documents with fixed chunking strategies...")
print("=" * 60)

for strategy in strategies:
    strategy_name = strategy["name"]
    chunk_size = strategy["chunk_size"]
    overlap = strategy["overlap"]
    
    print(f"\n📝 Processing: {strategy_name}")
    print(f"   Chunk size: {chunk_size} tokens")
    print(f"   Overlap: {overlap} tokens")
    
    start_time = time.time()
    all_chunks = []
    
    for doc_idx, doc in enumerate(documents):
        chunks = fixed_chunker.process_document(doc, chunk_size, overlap)
        all_chunks.extend(chunks)
        print(f"   Document {doc_idx + 1}: Generated {len(chunks)} chunks")
    
    processing_time = time.time() - start_time
    
    # Calculate statistics
    total_chunks = len(all_chunks)
    total_tokens = sum(chunk['token_count'] for chunk in all_chunks)
    avg_tokens_per_chunk = total_tokens / total_chunks if total_chunks > 0 else 0
    
    results[strategy_name] = {
        'chunks': all_chunks,
        'total_chunks': total_chunks,
        'total_tokens': total_tokens,
        'avg_tokens_per_chunk': avg_tokens_per_chunk,
        'processing_time': processing_time,
        'strategy_params': strategy
    }
    
    print(f"   ✅ Generated {total_chunks} chunks")
    print(f"   ⏱️  Processing time: {processing_time:.2f}s")
    print(f"   📊 Average tokens per chunk: {avg_tokens_per_chunk:.1f}")
    
    # Save chunks to file
    output_file = f"data/chunks/fixed/{strategy_name}.json"
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(all_chunks, f, indent=2, ensure_ascii=False, default=str)
    print(f"   💾 Saved to: {output_file}")

print("\n" + "=" * 60)
print("✅ Fixed chunking completed for all strategies!")

# Create summary comparison
print("\n📈 FIXED CHUNKING SUMMARY:")
print("=" * 80)
summary_df = pd.DataFrame([
    {
        'Strategy': name,
        'Chunk Size': results[name]['strategy_params']['chunk_size'],
        'Overlap': results[name]['strategy_params']['overlap'],
        'Total Chunks': results[name]['total_chunks'],
        'Avg Tokens/Chunk': f"{results[name]['avg_tokens_per_chunk']:.1f}",
        'Processing Time (s)': f"{results[name]['processing_time']:.2f}"
    }
    for name in results.keys()
])

print(summary_df.to_string(index=False))

# Save summary to CSV
summary_df.to_csv('results/ablations/fixed_chunking_summary.csv', index=False)
print(f"\n💾 Summary saved to: results/ablations/fixed_chunking_summary.csv")

# Show sample chunks from each strategy
print("\n🔍 SAMPLE CHUNKS (first chunk from each strategy):")
print("=" * 80)
for strategy_name, result in results.items():
    if result['chunks']:
        sample_chunk = result['chunks'][0]
        print(f"\n📄 {strategy_name}:")
        print(f"   Tokens: {sample_chunk['token_count']}")
        print(f"   Content preview: {sample_chunk['content'][:200]}...")
        print(f"   Source: {sample_chunk['source_title']}")

✅ Successfully loaded 139 documents from final.json
📊 Total documents loaded: 139
📄 First document: annexure_form_A.pdf
📏 Content length: 4283 characters
📈 Total content: 637,794 characters, ~141,210 tokens

🔄 Processing documents with fixed chunking strategies...

📝 Processing: Fixed_256_0
   Chunk size: 256 tokens
   Overlap: 0 tokens
   Document 1: Generated 4 chunks
   Document 2: Generated 2 chunks
   Document 3: Generated 8 chunks
   Document 4: Generated 20 chunks
   Document 5: Generated 20 chunks
   Document 6: Generated 25 chunks
   Document 7: Generated 22 chunks
   Document 8: Generated 3 chunks
   Document 9: Generated 90 chunks
   Document 10: Generated 5 chunks
   Document 11: Generated 6 chunks
   Document 12: Generated 6 chunks
   Document 13: Generated 31 chunks
   Document 14: Generated 4 chunks
   Document 15: Generated 3 chunks
   Document 16: Generated 2 chunks
   Document 17: Generated 93 chunks
   Document 18: Generated 3 chunks
   Document 19: Generated 4 chunk

# Semantic Chunking

In [3]:
# =============================================================================
# CELL 3: Semantic Chunking Implementation
# Strategy 2: Semantic-based chunking using sentence embeddings and similarity
# =============================================================================

import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from nltk.tokenize import sent_tokenize
import json
import time
from typing import List, Dict, Tuple
import pandas as pd

# Ensure NLTK data is available
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt', quiet=True)

class SemanticChunker:
    """Implementation of semantic chunking using sentence embeddings"""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize semantic chunker with embedding model
        
        Args:
            model_name: Sentence transformer model name
        """
        print(f"🔄 Loading embedding model: {model_name}")
        self.embedding_model = SentenceTransformer(model_name)
        self.model_name = model_name
        self.tokenizer = tokenizer  # From previous cell
        print(f"✅ Embedding model loaded successfully")
    
    def split_into_sentences(self, text: str) -> List[str]:
        """Split text into sentences using NLTK"""
        sentences = sent_tokenize(text)
        # Clean and filter sentences
        cleaned_sentences = []
        for sent in sentences:
            sent = sent.strip()
            if len(sent) > 10:  # Filter out very short sentences
                cleaned_sentences.append(sent)
        return cleaned_sentences
    
    def compute_sentence_embeddings(self, sentences: List[str]) -> np.ndarray:
        """Compute embeddings for list of sentences"""
        if not sentences:
            return np.array([])
        return self.embedding_model.encode(sentences)
    
    def compute_similarity_matrix(self, embeddings: np.ndarray) -> np.ndarray:
        """Compute cosine similarity matrix between sentence embeddings"""
        if len(embeddings) == 0:
            return np.array([])
        return cosine_similarity(embeddings)
    
    def find_semantic_boundaries(self, similarities: np.ndarray, threshold: float = 0.7) -> List[int]:
        """
        Find semantic boundaries based on similarity drops
        
        Args:
            similarities: Similarity matrix between consecutive sentences
            threshold: Similarity threshold below which to create boundaries
            
        Returns:
            List of sentence indices where semantic boundaries occur
        """
        if len(similarities) <= 1:
            return []
        
        boundaries = [0]  # Always start with first sentence
        
        # Look at consecutive sentence similarities
        for i in range(len(similarities) - 1):
            # Similarity between current and next sentence
            current_similarity = similarities[i][i + 1]
            
            # If similarity drops below threshold, create boundary
            if current_similarity < threshold:
                boundaries.append(i + 1)
        
        # Always end with last sentence index
        if boundaries[-1] != len(similarities):
            boundaries.append(len(similarities))
            
        return boundaries
    
    def merge_small_chunks(self, chunks: List[str], min_size: int = 100, max_size: int = 1500) -> List[str]:
        """
        Merge chunks that are too small and split chunks that are too large
        
        Args:
            chunks: List of text chunks
            min_size: Minimum chunk size in tokens
            max_size: Maximum chunk size in tokens
            
        Returns:
            List of adjusted chunks
        """
        if not chunks:
            return []
        
        adjusted_chunks = []
        i = 0
        
        while i < len(chunks):
            current_chunk = chunks[i]
            current_tokens = self.tokenizer.encode(current_chunk)
            
            # If chunk is too small, try to merge with next chunk
            if len(current_tokens) < min_size and i + 1 < len(chunks):
                next_chunk = chunks[i + 1]
                merged = current_chunk + " " + next_chunk
                merged_tokens = self.tokenizer.encode(merged)
                
                # If merged chunk is not too large, use it
                if len(merged_tokens) <= max_size:
                    adjusted_chunks.append(merged)
                    i += 2  # Skip next chunk as it's been merged
                else:
                    # Keep original chunk even if small
                    adjusted_chunks.append(current_chunk)
                    i += 1
            
            # If chunk is too large, split it using fixed chunking
            elif len(current_tokens) > max_size:
                # Split large chunk into smaller pieces
                sub_chunks = self.split_large_chunk(current_chunk, max_size)
                adjusted_chunks.extend(sub_chunks)
                i += 1
            
            else:
                # Chunk size is acceptable
                adjusted_chunks.append(current_chunk)
                i += 1
        
        return adjusted_chunks
    
    def split_large_chunk(self, text: str, max_tokens: int) -> List[str]:
        """Split a large chunk into smaller chunks"""
        tokens = self.tokenizer.encode(text)
        if len(tokens) <= max_tokens:
            return [text]
        
        # Use simple token-based splitting for large chunks
        chunks = []
        start = 0
        
        while start < len(tokens):
            end = min(start + max_tokens, len(tokens))
            chunk_tokens = tokens[start:end]
            chunk_text = self.tokenizer.decode(chunk_tokens)
            chunks.append(chunk_text.strip())
            start = end
        
        return chunks
    
    def semantic_chunk_text(self, text: str, similarity_threshold: float = 0.7,
                           min_chunk_size: int = 100, max_chunk_size: int = 1500) -> List[str]:
        """
        Perform semantic chunking on text
        
        Args:
            text: Input text to chunk
            similarity_threshold: Threshold for semantic similarity
            min_chunk_size: Minimum chunk size in tokens
            max_chunk_size: Maximum chunk size in tokens
            
        Returns:
            List of semantically coherent chunks
        """
        if not text.strip():
            return []
        
        # Step 1: Split into sentences
        sentences = self.split_into_sentences(text)
        if len(sentences) <= 1:
            return [text] if text.strip() else []
        
        # Step 2: Compute sentence embeddings
        embeddings = self.compute_sentence_embeddings(sentences)
        
        # Step 3: Compute similarity matrix
        similarity_matrix = self.compute_similarity_matrix(embeddings)
        
        # Step 4: Find semantic boundaries
        boundaries = self.find_semantic_boundaries(similarity_matrix, similarity_threshold)
        
        # Step 5: Create chunks based on boundaries
        chunks = []
        for i in range(len(boundaries) - 1):
            start_idx = boundaries[i]
            end_idx = boundaries[i + 1]
            chunk_sentences = sentences[start_idx:end_idx]
            chunk_text = " ".join(chunk_sentences)
            if chunk_text.strip():
                chunks.append(chunk_text.strip())
        
        # Step 6: Adjust chunk sizes
        adjusted_chunks = self.merge_small_chunks(chunks, min_chunk_size, max_chunk_size)
        
        return adjusted_chunks
    
    def process_document(self, doc: Dict, similarity_threshold: float = 0.7,
                        min_chunk_size: int = 100, max_chunk_size: int = 1500) -> List[Dict]:
        """
        Process a single document with semantic chunking
        
        Args:
            doc: Document dictionary with url, title, content
            similarity_threshold: Similarity threshold for chunking
            min_chunk_size: Minimum chunk size in tokens
            max_chunk_size: Maximum chunk size in tokens
            
        Returns:
            List of chunk dictionaries with metadata
        """
        content = doc.get('content', '')
        if not content.strip():
            return []
        
        chunks = self.semantic_chunk_text(content, similarity_threshold, min_chunk_size, max_chunk_size)
        
        chunk_objects = []
        for i, chunk_text in enumerate(chunks):
            if chunk_text.strip():
                chunk_obj = {
                    'chunk_id': f"{doc['url']}#semantic_chunk_{i}",
                    'source_url': doc['url'],
                    'source_title': doc.get('title', ''),
                    'content': chunk_text.strip(),
                    'chunk_index': i,
                    'token_count': len(self.tokenizer.encode(chunk_text)),
                    'char_count': len(chunk_text),
                    'strategy': 'semantic',
                    'strategy_params': {
                        'similarity_threshold': similarity_threshold,
                        'min_chunk_size': min_chunk_size,
                        'max_chunk_size': max_chunk_size,
                        'embedding_model': self.model_name
                    }
                }
                chunk_objects.append(chunk_obj)
        
        return chunk_objects

def load_scraped_data() -> List[Dict]:
    """Load scraped data from final.json"""
    try:
        with open('final.json', 'r', encoding='utf-8') as f:
            scraped_data = json.load(f)
        
        formatted_data = []
        for item in scraped_data:
            if 'content' in item:
                formatted_item = item
            elif 'text' in item:
                formatted_item = {
                    "url": item.get("url", "unknown"),
                    "title": item.get("title", item.get("url", "unknown")),
                    "content": item["text"]
                }
            else:
                continue
            formatted_data.append(formatted_item)
        
        return formatted_data
        
    except FileNotFoundError:
        print("❌ final.json not found")
        return []
    except Exception as e:
        print(f"❌ Error loading final.json: {e}")
        return []

# Initialize semantic chunker
print("🚀 Initializing Semantic Chunker...")
semantic_chunker = SemanticChunker(model_name="all-MiniLM-L6-v2")

# Load data
documents = load_scraped_data()

if not documents:
    print("❌ No documents loaded. Please check your final.json file.")
else:
    print(f"\n✅ Loaded {len(documents)} documents for semantic chunking")
    
    # Define semantic chunking configurations to test
    semantic_configs = [
        {
            "name": "Semantic_High_Sim", 
            "similarity_threshold": 0.8, 
            "min_size": 100, 
            "max_size": 1200
        },
        {
            "name": "Semantic_Med_Sim", 
            "similarity_threshold": 0.7, 
            "min_size": 150, 
            "max_size": 1500
        },
        {
            "name": "Semantic_Low_Sim", 
            "similarity_threshold": 0.6, 
            "min_size": 200, 
            "max_size": 1800
        }
    ]
    
    # Process documents with each semantic configuration
    semantic_results = {}
    
    print("\n🔄 Processing documents with semantic chunking strategies...")
    print("=" * 70)
    
    for config in semantic_configs:
        config_name = config["name"]
        threshold = config["similarity_threshold"]
        min_size = config["min_size"]
        max_size = config["max_size"]
        
        print(f"\n📝 Processing: {config_name}")
        print(f"   Similarity threshold: {threshold}")
        print(f"   Min size: {min_size} tokens")
        print(f"   Max size: {max_size} tokens")
        
        start_time = time.time()
        all_chunks = []
        
        for doc_idx, doc in enumerate(documents):
            chunks = semantic_chunker.process_document(
                doc, threshold, min_size, max_size
            )
            all_chunks.extend(chunks)
            print(f"   Document {doc_idx + 1}: Generated {len(chunks)} chunks")
        
        processing_time = time.time() - start_time
        
        # Calculate statistics
        total_chunks = len(all_chunks)
        total_tokens = sum(chunk['token_count'] for chunk in all_chunks)
        avg_tokens_per_chunk = total_tokens / total_chunks if total_chunks > 0 else 0
        
        semantic_results[config_name] = {
            'chunks': all_chunks,
            'total_chunks': total_chunks,
            'total_tokens': total_tokens,
            'avg_tokens_per_chunk': avg_tokens_per_chunk,
            'processing_time': processing_time,
            'config': config
        }
        
        print(f"   ✅ Generated {total_chunks} chunks")
        print(f"   ⏱️  Processing time: {processing_time:.2f}s")
        print(f"   📊 Average tokens per chunk: {avg_tokens_per_chunk:.1f}")
        
        # Save chunks to file
        output_file = f"data/chunks/semantic/{config_name}.json"
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(all_chunks, f, indent=2, ensure_ascii=False, default=str)
        print(f"   💾 Saved to: {output_file}")
    
    print("\n" + "=" * 70)
    print("✅ Semantic chunking completed for all configurations!")
    
    # Create summary comparison
    print("\n📈 SEMANTIC CHUNKING SUMMARY:")
    print("=" * 90)
    
    semantic_summary = []
    for name, result in semantic_results.items():
        config = result['config']
        semantic_summary.append({
            'Strategy': name,
            'Similarity Threshold': config['similarity_threshold'],
            'Min Size': config['min_size'],
            'Max Size': config['max_size'],
            'Total Chunks': result['total_chunks'],
            'Avg Tokens/Chunk': f"{result['avg_tokens_per_chunk']:.1f}",
            'Processing Time (s)': f"{result['processing_time']:.2f}"
        })
    
    semantic_df = pd.DataFrame(semantic_summary)
    print(semantic_df.to_string(index=False))
    
    # Save summary
    semantic_df.to_csv('results/ablations/semantic_chunking_summary.csv', index=False)
    print(f"\n💾 Summary saved to: results/ablations/semantic_chunking_summary.csv")
    
    # Show sample chunks
    print("\n🔍 SAMPLE CHUNKS (first chunk from each strategy):")
    print("=" * 90)
    for strategy_name, result in semantic_results.items():
        if result['chunks']:
            sample_chunk = result['chunks'][0]
            print(f"\n📄 {strategy_name}:")
            print(f"   Tokens: {sample_chunk['token_count']}")
            print(f"   Similarity threshold: {result['config']['similarity_threshold']}")
            print(f"   Content preview: {sample_chunk['content'][:200]}...")
            print(f"   Source: {sample_chunk['source_title']}")

🚀 Initializing Semantic Chunker...
🔄 Loading embedding model: all-MiniLM-L6-v2


KeyboardInterrupt: 

# Structural Chunking

In [None]:
# =============================================================================
# CELL 4: Structural Chunking Implementation (FIXED)
# Strategy 3: Structure-based chunking using headings, HTML tags, and document hierarchy
# =============================================================================

import re
import json
import time
import os
from typing import List, Dict, Tuple, Optional
import pandas as pd
from bs4 import BeautifulSoup

# Mock tokenizer for demonstration - replace with your actual tokenizer
class MockTokenizer:
    def encode(self, text: str) -> List[int]:
        """Simple mock tokenizer - replace with actual tokenizer"""
        return text.split()  # Simple word-based tokenization
    
    def decode(self, tokens: List[int]) -> str:
        """Simple mock decoder"""
        return " ".join(str(t) for t in tokens)

# Initialize mock tokenizer - replace with your actual tokenizer
tokenizer = MockTokenizer()

class StructuralChunker:
    """Implementation of structural chunking using headings and HTML structure"""
    
    def __init__(self):
        """Initialize structural chunker"""
        self.tokenizer = tokenizer
        
        # Define structural patterns to look for
        self.heading_patterns = [
            r'^#{1,6}\s+(.+)$',  # Markdown headings (# ## ### etc.)
            r'^(.+)\n={3,}$',    # Underlined headings with ===
            r'^(.+)\n-{3,}$',    # Underlined headings with ---
            r'^\d+\.\s+(.+)$',   # Numbered sections (1. 2. 3.)
            r'^[A-Z][A-Z\s]{5,}$',  # ALL CAPS headings
        ]
        
        # HTML tags that indicate structure
        self.structural_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div', 'section', 'article', 'p']
        
        print("✅ Structural chunker initialized")
    
    def detect_html_content(self, text: str) -> bool:
        """Check if text contains HTML tags"""
        return bool(re.search(r'<[^>]+>', text))
    
    def detect_content_patterns(self, text: str) -> List[Dict]:
        """
        Detect structural patterns in plain text content to identify potential section breaks
        and hierarchical structure based on common formatting patterns.
        """
        lines = text.split('\n')
        patterns = []
        
        for i, line in enumerate(lines):
            line = line.strip()
            if not line:
                continue
            
            # Pattern 1: Numbered sections (1. 2. 3. a) b) etc.)
            if re.match(r'^\d+\.\s+.+', line):
                patterns.append({
                    'type': 'numbered_section',
                    'line_num': i,
                    'text': line,
                    'pattern': 'numeric',
                    'level': 3
                })
                
            # Pattern 2: Alphabetic subsections (a) b) c) or (a. b. c.))
            elif re.match(r'^[a-z]\)\s+.+|^\([a-z]\)\s+.+|^\([a-z]\.\s+.+', line, re.IGNORECASE):
                patterns.append({
                    'type': 'alphabetic_subsection',
                    'line_num': i,
                    'text': line,
                    'pattern': 'alphabetic',
                    'level': 4
                })
                
            # Pattern 3: Roman numerals (i. ii. iii. or I. II. III.)
            elif re.match(r'^[IVXivx]+\.\s+.+', line):
                patterns.append({
                    'type': 'roman_numeral',
                    'line_num': i,
                    'text': line,
                    'pattern': 'roman',
                    'level': 3
                })
                
            # Pattern 4: Bullet points (• - *)
            elif line.startswith(('•', '-', '*', '→', '▪', '○')):
                patterns.append({
                    'type': 'bullet_point',
                    'line_num': i,
                    'text': line,
                    'pattern': 'bullet',
                    'level': 5
                })
                
            # Pattern 5: Headers (ALL CAPS or Title Case followed by colon)
            elif (line.isupper() and len(line) > 4) or re.match(r'^[A-Z][^.!?]*:', line):
                patterns.append({
                    'type': 'header',
                    'line_num': i,
                    'text': line,
                    'pattern': 'header',
                    'level': 2
                })
                
            # Pattern 6: Indented content (suggests hierarchical structure)
            elif line.startswith('    ') or line.startswith('\t'):
                patterns.append({
                    'type': 'indented_content',
                    'line_num': i,
                    'text': line,
                    'pattern': 'indent',
                    'level': 6
                })
                
            # Pattern 7: Question-Answer format (Q: or Question: followed by A: or Answer:)
            elif re.match(r'^(Q:|Question:|A:|Answer:)\s+.+', line, re.IGNORECASE):
                patterns.append({
                    'type': 'qa_format',
                    'line_num': i,
                    'text': line,
                    'pattern': 'qa',
                    'level': 4
                })

        return patterns
    
    def detect_topic_keywords(self, text: str) -> str:
        """
        Categorize content by topic using keyword matching
        """
        text_lower = text.lower()
        
        # Define topic keywords - customize these for your domain
        topic_keywords = {
            'kyc_documents': ['kyc', 'documents', 'verification', 'identity', 'proof', 'aadhaar', 'pan', 'declaration'],
            'merchant_onboarding': ['merchant', 'onboarding', 'registration', 'business', 'proprietor', 'settlement'],
            'payment_processing': ['payment', 'transaction', 'gateway', 'processing', 'refund', 'settlement'],
            'bank_account': ['bank account', 'settlement', 'beneficiary', 'passbook', 'cheque', 'utr'],
            'business_verification': ['business', 'proof', 'verification', 'certificate', 'legal name'],
            'legal_declaration': ['declaration', 'affirm', 'solemnly', 'proprietor', 'legal'],
            'technical_requirements': ['api', 'integration', 'technical', 'sdk', 'endpoint'],
            'general': []
        }
        
        best_topic = 'general'
        best_score = 0
        
        for topic, keywords in topic_keywords.items():
            if topic == 'general':
                continue
            score = sum(1 for keyword in keywords if keyword in text_lower)
            if score > best_score:
                best_score = score
                best_topic = topic
        
        return best_topic
    
    def extract_html_structure(self, html_text: str) -> List[Dict]:
        """
        Extract structural elements from HTML content
        
        Returns:
            List of structural elements with their hierarchy and content
        """
        soup = BeautifulSoup(html_text, 'html.parser')
        elements = []
        
        # Find all structural elements
        for tag in soup.find_all(self.structural_tags):
            # Skip if tag has no text content
            text_content = tag.get_text().strip()
            if not text_content:
                continue
                
            # Determine hierarchy level
            if tag.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
                level = int(tag.name[1])  # Extract number from h1, h2, etc.
                element_type = 'heading'
            else:
                level = 10  # Give non-heading elements lower priority
                element_type = 'content'
            
            elements.append({
                'tag': tag.name,
                'level': level,
                'type': element_type,
                'text': text_content,
                'start_pos': html_text.find(str(tag)) if str(tag) in html_text else 0
            })
        
        # Sort by position in document
        elements.sort(key=lambda x: x['start_pos'])
        return elements
    
    def detect_markdown_headings(self, text: str) -> List[Dict]:
        """
        Detect markdown-style headings and other structural patterns
        
        Returns:
            List of heading elements with positions and levels
        """
        lines = text.split('\n')
        headings = []
        
        for i, line in enumerate(lines):
            line = line.strip()
            if not line:
                continue
                
            # Check markdown headings (# ## ### etc.)
            markdown_match = re.match(r'^(#{1,6})\s+(.+)$', line)
            if markdown_match:
                level = len(markdown_match.group(1))
                title = markdown_match.group(2).strip()
                headings.append({
                    'line_num': i,
                    'level': level,
                    'type': 'markdown_heading',
                    'text': title,
                    'full_line': line
                })
                continue
            
            # Check underlined headings
            if i + 1 < len(lines):
                next_line = lines[i + 1].strip()
                if re.match(r'^={3,}$', next_line):  # Underlined with ===
                    headings.append({
                        'line_num': i,
                        'level': 1,
                        'type': 'underlined_heading',
                        'text': line,
                        'full_line': line
                    })
                    continue
                elif re.match(r'^-{3,}$', next_line):  # Underlined with ---
                    headings.append({
                        'line_num': i,
                        'level': 2,
                        'type': 'underlined_heading',
                        'text': line,
                        'full_line': line
                    })
                    continue
            
            # Check numbered sections (1. 2. 3.)
            if re.match(r'^\d+\.\s+(.+)$', line):
                headings.append({
                    'line_num': i,
                    'level': 3,
                    'type': 'numbered_section',
                    'text': line,
                    'full_line': line
                })
                continue
            
            # Check ALL CAPS headings (at least 5 characters)
            if re.match(r'^[A-Z][A-Z\s]{4,}$', line) and len(line.split()) <= 6:
                headings.append({
                    'line_num': i,
                    'level': 2,
                    'type': 'caps_heading',
                    'text': line,
                    'full_line': line
                })
                continue
        
        return headings
    
    def chunk_by_content_patterns(self, text: str) -> List[Dict]:
        """
        Chunk text based on detected content patterns (for documents without HTML/markdown)
        """
        patterns = self.detect_content_patterns(text)
        
        if not patterns:
            # No patterns found - use topic-based chunking
            return self.chunk_by_topic_content(text)
        
        lines = text.split('\n')
        chunks = []
        
        for i, pattern in enumerate(patterns):
            # Determine start and end positions for this chunk
            start_line = pattern['line_num']
            
            # Find end line (next pattern or end of document)
            if i + 1 < len(patterns):
                end_line = patterns[i + 1]['line_num']
            else:
                end_line = len(lines)
            
            # Extract content for this section
            section_lines = lines[start_line:end_line]
            content = '\n'.join(section_lines).strip()
            
            if content:
                topic = self.detect_topic_keywords(content)
                chunks.append({
                    'content': content,
                    'structure_type': pattern['pattern'],
                    'level': pattern['level'],
                    'heading': pattern['text'][:100],  # Limit heading length
                    'topic': topic
                })
        
        return chunks if chunks else self.chunk_by_topic_content(text)
    
    def chunk_by_topic_content(self, text: str) -> List[Dict]:
        """
        Fallback chunking by content topics when no structural patterns exist
        """
        # Split into logical paragraphs first
        paragraphs = re.split(r'\n\s*\n', text)
        
        chunks = []
        current_chunk_paras = []
        current_tokens = 0
        max_tokens_per_chunk = 800  # Reasonable size for topic-based chunks
        
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            
            para_tokens = len(self.tokenizer.encode(para))
            
            # If adding this paragraph would exceed limit, finalize current chunk
            if current_tokens + para_tokens > max_tokens_per_chunk and current_chunk_paras:
                chunk_content = '\n\n'.join(current_chunk_paras)
                topic = self.detect_topic_keywords(chunk_content)
                
                chunks.append({
                    'content': chunk_content,
                    'structure_type': 'topic_based',
                    'level': 5,  # Low priority level
                    'heading': f'Topic: {topic.title()}',
                    'topic': topic
                })
                
                current_chunk_paras = [para]
                current_tokens = para_tokens
            else:
                current_chunk_paras.append(para)
                current_tokens += para_tokens
        
        # Add final chunk
        if current_chunk_paras:
            chunk_content = '\n\n'.join(current_chunk_paras)
            topic = self.detect_topic_keywords(chunk_content)
            
            chunks.append({
                'content': chunk_content,
                'structure_type': 'topic_based',
                'level': 5,
                'heading': f'Topic: {topic.title()}',
                'topic': topic
            })
        
        return chunks
    
    def chunk_by_html_structure(self, text: str) -> List[Dict]:
        """Chunk text based on HTML structure"""
        elements = self.extract_html_structure(text)
        
        if not elements:
            return [{'content': text, 'structure_type': 'no_structure', 'level': 0, 'heading': ''}]
        
        chunks = []
        current_chunk = ""
        current_level = 0
        current_heading = ""
        
        for i, element in enumerate(elements):
            if element['type'] == 'heading':
                # Save previous chunk if it has content
                if current_chunk.strip():
                    chunks.append({
                        'content': current_chunk.strip(),
                        'structure_type': 'html_section',
                        'level': current_level,
                        'heading': current_heading
                    })
                
                # Start new chunk with this heading
                current_chunk = element['text'] + "\n"
                current_level = element['level']
                current_heading = element['text']
            else:
                # Add content to current chunk
                current_chunk += element['text'] + "\n"
        
        # Add final chunk
        if current_chunk.strip():
            chunks.append({
                'content': current_chunk.strip(),
                'structure_type': 'html_section',
                'level': current_level,
                'heading': current_heading
            })
        
        return chunks
    
    def chunk_by_markdown_structure(self, text: str) -> List[Dict]:
        """Chunk text based on markdown headings and structure"""
        headings = self.detect_markdown_headings(text)
        
        if not headings:
            return [{'content': text, 'structure_type': 'no_headings', 'level': 0, 'heading': ''}]
        
        lines = text.split('\n')
        chunks = []
        
        for i, heading in enumerate(headings):
            # Determine start and end positions for this chunk
            start_line = heading['line_num']
            
            # Find end line (next heading or end of document)
            if i + 1 < len(headings):
                end_line = headings[i + 1]['line_num']
            else:
                end_line = len(lines)
            
            # Extract content for this section
            section_lines = lines[start_line:end_line]
            content = '\n'.join(section_lines).strip()
            
            if content:
                chunks.append({
                    'content': content,
                    'structure_type': heading['type'],
                    'level': heading['level'],
                    'heading': heading['text']
                })
        
        return chunks
    
    def chunk_by_paragraph_structure(self, text: str) -> List[Dict]:
        """Chunk text based on paragraph breaks"""
        # Split by double newlines (paragraph breaks)
        paragraphs = re.split(r'\n\s*\n', text)
        
        chunks = []
        for i, paragraph in enumerate(paragraphs):
            content = paragraph.strip()
            if content:
                chunks.append({
                    'content': content,
                    'structure_type': 'paragraph',
                    'level': 5,  # Lower priority level
                    'heading': f'Paragraph {i+1}'
                })
        
        return chunks
    
    def merge_small_chunks(self, chunks: List[Dict], min_tokens: int = 100, max_tokens: int = 1500) -> List[Dict]:
        """Merge chunks that are too small and split chunks that are too large"""
        if not chunks:
            return []
        
        adjusted_chunks = []
        i = 0
        
        while i < len(chunks):
            current_chunk = chunks[i]
            current_tokens = len(self.tokenizer.encode(current_chunk['content']))
            
            # If chunk is too small, try to merge with next chunk
            if current_tokens < min_tokens and i + 1 < len(chunks):
                next_chunk = chunks[i + 1]
                
                # Only merge if they're at similar structural levels
                if abs(current_chunk['level'] - next_chunk['level']) <= 1:
                    merged_content = current_chunk['content'] + "\n\n" + next_chunk['content']
                    merged_tokens = len(self.tokenizer.encode(merged_content))
                    
                    if merged_tokens <= max_tokens:
                        merged_chunk = {
                            'content': merged_content,
                            'structure_type': f"{current_chunk['structure_type']}_merged",
                            'level': min(current_chunk['level'], next_chunk['level']),
                            'heading': f"{current_chunk['heading']} + {next_chunk['heading']}"
                        }
                        adjusted_chunks.append(merged_chunk)
                        i += 2
                        continue
                
                # Can't merge, keep original
                adjusted_chunks.append(current_chunk)
                i += 1
            
            # If chunk is too large, split it
            elif current_tokens > max_tokens:
                sub_chunks = self.split_large_chunk(current_chunk, max_tokens)
                adjusted_chunks.extend(sub_chunks)
                i += 1
            
            else:
                # Chunk size is acceptable
                adjusted_chunks.append(current_chunk)
                i += 1
        
        return adjusted_chunks
    
    def split_large_chunk(self, chunk: Dict, max_tokens: int) -> List[Dict]:
        """Split a large chunk into smaller pieces"""
        content = chunk['content']
        tokens = self.tokenizer.encode(content)
        
        if len(tokens) <= max_tokens:
            return [chunk]
        
        # Split by sentences first, then by tokens if needed
        sentences = re.split(r'[.!?]+', content)
        sub_chunks = []
        current_content = ""
        part_num = 1
        
        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue
                
            # Check if adding this sentence would exceed limit
            test_content = current_content + ". " + sentence if current_content else sentence
            test_tokens = len(self.tokenizer.encode(test_content))
            
            if test_tokens > max_tokens and current_content:
                # Save current chunk and start new one
                sub_chunks.append({
                    'content': current_content.strip(),
                    'structure_type': f"{chunk['structure_type']}_split",
                    'level': chunk['level'],
                    'heading': f"{chunk['heading']} (Part {part_num})"
                })
                current_content = sentence
                part_num += 1
            else:
                current_content = test_content
        
        # Add final chunk
        if current_content.strip():
            sub_chunks.append({
                'content': current_content.strip(),
                'structure_type': f"{chunk['structure_type']}_split",
                'level': chunk['level'],
                'heading': f"{chunk['heading']} (Part {part_num})"
            })
        
        return sub_chunks if sub_chunks else [chunk]
    
    def structural_chunk_text(self, text: str, preserve_hierarchy: bool = True,
                            min_chunk_tokens: int = 100, max_chunk_tokens: int = 1500) -> List[Dict]:
        """
        Perform structural chunking on text (adapted for plain text without HTML/markdown)
        
        Args:
            text: Input text to chunk
            preserve_hierarchy: Whether to maintain structural hierarchy
            min_chunk_tokens: Minimum chunk size in tokens
            max_chunk_tokens: Maximum chunk size in tokens
            
        Returns:
            List of structurally coherent chunks
        """
        if not text.strip():
            return []
        
        # Try different structural approaches in order of preference
        chunks = []
        
        # 1. Try HTML structure first
        if self.detect_html_content(text):
            chunks = self.chunk_by_html_structure(text)
        
        # 2. Try markdown structure
        elif self.detect_markdown_headings(text):
            chunks = self.chunk_by_markdown_structure(text)
        
        # 3. Try content pattern detection (for plain text documents)
        else:
            chunks = self.chunk_by_content_patterns(text)
        
        # 4. If no meaningful chunks, fall back to topic-based chunking
        if not chunks or (len(chunks) == 1 and chunks[0].get('structure_type') == 'topic_based'):
            chunks = self.chunk_by_topic_content(text)
        
        # Adjust chunk sizes if needed
        if preserve_hierarchy:
            chunks = self.merge_small_chunks(chunks, min_chunk_tokens, max_chunk_tokens)
        
        return chunks
    
    def process_document(self, doc: Dict, preserve_hierarchy: bool = True,
                        min_chunk_tokens: int = 100, max_chunk_tokens: int = 1500) -> List[Dict]:
        """
        Process a single document with structural chunking
        
        Args:
            doc: Document dictionary with url, title, content
            preserve_hierarchy: Whether to maintain structural hierarchy
            min_chunk_tokens: Minimum chunk size in tokens
            max_chunk_tokens: Maximum chunk size in tokens
            
        Returns:
            List of chunk dictionaries with metadata
        """
        content = doc.get('content', '')
        if not content.strip():
            return []
        
        structural_chunks = self.structural_chunk_text(
            content, preserve_hierarchy, min_chunk_tokens, max_chunk_tokens
        )
        
        chunk_objects = []
        for i, chunk_data in enumerate(structural_chunks):
            chunk_text = chunk_data['content']
            if chunk_text.strip():
                chunk_obj = {
                    'chunk_id': f"{doc['url']}#structural_chunk_{i}",
                    'source_url': doc['url'],
                    'source_title': doc.get('title', ''),
                    'content': chunk_text.strip(),
                    'chunk_index': i,
                    'token_count': len(self.tokenizer.encode(chunk_text)),
                    'char_count': len(chunk_text),
                    'strategy': 'structural',
                    'strategy_params': {
                        'preserve_hierarchy': preserve_hierarchy,
                        'min_chunk_tokens': min_chunk_tokens,
                        'max_chunk_tokens': max_chunk_tokens
                    },
                    'structural_info': {
                        'structure_type': chunk_data['structure_type'],
                        'level': chunk_data['level'],
                        'heading': chunk_data['heading'],
                        'topic': chunk_data.get('topic', 'general')
                    }
                }
                chunk_objects.append(chunk_obj)
        
        return chunk_objects


def load_scraped_data() -> List[Dict]:
    """Load scraped data from final.json"""
    try:
        with open('final.json', 'r', encoding='utf-8') as f:
            scraped_data = json.load(f)
        
        formatted_data = []
        for item in scraped_data:
            if 'content' in item:
                formatted_item = item
            elif 'text' in item:
                formatted_item = {
                    "url": item.get("url", "unknown"),
                    "title": item.get("title", item.get("url", "unknown")),
                    "content": item["text"]
                }
            else:
                continue
            formatted_data.append(formatted_item)
        
        return formatted_data
        
    except FileNotFoundError:
        print("❌ final.json not found")
        return []
    except Exception as e:
        print(f"❌ Error loading final.json: {e}")
        return []


def create_output_directories():
    """Create necessary output directories"""
    directories = [
        'data/chunks/structural',
        'results/ablations'
    ]
    
    for directory in directories:
        os.makedirs(directory, exist_ok=True)


def main():
    """Main execution function"""
    # Create output directories
    create_output_directories()
    
    # Initialize structural chunker
    print("🚀 Initializing Structural Chunker...")
    structural_chunker = StructuralChunker()

    # Load data
    documents = load_scraped_data()

    if not documents:
        print("❌ No documents loaded. Please check your final.json file.")
        return
    
    print(f"\n✅ Loaded {len(documents)} documents for structural chunking")
    
    # Define structural chunking configurations to test
    structural_configs = [
        {
            "name": "Structural_Hierarchical",
            "preserve_hierarchy": True,
            "min_tokens": 100,
            "max_tokens": 1200
        },
        {
            "name": "Structural_Balanced", 
            "preserve_hierarchy": True,
            "min_tokens": 150,
            "max_tokens": 1500
        },
        {
            "name": "Structural_Large",
            "preserve_hierarchy": False,  # Allow breaking hierarchy for size control
            "min_tokens": 200,
            "max_tokens": 1800
        }
    ]
    
    # Process documents with each structural configuration
    structural_results = {}
    
    print("\n🔄 Processing documents with structural chunking strategies...")
    print("=" * 75)
    
    for config in structural_configs:
        config_name = config["name"]
        preserve_hierarchy = config["preserve_hierarchy"]
        min_tokens = config["min_tokens"]
        max_tokens = config["max_tokens"]
        
        print(f"\n📝 Processing: {config_name}")
        print(f"   Preserve hierarchy: {preserve_hierarchy}")
        print(f"   Min tokens: {min_tokens}")
        print(f"   Max tokens: {max_tokens}")
        
        start_time = time.time()
        all_chunks = []
        
        for doc_idx, doc in enumerate(documents):
            chunks = structural_chunker.process_document(
                doc, preserve_hierarchy, min_tokens, max_tokens
            )
            all_chunks.extend(chunks)
            print(f"   Document {doc_idx + 1}: Generated {len(chunks)} chunks")
        
        processing_time = time.time() - start_time
        
        # Calculate statistics
        total_chunks = len(all_chunks)
        total_tokens = sum(chunk['token_count'] for chunk in all_chunks)
        avg_tokens_per_chunk = total_tokens / total_chunks if total_chunks > 0 else 0
        
        # Count different structure types and topics
        structure_types = {}
        topics = {}
        for chunk in all_chunks:
            struct_info = chunk.get('structural_info', {})
            struct_type = struct_info.get('structure_type', 'unknown')
            topic = struct_info.get('topic', 'general')
            
            structure_types[struct_type] = structure_types.get(struct_type, 0) + 1
            topics[topic] = topics.get(topic, 0) + 1
        
        structural_results[config_name] = {
            'chunks': all_chunks,
            'total_chunks': total_chunks,
            'total_tokens': total_tokens,
            'avg_tokens_per_chunk': avg_tokens_per_chunk,
            'processing_time': processing_time,
            'config': config,
            'structure_types': structure_types,
            'topics': topics
        }
        
        print(f"   ✅ Generated {total_chunks} chunks")
        print(f"   ⏱️  Processing time: {processing_time:.2f}s")
        print(f"   📊 Average tokens per chunk: {avg_tokens_per_chunk:.1f}")
        print(f"   🏗️  Structure types: {list(structure_types.keys())}")
        print(f"   📋 Topics found: {list(topics.keys())}")
        
        # Save chunks to file
        output_file = f"data/chunks/structural/{config_name}.json"
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(all_chunks, f, indent=2, ensure_ascii=False, default=str)
        print(f"   💾 Saved to: {output_file}")
    
    print("\n" + "=" * 75)
    print("✅ Structural chunking completed for all configurations!")
    
    # Create summary comparison
    print("\n📈 STRUCTURAL CHUNKING SUMMARY:")
    print("=" * 100)
    
    structural_summary = []
    for name, result in structural_results.items():
        config = result['config']
        structural_summary.append({
            'Strategy': name,
            'Preserve Hierarchy': config['preserve_hierarchy'],
            'Min Tokens': config['min_tokens'],
            'Max Tokens': config['max_tokens'],
            'Total Chunks': result['total_chunks'],
            'Avg Tokens/Chunk': f"{result['avg_tokens_per_chunk']:.1f}",
            'Processing Time (s)': f"{result['processing_time']:.2f}",
            'Structure Types': len(result['structure_types'])
        })
    
    structural_df = pd.DataFrame(structural_summary)
    print(structural_df.to_string(index=False))
    
    # Save summary
    structural_df.to_csv('results/ablations/structural_chunking_summary.csv', index=False)
    print(f"\n💾 Summary saved to: results/ablations/structural_chunking_summary.csv")
    
    # Show sample chunks with structural information
    print("\n🔍 SAMPLE CHUNKS WITH STRUCTURE INFO:")
    print("=" * 100)
    for strategy_name, result in structural_results.items():
        if result['chunks']:
            sample_chunk = result['chunks'][0]
            structural_info = sample_chunk.get('structural_info', {})
            print(f"\n📄 {strategy_name}:")
            print(f"   Tokens: {sample_chunk['token_count']}")
            print(f"   Structure Type: {structural_info.get('structure_type', 'N/A')}")
            print(f"   Level: {structural_info.get('level', 'N/A')}")
            print(f"   Heading: {structural_info.get('heading', 'N/A')}")
            print(f"   Content preview: {sample_chunk['content'][:200]}...")
            print(f"   Source: {sample_chunk['source_title']}")


if __name__ == "__main__":
    main()

🚀 Initializing Structural Chunker...
✅ Structural chunker initialized

✅ Loaded 139 documents for structural chunking

🔄 Processing documents with structural chunking strategies...

📝 Processing: Structural_Hierarchical
   Preserve hierarchy: True
   Min tokens: 100
   Max tokens: 1200
   Document 1: Generated 2 chunks
   Document 2: Generated 5 chunks
   Document 3: Generated 14 chunks
   Document 4: Generated 8 chunks
   Document 5: Generated 8 chunks
   Document 6: Generated 5 chunks
   Document 7: Generated 16 chunks
   Document 8: Generated 3 chunks
   Document 9: Generated 33 chunks
   Document 10: Generated 6 chunks
   Document 11: Generated 4 chunks
   Document 12: Generated 4 chunks
   Document 13: Generated 26 chunks
   Document 14: Generated 4 chunks
   Document 15: Generated 23 chunks
   Document 16: Generated 1 chunks
   Document 17: Generated 28 chunks
   Document 18: Generated 1 chunks
   Document 19: Generated 2 chunks
   Document 20: Generated 2 chunks
   Document 21: 

# Recursive Chunking

In [None]:
# =============================================================================
# RECURSIVE CHUNKING IMPLEMENTATION
# Strategy 4: Hierarchical fallback from large structural blocks to smaller chunks
# =============================================================================

import re
import json
import time
import os
from typing import List, Dict, Tuple, Optional
import pandas as pd
from bs4 import BeautifulSoup

# Mock tokenizer for demonstration - replace with your actual tokenizer
class MockTokenizer:
    def encode(self, text: str) -> List[int]:
        """Simple mock tokenizer - replace with actual tokenizer"""
        return text.split()  # Simple word-based tokenization

# Initialize mock tokenizer - replace with your actual tokenizer
tokenizer = MockTokenizer()

class RecursiveChunker:
    """
    Implementation of recursive chunking with hierarchical fallback:
    1. Try structural chunking first (headings, HTML tags)
    2. If chunks are too large, fall back to semantic chunking
    3. If still too large, fall back to fixed-size chunking
    """
    
    def __init__(self):
        """Initialize recursive chunker"""
        self.tokenizer = tokenizer
        
        # Structural patterns (from structural chunker)
        self.heading_patterns = [
            r'^#{1,6}\s+(.+)$',  # Markdown headings
            r'^(.+)\n={3,}$',    # Underlined headings with ===
            r'^(.+)\n-{3,}$',    # Underlined headings with ---
            r'^\d+\.\s+(.+)$',   # Numbered sections
            r'^[A-Z][A-Z\s]{5,}$',  # ALL CAPS headings
        ]
        
        self.structural_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div', 'section', 'article', 'p']
        
        print("✅ Recursive chunker initialized")
    
    def detect_html_content(self, text: str) -> bool:
        """Check if text contains HTML tags"""
        return bool(re.search(r'<[^>]+>', text))
    
    def get_structural_chunks(self, text: str) -> List[Dict]:
        """
        Extract structural chunks using headings and HTML structure
        Similar to structural chunker but returns raw chunks without size constraints
        """
        if self.detect_html_content(text):
            return self._extract_html_structural_chunks(text)
        else:
            return self._extract_text_structural_chunks(text)
    
    def _extract_html_structural_chunks(self, html_text: str) -> List[Dict]:
        """Extract chunks based on HTML structure"""
        soup = BeautifulSoup(html_text, 'html.parser')
        chunks = []
        
        # Find all heading elements
        headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
        
        if not headings:
            # No headings found, return whole text as one chunk
            return [{
                'content': soup.get_text().strip(),
                'level': 0,
                'heading': 'No Structure',
                'chunk_type': 'html_no_structure'
            }]
        
        # Process each heading section
        for i, heading in enumerate(headings):
            level = int(heading.name[1])  # Extract number from h1, h2, etc.
            heading_text = heading.get_text().strip()
            
            # Find content until next heading of same or higher level
            content_elements = []
            current = heading.next_sibling
            
            while current:
                if current.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
                    current_level = int(current.name[1])
                    if current_level <= level:  # Same or higher level heading
                        break
                
                if hasattr(current, 'get_text'):
                    text_content = current.get_text().strip()
                    if text_content:
                        content_elements.append(text_content)
                elif isinstance(current, str) and current.strip():
                    content_elements.append(current.strip())
                
                current = current.next_sibling
            
            # Combine heading and content
            full_content = heading_text
            if content_elements:
                full_content += "\n\n" + "\n".join(content_elements)
            
            if full_content.strip():
                chunks.append({
                    'content': full_content.strip(),
                    'level': level,
                    'heading': heading_text,
                    'chunk_type': 'html_structural'
                })
        
        return chunks
    
    def _extract_text_structural_chunks(self, text: str) -> List[Dict]:
        """Extract chunks based on text structure patterns"""
        lines = text.split('\n')
        chunks = []
        current_chunk_lines = []
        current_heading = ""
        current_level = 10  # Default high level for non-structured content
        
        for line in lines:
            line_stripped = line.strip()
            if not line_stripped:
                if current_chunk_lines:  # Keep empty lines within chunks
                    current_chunk_lines.append(line)
                continue
            
            # Check for structural patterns
            is_heading = False
            heading_level = 10
            
            # Markdown headings (# ## ### etc.)
            markdown_match = re.match(r'^(#{1,6})\s+(.+)$', line_stripped)
            if markdown_match:
                is_heading = True
                heading_level = len(markdown_match.group(1))
                heading_text = markdown_match.group(2)
            
            # Numbered sections (1. 2. 3.)
            elif re.match(r'^\d+\.\s+(.+)$', line_stripped):
                is_heading = True
                heading_level = 3
                heading_text = line_stripped
            
            # ALL CAPS headings
            elif line_stripped.isupper() and len(line_stripped) > 4 and len(line_stripped.split()) <= 6:
                is_heading = True
                heading_level = 2
                heading_text = line_stripped
            
            # Title case with colon
            elif re.match(r'^[A-Z][^.!?]*:$', line_stripped):
                is_heading = True
                heading_level = 3
                heading_text = line_stripped
            
            if is_heading:
                # Save previous chunk if exists
                if current_chunk_lines:
                    chunk_content = '\n'.join(current_chunk_lines).strip()
                    if chunk_content:
                        chunks.append({
                            'content': chunk_content,
                            'level': current_level,
                            'heading': current_heading if current_heading else 'Content Block',
                            'chunk_type': 'text_structural'
                        })
                
                # Start new chunk
                current_chunk_lines = [line]
                current_heading = heading_text
                current_level = heading_level
            else:
                current_chunk_lines.append(line)
        
        # Add final chunk
        if current_chunk_lines:
            chunk_content = '\n'.join(current_chunk_lines).strip()
            if chunk_content:
                chunks.append({
                    'content': chunk_content,
                    'level': current_level,
                    'heading': current_heading if current_heading else 'Content Block',
                    'chunk_type': 'text_structural'
                })
        
        # If no structured chunks found, return whole text
        if not chunks:
            chunks = [{
                'content': text.strip(),
                'level': 10,
                'heading': 'No Structure',
                'chunk_type': 'text_no_structure'
            }]
        
        return chunks
    
    def semantic_split_chunk(self, text: str, max_tokens: int = 512) -> List[Dict]:
        """
        Split text using semantic boundaries (sentences and paragraphs)
        """
        # Split by paragraphs first
        paragraphs = re.split(r'\n\s*\n', text)
        chunks = []
        current_chunk = ""
        
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            
            # Check if adding this paragraph would exceed limit
            test_chunk = current_chunk + "\n\n" + para if current_chunk else para
            test_tokens = len(self.tokenizer.encode(test_chunk))
            
            if test_tokens > max_tokens and current_chunk:
                # Current chunk is full, save it
                chunks.append({
                    'content': current_chunk.strip(),
                    'chunk_type': 'semantic_paragraph',
                    'level': 5,
                    'heading': 'Semantic Chunk'
                })
                current_chunk = para
            else:
                current_chunk = test_chunk
        
        # Handle remaining content
        if current_chunk.strip():
            current_tokens = len(self.tokenizer.encode(current_chunk))
            if current_tokens > max_tokens:
                # Still too large, split by sentences
                sentence_chunks = self.sentence_split_chunk(current_chunk, max_tokens)
                chunks.extend(sentence_chunks)
            else:
                chunks.append({
                    'content': current_chunk.strip(),
                    'chunk_type': 'semantic_paragraph',
                    'level': 5,
                    'heading': 'Semantic Chunk'
                })
        
        return chunks
    
    def sentence_split_chunk(self, text: str, max_tokens: int = 512) -> List[Dict]:
        """Split text by sentences when paragraph splitting isn't enough"""
        # Split by sentences
        sentences = re.split(r'[.!?]+', text)
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue
            
            # Add sentence terminator back
            sentence_with_punct = sentence + "."
            test_chunk = current_chunk + " " + sentence_with_punct if current_chunk else sentence_with_punct
            test_tokens = len(self.tokenizer.encode(test_chunk))
            
            if test_tokens > max_tokens and current_chunk:
                # Current chunk is full, save it
                chunks.append({
                    'content': current_chunk.strip(),
                    'chunk_type': 'semantic_sentence',
                    'level': 6,
                    'heading': 'Sentence Chunk'
                })
                current_chunk = sentence_with_punct
            else:
                current_chunk = test_chunk
        
        # Add final chunk
        if current_chunk.strip():
            chunks.append({
                'content': current_chunk.strip(),
                'chunk_type': 'semantic_sentence',
                'level': 6,
                'heading': 'Sentence Chunk'
            })
        
        return chunks
    
    def fixed_size_split_chunk(self, text: str, chunk_size: int = 512, overlap: int = 64) -> List[Dict]:
        """
        Final fallback: split by fixed token size with overlap
        """
        tokens = self.tokenizer.encode(text)
        chunks = []
        
        start = 0
        chunk_num = 1
        
        while start < len(tokens):
            end = min(start + chunk_size, len(tokens))
            chunk_tokens = tokens[start:end]
            
            # Convert back to text (simplified for mock tokenizer)
            if isinstance(chunk_tokens[0], str):
                chunk_text = " ".join(chunk_tokens)
            else:
                chunk_text = self.tokenizer.decode(chunk_tokens)
            
            chunks.append({
                'content': chunk_text.strip(),
                'chunk_type': 'fixed_size',
                'level': 7,  # Lowest priority
                'heading': f'Fixed Chunk {chunk_num}'
            })
            
            # Move start position with overlap
            start = end - overlap if end < len(tokens) else len(tokens)
            chunk_num += 1
        
        return chunks
    
    def recursive_chunk_text(self, text: str, max_tokens: int = 1024, 
                           semantic_max_tokens: int = 512, 
                           fixed_chunk_size: int = 256,
                           fixed_overlap: int = 64) -> List[Dict]:
        """
        Main recursive chunking logic:
        1. Try structural chunking
        2. If chunks too large, apply semantic chunking
        3. If still too large, apply fixed-size chunking
        
        Args:
            text: Input text to chunk
            max_tokens: Maximum tokens for structural chunks before fallback
            semantic_max_tokens: Maximum tokens for semantic chunks before fallback
            fixed_chunk_size: Size for fixed chunking fallback
            fixed_overlap: Overlap for fixed chunking
            
        Returns:
            List of chunks with metadata
        """
        if not text.strip():
            return []
        
        # Step 1: Try structural chunking
        structural_chunks = self.get_structural_chunks(text)
        
        final_chunks = []
        
        for chunk in structural_chunks:
            chunk_content = chunk['content']
            chunk_tokens = len(self.tokenizer.encode(chunk_content))
            
            if chunk_tokens <= max_tokens:
                # Chunk is acceptable size, keep as is
                final_chunks.append({
                    'content': chunk_content,
                    'chunk_type': chunk['chunk_type'],
                    'level': chunk['level'],
                    'heading': chunk['heading'],
                    'token_count': chunk_tokens,
                    'processing_level': 'structural'
                })
            else:
                # Chunk too large, apply semantic chunking
                semantic_chunks = self.semantic_split_chunk(chunk_content, semantic_max_tokens)
                
                for sem_chunk in semantic_chunks:
                    sem_tokens = len(self.tokenizer.encode(sem_chunk['content']))
                    
                    if sem_tokens <= semantic_max_tokens:
                        # Semantic chunk is acceptable
                        final_chunks.append({
                            'content': sem_chunk['content'],
                            'chunk_type': sem_chunk['chunk_type'],
                            'level': sem_chunk['level'],
                            'heading': f"{chunk['heading']} - {sem_chunk['heading']}",
                            'token_count': sem_tokens,
                            'processing_level': 'semantic',
                            'parent_heading': chunk['heading']
                        })
                    else:
                        # Still too large, apply fixed-size chunking
                        fixed_chunks = self.fixed_size_split_chunk(
                            sem_chunk['content'], fixed_chunk_size, fixed_overlap
                        )
                        
                        for i, fix_chunk in enumerate(fixed_chunks):
                            fix_tokens = len(self.tokenizer.encode(fix_chunk['content']))
                            final_chunks.append({
                                'content': fix_chunk['content'],
                                'chunk_type': fix_chunk['chunk_type'],
                                'level': fix_chunk['level'],
                                'heading': f"{chunk['heading']} - Part {i+1}",
                                'token_count': fix_tokens,
                                'processing_level': 'fixed',
                                'parent_heading': chunk['heading'],
                                'semantic_parent': sem_chunk['heading']
                            })
        
        return final_chunks
    
    def process_document(self, doc: Dict, max_tokens: int = 1024,
                        semantic_max_tokens: int = 512,
                        fixed_chunk_size: int = 256,
                        fixed_overlap: int = 64) -> List[Dict]:
        """
        Process a single document with recursive chunking
        
        Args:
            doc: Document dictionary with url, title, content
            max_tokens: Maximum tokens for structural chunks
            semantic_max_tokens: Maximum tokens for semantic chunks  
            fixed_chunk_size: Size for fixed chunking fallback
            fixed_overlap: Overlap for fixed chunking
            
        Returns:
            List of chunk dictionaries with metadata
        """
        content = doc.get('content', '')
        if not content.strip():
            return []
        
        recursive_chunks = self.recursive_chunk_text(
            content, max_tokens, semantic_max_tokens, fixed_chunk_size, fixed_overlap
        )
        
        chunk_objects = []
        for i, chunk_data in enumerate(recursive_chunks):
            chunk_text = chunk_data['content']
            if chunk_text.strip():
                chunk_obj = {
                    'chunk_id': f"{doc['url']}#recursive_chunk_{i}",
                    'source_url': doc['url'],
                    'source_title': doc.get('title', ''),
                    'content': chunk_text.strip(),
                    'chunk_index': i,
                    'token_count': chunk_data.get('token_count', len(self.tokenizer.encode(chunk_text))),
                    'char_count': len(chunk_text),
                    'strategy': 'recursive',
                    'strategy_params': {
                        'max_tokens': max_tokens,
                        'semantic_max_tokens': semantic_max_tokens,
                        'fixed_chunk_size': fixed_chunk_size,
                        'fixed_overlap': fixed_overlap
                    },
                    'recursive_info': {
                        'chunk_type': chunk_data['chunk_type'],
                        'level': chunk_data['level'],
                        'heading': chunk_data['heading'],
                        'processing_level': chunk_data['processing_level'],
                        'parent_heading': chunk_data.get('parent_heading', ''),
                        'semantic_parent': chunk_data.get('semantic_parent', '')
                    }
                }
                chunk_objects.append(chunk_obj)
        
        return chunk_objects


def load_scraped_data() -> List[Dict]:
    """Load scraped data from final.json"""
    try:
        with open('final.json', 'r', encoding='utf-8') as f:
            scraped_data = json.load(f)
        
        formatted_data = []
        for item in scraped_data:
            if 'content' in item:
                formatted_item = item
            elif 'text' in item:
                formatted_item = {
                    "url": item.get("url", "unknown"),
                    "title": item.get("title", item.get("url", "unknown")),
                    "content": item["text"]
                }
            else:
                continue
            formatted_data.append(formatted_item)
        
        return formatted_data
        
    except FileNotFoundError:
        print("❌ final.json not found")
        return []
    except Exception as e:
        print(f"❌ Error loading final.json: {e}")
        return []


def create_output_directories():
    """Create necessary output directories"""
    directories = [
        'data/chunks/recursive',
        'results/ablations'
    ]
    
    for directory in directories:
        os.makedirs(directory, exist_ok=True)


def main():
    """Main execution function"""
    # Create output directories
    create_output_directories()
    
    # Initialize recursive chunker
    print("🚀 Initializing Recursive Chunker...")
    recursive_chunker = RecursiveChunker()

    # Load data
    documents = load_scraped_data()

    if not documents:
        print("❌ No documents loaded. Please check your final.json file.")
        return
    
    print(f"\n✅ Loaded {len(documents)} documents for recursive chunking")
    
    # Define recursive chunking configurations to test
    recursive_configs = [
        {
            "name": "Recursive_Balanced",
            "max_tokens": 1024,
            "semantic_max_tokens": 512,
            "fixed_chunk_size": 256,
            "fixed_overlap": 64
        },
        {
            "name": "Recursive_Large", 
            "max_tokens": 1536,
            "semantic_max_tokens": 768,
            "fixed_chunk_size": 384,
            "fixed_overlap": 96
        },
        {
            "name": "Recursive_Small",
            "max_tokens": 768,
            "semantic_max_tokens": 384,
            "fixed_chunk_size": 192,
            "fixed_overlap": 48
        }
    ]
    
    # Process documents with each recursive configuration
    recursive_results = {}
    
    print("\n🔄 Processing documents with recursive chunking strategies...")
    print("=" * 75)
    
    for config in recursive_configs:
        config_name = config["name"]
        max_tokens = config["max_tokens"]
        semantic_max_tokens = config["semantic_max_tokens"] 
        fixed_chunk_size = config["fixed_chunk_size"]
        fixed_overlap = config["fixed_overlap"]
        
        print(f"\n📝 Processing: {config_name}")
        print(f"   Max tokens: {max_tokens}")
        print(f"   Semantic max tokens: {semantic_max_tokens}")
        print(f"   Fixed chunk size: {fixed_chunk_size}")
        print(f"   Fixed overlap: {fixed_overlap}")
        
        start_time = time.time()
        all_chunks = []
        
        for doc_idx, doc in enumerate(documents):
            chunks = recursive_chunker.process_document(
                doc, max_tokens, semantic_max_tokens, fixed_chunk_size, fixed_overlap
            )
            all_chunks.extend(chunks)
            print(f"   Document {doc_idx + 1}: Generated {len(chunks)} chunks")
        
        processing_time = time.time() - start_time
        
        # Calculate statistics
        total_chunks = len(all_chunks)
        total_tokens = sum(chunk['token_count'] for chunk in all_chunks)
        avg_tokens_per_chunk = total_tokens / total_chunks if total_chunks > 0 else 0
        
        # Count different processing levels and chunk types
        processing_levels = {}
        chunk_types = {}
        for chunk in all_chunks:
            recursive_info = chunk.get('recursive_info', {})
            processing_level = recursive_info.get('processing_level', 'unknown')
            chunk_type = recursive_info.get('chunk_type', 'unknown')
            
            processing_levels[processing_level] = processing_levels.get(processing_level, 0) + 1
            chunk_types[chunk_type] = chunk_types.get(chunk_type, 0) + 1
        
        recursive_results[config_name] = {
            'chunks': all_chunks,
            'total_chunks': total_chunks,
            'total_tokens': total_tokens,
            'avg_tokens_per_chunk': avg_tokens_per_chunk,
            'processing_time': processing_time,
            'config': config,
            'processing_levels': processing_levels,
            'chunk_types': chunk_types
        }
        
        print(f"   ✅ Generated {total_chunks} chunks")
        print(f"   ⏱️  Processing time: {processing_time:.2f}s")
        print(f"   📊 Average tokens per chunk: {avg_tokens_per_chunk:.1f}")
        print(f"   🔄 Processing levels: {list(processing_levels.keys())}")
        print(f"   📋 Chunk types: {list(chunk_types.keys())}")
        
        # Save chunks to file
        output_file = f"data/chunks/recursive/{config_name}.json"
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(all_chunks, f, indent=2, ensure_ascii=False, default=str)
        print(f"   💾 Saved to: {output_file}")
    
    print("\n" + "=" * 75)
    print("✅ Recursive chunking completed for all configurations!")
    
    # Create summary comparison
    print("\n📈 RECURSIVE CHUNKING SUMMARY:")
    print("=" * 100)
    
    recursive_summary = []
    for name, result in recursive_results.items():
        config = result['config']
        recursive_summary.append({
            'Strategy': name,
            'Max Tokens': config['max_tokens'],
            'Semantic Max': config['semantic_max_tokens'],
            'Fixed Size': config['fixed_chunk_size'],
            'Fixed Overlap': config['fixed_overlap'],
            'Total Chunks': result['total_chunks'],
            'Avg Tokens/Chunk': f"{result['avg_tokens_per_chunk']:.1f}",
            'Processing Time (s)': f"{result['processing_time']:.2f}",
            'Processing Levels': len(result['processing_levels'])
        })
    
    recursive_df = pd.DataFrame(recursive_summary)
    print(recursive_df.to_string(index=False))
    
    # Save summary
    recursive_df.to_csv('results/ablations/recursive_chunking_summary.csv', index=False)
    print(f"\n💾 Summary saved to: results/ablations/recursive_chunking_summary.csv")
    
    # Show sample chunks with recursive information
    print("\n🔍 SAMPLE CHUNKS WITH RECURSIVE INFO:")
    print("=" * 100)
    for strategy_name, result in recursive_results.items():
        if result['chunks']:
            sample_chunk = result['chunks'][0]
            recursive_info = sample_chunk.get('recursive_info', {})
            print(f"\n📄 {strategy_name}:")
            print(f"   Tokens: {sample_chunk['token_count']}")
            print(f"   Processing Level: {recursive_info.get('processing_level', 'N/A')}")
            print(f"   Chunk Type: {recursive_info.get('chunk_type', 'N/A')}")
            print(f"   Level: {recursive_info.get('level', 'N/A')}")
            print(f"   Heading: {recursive_info.get('heading', 'N/A')}")
            print(f"   Parent Heading: {recursive_info.get('parent_heading', 'N/A')}")
            print(f"   Content preview: {sample_chunk['content'][:200]}...")
            print(f"   Source: {sample_chunk['source_title']}")


if __name__ == "__main__":
    main()

🚀 Initializing Recursive Chunker...
✅ Recursive chunker initialized

✅ Loaded 139 documents for recursive chunking

🔄 Processing documents with recursive chunking strategies...

📝 Processing: Recursive_Balanced
   Max tokens: 1024
   Semantic max tokens: 512
   Fixed chunk size: 256
   Fixed overlap: 64
   Document 1: Generated 5 chunks
   Document 2: Generated 2 chunks
   Document 3: Generated 31 chunks
   Document 4: Generated 35 chunks
   Document 5: Generated 35 chunks
   Document 6: Generated 31 chunks
   Document 7: Generated 50 chunks
   Document 8: Generated 4 chunks
   Document 9: Generated 87 chunks
   Document 10: Generated 12 chunks
   Document 11: Generated 11 chunks
   Document 12: Generated 14 chunks
   Document 13: Generated 49 chunks
   Document 14: Generated 11 chunks
   Document 15: Generated 1 chunks
   Document 16: Generated 1 chunks
   Document 17: Generated 62 chunks
   Document 18: Generated 1 chunks
   Document 19: Generated 1 chunks
   Document 20: Generated 1

# LLM Chunking

In [None]:
!pip install python-dotenv google-generativeai

Collecting python-dotenv
  Using cached python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Collecting google-ai-generativelanguage==0.6.15 (from google-generativeai)
  Using cached google_ai_generativelanguage-0.6.15-py3-none-any.whl.metadata (5.7 kB)
Using cached python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Using cached google_ai_generativelanguage-0.6.15-py3-none-any.whl (1.3 MB)
Installing collected packages: python-dotenv, google-ai-generativelanguage
  Attempting uninstall: google-ai-generativelanguage
    Found existing installation: google-ai-generativelanguage 0.7.0
    Uninstalling google-ai-generativelanguage-0.7.0:
      Successfully uninstalled google-ai-generativelanguage-0.7.0
Successfully installed google-ai-generativelanguage-0.6.15 python-dotenv-1.1.1


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
langchain-google-genai 2.1.12 requires google-ai-generativelanguage<1,>=0.7, but you have google-ai-generativelanguage 0.6.15 which is incompatible.

[notice] A new release of pip is available: 24.2 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# =============================================================================
# LLM-BASED CHUNKING IMPLEMENTATION
# Strategy 5: Instruction-aware segmentation using Gemini 2.0 Flash-Lite
# =============================================================================

import re
import json
import time
import os
from typing import List, Dict, Tuple, Optional
import pandas as pd
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Mock tokenizer for demonstration - replace with your actual tokenizer
class MockTokenizer:
    def encode(self, text: str) -> List[int]:
        """Simple mock tokenizer - replace with actual tokenizer"""
        return text.split()  # Simple word-based tokenization

# Initialize mock tokenizer - replace with your actual tokenizer
tokenizer = MockTokenizer()

class LLMChunker:
    """
    Implementation of LLM-based chunking using Gemini 2.0 Flash-Lite
    for instruction-aware segmentation with cost vs quality analysis
    """
    
    def __init__(self, api_key: str):
        """Initialize LLM chunker with Gemini API key"""
        self.tokenizer = tokenizer
        self.api_key = api_key
        
        # Configure Gemini
        genai.configure(api_key=api_key)
        
        # Initialize Gemini 2.0 Flash-Lite model
        self.model = genai.GenerativeModel(
        "gemini-2.0-flash-lite-001",  # consistent with test script
        generation_config={
            "temperature": 0.1,   # Low temperature for consistent chunking
            "top_p": 0.8,
            "top_k": 40,
            "max_output_tokens": 8192,
        },
        safety_settings={
            HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
            HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
            HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
            HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
        }
    )
        
        # Cost tracking
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.total_requests = 0
        
        print("✅ LLM chunker initialized with Gemini 2.0 Flash-Lite")
    
    def create_chunking_prompt(self, document_content: str, document_title: str = "", 
                              max_chunk_tokens: int = 512) -> str:
        """
        Create a comprehensive chunking prompt using the specified format:
        Persona, Instruction, Context, Format, Audience, Tone, Data
        """
        
        prompt = f"""**PERSONA:**
You are an expert document processing specialist with deep expertise in information architecture, content analysis, and knowledge management systems. You have extensive experience in preparing documents for retrieval-augmented generation (RAG) systems, particularly for customer support chatbots in the fintech domain.

**INSTRUCTION:**
Analyze the provided JioPay customer support document and intelligently segment it into coherent, semantically meaningful chunks. Each chunk should:
1. Maintain complete contextual meaning and be self-contained
2. Preserve logical flow and relationships between concepts
3. Respect natural topic boundaries and information hierarchies  
4. Optimize for retrieval relevance in customer support scenarios
5. Stay within approximately {max_chunk_tokens} tokens per chunk
6. Include sufficient context to be understood independently

**CONTEXT:**
This document contains JioPay customer support information including onboarding procedures, payment processes, KYC requirements, API documentation, troubleshooting guides, and policy information. The chunks will be used in a RAG system to answer customer queries about JioPay services. Each chunk must be retrievable and provide complete answers to potential customer questions.

**FORMAT:**
Return your response as a valid JSON array where each chunk object contains:
```json
[
  {{
    "chunk_number": 1,
    "heading": "Descriptive heading that captures the main topic",
    "content": "The actual chunk content with complete sentences and context",
    "topic_category": "Primary category (onboarding/payments/kyc/api/security/troubleshooting/policies)",
    "key_concepts": ["concept1", "concept2", "concept3"],
    "chunk_type": "information_type (procedural/declarative/troubleshooting/reference)",
    "estimated_tokens": 450,
    "relevance_keywords": ["keyword1", "keyword2", "keyword3"]
  }}
]
```

**AUDIENCE:**
The chunks will serve customer support representatives and automated systems answering queries from JioPay business customers who need specific, actionable information about account setup, payment processing, compliance requirements, and technical integration.

**TONE:**
Maintain a professional, precise, and analytical approach. Focus on clarity and accuracy. Preserve the original document's authoritative tone while ensuring each chunk provides complete, actionable information.

**DATA:**
Document Title: "{document_title}"
Document Content:
---
{document_content}
---

Please analyze the above document and create optimal chunks following the specified format. Ensure each chunk is self-contained, semantically coherent, and valuable for customer support retrieval."""

        return prompt
    
    def estimate_tokens(self, text: str) -> int:
        """Estimate token count for cost calculation"""
        return len(self.tokenizer.encode(text))
    
    def chunk_with_llm(self, document_content: str, document_title: str = "",
                       max_chunk_tokens: int = 512) -> Tuple[List[Dict], Dict]:
        """
        Use Gemini to intelligently chunk the document content
        
        Returns:
            Tuple of (chunks_list, cost_info)
        """
        try:
            # Create the chunking prompt
            prompt = self.create_chunking_prompt(document_content, document_title, max_chunk_tokens)
            
            # Estimate input cost
            input_tokens = self.estimate_tokens(prompt)
            self.total_input_tokens += input_tokens
            
            print(f"   Sending request to Gemini (estimated {input_tokens} input tokens)...")
            
            # Send request to Gemini
            start_time = time.time()
            response = self.model.generate_content(prompt)  # prompt is string
            request_time = time.time() - start_time
            
            self.total_requests += 1
            
            if not response.text:
                print("   ❌ Empty response from Gemini")
                return [], {"error": "Empty response", "request_time": request_time}
            
            # Estimate output tokens
            output_tokens = self.estimate_tokens(response.text)
            self.total_output_tokens += output_tokens
            
            # Parse JSON response
            try:
                # Clean response text and extract JSON
                response_text = response.text.strip()
                
                # Find JSON array in response
                json_start = response_text.find('[')
                json_end = response_text.rfind(']') + 1
                
                if json_start == -1 or json_end == 0:
                    print("   ❌ No JSON array found in response")
                    return [], {"error": "No JSON found", "request_time": request_time}
                
                json_text = response_text[json_start:json_end]
                chunks_data = json.loads(json_text)
                
                # Validate and process chunks
                processed_chunks = []
                for i, chunk in enumerate(chunks_data):
                    if not isinstance(chunk, dict):
                        continue
                    
                    processed_chunk = {
                        'content': chunk.get('content', '').strip(),
                        'heading': chunk.get('heading', f'Chunk {i+1}'),
                        'topic_category': chunk.get('topic_category', 'general'),
                        'key_concepts': chunk.get('key_concepts', []),
                        'chunk_type': chunk.get('chunk_type', 'declarative'),
                        'estimated_tokens': chunk.get('estimated_tokens', 0),
                        'relevance_keywords': chunk.get('relevance_keywords', []),
                        'level': 1,  # LLM-determined chunks get high priority
                        'llm_generated': True
                    }
                    
                    if processed_chunk['content']:
                        processed_chunks.append(processed_chunk)
                
                cost_info = {
                    'input_tokens': input_tokens,
                    'output_tokens': output_tokens,
                    'request_time': request_time,
                    'chunks_generated': len(processed_chunks),
                    'success': True
                }
                
                print(f"   ✅ Generated {len(processed_chunks)} chunks in {request_time:.2f}s")
                return processed_chunks, cost_info
                
            except json.JSONDecodeError as e:
                print(f"   ❌ JSON parsing error: {e}")
                print(f"   Response preview: {response.text[:500]}...")
                return [], {"error": f"JSON parsing: {e}", "request_time": request_time}
                
        except Exception as e:
            print(f"   ❌ Error calling Gemini: {e}")
            return [], {"error": str(e), "request_time": 0}
    
    def fallback_chunking(self, text: str, max_tokens: int = 512) -> List[Dict]:
        """
        Fallback chunking when LLM fails (simple paragraph-based)
        """
        paragraphs = re.split(r'\n\s*\n', text)
        chunks = []
        current_chunk = ""
        chunk_num = 1
        
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            
            test_chunk = current_chunk + "\n\n" + para if current_chunk else para
            test_tokens = len(self.tokenizer.encode(test_chunk))
            
            if test_tokens > max_tokens and current_chunk:
                chunks.append({
                    'content': current_chunk.strip(),
                    'heading': f'Fallback Chunk {chunk_num}',
                    'topic_category': 'general',
                    'key_concepts': [],
                    'chunk_type': 'fallback',
                    'estimated_tokens': len(self.tokenizer.encode(current_chunk)),
                    'relevance_keywords': [],
                    'level': 5,  # Lower priority for fallback
                    'llm_generated': False
                })
                current_chunk = para
                chunk_num += 1
            else:
                current_chunk = test_chunk
        
        if current_chunk.strip():
            chunks.append({
                'content': current_chunk.strip(),
                'heading': f'Fallback Chunk {chunk_num}',
                'topic_category': 'general',
                'key_concepts': [],
                'chunk_type': 'fallback',
                'estimated_tokens': len(self.tokenizer.encode(current_chunk)),
                'relevance_keywords': [],
                'level': 5,
                'llm_generated': False
            })
        
        return chunks
    
    def llm_chunk_text(self, text: str, title: str = "", max_chunk_tokens: int = 512) -> Tuple[List[Dict], Dict]:
        """
        Main LLM chunking method
        
        Args:
            text: Input text to chunk
            title: Document title for context
            max_chunk_tokens: Maximum tokens per chunk
            
        Returns:
            Tuple of (chunks_list, cost_info)
        """
        if not text.strip():
            return [], {"error": "Empty text", "cost": 0}
        
        # Try LLM chunking first
        chunks, cost_info = self.chunk_with_llm(text, title, max_chunk_tokens)
        
        if not chunks or not cost_info.get('success', False):
            print("   🔄 LLM chunking failed, using fallback method...")
            chunks = self.fallback_chunking(text, max_chunk_tokens)
            cost_info['fallback_used'] = True
        
        return chunks, cost_info
    
    def process_document(self, doc: Dict, max_chunk_tokens: int = 512) -> Tuple[List[Dict], Dict]:
        """
        Process a single document with LLM-based chunking
        
        Args:
            doc: Document dictionary with url, title, content
            max_chunk_tokens: Maximum tokens per chunk
            
        Returns:
            Tuple of (chunk_objects_list, cost_info)
        """
        content = doc.get('content', '')
        title = doc.get('title', '')
        
        if not content.strip():
            return [], {"error": "Empty content", "cost": 0}
        
        llm_chunks, cost_info = self.llm_chunk_text(content, title, max_chunk_tokens)
        
        chunk_objects = []
        for i, chunk_data in enumerate(llm_chunks):
            chunk_text = chunk_data['content']
            if chunk_text.strip():
                chunk_obj = {
                    'chunk_id': f"{doc['url']}#llm_chunk_{i}",
                    'source_url': doc['url'],
                    'source_title': doc.get('title', ''),
                    'content': chunk_text.strip(),
                    'chunk_index': i,
                    'token_count': len(self.tokenizer.encode(chunk_text)),
                    'char_count': len(chunk_text),
                    'strategy': 'llm_based',
                    'strategy_params': {
                        'max_chunk_tokens': max_chunk_tokens,
                        'model': 'gemini-2.0-flash-exp',
                        'temperature': 0.1
                    },
                    'llm_info': {
                        'heading': chunk_data['heading'],
                        'topic_category': chunk_data['topic_category'],
                        'key_concepts': chunk_data['key_concepts'],
                        'chunk_type': chunk_data['chunk_type'],
                        'estimated_tokens': chunk_data['estimated_tokens'],
                        'relevance_keywords': chunk_data['relevance_keywords'],
                        'level': chunk_data['level'],
                        'llm_generated': chunk_data['llm_generated']
                    }
                }
                chunk_objects.append(chunk_obj)
        
        return chunk_objects, cost_info
    
    def get_cost_summary(self) -> Dict:
        """Get comprehensive cost analysis"""
        # Gemini pricing (approximate - update with actual pricing)
        input_cost_per_1k = 0.000125  # $0.000125 per 1K input tokens
        output_cost_per_1k = 0.000375  # $0.000375 per 1K output tokens
        
        input_cost = (self.total_input_tokens / 1000) * input_cost_per_1k
        output_cost = (self.total_output_tokens / 1000) * output_cost_per_1k
        total_cost = input_cost + output_cost
        
        return {
            'total_requests': self.total_requests,
            'total_input_tokens': self.total_input_tokens,
            'total_output_tokens': self.total_output_tokens,
            'total_tokens': self.total_input_tokens + self.total_output_tokens,
            'input_cost_usd': input_cost,
            'output_cost_usd': output_cost,
            'total_cost_usd': total_cost,
            'avg_tokens_per_request': (self.total_input_tokens + self.total_output_tokens) / max(1, self.total_requests),
            'cost_per_request_usd': total_cost / max(1, self.total_requests)
        }


def load_scraped_data() -> List[Dict]:
    """Load scraped data from final.json"""
    try:
        with open('final.json', 'r', encoding='utf-8') as f:
            scraped_data = json.load(f)
        
        formatted_data = []
        for item in scraped_data:
            if 'content' in item:
                formatted_item = item
            elif 'text' in item:
                formatted_item = {
                    "url": item.get("url", "unknown"),
                    "title": item.get("title", item.get("url", "unknown")),
                    "content": item["text"]
                }
            else:
                continue
            formatted_data.append(formatted_item)
        
        return formatted_data
        
    except FileNotFoundError:
        print("❌ final.json not found")
        return []
    except Exception as e:
        print(f"❌ Error loading final.json: {e}")
        return []


def create_output_directories():
    """Create necessary output directories"""
    directories = [
        'data/chunks/llm_based',
        'results/ablations'
    ]
    
    for directory in directories:
        os.makedirs(directory, exist_ok=True)


def main():
    """Main execution function"""
    # Load API key from .env file
    api_key = os.getenv('GEMINI_API_KEY')
    if not api_key:
        print("❌ GEMINI_API_KEY not found in .env file")
        print("   Please add your API key to .env file: GEMINI_API_KEY=your-api-key-here")
        return
    
    # Create output directories
    create_output_directories()
    
    # Initialize LLM chunker
    print("🚀 Initializing LLM Chunker with Gemini 2.0 Flash-Lite...")
    llm_chunker = LLMChunker(api_key)

    # Load data
    documents = load_scraped_data()

    if not documents:
        print("❌ No documents loaded. Please check your final.json file.")
        return
    
    print(f"\n✅ Loaded {len(documents)} documents for LLM-based chunking")
    
    # Define LLM chunking configurations to test
    llm_configs = [
        # {
        #     "name": "LLM_Small_Chunks",
        #     "max_chunk_tokens": 256
        # },
        {
            "name": "LLM_Medium_Chunks", 
            "max_chunk_tokens": 512
        },
        # {
        #     "name": "LLM_Large_Chunks",
        #     "max_chunk_tokens": 768
        # }
    ]
    
    # Process documents with each LLM configuration
    llm_results = {}
    
    print("\n🔄 Processing documents with LLM-based chunking strategies...")
    print("=" * 75)
    
    for config in llm_configs:
        config_name = config["name"]
        max_chunk_tokens = config["max_chunk_tokens"]
        
        print(f"\n📝 Processing: {config_name}")
        print(f"   Max chunk tokens: {max_chunk_tokens}")
        
        start_time = time.time()
        all_chunks = []
        total_cost_info = {
            'total_input_tokens': 0,
            'total_output_tokens': 0,
            'total_requests': 0,
            'fallback_used_count': 0,
            'successful_requests': 0
        }
        
        for doc_idx, doc in enumerate(documents):
            print(f"   Processing document {doc_idx + 1}/{len(documents)}...")
            chunks, cost_info = llm_chunker.process_document(doc, max_chunk_tokens)
            all_chunks.extend(chunks)
            
            # Aggregate cost info
            total_cost_info['total_input_tokens'] += cost_info.get('input_tokens', 0)
            total_cost_info['total_output_tokens'] += cost_info.get('output_tokens', 0)
            total_cost_info['total_requests'] += 1
            if cost_info.get('fallback_used'):
                total_cost_info['fallback_used_count'] += 1
            if cost_info.get('success'):
                total_cost_info['successful_requests'] += 1
            
            print(f"     Generated {len(chunks)} chunks")
        
        processing_time = time.time() - start_time
        
        # Calculate statistics
        total_chunks = len(all_chunks)
        total_tokens = sum(chunk['token_count'] for chunk in all_chunks)
        avg_tokens_per_chunk = total_tokens / total_chunks if total_chunks > 0 else 0
        
        # Count different categories and types
        topic_categories = {}
        chunk_types = {}
        llm_generated_count = 0
        
        for chunk in all_chunks:
            llm_info = chunk.get('llm_info', {})
            topic_category = llm_info.get('topic_category', 'unknown')
            chunk_type = llm_info.get('chunk_type', 'unknown')
            
            topic_categories[topic_category] = topic_categories.get(topic_category, 0) + 1
            chunk_types[chunk_type] = chunk_types.get(chunk_type, 0) + 1
            
            if llm_info.get('llm_generated', False):
                llm_generated_count += 1
        
        # Get cost summary
        cost_summary = llm_chunker.get_cost_summary()
        
        llm_results[config_name] = {
            'chunks': all_chunks,
            'total_chunks': total_chunks,
            'total_tokens': total_tokens,
            'avg_tokens_per_chunk': avg_tokens_per_chunk,
            'processing_time': processing_time,
            'config': config,
            'topic_categories': topic_categories,
            'chunk_types': chunk_types,
            'llm_generated_count': llm_generated_count,
            'fallback_count': total_chunks - llm_generated_count,
            'cost_info': total_cost_info,
            'cost_summary': cost_summary
        }
        
        print(f"   ✅ Generated {total_chunks} chunks ({llm_generated_count} LLM, {total_chunks - llm_generated_count} fallback)")
        print(f"   ⏱️  Processing time: {processing_time:.2f}s")
        print(f"   📊 Average tokens per chunk: {avg_tokens_per_chunk:.1f}")
        print(f"   💰 Estimated cost: ${cost_summary['total_cost_usd']:.4f}")
        print(f"   📋 Topic categories: {list(topic_categories.keys())}")
        print(f"   🏷️  Chunk types: {list(chunk_types.keys())}")
        
        # Save chunks to file
        output_file = f"data/chunks/llm_based/{config_name}.json"
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(all_chunks, f, indent=2, ensure_ascii=False, default=str)
        print(f"   💾 Saved to: {output_file}")
    
    print("\n" + "=" * 75)
    print("✅ LLM-based chunking completed for all configurations!")
    
    # Create summary comparison
    print("\n📈 LLM CHUNKING SUMMARY:")
    print("=" * 100)
    
    llm_summary = []
    for name, result in llm_results.items():
        config = result['config']
        cost_summary = result['cost_summary']
        llm_summary.append({
            'Strategy': name,
            'Max Tokens': config['max_chunk_tokens'],
            'Total Chunks': result['total_chunks'],
            'LLM Generated': result['llm_generated_count'],
            'Fallback Used': result['fallback_count'],
            'Avg Tokens/Chunk': f"{result['avg_tokens_per_chunk']:.1f}",
            'Processing Time (s)': f"{result['processing_time']:.2f}",
            'Total Cost ($)': f"{cost_summary['total_cost_usd']:.4f}",
            'Cost per Chunk ($)': f"{cost_summary['total_cost_usd']/max(1, result['total_chunks']):.6f}"
        })
    
    llm_df = pd.DataFrame(llm_summary)
    print(llm_df.to_string(index=False))
    
    # Save summary
    llm_df.to_csv('results/ablations/llm_chunking_summary.csv', index=False)
    print(f"\n💾 Summary saved to: results/ablations/llm_chunking_summary.csv")
    
    # Show sample chunks with LLM information
    print("\n🔍 SAMPLE CHUNKS WITH LLM INFO:")
    print("=" * 100)
    for strategy_name, result in llm_results.items():
        if result['chunks']:
            sample_chunk = result['chunks'][0]
            llm_info = sample_chunk.get('llm_info', {})
            print(f"\n📄 {strategy_name}:")
            print(f"   Tokens: {sample_chunk['token_count']}")
            print(f"   LLM Generated: {llm_info.get('llm_generated', False)}")
            print(f"   Topic Category: {llm_info.get('topic_category', 'N/A')}")
            print(f"   Chunk Type: {llm_info.get('chunk_type', 'N/A')}")
            print(f"   Heading: {llm_info.get('heading', 'N/A')}")
            print(f"   Key Concepts: {llm_info.get('key_concepts', [])}")
            print(f"   Content preview: {sample_chunk['content'][:200]}...")
            print(f"   Source: {sample_chunk['source_title']}")


if __name__ == "__main__":
    main()

🚀 Initializing LLM Chunker with Gemini 2.0 Flash-Lite...
✅ LLM chunker initialized with Gemini 2.0 Flash-Lite

✅ Loaded 139 documents for LLM-based chunking

🔄 Processing documents with LLM-based chunking strategies...

📝 Processing: LLM_Medium_Chunks
   Max chunk tokens: 512
   Processing document 1/139...
   Sending request to Gemini (estimated 981 input tokens)...
   ✅ Generated 4 chunks in 7.32s
     Generated 4 chunks
   Processing document 2/139...
   Sending request to Gemini (estimated 547 input tokens)...
   ✅ Generated 4 chunks in 6.26s
     Generated 4 chunks
   Processing document 3/139...
   Sending request to Gemini (estimated 1680 input tokens)...
   ✅ Generated 7 chunks in 9.77s
     Generated 7 chunks
   Processing document 4/139...
   Sending request to Gemini (estimated 3138 input tokens)...
   ✅ Generated 17 chunks in 21.16s
     Generated 17 chunks
   Processing document 5/139...
   Sending request to Gemini (estimated 3137 input tokens)...
   ✅ Generated 17 chunks

# Evals

## Import and Setup

In [2]:
# =============================================================================
# UNIFIED CHUNKING EVALUATION TEST SUITE
# Combines retrieval-focused metrics with domain-specific RAG evaluation
# =============================================================================

import json
import time
import os
import re
import math
from typing import List, Dict, Tuple, Optional, Set, Any
import pandas as pd
import numpy as np
from collections import defaultdict, Counter
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Create output directories
os.makedirs("results/unified_evaluation", exist_ok=True)

## Text Analysis

In [3]:

def count_broken_sentences(text: str) -> int:
    """Count sentences that are incomplete or broken"""
    sentences = text.split('.')
    broken_count = 0

    for sentence in sentences:
        sentence = sentence.strip()
        if sentence and len(sentence) < 10:  # Very short fragments
            broken_count += 1
        elif sentence and not sentence[0].isupper():  # Doesn't start with capital
            broken_count += 1

    return broken_count

def measure_topic_consistency(text: str) -> float:
    """Measure if text maintains consistent topic/theme"""
    # JioPay domain keywords
    domain_topics = {
        'payment': ['payment', 'transaction', 'money', 'amount', 'pay', 'refund'],
        'app': ['app', 'application', 'download', 'install', 'mobile', 'android', 'ios'],
        'account': ['account', 'login', 'password', 'signin', 'logout', 'profile'],
        'business': ['business', 'merchant', 'kyc', 'documents', 'verification'],
        'support': ['help', 'support', 'issue', 'problem', 'error', 'troubleshoot'],
        'settlement': ['settlement', 'bank', 'transfer', 'utr', 'payout'],
        'collect_links': ['collect link', 'payment link', 'link validity', 'bulk collect', 'partial payment'],
        'voicebox': ['voicebox', 'voice box', 'audio', 'announcement', 'replay'],
        'transactions': ['transaction', 'payment', 'refund', 'failed'],
        'repeat_payments': ['repeat', 'recurring', 'subscription', 'mandate'],
        'campaigns': ['campaign', 'offer', 'create campaign', 'edit campaign'],
        'user_management': ['sub user', 'user management', 'block user'],
        'dqr': ['DQR', 'dynamic QR', 'store manager'],
        'partner_program': ['partner', 'commission', 'earning'],
        'p2pm_merchants': ['P2PM', 'merchant limit', 'upgrade'],
        'payment_gateway': ['payment', 'gateway', 'transaction', 'processing', 'checkout'],
        'app_usage': ['app', 'download', 'install', 'mobile', 'android', 'ios'],
        'business_setup': ['business', 'setup', 'merchant', 'onboarding', 'registration'],
        'technical_issues': ['error', 'issue', 'problem', 'troubleshoot', 'fix', 'bug'],
        'refunds': ['refund', 'return', 'cancel', 'reverse', 'chargeback'],
        'kyc_documents': ['kyc', 'documents', 'verification', 'identity', 'proof'],
        'fees_pricing': ['fee', 'charge', 'cost', 'price', 'rate', 'commission'],
        'general': []  # fallback category
    }

    text_lower = text.lower()
    topic_scores = {}

    for topic, keywords in domain_topics.items():
        score = sum(1 for keyword in keywords if keyword in text_lower)
        if score > 0:
            topic_scores[topic] = score

    if not topic_scores:
        return 0.5  # Neutral if no topics found

    # Calculate consistency (higher if dominated by one topic)
    max_score = max(topic_scores.values())
    total_score = sum(topic_scores.values())

    consistency = max_score / total_score if total_score > 0 else 0
    return consistency

def measure_thought_completeness(text: str) -> float:
    """Measure if text contains complete thoughts/ideas"""
    # Check for complete Q&A pairs
    has_question = bool(re.search(r'[Qq]:', text) or '?' in text)
    has_answer = bool(re.search(r'[Aa]:', text) or len(text.split()) > 15)

    if has_question and has_answer:
        return 1.0
    elif has_question or has_answer:
        return 0.7

    # Check for complete procedures/instructions
    procedure_indicators = ['step', 'follow', 'click', 'enter', 'select', 'go to']
    has_procedure = any(indicator in text.lower() for indicator in procedure_indicators)

    if has_procedure and len(text.split()) > 20:
        return 0.9
    elif has_procedure:
        return 0.6

    # Default based on length and structure
    sentences = text.split('.')
    complete_sentences = [s for s in sentences if len(s.strip()) > 10]

    if len(complete_sentences) >= 2:
        return 0.8
    elif len(complete_sentences) == 1:
        return 0.6
    else:
        return 0.3

## Domain Specific Metrics

In [4]:
def count_jiopay_keywords(text: str) -> int:
    """Count JioPay-specific keywords and terms"""
    jiopay_keywords = [
        'jiopay', 'jio pay', 'jiomoney', 'jio money',
        'payment gateway', 'upi', 'wallet', 'digital payment',
        'merchant', 'business account', 'kyc', 'settlement',
        'transaction', 'refund', 'chargeback', 'dispute',
        'collect link', 'payment link', 'qr code', 'dynamic qr',
        'voicebox', 'voice box', 'announcement', 'replay',
        'api', 'integration', 'webhook', 'callback'
    ]

    text_lower = text.lower()
    return sum(1 for keyword in jiopay_keywords if keyword in text_lower)

def count_procedure_words(text: str) -> int:
    """Count procedural/instructional words"""
    procedure_words = [
        'click', 'tap', 'select', 'choose', 'enter', 'input',
        'go to', 'navigate', 'open', 'close', 'save', 'submit',
        'step', 'follow', 'complete', 'finish', 'start', 'begin'
    ]

    text_lower = text.lower()
    return sum(1 for word in procedure_words if word in text_lower)

# Mock embedding function
def get_embeddings(texts: List[str]) -> np.ndarray:
    """Mock embedding function using TF-IDF"""
    vectorizer = TfidfVectorizer(max_features=384, stop_words='english')
    if len(texts) == 0:
        return np.array([])
    embeddings = vectorizer.fit_transform(texts).toarray()
    return embeddings


## Test Data Setup

In [5]:
def create_test_queries() -> List[Dict]:
    """Create comprehensive test queries for JioPay domain"""
    return [
        # Onboarding queries
        {
            "id": "q1",
            "query": "How do I register for a JioPay business account?",
            "category": "onboarding",
            "expected_topics": ["registration", "business", "account", "setup"],
            "complexity": "simple"
        },
        {
            "id": "q2", 
            "query": "What documents are required for KYC verification during business onboarding?",
            "category": "onboarding",
            "expected_topics": ["kyc", "documents", "verification", "business"],
            "complexity": "medium"
        },
        
        # Payment processing queries
        {
            "id": "q3",
            "query": "How does payment processing work for merchants?",
            "category": "payments",
            "expected_topics": ["payment", "processing", "merchant"],
            "complexity": "medium"
        },
        {
            "id": "q4",
            "query": "What are the settlement timelines and how do refunds work?",
            "category": "payments", 
            "expected_topics": ["settlement", "refund", "timeline"],
            "complexity": "complex"
        },
        
        # KYC and compliance
        {
            "id": "q5",
            "query": "What identity documents are accepted for KYC?",
            "category": "kyc",
            "expected_topics": ["kyc", "identity", "documents"],
            "complexity": "simple"
        },
        {
            "id": "q6",
            "query": "How long does the KYC verification process take?",
            "category": "kyc",
            "expected_topics": ["kyc", "verification", "timeline"],
            "complexity": "simple"
        },
        
        # API and integration
        {
            "id": "q7",
            "query": "How do I integrate JioPay payment gateway API?",
            "category": "api",
            "expected_topics": ["api", "integration", "gateway"],
            "complexity": "complex"
        },
        {
            "id": "q8",
            "query": "What are the API endpoints for payment processing?",
            "category": "api",
            "expected_topics": ["api", "endpoints", "payment"],
            "complexity": "medium"
        },
        
        # Security and troubleshooting
        {
            "id": "q9",
            "query": "What security measures are in place for transactions?",
            "category": "security",
            "expected_topics": ["security", "transaction", "protection"],
            "complexity": "medium"
        },
        {
            "id": "q10",
            "query": "How do I resolve failed payment transactions?",
            "category": "troubleshooting",
            "expected_topics": ["troubleshooting", "failed", "payment"],
            "complexity": "complex"
        },
        
        # Pricing and fees
        {
            "id": "q11", 
            "query": "What are the transaction fees for different payment methods?",
            "category": "pricing",
            "expected_topics": ["fees", "pricing", "transaction"],
            "complexity": "medium"
        },
        {
            "id": "q12",
            "query": "Are there any setup or monthly maintenance charges?",
            "category": "pricing",
            "expected_topics": ["charges", "setup", "maintenance"],
            "complexity": "simple"
        },
        
        # JioPay specific features
        {
            "id": "q13",
            "query": "How do I use collect links for payment collection?",
            "category": "features",
            "expected_topics": ["collect", "link", "payment"],
            "complexity": "medium"
        },
        {
            "id": "q14",
            "query": "What is voicebox and how do I configure announcements?",
            "category": "features",
            "expected_topics": ["voicebox", "announcement", "audio"],
            "complexity": "medium"
        },
        {
            "id": "q15",
            "query": "How do I create and manage dynamic QR codes?",
            "category": "features",
            "expected_topics": ["qr", "dynamic", "code"],
            "complexity": "medium"
        }
    ]

def create_ground_truth(test_queries) -> Dict:
    """Create ground truth relevance mappings"""
    return {
        query["id"]: query["expected_topics"] 
        for query in test_queries
    }

# Initialize test data
test_queries = create_test_queries()
ground_truth = create_ground_truth(test_queries)

print(f"Initialized {len(test_queries)} test queries")
print(f"Ground truth mappings: {len(ground_truth)}")

Initialized 15 test queries
Ground truth mappings: 15


## Core Evaluator Class - Part 1 (Initialization & Data Loading)

In [6]:
class UnifiedChunkingEvaluator:
    """
    Comprehensive evaluation framework combining:
    1. Retrieval-focused metrics (precision, recall, latency)
    2. Domain-specific RAG metrics (semantic coherence, content quality)
    """
    
    def __init__(self, test_queries=None, ground_truth=None):
        """Initialize the unified evaluator"""
        self.test_queries = test_queries or create_test_queries()
        self.ground_truth = ground_truth or create_ground_truth(self.test_queries)
        self.evaluation_results = {}
        
        print("Unified Chunking Evaluator initialized")
        print(f"Test queries: {len(self.test_queries)}")
        print(f"Ground truth mappings: {len(self.ground_truth)}")
    
    def load_chunked_data(self, strategy_name: str, file_path: str) -> List[Dict]:
        """Load chunked data from JSON file"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            # Handle different data formats
            if isinstance(data, dict) and 'chunks' in data:
                chunks = data['chunks']
            elif isinstance(data, list):
                chunks = data
            else:
                print(f"Warning: Unexpected data format in {file_path}")
                return []
            
            # Normalize chunk format
            normalized_chunks = []
            for chunk in chunks:
                if isinstance(chunk, dict):
                    # Handle different chunk formats
                    content = chunk.get('content', chunk.get('text', ''))
                    if content:
                        normalized_chunk = {
                            'content': content,
                            'text': content,  # For compatibility
                            'token_count': chunk.get('token_count', len(content.split())),
                            'char_count': chunk.get('char_count', len(content)),
                            'type': chunk.get('type', 'unknown'),
                            'source': chunk.get('source', 'unknown'),
                            'chunk_id': chunk.get('chunk_id', chunk.get('id', len(normalized_chunks)))
                        }
                        normalized_chunks.append(normalized_chunk)
                        
            print(f"Loaded {len(normalized_chunks)} chunks for {strategy_name}")
            return normalized_chunks
            
        except FileNotFoundError:
            print(f"File not found: {file_path}")
            return []
        except Exception as e:
            print(f"Error loading {file_path}: {e}")
            return []

# Initialize evaluator
evaluator = UnifiedChunkingEvaluator(test_queries, ground_truth)

Unified Chunking Evaluator initialized
Test queries: 15
Ground truth mappings: 15


## Core Evaluator Class - Part 2 (RAG Quality Metrics)

In [7]:
def evaluate_semantic_coherence(self, chunks: List[Dict]) -> float:
    """Measure how well chunks preserve semantic meaning"""
    if not chunks:
        return 0.0

    coherence_scores = []

    for chunk in chunks:
        text = chunk.get('content', chunk.get('text', ''))
        if not text:
            continue

        # Check for broken sentences
        broken_sentence_penalty = count_broken_sentences(text) * 0.1

        # Check for topic consistency
        topic_consistency = measure_topic_consistency(text)

        # Check for complete thoughts
        completeness_score = measure_thought_completeness(text)

        chunk_score = (topic_consistency + completeness_score) / 2 - broken_sentence_penalty
        coherence_scores.append(max(0, min(1, chunk_score)))

    return np.mean(coherence_scores) if coherence_scores else 0.0

def evaluate_context_completeness(self, chunks: List[Dict]) -> float:
    """Measure if chunks contain complete contextual information"""
    if not chunks:
        return 0.0

    context_scores = []

    for chunk in chunks:
        text = chunk.get('content', chunk.get('text', ''))
        chunk_type = chunk.get('type', 'unknown')

        if 'faq' in chunk_type.lower():
            # FAQ should contain complete Q&A pairs
            has_question = 'Q:' in text or '?' in text
            has_answer = 'A:' in text or len(text.split()) > 10
            context_scores.append(1.0 if (has_question and has_answer) else 0.5)

        elif 'pdf' in chunk_type.lower():
            # Policy docs should have complete sections
            word_count = len(text.split())
            if word_count < 30:  # Too fragmented
                context_scores.append(0.3)
            elif word_count > 200:  # Good context
                context_scores.append(1.0)
            else:
                context_scores.append(0.7)

        else:  # Web content
            word_count = len(text.split())
            if word_count < 20:
                context_scores.append(0.4)
            elif word_count > 100:
                context_scores.append(0.9)
            else:
                context_scores.append(0.7)

    return np.mean(context_scores) if context_scores else 0.0

def evaluate_information_density(self, chunks: List[Dict]) -> float:
    """Measure information value per token"""
    if not chunks:
        return 0.0

    density_scores = []

    for chunk in chunks:
        text = chunk.get('content', chunk.get('text', ''))
        tokens = chunk.get('token_count', len(text.split()))

        # Count informative elements
        keywords = count_jiopay_keywords(text)
        numbers = len(re.findall(r'\d+', text))
        procedures = count_procedure_words(text)

        info_elements = keywords + numbers + procedures
        density = info_elements / max(tokens, 1)

        # Normalize to 0-1 scale
        density_scores.append(min(1.0, density * 10))

    return np.mean(density_scores) if density_scores else 0.0

# Add these methods to the evaluator class
UnifiedChunkingEvaluator.evaluate_semantic_coherence = evaluate_semantic_coherence
UnifiedChunkingEvaluator.evaluate_context_completeness = evaluate_context_completeness
UnifiedChunkingEvaluator.evaluate_information_density = evaluate_information_density

## Core Evaluator Class - Part 3 (Domain Coverage Metrics)

In [8]:
def evaluate_topic_coverage(self, chunks: List[Dict]) -> float:
    """Evaluate how well chunks cover different JioPay topics"""
    if not chunks:
        return 0.0

    # JioPay topic categories
    topics = {
        'payments': ['payment', 'transaction', 'money', 'pay', 'refund'],
        'app_usage': ['app', 'download', 'login', 'mobile'],
        'business': ['merchant', 'business', 'kyc', 'verification'],
        'support': ['help', 'support', 'issue', 'problem'],
        'features': ['collect link', 'qr code', 'voicebox', 'settlement']
    }

    topic_coverage = {topic: 0 for topic in topics}

    for chunk in chunks:
        text = chunk.get('content', chunk.get('text', '')).lower()
        for topic, keywords in topics.items():
            if any(keyword in text for keyword in keywords):
                topic_coverage[topic] += 1

    # Calculate coverage score
    total_chunks = len(chunks)
    coverage_ratios = [min(1.0, count / max(total_chunks * 0.1, 1)) for count in topic_coverage.values()]

    return np.mean(coverage_ratios)

def evaluate_faq_grouping(self, chunks: List[Dict]) -> float:
    """Evaluate FAQ grouping quality for JioPay domain"""
    faq_chunks = [c for c in chunks if 'faq' in c.get('type', '').lower()]
    if not faq_chunks:
        return 1.0  # No FAQs to evaluate

    grouping_scores = []

    for chunk in faq_chunks:
        text = chunk.get('content', chunk.get('text', '')).lower()

        # Check for related topic grouping
        payment_terms = ['payment', 'transaction', 'refund', 'settlement', 'money']
        app_terms = ['app', 'login', 'password', 'download', 'mobile', 'install']
        business_terms = ['merchant', 'business', 'kyc', 'documents', 'verification']
        support_terms = ['help', 'support', 'issue', 'problem', 'error']
        feature_terms = ['collect link', 'qr code', 'voicebox', 'voice box']

        topic_groups = [payment_terms, app_terms, business_terms, support_terms, feature_terms]

        max_topic_score = 0
        for topic_group in topic_groups:
            topic_score = sum(1 for term in topic_group if term in text)
            max_topic_score = max(max_topic_score, topic_score)

        # Score based on topic coherence
        if max_topic_score >= 3:
            grouping_scores.append(1.0)  # Excellent grouping
        elif max_topic_score >= 2:
            grouping_scores.append(0.8)  # Good grouping
        elif max_topic_score == 1:
            grouping_scores.append(0.6)  # Partial grouping
        else:
            grouping_scores.append(0.3)  # Poor grouping

    return np.mean(grouping_scores) if grouping_scores else 1.0

# Add these methods to the evaluator class
UnifiedChunkingEvaluator.evaluate_topic_coverage = evaluate_topic_coverage
UnifiedChunkingEvaluator.evaluate_faq_grouping = evaluate_faq_grouping

## Core Evaluator Class - Part 4 (Retrieval Metrics)

In [9]:
def simulate_retrieval(self, query: str, chunks: List[Dict], top_k: int = 5) -> List[Tuple[Dict, float]]:
    """Simulate retrieval using TF-IDF similarity"""
    if not chunks:
        return []

    try:
        # Extract content from chunks
        chunk_contents = [chunk.get('content', chunk.get('text', '')) for chunk in chunks]
        all_texts = [query] + chunk_contents

        # Calculate TF-IDF vectors
        vectorizer = TfidfVectorizer(stop_words='english')
        tfidf_matrix = vectorizer.fit_transform(all_texts)

        # Calculate similarities
        query_vector = tfidf_matrix[0]
        chunk_vectors = tfidf_matrix[1:]
        similarities = cosine_similarity(query_vector, chunk_vectors).flatten()

        # Get top-k chunks with scores
        top_indices = np.argsort(similarities)[-top_k:][::-1]
        results = [(chunks[i], similarities[i]) for i in top_indices if similarities[i] > 0]

        return results

    except Exception as e:
        print(f"Error in retrieval simulation: {e}")
        return []

def evaluate_retrieval_metrics(self, query_info: Dict, chunks: List[Dict], top_k: int = 5) -> Dict:
    """Evaluate retrieval metrics (P@1, Recall@k, MRR)"""
    query = query_info['query']
    expected_topics = set(query_info['expected_topics'])

    # Simulate retrieval
    retrieved = self.simulate_retrieval(query, chunks, top_k)

    if not retrieved:
        return {
            'precision_at_1': 0.0,
            'precision_at_k': 0.0,
            'recall_at_k': 0.0,
            'mrr': 0.0,
            'retrieved_count': 0
        }

    # Calculate relevance for each retrieved chunk
    relevance_scores = []
    for chunk, score in retrieved:
        content = chunk.get('content', chunk.get('text', '')).lower()

        # Simple relevance calculation based on topic keyword presence
        matches = sum(1 for topic in expected_topics if topic in content)
        relevance = matches / len(expected_topics) if expected_topics else 0
        relevance_scores.append(relevance > 0.3)  # Threshold for relevance

    # Calculate metrics
    precision_at_1 = relevance_scores[0] if relevance_scores else 0.0
    precision_at_k = sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0.0

    # For recall, assume perfect system would retrieve all relevant chunks
    recall_at_k = min(1.0, sum(relevance_scores) / max(1, len(expected_topics)))

    # Mean Reciprocal Rank
    mrr = 0.0
    for i, relevant in enumerate(relevance_scores):
        if relevant:
            mrr = 1.0 / (i + 1)
            break

    return {
        'precision_at_1': precision_at_1,
        'precision_at_k': precision_at_k,
        'recall_at_k': recall_at_k,
        'mrr': mrr,
        'retrieved_count': len(retrieved)
    }

def benchmark_performance(self, chunks: List[Dict], num_queries: int = 50) -> Dict:
    """Benchmark retrieval performance (latency, throughput)"""
    if not chunks:
        return {'error': 'No chunks provided'}

    # Generate sample queries
    sample_queries = [q['query'] for q in self.test_queries[:min(num_queries, len(self.test_queries))]]

    latencies = []

    for i, query in enumerate(sample_queries):
        start_time = time.time()
        self.simulate_retrieval(query, chunks, top_k=5)
        latency = (time.time() - start_time) * 1000  # Convert to ms
        latencies.append(latency)

    return {
        'mean_latency_ms': np.mean(latencies),
        'median_latency_ms': np.median(latencies),
        'p95_latency_ms': np.percentile(latencies, 95),
        'p99_latency_ms': np.percentile(latencies, 99),
        'queries_per_second': 1000 / np.mean(latencies) if latencies else 0
    }

# Add these methods to the evaluator class
UnifiedChunkingEvaluator.simulate_retrieval = simulate_retrieval
UnifiedChunkingEvaluator.evaluate_retrieval_metrics = evaluate_retrieval_metrics
UnifiedChunkingEvaluator.benchmark_performance = benchmark_performance

## Core Evaluator Class - Part 5 (Main Evaluation Method)

In [10]:
def evaluate_strategy(self, strategy_name: str, chunks: List[Dict]) -> Dict[str, Any]:
    """
    Comprehensive evaluation combining both approaches
    """
    print(f"Evaluating {strategy_name}...")
    
    if not chunks:
        return {"error": "No chunks to evaluate"}
    
    results = {
        'strategy_name': strategy_name,
        'timestamp': time.time()
    }
    
    # Basic statistics
    token_counts = [chunk.get('token_count', len(chunk.get('content', chunk.get('text', '')).split())) for chunk in chunks]
    char_counts = [chunk.get('char_count', len(chunk.get('content', chunk.get('text', '')))) for chunk in chunks]
    
    results['basic_stats'] = {
        'total_chunks': len(chunks),
        'avg_tokens': np.mean(token_counts) if token_counts else 0,
        'median_tokens': np.median(token_counts) if token_counts else 0,
        'std_tokens': np.std(token_counts) if token_counts else 0,
        'min_tokens': np.min(token_counts) if token_counts else 0,
        'max_tokens': np.max(token_counts) if token_counts else 0,
        'total_tokens': sum(token_counts),
        'total_chars': sum(char_counts)
    }
    
    # 1. DOMAIN-SPECIFIC RAG METRICS (40% weight)
    print("  Evaluating domain-specific RAG metrics...")
    rag_metrics = {
        'semantic_coherence': self.evaluate_semantic_coherence(chunks),
        'context_completeness': self.evaluate_context_completeness(chunks),
        'information_density': self.evaluate_information_density(chunks),
        'topic_coverage': self.evaluate_topic_coverage(chunks),
        'faq_grouping': self.evaluate_faq_grouping(chunks)
    }
    
    results['rag_quality'] = np.mean([
        rag_metrics['semantic_coherence'],
        rag_metrics['context_completeness'],
        rag_metrics['information_density'],
        rag_metrics['topic_coverage']
    ])
    
    # 2. RETRIEVAL PERFORMANCE METRICS (35% weight)
    print("  Evaluating retrieval performance...")
    retrieval_results = []
    for query_info in self.test_queries:
        metrics = self.evaluate_retrieval_metrics(query_info, chunks)
        metrics['query_id'] = query_info['id']
        metrics['category'] = query_info['category']
        retrieval_results.append(metrics)
    
    results['retrieval_metrics'] = retrieval_results
    
    # Aggregate retrieval metrics
    avg_precision_at_1 = np.mean([m['precision_at_1'] for m in retrieval_results])
    avg_precision_at_k = np.mean([m['precision_at_k'] for m in retrieval_results])
    avg_recall_at_k = np.mean([m['recall_at_k'] for m in retrieval_results])
    avg_mrr = np.mean([m['mrr'] for m in retrieval_results])
    
    results['retrieval_performance'] = (avg_precision_at_1 + avg_precision_at_k + avg_recall_at_k + avg_mrr) / 4
    
    # 3. SIZE OPTIMIZATION METRICS (15% weight)
    print("  Evaluating size optimization...")
    excellent_range = (80, 400)
    good_range = (50, 600)
    size_scores = []
    for chunk in chunks:
        tokens = chunk.get('token_count', len(chunk.get('content', chunk.get('text', '')).split()))
        if excellent_range[0] <= tokens <= excellent_range[1]:
            score = 1.0
        elif good_range[0] <= tokens <= good_range[1]:
            score = 0.8
        elif tokens < good_range[0]:
            score = max(0.6, tokens / good_range[0])
        else:
            score = max(0.4, good_range[1] / tokens)
        size_scores.append(score)
    
    results['size_optimization'] = np.mean(size_scores) if size_scores else 0.0
    
    # 4. PERFORMANCE METRICS (10% weight)
    print("  Benchmarking performance...")
    performance = self.benchmark_performance(chunks)
    results['performance_benchmark'] = performance
    
    # Normalize performance score (higher QPS is better, lower latency is better)
    qps = performance.get('queries_per_second', 0)
    latency = performance.get('mean_latency_ms', 1000)
    results['performance_score'] = min(1.0, qps / 100) * (1000 / max(latency, 1)) * 0.001
    
    # CALCULATE WEIGHTED FINAL SCORE
    results['final_score'] = (
        results['rag_quality'] * 0.45 +
        results['retrieval_performance'] * 0.40 +
        results['size_optimization'] * 0.08 +
        results['performance_score'] * 0.07
    )
    
    # Store detailed metrics
    results['detailed_metrics'] = {
        **rag_metrics,
        'avg_precision_at_1': avg_precision_at_1,
        'avg_precision_at_k': avg_precision_at_k,
        'avg_recall_at_k': avg_recall_at_k,
        'avg_mrr': avg_mrr
    }
    
    return results

def compare_strategies(self, strategies_data: Dict[str, List[Dict]]) -> pd.DataFrame:
    """Compare multiple chunking strategies using unified evaluation"""
    print("\nRunning Unified Chunking Strategy Comparison...")
    print("=" * 70)
    
    comparison_results = []
    
    for strategy_name, chunks in strategies_data.items():
        print(f"\nProcessing {strategy_name}...")
        results = self.evaluate_strategy(strategy_name, chunks)
        
        if 'error' in results:
            print(f"Skipping {strategy_name} due to error: {results['error']}")
            continue
        
        # Store detailed results
        self.evaluation_results[strategy_name] = results
        
        # Extract key metrics for comparison table
        basic_stats = results.get('basic_stats', {})
        detailed = results.get('detailed_metrics', {})
        performance = results.get('performance_benchmark', {})
        
        row = {
            'Strategy': strategy_name,
            'Final_Score': f"{results.get('final_score', 0):.3f}",
            'RAG_Quality': f"{results.get('rag_quality', 0):.3f}",
            'Retrieval_Perf': f"{results.get('retrieval_performance', 0):.3f}",
            'Size_Opt': f"{results.get('size_optimization', 0):.3f}",
            'Performance': f"{results.get('performance_score', 0):.3f}",
            'Total_Chunks': basic_stats.get('total_chunks', 0),
            'Avg_Tokens': f"{basic_stats.get('avg_tokens', 0):.1f}",
            'Semantic_Coh': f"{detailed.get('semantic_coherence', 0):.3f}",
            'Context_Comp': f"{detailed.get('context_completeness', 0):.3f}",
            'Info_Density': f"{detailed.get('information_density', 0):.3f}",
            'Topic_Cov': f"{detailed.get('topic_coverage', 0):.3f}",
            'Precision@1': f"{detailed.get('avg_precision_at_1', 0):.3f}",
            'Recall@5': f"{detailed.get('avg_recall_at_k', 0):.3f}",
            'MRR': f"{detailed.get('avg_mrr', 0):.3f}",
            'Latency_ms': f"{performance.get('mean_latency_ms', 0):.2f}",
            'QPS': f"{performance.get('queries_per_second', 0):.1f}"
        }
        
        comparison_results.append(row)
    
    return pd.DataFrame(comparison_results)

def analyze_chunk_sizes(self, strategies_data: Dict[str, List[Dict]]) -> pd.DataFrame:
    """Analyze chunk size statistics for all strategies"""
    size_analysis = []
    
    for strategy_name, chunks in strategies_data.items():
        if not chunks:
            continue
            
        token_counts = [chunk.get('token_count', len(chunk.get('content', chunk.get('text', '')).split())) for chunk in chunks]
        
        size_stats = {
            'Strategy': strategy_name,
            'Total_Chunks': len(chunks),
            'Min_Tokens': np.min(token_counts),
            'Max_Tokens': np.max(token_counts),
            'Avg_Tokens': np.mean(token_counts),
            'Median_Tokens': np.median(token_counts),
            'Std_Tokens': np.std(token_counts)
        }
        size_analysis.append(size_stats)
    
    return pd.DataFrame(size_analysis)

# Add these methods to the evaluator class
UnifiedChunkingEvaluator.evaluate_strategy = evaluate_strategy
UnifiedChunkingEvaluator.compare_strategies = compare_strategies
UnifiedChunkingEvaluator.analyze_chunk_sizes = analyze_chunk_sizes

## Visualization Functions - Part 1

In [11]:
def generate_unified_plots(self, output_dir: str):
    """Generate unified performance plots"""
    strategies = list(self.evaluation_results.keys())
    
    # Create comprehensive metrics dashboard
    fig, axes = plt.subplots(3, 3, figsize=(20, 15))
    fig.suptitle('Unified Chunking Strategy Analysis Dashboard', fontsize=16, fontweight='bold')
    
    # Extract all metrics
    metrics_data = {}
    for strategy in strategies:
        results = self.evaluation_results[strategy]
        basic_stats = results.get('basic_stats', {})
        detailed = results.get('detailed_metrics', {})
        performance = results.get('performance_benchmark', {})
        
        metrics_data[strategy] = {
            'Final Score': results.get('final_score', 0),
            'RAG Quality': results.get('rag_quality', 0),
            'Retrieval Perf': results.get('retrieval_performance', 0),
            'Semantic Coherence': detailed.get('semantic_coherence', 0),
            'Context Completeness': detailed.get('context_completeness', 0),
            'Information Density': detailed.get('information_density', 0),
            'Topic Coverage': detailed.get('topic_coverage', 0),
            'Precision@1': detailed.get('avg_precision_at_1', 0),
            'Size Optimization': results.get('size_optimization', 0)
        }
    
    # Plot each metric
    plot_configs = [
        ('Final Score', (0, 0)),
        ('RAG Quality', (0, 1)),
        ('Retrieval Perf', (0, 2)),
        ('Semantic Coherence', (1, 0)),
        ('Context Completeness', (1, 1)),
        ('Information Density', (1, 2)),
        ('Topic Coverage', (2, 0)),
        ('Precision@1', (2, 1)),
        ('Size Optimization', (2, 2))
    ]
    
    for metric_name, (row, col) in plot_configs:
        ax = axes[row, col]
        values = [metrics_data[strategy][metric_name] for strategy in strategies]
        
        bars = ax.bar(strategies, values, color=plt.cm.viridis(np.linspace(0, 1, len(strategies))))
        ax.set_title(metric_name, fontweight='bold')
        ax.set_ylabel('Score')
        ax.tick_params(axis='x', rotation=45)
        
        # Add value labels
        for bar, value in zip(bars, values):
            ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                   f'{value:.3f}', ha='center', va='bottom', fontsize=8)
    
    plt.tight_layout()
    plt.savefig(f"{output_dir}/unified_performance_dashboard.png", dpi=300, bbox_inches='tight')
    plt.close()
    
    # Create radar chart comparison
    self.create_radar_chart(output_dir, metrics_data)

def create_radar_chart(self, output_dir: str, metrics_data: Dict):
    """Create radar chart for strategy comparison"""
    # Select key metrics for radar chart
    radar_metrics = ['RAG Quality', 'Retrieval Perf', 'Semantic Coherence', 
                    'Context Completeness', 'Information Density', 'Size Optimization']
    
    fig, ax = plt.subplots(figsize=(12, 10), subplot_kw=dict(projection='polar'))
    
    angles = np.linspace(0, 2 * np.pi, len(radar_metrics), endpoint=False)
    angles = np.concatenate((angles, [angles[0]]))  # Complete the circle
    
    colors = plt.cm.Set3(np.linspace(0, 1, len(metrics_data)))
    
    for i, (strategy, data) in enumerate(metrics_data.items()):
        values = [data[metric] for metric in radar_metrics]
        values += [values[0]]  # Complete the circle
        
        ax.plot(angles, values, 'o-', linewidth=2, label=strategy, color=colors[i])
        ax.fill(angles, values, alpha=0.1, color=colors[i])
    
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(radar_metrics)
    ax.set_ylim(0, 1)
    ax.set_title('Strategy Comparison Radar Chart', fontweight='bold', pad=20)
    ax.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
    ax.grid(True)
    
    plt.tight_layout()
    plt.savefig(f"{output_dir}/strategy_radar_chart.png", dpi=300, bbox_inches='tight')
    plt.close()

# Add these methods to the evaluator class
UnifiedChunkingEvaluator.generate_unified_plots = generate_unified_plots
UnifiedChunkingEvaluator.create_radar_chart = create_radar_chart

## Visualization Functions - Part 2

In [12]:
def generate_domain_analysis(self, output_dir: str):
    """Generate domain-specific analysis"""
    # Analyze domain coverage by strategy
    domain_analysis = defaultdict(dict)
    
    for strategy_name, results in self.evaluation_results.items():
        detailed = results.get('detailed_metrics', {})
        domain_analysis[strategy_name] = {
            'Semantic Coherence': detailed.get('semantic_coherence', 0),
            'Context Completeness': detailed.get('context_completeness', 0),
            'Information Density': detailed.get('information_density', 0),
            'Topic Coverage': detailed.get('topic_coverage', 0),
            'FAQ Grouping': detailed.get('faq_grouping', 0)
        }
    
    # Create domain analysis DataFrame
    domain_df = pd.DataFrame(domain_analysis).T
    domain_df.to_csv(f"{output_dir}/domain_specific_analysis.csv")
    
    # Create domain performance heatmap
    plt.figure(figsize=(12, 8))
    sns.heatmap(domain_df, annot=True, fmt='.3f', cmap='RdYlGn', 
               cbar_kws={'label': 'Score'})
    plt.title('Domain-Specific Performance Analysis', fontweight='bold')
    plt.ylabel('Strategy')
    plt.xlabel('Domain Metric')
    plt.tight_layout()
    plt.savefig(f"{output_dir}/domain_performance_heatmap.png", dpi=300, bbox_inches='tight')
    plt.close()

def generate_retrieval_analysis(self, output_dir: str):
    """Generate retrieval performance analysis"""
    # Analyze retrieval metrics by category
    category_analysis = defaultdict(lambda: defaultdict(list))
    
    for strategy_name, results in self.evaluation_results.items():
        retrieval_metrics = results.get('retrieval_metrics', [])
        for metric in retrieval_metrics:
            category = metric['category']
            category_analysis[category]['precision_at_1'].append(metric['precision_at_1'])
            category_analysis[category]['recall_at_k'].append(metric['recall_at_k'])
            category_analysis[category]['mrr'].append(metric['mrr'])
    
    # Create category performance summary
    category_summary = []
    for category, metrics in category_analysis.items():
        category_summary.append({
            'Category': category,
            'Avg_Precision@1': np.mean(metrics['precision_at_1']),
            'Avg_Recall@5': np.mean(metrics['recall_at_k']),
            'Avg_MRR': np.mean(metrics['mrr']),
            'Std_Precision@1': np.std(metrics['precision_at_1']),
            'Query_Count': len(metrics['precision_at_1'])
        })
    
    category_df = pd.DataFrame(category_summary)
    category_df.to_csv(f"{output_dir}/retrieval_by_category.csv", index=False)
    
    # Create category performance plot
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    categories = category_df['Category']
    metrics_to_plot = ['Avg_Precision@1', 'Avg_Recall@5', 'Avg_MRR']
    
    for i, metric in enumerate(metrics_to_plot):
        axes[i].bar(categories, category_df[metric])
        axes[i].set_title(metric)
        axes[i].set_ylabel('Score')
        axes[i].tick_params(axis='x', rotation=45)
    
    plt.suptitle('Retrieval Performance by Query Category', fontweight='bold')
    plt.tight_layout()
    plt.savefig(f"{output_dir}/retrieval_by_category.png", dpi=300, bbox_inches='tight')
    plt.close()

# Add these methods to the evaluator class
UnifiedChunkingEvaluator.generate_domain_analysis = generate_domain_analysis
UnifiedChunkingEvaluator.generate_retrieval_analysis = generate_retrieval_analysis

## Report Generation Functions

In [13]:
def generate_unified_summary_report(self, output_dir: str):
    """Generate comprehensive markdown summary report"""
    report_lines = []
    report_lines.append("# UNIFIED CHUNKING EVALUATION REPORT")
    report_lines.append("=" * 60)
    report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    report_lines.append(f"Strategies Evaluated: {len(self.evaluation_results)}")
    report_lines.append(f"Test Queries: {len(self.test_queries)}")
    report_lines.append("")
    
    # Executive Summary
    report_lines.append("## EXECUTIVE SUMMARY")
    report_lines.append("-" * 30)
    
    # Find best strategy
    best_strategy = max(self.evaluation_results.items(), 
                      key=lambda x: x[1].get('final_score', 0))
    
    if best_strategy:
        name, results = best_strategy
        report_lines.append(f"**Best Overall Strategy:** {name}")
        report_lines.append(f"- Final Score: {results.get('final_score', 0):.3f}")
        report_lines.append(f"- RAG Quality: {results.get('rag_quality', 0):.3f}")
        report_lines.append(f"- Retrieval Performance: {results.get('retrieval_performance', 0):.3f}")
        report_lines.append("")
    
    # Evaluation Methodology
    report_lines.append("## EVALUATION METHODOLOGY")
    report_lines.append("-" * 30)
    report_lines.append("This evaluation combines two complementary approaches:")
    report_lines.append("")
    report_lines.append("### Weighting Scheme:")
    report_lines.append("- **RAG Quality (40%)**: Domain-specific content quality metrics")
    report_lines.append("  - Semantic coherence, context completeness, information density, topic coverage")
    report_lines.append("- **Retrieval Performance (35%)**: Actual retrieval effectiveness")
    report_lines.append("  - Precision@1, Precision@K, Recall@K, Mean Reciprocal Rank")
    report_lines.append("- **Size Optimization (15%)**: Chunk size appropriateness")
    report_lines.append("  - Optimal size distribution for embeddings (150-600 tokens)")
    report_lines.append("- **Performance (10%)**: Processing efficiency")
    report_lines.append("  - Query latency and throughput")
    report_lines.append("")
    
    # Strategy Rankings
    report_lines.append("## STRATEGY RANKINGS")
    report_lines.append("-" * 30)
    
    rankings = sorted(self.evaluation_results.items(), 
                     key=lambda x: x[1].get('final_score', 0), reverse=True)
    
    for i, (strategy, results) in enumerate(rankings, 1):
        score = results.get('final_score', 0)
        report_lines.append(f"{i}. **{strategy}**: {score:.3f}")
    
    report_lines.append("")
    
    # Detailed Analysis
    report_lines.append("## DETAILED ANALYSIS")
    report_lines.append("-" * 30)
    
    for strategy_name, results in rankings:
        report_lines.append(f"### {strategy_name}")
        
        basic_stats = results.get('basic_stats', {})
        detailed = results.get('detailed_metrics', {})
        performance = results.get('performance_benchmark', {})
        
        report_lines.append("**Core Metrics:**")
        report_lines.append(f"- Final Score: {results.get('final_score', 0):.3f}")
        report_lines.append(f"- RAG Quality: {results.get('rag_quality', 0):.3f}")
        report_lines.append(f"- Retrieval Performance: {results.get('retrieval_performance', 0):.3f}")
        report_lines.append(f"- Size Optimization: {results.get('size_optimization', 0):.3f}")
        report_lines.append("")
        
        report_lines.append("**Chunk Statistics:**")
        report_lines.append(f"- Total Chunks: {basic_stats.get('total_chunks', 0)}")
        report_lines.append(f"- Avg Tokens: {basic_stats.get('avg_tokens', 0):.1f}")
        report_lines.append(f"- Token Std Dev: {basic_stats.get('std_tokens', 0):.1f}")
        report_lines.append("")
        
        report_lines.append("**Domain-Specific Metrics:**")
        report_lines.append(f"- Semantic Coherence: {detailed.get('semantic_coherence', 0):.3f}")
        report_lines.append(f"- Context Completeness: {detailed.get('context_completeness', 0):.3f}")
        report_lines.append(f"- Information Density: {detailed.get('information_density', 0):.3f}")
        report_lines.append(f"- Topic Coverage: {detailed.get('topic_coverage', 0):.3f}")
        report_lines.append("")
        
        report_lines.append("**Retrieval Metrics:**")
        report_lines.append(f"- Precision@1: {detailed.get('avg_precision_at_1', 0):.3f}")
        report_lines.append(f"- Recall@5: {detailed.get('avg_recall_at_k', 0):.3f}")
        report_lines.append(f"- MRR: {detailed.get('avg_mrr', 0):.3f}")
        report_lines.append("")
        
        report_lines.append("**Performance:**")
        report_lines.append(f"- Avg Latency: {performance.get('mean_latency_ms', 0):.2f} ms")
        report_lines.append(f"- Queries/Second: {performance.get('queries_per_second', 0):.1f}")
        report_lines.append("")
    
    # Recommendations
    report_lines.append("## RECOMMENDATIONS")
    report_lines.append("-" * 30)
    
    if best_strategy:
        name, results = best_strategy
        report_lines.append(f"### Primary Recommendation: {name}")
        report_lines.append(f"Use **{name}** as your primary chunking strategy for the JioPay RAG chatbot.")
        report_lines.append("")
        
        # Analyze strengths
        detailed = results.get('detailed_metrics', {})
        strengths = []
        if detailed.get('semantic_coherence', 0) > 0.8:
            strengths.append("Excellent semantic coherence")
        if detailed.get('avg_precision_at_1', 0) > 0.7:
            strengths.append("High precision retrieval")
        if detailed.get('topic_coverage', 0) > 0.8:
            strengths.append("Comprehensive topic coverage")
        
        if strengths:
            report_lines.append("**Key Strengths:**")
            for strength in strengths:
                report_lines.append(f"- {strength}")
            report_lines.append("")
    
    report_lines.append("### Implementation Guidelines:")
    report_lines.append("- Monitor retrieval performance in production")
    report_lines.append("- Consider A/B testing with top 2-3 strategies")
    report_lines.append("- Regularly evaluate with domain-specific queries")
    report_lines.append("- Adjust chunk sizes based on embedding model performance")
    report_lines.append("")
    
    # Save report
    with open(f"{output_dir}/unified_evaluation_summary.md", 'w', encoding='utf-8') as f:
        f.write('\n'.join(report_lines))

def prepare_production_ready_chunks(self, output_dir: str):
    """Prepare best performing chunks for production deployment"""
    if not self.evaluation_results:
        return
    
    # Find best strategy
    best_strategy = max(self.evaluation_results.items(), 
                      key=lambda x: x[1].get('final_score', 0))
    
    if not best_strategy:
        return
    
    strategy_name, results = best_strategy
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    production_metadata = {
        "deployment_info": {
            "best_strategy": strategy_name,
            "evaluation_score": results.get('final_score', 0),
            "evaluation_timestamp": timestamp,
            "recommended_for_production": True
        },
        "performance_metrics": {
            "rag_quality_score": results.get('rag_quality', 0),
            "retrieval_performance": results.get('retrieval_performance', 0),
            "size_optimization": results.get('size_optimization', 0),
            "avg_latency_ms": results.get('performance_benchmark', {}).get('mean_latency_ms', 0),
            "queries_per_second": results.get('performance_benchmark', {}).get('queries_per_second', 0)
        },
        "chunk_statistics": results.get('basic_stats', {}),
        "detailed_metrics": results.get('detailed_metrics', {}),
        "deployment_notes": [
            f"Evaluated against {len(self.test_queries)} test queries",
            f"Optimized for JioPay domain-specific content",
            f"Balanced approach: 40% RAG quality, 35% retrieval, 15% size, 10% performance",
            f"Ready for vector database ingestion and RAG deployment"
        ]
    }
    
    # Save production metadata
    with open(f"{output_dir}/production_deployment_metadata.json", 'w') as f:
        json.dump(production_metadata, f, indent=2, default=str)
    
    print(f"Production metadata prepared for: {strategy_name}")
    print(f"   Score: {results.get('final_score', 0):.3f}")
    print(f"   File: {output_dir}/production_deployment_metadata.json")

def generate_comprehensive_report(self, output_dir: str = "results/unified_evaluation"):
    """Generate comprehensive evaluation report with all metrics"""
    os.makedirs(output_dir, exist_ok=True)
    
    if not self.evaluation_results:
        print("No evaluation results to report")
        return
    
    print(f"\nGenerating comprehensive reports in {output_dir}...")
    
    # 1. Save detailed results as JSON
    with open(f"{output_dir}/unified_evaluation_results.json", 'w') as f:
        json.dump(self.evaluation_results, f, indent=2, default=str)
    
    # 2. Generate performance comparison plots
    self.generate_unified_plots(output_dir)
    
    # 3. Generate domain-specific analysis
    self.generate_domain_analysis(output_dir)
    
    # 4. Generate retrieval analysis
    self.generate_retrieval_analysis(output_dir)
    
    # 5. Generate comprehensive summary report
    self.generate_unified_summary_report(output_dir)
    
    # 6. Prepare best chunks for production
    self.prepare_production_ready_chunks(output_dir)
    
    print("Comprehensive reports generated successfully!")

# Add these methods to the evaluator class
UnifiedChunkingEvaluator.generate_unified_summary_report = generate_unified_summary_report
UnifiedChunkingEvaluator.prepare_production_ready_chunks = prepare_production_ready_chunks
UnifiedChunkingEvaluator.generate_comprehensive_report = generate_comprehensive_report

## Data Loading and Strategy Configuration

In [14]:
# Define chunking strategies and their file paths
strategies = {
    'Fixed_256_0': r'Data\chunks\fixed\Fixed_256_0.json',
    'Fixed_512_64': r'Data\chunks\fixed\Fixed_512_64.json',
    'Fixed_1024_128': r'Data\chunks\fixed\Fixed_1024_128.json',
    'Semantic_High_Sim': r'Data\chunks\semantic\Semantic_High_Sim.json',
    'Semantic_Low_Sim': r'Data\chunks\semantic\Semantic_Low_Sim.json',
    'Semantic_Med_Sim': r'Data\chunks\semantic\Semantic_Med_Sim.json',
    'Structural_Balanced': r'Data\chunks\structural\Structural_Balanced.json',
    'Structural_Hierarchical': r'Data\chunks\structural\Structural_Hierarchical.json',
    'Structural_Large': r'Data\chunks\structural\Structural_Large.json',
    'Recursive_Balanced': r'Data\chunks\recursive\Recursive_Balanced.json',
    'Recursive_Large': r'Data\chunks\recursive\Recursive_Large.json',
    'Recursive_Small': r'Data\chunks\recursive\Recursive_Small.json',
    'LLM_smallChunks': r'Data\chunks\llm_based\LLM_Small_Chunks.json'
}

# Load all chunking strategies data
strategies_data = {}
for strategy_name, file_path in strategies.items():
    chunks = evaluator.load_chunked_data(strategy_name, file_path)
    if chunks:
        strategies_data[strategy_name] = chunks
    else:
        print(f"Warning: No data loaded for {strategy_name}")

print(f"Loaded {len(strategies_data)} strategies successfully")

Loaded 642 chunks for Fixed_256_0
Loaded 415 chunks for Fixed_512_64
Loaded 262 chunks for Fixed_1024_128
Loaded 1486 chunks for Semantic_High_Sim
Loaded 1065 chunks for Semantic_Low_Sim
Loaded 1320 chunks for Semantic_Med_Sim
Loaded 368 chunks for Structural_Balanced
Loaded 380 chunks for Structural_Hierarchical
Loaded 489 chunks for Structural_Large
Loaded 619 chunks for Recursive_Balanced
Loaded 600 chunks for Recursive_Large
Loaded 654 chunks for Recursive_Small
Loaded 584 chunks for LLM_smallChunks
Loaded 13 strategies successfully


## Run Full Evaluation

In [15]:
# Run unified comprehensive comparison
if strategies_data:
    print("Starting comprehensive evaluation...")
    comparison_df = evaluator.compare_strategies(strategies_data)
    
    # Display results
    print("\n" + "=" * 80)
    print("UNIFIED CHUNKING STRATEGY EVALUATION RESULTS")
    print("=" * 80)
    print(comparison_df.to_string(index=False))
    
    # Save comparison results
    comparison_df.to_csv('results/unified_evaluation/unified_strategy_comparison.csv', index=False)
    print("\nResults saved to: results/unified_evaluation/unified_strategy_comparison.csv")
    
else:
    print("No chunking data loaded. Please ensure chunk files exist.")

Starting comprehensive evaluation...

Running Unified Chunking Strategy Comparison...

Processing Fixed_256_0...
Evaluating Fixed_256_0...
  Evaluating domain-specific RAG metrics...
  Evaluating retrieval performance...
  Evaluating size optimization...
  Benchmarking performance...

Processing Fixed_512_64...
Evaluating Fixed_512_64...
  Evaluating domain-specific RAG metrics...
  Evaluating retrieval performance...
  Evaluating size optimization...
  Benchmarking performance...

Processing Fixed_1024_128...
Evaluating Fixed_1024_128...
  Evaluating domain-specific RAG metrics...
  Evaluating retrieval performance...
  Evaluating size optimization...
  Benchmarking performance...

Processing Semantic_High_Sim...
Evaluating Semantic_High_Sim...
  Evaluating domain-specific RAG metrics...
  Evaluating retrieval performance...
  Evaluating size optimization...
  Benchmarking performance...

Processing Semantic_Low_Sim...
Evaluating Semantic_Low_Sim...
  Evaluating domain-specific RAG me

## Analyze Chunk Sizes

In [16]:
# Generate comprehensive reports
if evaluator.evaluation_results:
    evaluator.generate_comprehensive_report()
    
    # Find and display best strategy
    best_strategy = max(evaluator.evaluation_results.items(), 
                      key=lambda x: x[1].get('final_score', 0))
    
    if best_strategy:
        name, results = best_strategy
        print(f"\nBEST STRATEGY: {name}")
        print(f"Final Score: {results.get('final_score', 0):.3f}")
        print(f"RAG Quality: {results.get('rag_quality', 0):.3f}")
        print(f"Retrieval Performance: {results.get('retrieval_performance', 0):.3f}")
    
    print(f"\nReports generated in: results/unified_evaluation/")
    print("Evaluation completed successfully!")
else:
    print("No evaluation results available for report generation.")


Generating comprehensive reports in results/unified_evaluation...
Production metadata prepared for: LLM_smallChunks
   Score: 0.741
   File: results/unified_evaluation/production_deployment_metadata.json
Comprehensive reports generated successfully!

BEST STRATEGY: LLM_smallChunks
Final Score: 0.741
RAG Quality: 0.680
Retrieval Performance: 0.925

Reports generated in: results/unified_evaluation/
Evaluation completed successfully!


##  Generate Comprehensive Reports

In [17]:
def generate_domain_analysis(self, output_dir: str):
    """Generate domain-specific analysis"""
    # Analyze domain coverage by strategy
    domain_analysis = defaultdict(dict)
    
    for strategy_name, results in self.evaluation_results.items():
        detailed = results.get('detailed_metrics', {})
        domain_analysis[strategy_name] = {
            'Semantic Coherence': detailed.get('semantic_coherence', 0),
            'Context Completeness': detailed.get('context_completeness', 0),
            'Information Density': detailed.get('information_density', 0),
            'Topic Coverage': detailed.get('topic_coverage', 0),
            'FAQ Grouping': detailed.get('faq_grouping', 0)
        }
    
    # Create domain analysis DataFrame
    domain_df = pd.DataFrame(domain_analysis).T
    domain_df.to_csv(f"{output_dir}/domain_specific_analysis.csv")
    
    # Create domain performance heatmap
    plt.figure(figsize=(12, 8))
    sns.heatmap(domain_df, annot=True, fmt='.3f', cmap='RdYlGn', 
               cbar_kws={'label': 'Score'})
    plt.title('Domain-Specific Performance Analysis', fontweight='bold')
    plt.ylabel('Strategy')
    plt.xlabel('Domain Metric')
    plt.tight_layout()
    plt.savefig(f"{output_dir}/domain_performance_heatmap.png", dpi=300, bbox_inches='tight')
    plt.close()

def generate_retrieval_analysis(self, output_dir: str):
    """Generate retrieval performance analysis"""
    # Analyze retrieval metrics by category
    category_analysis = defaultdict(lambda: defaultdict(list))
    
    for strategy_name, results in self.evaluation_results.items():
        retrieval_metrics = results.get('retrieval_metrics', [])
        for metric in retrieval_metrics:
            category = metric['category']
            category_analysis[category]['precision_at_1'].append(metric['precision_at_1'])
            category_analysis[category]['recall_at_k'].append(metric['recall_at_k'])
            category_analysis[category]['mrr'].append(metric['mrr'])
    
    # Create category performance summary
    category_summary = []
    for category, metrics in category_analysis.items():
        category_summary.append({
            'Category': category,
            'Avg_Precision@1': np.mean(metrics['precision_at_1']),
            'Avg_Recall@5': np.mean(metrics['recall_at_k']),
            'Avg_MRR': np.mean(metrics['mrr']),
            'Std_Precision@1': np.std(metrics['precision_at_1']),
            'Query_Count': len(metrics['precision_at_1'])
        })
    
    category_df = pd.DataFrame(category_summary)
    category_df.to_csv(f"{output_dir}/retrieval_by_category.csv", index=False)
    
    # Create category performance plot
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    categories = category_df['Category']
    metrics_to_plot = ['Avg_Precision@1', 'Avg_Recall@5', 'Avg_MRR']
    
    for i, metric in enumerate(metrics_to_plot):
        axes[i].bar(categories, category_df[metric])
        axes[i].set_title(metric)
        axes[i].set_ylabel('Score')
        axes[i].tick_params(axis='x', rotation=45)
    
    plt.suptitle('Retrieval Performance by Query Category', fontweight='bold')
    plt.tight_layout()
    plt.savefig(f"{output_dir}/retrieval_by_category.png", dpi=300, bbox_inches='tight')
    plt.close()

# Add these methods to the evaluator class
UnifiedChunkingEvaluator.generate_domain_analysis = generate_domain_analysis
UnifiedChunkingEvaluator.generate_retrieval_analysis = generate_retrieval_analysis

## Performance Monitoring

In [18]:
def monitor_strategy_performance():
    """Monitor and display key performance indicators"""
    if not evaluator.evaluation_results:
        print("No evaluation results available")
        return
    
    print("=== PERFORMANCE MONITORING DASHBOARD ===")
    
    # Overall rankings
    rankings = sorted(evaluator.evaluation_results.items(), 
                     key=lambda x: x[1].get('final_score', 0), reverse=True)
    
    print("\nStrategy Rankings:")
    for i, (strategy, results) in enumerate(rankings, 1):
        score = results.get('final_score', 0)
        status = "🟢" if score > 0.7 else "🟡" if score > 0.5 else "🔴"
        print(f"{i:2d}. {status} {strategy:<20} {score:.3f}")
    
    # Performance alerts
    print("\n=== PERFORMANCE ALERTS ===")
    alerts = []
    
    for strategy, results in evaluator.evaluation_results.items():
        basic_stats = results.get('basic_stats', {})
        detailed = results.get('detailed_metrics', {})
        performance = results.get('performance_benchmark', {})
        
        # Check for issues
        if basic_stats.get('avg_tokens', 0) < 50:
            alerts.append(f"⚠️  {strategy}: Chunks too small (avg {basic_stats.get('avg_tokens', 0):.0f} tokens)")
        
        if basic_stats.get('avg_tokens', 0) > 800:
            alerts.append(f"⚠️  {strategy}: Chunks too large (avg {basic_stats.get('avg_tokens', 0):.0f} tokens)")
        
        if detailed.get('semantic_coherence', 0) < 0.5:
            alerts.append(f"⚠️  {strategy}: Low semantic coherence ({detailed.get('semantic_coherence', 0):.3f})")
        
        if detailed.get('avg_precision_at_1', 0) < 0.3:
            alerts.append(f"⚠️  {strategy}: Poor retrieval precision ({detailed.get('avg_precision_at_1', 0):.3f})")
        
        if performance.get('mean_latency_ms', 0) > 100:
            alerts.append(f"⚠️  {strategy}: High latency ({performance.get('mean_latency_ms', 0):.1f}ms)")
    
    if alerts:
        for alert in alerts:
            print(alert)
    else:
        print("✅ No performance issues detected")
    
    # Recommendations
    print("\n=== RECOMMENDATIONS ===")
    best_strategy = rankings[0] if rankings else None
    
    if best_strategy:
        name, results = best_strategy
        print(f"✅ Recommended Strategy: {name}")
        print(f"   Final Score: {results.get('final_score', 0):.3f}")
        print(f"   Ready for production deployment")
        
        # Check if score is concerning
        if results.get('final_score', 0) < 0.6:
            print("⚠️  Note: Best strategy score is below 0.6 - consider strategy refinement")

def export_results_summary():
    """Export a concise summary for stakeholders"""
    if not evaluator.evaluation_results:
        print("No results to export")
        return
    
    summary_data = []
    
    for strategy, results in evaluator.evaluation_results.items():
        basic_stats = results.get('basic_stats', {})
        
        summary_data.append({
            'Strategy': strategy,
            'Final_Score': round(results.get('final_score', 0), 3),
            'RAG_Quality': round(results.get('rag_quality', 0), 3),
            'Retrieval_Performance': round(results.get('retrieval_performance', 0), 3),
            'Total_Chunks': basic_stats.get('total_chunks', 0),
            'Avg_Tokens': round(basic_stats.get('avg_tokens', 0), 1),
            'Recommendation': 'Recommended' if results.get('final_score', 0) == max([r.get('final_score', 0) for r in evaluator.evaluation_results.values()]) else 'Not Recommended'
        })
    
    summary_df = pd.DataFrame(summary_data)
    summary_df = summary_df.sort_values('Final_Score', ascending=False)
    
    # Export to CSV
    summary_df.to_csv('results/unified_evaluation/executive_summary.csv', index=False)
    print("Executive summary exported to: results/unified_evaluation/executive_summary.csv")
    
    return summary_df

print("Performance monitoring functions ready!")
print("Use monitor_strategy_performance() to view dashboard")
print("Use export_results_summary() to create executive summary")

Performance monitoring functions ready!
Use monitor_strategy_performance() to view dashboard
Use export_results_summary() to create executive summary


## Final Execution Summary

In [19]:
def run_complete_evaluation():
    """Run the complete evaluation pipeline"""
    print("Starting complete evaluation pipeline...")
    
    # Step 1: Load data
    print("Step 1: Loading strategy data...")
    if not strategies_data:
        print("❌ No data loaded. Please run Cell 13 first.")
        return
    
    # Step 2: Run evaluation
    print("Step 2: Running comprehensive evaluation...")
    comparison_df = evaluator.compare_strategies(strategies_data)
    
    # Step 3: Analyze results
    print("Step 3: Analyzing chunk sizes...")
    size_analysis_df = evaluator.analyze_chunk_sizes(strategies_data)
    
    # Step 4: Generate reports
    print("Step 4: Generating comprehensive reports...")
    evaluator.generate_comprehensive_report()
    
    # Step 5: Performance monitoring
    print("Step 5: Performance monitoring...")
    monitor_strategy_performance()
    
    # Step 6: Export summary
    print("Step 6: Exporting executive summary...")
    summary_df = export_results_summary()
    
    print("\n" + "="*60)
    print("EVALUATION PIPELINE COMPLETE")
    print("="*60)
    
    # Display key results
    print("\nTOP 3 STRATEGIES:")
    top_strategies = summary_df.head(3)
    for idx, row in top_strategies.iterrows():
        print(f"{row.name + 1}. {row['Strategy']}: {row['Final_Score']}")
    
    print(f"\nAll results saved in: results/unified_evaluation/")
    print("✅ Pipeline completed successfully!")

# Optional: Run everything at once
# run_complete_evaluation()

print("Complete evaluation pipeline ready!")
print("Run run_complete_evaluation() to execute the entire pipeline")
print("\nOr execute individual cells for step-by-step analysis:")
print("- Cell 13: Load data")
print("- Cell 14: Run evaluation") 
print("- Cell 15: Analyze chunk sizes")
print("- Cell 16: Generate reports")
print("- Cell 17-19: Interactive analysis")

Complete evaluation pipeline ready!
Run run_complete_evaluation() to execute the entire pipeline

Or execute individual cells for step-by-step analysis:
- Cell 13: Load data
- Cell 14: Run evaluation
- Cell 15: Analyze chunk sizes
- Cell 16: Generate reports
- Cell 17-19: Interactive analysis
