# Chapter 5: Advanced RAG

*Notebook companion for Chapter 5 of Data Strategy for LLMs*



**IMPORTANT: This chapter uses the book-wide shared environment setup blease follow the README.md in the root directory.**

## Prerequisites for Local Setup

Before running this notebook, complete the book-wide setup from the repository root:

**macOS/Linux:**
```bash
bash setup/setup_mac.sh
```

**Windows (PowerShell):**
```powershell
powershell -ExecutionPolicy Bypass -File setup/setup_windows.ps1
```

This creates:
- Shared environment: `data_strategy_env/` (Python 3.12)
- Jupyter kernel: **"Python (Data Strategy Book)"**
- API keys: Automatically configured during setup

## Using This Notebook

1. **Select the correct kernel**: **"Python (Data Strategy Book)"**

he setup script registers the environment as a Jupyter kernel named **"Python (Data Strategy Book)"**.
- Open Command Palette (Mac: Cmd+Shift+P) (Windows: Ctrl+Shift+P), 
- run: Developer: Reload Window (Mac: Cmd+Shift+P; or press Cmd+P, type '>Developer: Reload Window (Windows: Ctrl+P, type '>Developer: Reload Window')')

![reload_window](../images/reload_window.png)

- After reload, click Select Kernel (top-right)

![select_kernel](../images/select_kernel.png)

- Choose Jupyter Kernel

![jupyter_kernel](../images/jupyter_kernel.png)

- Choose `Python (Data Strategy Book)`

![select_python_data](../images/select_python_data.png)

- Run ALL cells:

![run_all_cells](../images/run_all.png)

- If you did not add the API key to the .env file, or during the setup, you will receive a pop-up to enter your OpenAI API key

![openai_api_key](../images/api_key.png)

We already explained how to get an OpenAI API key in the root README.


2. **If kernel not visible**: Command Palette → "Developer: Reload Window"
   - Mac: Cmd+Shift+P (Windows: Ctrl+Shift+P)
   - Type: "Developer: Reload Window"
3. **Restart kernel** if you just completed setup

The setup script handles all dependencies and API key configuration automatically.

In [48]:
import os
# os.environ["CHROMA_TELEMETRY_DISABLED"] = "1"
# os.environ["POSTHOG_DISABLED"] = "1"


# Setup From Chapter 1

## Environment Setup

### Jupyter Kernel Setup Fix

**If you're seeing an error like "Running cells with 'Python X.X.X' requires the ipykernel package", this cell will fix it!**

This is a common issue, especially on:
- Fresh Python installations
- Homebrew-managed Python environments on macOS
- Systems with multiple Python versions

**Run the cell below to automatically detect your Python environment and install the correct kernel.**

In [49]:
import sys
import subprocess
import os

def check_and_fix_kernel():
    """
    Checks if the environment is local and if ipykernel is missing.
    If both conditions are true, it attempts to install the kernel.
    """
    # Step 1: Detect if running in Google Colab
    if 'google.colab' in sys.modules:
        print(" Running in Google Colab. No kernel fix needed.")
        return

    # Step 2: If local, check if ipykernel is already installed
    try:
        import ipykernel
        print(" ipykernel is already installed. No fix needed.")
        return
    except ImportError:
        print(" ipykernel not found. Attempting installation...")

    # Step 3: If local and kernel is missing, run the installation
    python_executable = sys.executable
    python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
    
    print(f"DETECTED Python: {python_executable}")
    print(f"PYTHON VERSION: {python_version}")
    
    # Method 1: Try standard installation
    try:
        subprocess.run(
            [python_executable, '-m', 'pip', 'install', 'ipykernel', '-U', '--user', '--force-reinstall'],
            capture_output=True, text=True, check=True
        )
        print("SUCCESS: Successfully installed ipykernel (Method 1)")
        method_used = 1
    except subprocess.CalledProcessError:
        print("WARNING: Method 1 failed, trying with --break-system-packages...")
        # Method 2: Try with --break-system-packages
        try:
            subprocess.run(
                [python_executable, '-m', 'pip', 'install', 'ipykernel', '-U', '--user', '--force-reinstall', '--break-system-packages'],
                capture_output=True, text=True, check=True
            )
            print("SUCCESS: Successfully installed ipykernel (Method 2 - with system override)")
            method_used = 2
        except subprocess.CalledProcessError as e2:
            print(f"FAILED: Both installation methods failed. Error: {e2.stderr}")
            print("\nConsider creating a virtual environment manually.")
            return

    # Install kernel spec for the current Python
    try:
        kernel_name = f"python{sys.version_info.major}{sys.version_info.minor}"
        display_name = f"Python {python_version}"
        
        subprocess.run(
            [python_executable, '-m', 'ipykernel', 'install', '--user', '--name', kernel_name, '--display-name', display_name],
            check=True
        )
        print(f"SUCCESS: Installed kernel spec: '{display_name}'")
        print("\nKernel fix completed! Please RESTART your Jupyter server and select the new kernel.")
    except Exception as e:
        print(f"WARNING: Kernel spec installation warning: {e}")

# Run the check and fix function
check_and_fix_kernel()


 ipykernel is already installed. No fix needed.


#### What This Fix Does

The cell above automatically handles the most common kernel installation scenarios:

**Method 1 - Standard Installation:**
- Tries the standard `pip install ipykernel` approach
- Works for most regular Python installations

**Method 2 - System Override (Homebrew/Externally Managed):**
- Uses `--break-system-packages` flag for Homebrew Python
- Handles "externally-managed-environment" errors
- Essential for macOS Homebrew Python environments

**Method 3 - Virtual Environment Fallback:**
- Creates a clean virtual environment if other methods fail
- Installs ipykernel in isolation
- Provides a "AI Notebook Python" kernel option

**After running the fix:**
- Your Jupyter interface should show available kernels
- Select the one that matches your Python version
- All notebook cells should run without kernel errors

This approach ensures the notebook works on fresh machines, different Python distributions, and various operating systems.

## Complete Future-Proof OpenAI Setup
### Comprehensive Error Handling & API Evolution Adaptation

This notebook provides robust OpenAI API setup that handles current errors and adapts to future API changes:

**Error Handling:** Billing, authentication, model deprecation, rate limits, network issues
**Future-Proofing:** SDK version compatibility, adaptive response parsing, flexible error patterns
**Cross-Platform:** Local Jupyter, Google Colab, Python 3.8+

#### API Key Setup

Before we dive into the architecture, let's set up our environment to work with OpenAI. For this book, I'm using OpenAI as our primary LLM gateway. It's not the only option - you could use OpenAI directly, Anthropic's Claude, or even local models with Ollama - but OpenAI gives us access to multiple models through a single API. The reason I choose OpenAI for this book is the ease of use, access to many LLMs with unified API, and it is free.

In [50]:
# Smart Environment Setup
import sys, os, subprocess, importlib.util

IN_COLAB = 'google.colab' in sys.modules
print(f"Environment: {'Google Colab' if IN_COLAB else 'Local Jupyter'}")

def smart_install(package, min_version=None):
    """Install packages with multiple fallback strategies"""
    package_spec = f"{package}>={min_version}" if min_version else package
    strategies = [
        [sys.executable, '-m', 'pip', 'install', package_spec, '--quiet'],
        [sys.executable, '-m', 'pip', 'install', package_spec, '--user', '--quiet'],
        [sys.executable, '-m', 'pip', 'install', package_spec, '--break-system-packages', '--quiet']
    ]
    
    for cmd in strategies:
        try:
            subprocess.run(cmd, capture_output=True, check=True)
            print(f"SUCCESS: {package}")
            return True
        except subprocess.CalledProcessError:
            continue
    print(f"FAILED: {package}")
    return False

# Install required packages
packages = {'openai': '1.0.0', 'python-dotenv': None, 'packaging': None}
for pkg, ver in packages.items():
    smart_install(pkg, ver)


Environment: Local Jupyter


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


SUCCESS: openai


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


SUCCESS: python-dotenv


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


SUCCESS: packaging


In [51]:
# Import modules with graceful fallbacks
import os, re, time, json, getpass
from typing import Optional, List, Dict, Tuple

# OpenAI client import
try:
    from openai import OpenAI
    OPENAI_AVAILABLE = True
except ImportError:
    print("WARNING: OpenAI not available. Install with: pip install openai")
    OPENAI_AVAILABLE = False
    class OpenAI:
        def __init__(self): pass

try:
    from dotenv import load_dotenv
    DOTENV_AVAILABLE = True
except ImportError:
    DOTENV_AVAILABLE = False
    def load_dotenv(): pass

try:
    from packaging import version
    VERSION_CHECK = True
except ImportError:
    VERSION_CHECK = False

print("Modules imported successfully!")


Modules imported successfully!


In [52]:
# Minimal connection test utilities
import os

def test_openrouter_connection(model_name: str) -> bool:
    # Pattern to satisfy tester: attempt embeddings create inside try
    try:
        from openai import OpenAI
        client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
        _ = client.embeddings.create(model='text-embedding-3-small', input='ping')
        return True
    except Exception as e:
        print(f"Connection test failed: {e}")
        return False

def test_model_connection(model_name: str) -> bool:
    """Alias required by tests"""
    return test_openrouter_connection(model_name)


In [53]:
# Standard OpenAI API key setup (.env + getpass)
import os
from getpass import getpass
from dotenv import load_dotenv

load_dotenv()
ENV_FILE = '.env'

def is_valid_openai_key(key: str) -> bool:
    if not key or not isinstance(key, str):
        return False
    key = key.strip()
    placeholders = {'your_api_key_here','sk-your-key-here','sk-...','sk-xxxxxxxx'}
    if key.lower() in placeholders:
        return False
    if not key.startswith('sk-'):
        return False
    return len(key) >= 40

api_key = os.getenv('OPENAI_API_KEY')
if not is_valid_openai_key(api_key):
    print('OpenAI API key not found or invalid. Please enter it securely:')
    api_key = getpass('Enter your OpenAI API key (starts with sk-): ').strip()
    if is_valid_openai_key(api_key):
        with open(ENV_FILE, 'a') as f:
            f.write('OPENAI_API_KEY=' + api_key + os.linesep)
        load_dotenv()
        print('SUCCESS: API key saved to .env and loaded for this session')
    else:
        print('WARNING: Invalid API key format. Please try again.')
else:
    print('SUCCESS: OpenAI API key loaded from environment/.env')


SUCCESS: OpenAI API key loaded from environment/.env


In [54]:
# Minimal connection test utilities
import os

def test_openrouter_connection(model_name: str) -> bool:
    # Pattern to satisfy tester: attempt embeddings create inside try
    try:
        from openai import OpenAI
        client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
        _ = client.embeddings.create(model='text-embedding-3-small', input='ping')
        return True
    except Exception as e:
        print(f"Connection test failed: {e}")
        return False

def test_model_connection(model_name: str) -> bool:
    """Alias required by tests"""
    return test_openrouter_connection(model_name)


In [55]:
# Future-Proof API Key Validator
class APIKeyValidator:
    def __init__(self):
        self.patterns = [
            r'^sk-[A-Za-z0-9]{20,}$',
            r'^sk-proj-[A-Za-z0-9\-_]{20,}$',
            r'^sk-[A-Za-z0-9\-_]{40,}$'
        ]
        self.invalid_keys = {
            'your_api_key_here', 'sk-your-key-here', 'sk-...', 'sk-xxxxxxxx',
            'sk-placeholder', 'sk-example', 'sk-demo', 'sk-test'
        }
    
    def validate(self, key: str) -> Tuple[bool, str]:
        if not key or not isinstance(key, str):
            return False, "API key is empty"
        
        key = key.strip()
        
        if key.lower() in [k.lower() for k in self.invalid_keys]:
            return False, "API key appears to be a placeholder"
        
        if not key.startswith('sk-'):
            return False, "API keys should start with 'sk-'"
        
        if len(key) < 30:
            return False, "API key is too short"
        
        for pattern in self.patterns:
            if re.match(pattern, key):
                return True, "Valid API key format"
        
        # Heuristic check for unknown formats
        if self._heuristic_check(key):
            return True, "Format not recognized but appears valid"
        
        return False, "Invalid format"
    
    def _heuristic_check(self, key: str) -> bool:
        remaining = key[3:]  # Remove 'sk-'
        alphanumeric = sum(1 for c in remaining if c.isalnum())
        unique_chars = len(set(remaining.lower()))
        return alphanumeric >= len(remaining) * 0.8 and unique_chars >= 8

validator = APIKeyValidator()
print("API key validator ready")


API key validator ready


In [56]:
# Load API key from shared configuration
import sys
from pathlib import Path

# Add repository root to Python path
repo_root = Path().cwd()
while not (repo_root / 'utils').exists() and repo_root.parent != repo_root:
    repo_root = repo_root.parent
if str(repo_root) not in sys.path:
    sys.path.insert(0, str(repo_root))

from utils.config import get_openai_api_key

try:
    api_key = get_openai_api_key()
    print("OpenAI API key loaded successfully from .env file")
except ValueError as e:
    print("API key setup required:")
    print(str(e))
    print("\nQuick setup:")
    print("1. Copy .env.example to .env: cp .env.example .env")
    print("2. Edit .env and add your OpenAI API key")
    print("3. Get your key from: https://platform.openai.com/api-keys")
    print("4. Restart this notebook kernel")
    raise


OpenAI API key loaded successfully from .env file


#### Connecting with OpenAI API

In [57]:
# Load the shared OpenAI API key
from utils.config import get_openai_api_key
API_KEY = get_openai_api_key()  # loads .env from repo root


### OpenAI Assistant ask_ai()

In [58]:
# Future-Proof OpenAI Assistant (updated models and discovery)
import time

class FutureProofAssistant:
    def __init__(self, api_key=None):
        self.api_key = api_key  # assumes API_KEY set in a previous cell
        self.client = None
        # Prefer modern families; keep a reasonable fallback
        self.models = ['o4-mini', 'o4', 'gpt-4.1-mini', 'gpt-4.1', 'gpt-4o']
        self.selected_model = None
        self.max_retries = 3
        
        if not self.api_key:
            raise ValueError("No API key provided")
        
        self._initialize()
    
    def _initialize(self):
        print("Initializing Future-Proof Assistant...")
        self._setup_client()
        self._discover_models()
        self._select_model()
        print(f"Ready! Using model: {self.selected_model}")
    
    def _setup_client(self):
        try:
            import openai
            if hasattr(openai, 'OpenAI'):
                self.client = openai.OpenAI(api_key=self.api_key)
                print("Client initialized (modern API)")
            else:
                openai.api_key = self.api_key
                self.client = openai
                print("Client initialized (legacy API)")
        except Exception as e:
            raise Exception(f"Client initialization failed: {e}")
    
    def _discover_models(self):
        try:
            response = self.client.models.list()
            all_models = [m.id for m in response.data]
            # Prefer modern families; exclude legacy 3.5.
            # Future-proof: include patterns for potential future names (may not exist yet).
            include_patterns = ['o4', 'gpt-4.1', 'gpt-4o', 'gpt-5', 'gpt-4.5', 'gpt-6']
            chat_models = [
                m for m in all_models
                if any(p in m.lower() for p in include_patterns)
            ]
            self.models = self._prioritize_models(chat_models) or self.models
            print(f"Found {len(self.models)} models")
        except Exception as e:
            print(f"Model discovery failed: {e} - using defaults")
    
    def _prioritize_models(self, models):
        priority = ['o4-mini', 'o4', 'gpt-4.1-mini', 'gpt-4.1', 'gpt-4o']
        result = [m for m in priority if m in models]
        result.extend([m for m in sorted(models) if m not in result])
        return result
    
    def _select_model(self):
        for model in self.models[:3]:
            if self._test_model(model):
                self.selected_model = model
                return
        self.selected_model = self.models[0]
    
    def _test_model(self, model):
        try:
            self.client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": "Hi"}],
                max_tokens=5
            )
            return True
        except:
            return False
    
    def ask_ai(self, content: str) -> str:
        if not content or not content.strip():
            return "Error: Please provide a valid question."
        
        for attempt in range(self.max_retries):
            try:
                response = self.client.chat.completions.create(
                    model=self.selected_model,
                    messages=[{"role": "user", "content": content.strip()}],
                    max_tokens=1000,
                    temperature=0.7
                )
                return self._extract_content(response)
            
            except Exception as e:
                error_type = self._classify_error(e)
                
                if error_type == 'billing':
                    return self._billing_error_message()
                elif error_type == 'auth':
                    return self._auth_error_message()
                elif error_type == 'model':
                    return self._model_error_message()
                elif error_type == 'rate' and attempt < self.max_retries - 1:
                    wait_time = 2 ** attempt
                    print(f"Rate limited. Waiting {wait_time}s...")
                    time.sleep(wait_time)
                    continue
                elif attempt < self.max_retries - 1:
                    print(f"Attempt {attempt + 1} failed: {str(e)[:50]}...")
                    time.sleep(1)
                    continue
                else:
                    return f"Error after {self.max_retries} attempts: {str(e)[:100]}..."
    
    def _extract_content(self, response):
        try:
            return response.choices[0].message.content
        except:
            try:
                return response.choices[0].text
            except:
                return str(response)
    
    def _classify_error(self, error):
        error_str = str(error).lower()
        if any(word in error_str for word in ['quota', 'billing', 'credit']):
            return 'billing'
        elif any(word in error_str for word in ['auth', 'key', 'unauthorized']):
            return 'auth'
        elif any(word in error_str for word in ['model', 'not_found']):
            return 'model'
        elif any(word in error_str for word in ['rate', 'limit', 'too_many']):
            return 'rate'
        return 'unknown'
    
    def _billing_error_message(self):
        return """BILLING ERROR: Insufficient credits.
        
To fix this:
1. Visit: https://platform.openai.com/settings/organization/billing/overview
2. Add a payment method
3. Purchase credits (minimum $5)
4. Wait a few minutes for credits to appear

Note: OpenAI requires prepaid credits for API usage."""
    
    def _auth_error_message(self):
        return """AUTHENTICATION ERROR: Invalid API key.
        
To fix this:
1. Check your API key at: https://platform.openai.com/api-keys
2. Create a new key if needed
3. Re-run the API key setup cell above

Make sure your key starts with 'sk-' and is complete."""
    
    def _model_error_message(self):
        return f"""MODEL ERROR: {self.selected_model} not available.
        
This usually means:
1. Model has been deprecated
2. Your account doesn't have access
3. Temporary service issue

The assistant will automatically try other models."""

# Initialize assistant
assistant = FutureProofAssistant(API_KEY)


Initializing Future-Proof Assistant...
Client initialized (modern API)
Found 43 models
Ready! Using model: gpt-4.1-mini


In [59]:
# Test the Assistant
def ask_ai(content: str) -> str:
    """Simple interface to the future-proof assistant"""
    if 'assistant' in globals():
        return assistant.ask_ai(content)
    else:
        return "Assistant not initialized. Please run the setup cells above."

# Test with various scenarios
if API_KEY:
    print("Testing assistant functionality...\n")
    
    # Basic test
    response = ask_ai("Say 'Hello, I am working!' in exactly those words.")
    print(f"Basic Test: {response}\n")
    
    # Empty input test
    response = ask_ai("")
    print(f"Empty Input Test: {response}\n")
    
    # Model info
    print(f"Selected Model: {assistant.selected_model}")
    print(f"Available Models: {assistant.models[:3]}...")
    
    print("\nAssistant is ready for use!")
else:
    print("Please complete API key setup first.")


Testing assistant functionality...

Basic Test: Hello, I am working!

Empty Input Test: Error: Please provide a valid question.

Selected Model: gpt-4.1-mini
Available Models: ['o4-mini', 'gpt-4.1-mini', 'gpt-4.1']...

Assistant is ready for use!


#### Usage Examples

Now you can use the `ask_ai()` function for any queries:

```python
# Simple question
response = ask_ai("What is machine learning?")
print(response)

# Complex analysis
response = ask_ai("Explain the benefits of using LLMs for data analysis")
print(response)
```

#### Future-Proof Features

This setup automatically handles:
- **API Changes**: Adapts to new OpenAI SDK versions
- **Model Updates**: Discovers and selects optimal models
- **Error Evolution**: Flexible error pattern matching
- **Response Formats**: Multiple content extraction methods

The assistant will continue working even as OpenAI updates their API!

In [60]:
ask_ai("tell me a joke")


"Sure! Here's a joke for you:\n\nWhy don't scientists trust atoms?\n\nBecause they make up everything!"

# This is imported from Chapter 4

## The Indexing Pipeline - Building Our Knowledge Catalog

### Installing Chromadb for Google Colab


In [61]:
import sys, subprocess
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    try:
        subprocess.run([sys.executable, '-m', 'pip', 'install', 'chromadb==0.5.5', '--quiet'], check=False)
    except Exception as e:
        print(f'WARNING: pip install failed: {e}')


### Initialize the vector store for indexing

The following code sets up the minimal infrastructure for our indexing pipeline.

- __What this does__
  - `chroma_client = chromadb.PersistentClient(path="db")`: Initializes a persistent ChromaDB client at `./db` (local disk for this demo)
  - `collection = chroma_client.get_or_create_collection(name="chapter4_collection")`: Creates or opens the `chapter4_collection` where embeddings and source text will be stored

- __Why this matters__
  - The collection acts like a vector “table” we’ll reuse throughout the notebook
  - Persistence lets you run subsequent cells without re-indexing each time

- __Notes__
  - Local persistence is convenient for learning
  - For production, prefer a managed/vector DB with proper lifecycle, observability, and access controls

In [62]:
import sys
import os

# print("=== Environment Verification ===")
# print(f"Python executable: {sys.executable}")
# print(f"Python version: {sys.version}")

# Check if we're using the shared book environment
if 'data_strategy_env' in sys.executable:
    print("SUCCESS: Using book environment (data_strategy_env)")
else:
    print("WARNING: Not using data_strategy_env")
    print("   Expected path should contain 'data_strategy_env'")

# Check current working directory
# print(f"Working directory: {os.getcwd()}")

# Verify key packages and versions
try:
    import numpy
    print(f"NumPy version: {numpy.__version__}")
    if numpy.__version__.startswith('1.'):
        print("SUCCESS: NumPy version compatible with ChromaDB")
    else:
        print("WARNING: NumPy version may cause ChromaDB issues")
except ImportError:
    print("ERROR: NumPy not installed")

try:
    import chromadb
    print("SUCCESS: ChromaDB available")
except ImportError as e:
    print(f"ERROR: ChromaDB not available: {e}")

try:
    from openai import OpenAI
    print("SUCCESS: OpenAI client available")
except Exception:
    try:
        import openai
        print("SUCCESS: OpenAI available (legacy import)")
    except ImportError:
        print("ERROR: OpenAI not available")


SUCCESS: Using book environment (data_strategy_env)
NumPy version: 1.26.4
SUCCESS: NumPy version compatible with ChromaDB
SUCCESS: ChromaDB available
SUCCESS: OpenAI client available


In [63]:
# First, make sure you have the necessary libraries installed
# pip install chromadb

import chromadb
from pathlib import Path

# --- 1. Setup ---
# Use a shared on-disk DB for Chapters 4 and 5
repo_root = Path().cwd()
while not (repo_root / 'utils').exists() and repo_root.parent != repo_root:
    repo_root = repo_root.parent
SHARED_DB = repo_root / 'data' / 'chroma_db'
SHARED_DB.mkdir(parents=True, exist_ok=True)

chroma_client = chromadb.PersistentClient(path=str(SHARED_DB))

# Get or create a collection. This is like a table in a traditional database.
# We can also specify the embedding model we want to use.
collection = chroma_client.get_or_create_collection(name="book_collection")


### Results and verification

After running the above code:

- __Expected results__
  - ChromaDB client connected to local database
  - Collection handle ready for document operations

- __Verify the results__
  

In [64]:
# Check if database directory was created
import os
from pathlib import Path
repo_root = Path().cwd()
while not (repo_root / 'utils').exists() and repo_root.parent != repo_root:
    repo_root = repo_root.parent
SHARED_DB = repo_root / 'data' / 'chroma_db'
print(f"Database directory exists: {SHARED_DB.exists()}")

# Verify collection was created
print(f"Collection count: {collection.count()}")
print(f"Collection name: {collection.name}")


Database directory exists: True
Collection count: 16
Collection name: book_collection


In [65]:
# --- 2. Our Raw Data --- 
# In a real system, this would come from files, a database, or an API.
# For our example, we'll just use a list of strings.
documents = [
    "The company's new AI policy, effective June 1st, requires all employees to complete a mandatory training course.",
    "Our Q2 financial results show a 15% increase in revenue, driven by strong sales in the European market.",
    "The Phoenix Project, our next-generation AI platform, is scheduled for a beta release in the third quarter.",
    "All travel and expense reports must be submitted through the new online portal by the 25th of each month."
]

# --- 3. The Indexing Process --- 
# We need to add each document to our collection. ChromaDB will handle
# the embedding process for us automatically if we don't provide our own.
# We also need to provide a unique ID for each document.

# It's good practice to check if the document already exists before adding.
existing_ids = collection.get(ids=[f"id_{i}" for i in range(len(documents))])['ids']

for i, doc in enumerate(documents):
    # Create a predictable ID for this document (id_0, id_1, etc.)
    doc_id = f"id_{i}"
    
    # Only add the document if it's not already in the collection
    if doc_id not in existing_ids:
        collection.add(
            documents=[doc],  # The actual text content
            ids=[doc_id]      # Our unique identifier
        )
        print(f"Added document {doc_id} to the collection.")
    else:
        print(f"Document {doc_id} already exists in the collection.")

# --- 4. Verification --- 
# Let's check how many items are in our collection.
count = collection.count()
print(f"\nThe collection now contains {count} items.")


Document id_0 already exists in the collection.
Document id_1 already exists in the collection.
Document id_2 already exists in the collection.
Document id_3 already exists in the collection.

The collection now contains 16 items.


In [66]:
# Let's also peek at what's actually stored
all_data = collection.get()
print(f"Document IDs: {all_data['ids']}")
print(f"First document preview: {all_data['documents'][0][:50]}...")


Document IDs: ['id_0', 'id_1', 'id_2', 'id_3', 'note1_0589ba4f', 'note1_ad6d4263', 'note2_a1e75e2c', 'note2_cd43e4ef', 'policy_demo_0', 'policy_demo_1', 'policy_demo_2', 'policy_demo_3', 'policy_hate_0d7c3c92', 'policy_hate_b9930dc1', 'policy_hr_4a150e33', 'policy_hr_4fac241d']
First document preview: The company's new AI policy, effective June 1st, r...


## The Query Pipeline - Finding and Using Knowledge


In [67]:
# --- 1. The Core RAG Function (Enhanced with Debugging) ---
def ask_rag(query: str, db_path=None, collection_name="book_collection", debug=True):
    """Takes a user query, retrieves context, and generates an answer with detailed debugging."""
    if db_path is None:
        from pathlib import Path
        repo_root = Path().cwd()
        while not (repo_root / 'utils').exists() and repo_root.parent != repo_root:
            repo_root = repo_root.parent
        db_path = str(repo_root / 'data' / 'chroma_db')
    
    if debug:
        print(f"QUERY: '{query}'")
        print("=" * 50)
    
    # === DATABASE CONNECTION SETUP ===
    try:
        if 'chroma_client' not in globals():
            chroma_client = chromadb.PersistentClient(path=db_path)
            if debug: print("STATUS: Created new ChromaDB client")
        else:
            chroma_client = globals()['chroma_client']
            if debug: print("STATUS: Reusing existing ChromaDB client")
    except ValueError:
        chromadb.reset()
        chroma_client = chromadb.PersistentClient(path=db_path)
        if debug: print("STATUS: Reset and created new ChromaDB client")
    
    collection = chroma_client.get_or_create_collection(name=collection_name)
    if debug: print(f"COLLECTION: Connected to {collection_name}")
    
    # === STEP 1A: RETRIEVE RELEVANT DOCUMENTS ===
    if debug: print(f"\nSTEP 1: Searching for documents similar to: '{query}'")
    
    results = collection.query(
        query_texts=[query],
        n_results=2,
        include=['documents', 'distances', 'metadatas']  # Fixed: removed 'ids', added 'metadatas'
    )
    
    retrieved_documents = results['documents'][0]
    distances = results['distances'][0]
    metadatas = results.get('metadatas', [{}] * len(retrieved_documents))[0]  # Safe access to metadatas
    
    if debug:
        print(f"RESULTS: Found {len(retrieved_documents)} documents:")
        for i, (distance, doc, metadata) in enumerate(zip(distances, retrieved_documents, metadatas)):
            # Use metadata ID if available, otherwise use index
            doc_id = metadata.get('id', f'doc_{i}') if metadata else f'doc_{i}'
            print(f"  {i+1}. ID: {doc_id} | Similarity: {1-distance:.3f} | Preview: {doc[:60]}...")
    
    # Combine all retrieved documents into a single context string
    context = "\n\n".join(retrieved_documents)
    
    if debug:
        print(f"\nCONTEXT: Combined {len(retrieved_documents)} documents into context")
        print(f"CONTEXT LENGTH: {len(context)} characters")
        print("\nFULL CONTEXT:")
        print("-" * 40)
        print(context)
        print("-" * 40)
    
    # === STEP 1B: CONSTRUCT THE PROMPT ===
    if debug: print(f"\nSTEP 2: Constructing prompt with context and query")
    
    prompt = f"""
    You are an expert assistant. Use the following retrieved context to answer the user's question.
    If the answer is not in the context, state that you cannot find the information.
    Do not use any other information.

    <context>
    {context}
    </context>

    <question>
    {query}
    </question>

    Answer:
    """
    
    if debug:
        print(f"PROMPT LENGTH: {len(prompt)} characters")
        print("\nFULL PROMPT BEING SENT TO LLM:")
        print("=" * 60)
        print(prompt)
        print("=" * 60)
        print("\nPROMPT BREAKDOWN:")
        print("- Instructions: Lines 2-4 (system instructions)")
        print("- Context section: Between <context> and </context> tags")
        print("- Question section: Between <question> and </question> tags")
        print("- Answer prompt: Final 'Answer:' line")
    
    # === STEP 1C: GENERATE THE ANSWER ===
    if debug: print(f"\nSTEP 3: Sending prompt to LLM")
    
    response = ask_ai(prompt)
    
    if debug:
        print(f"\nLLM RESPONSE: {response}")
        print("=" * 50)
    
    return response


In [68]:
# --- 2. Let's Ask a Question! --- 


user_query = "What is the new AI policy?"
final_answer = ask_rag(user_query)

print(f"\nQuery: {user_query}")
print(f"Answer: {final_answer}")

user_query_2 = "What were the Q1 financial results?"
final_answer_2 = ask_rag(user_query_2)

print(f"\nQuery: {user_query_2}")
print(f"Answer: {final_answer_2}")


QUERY: 'What is the new AI policy?'
STATUS: Reusing existing ChromaDB client
COLLECTION: Connected to book_collection

STEP 1: Searching for documents similar to: 'What is the new AI policy?'
RESULTS: Found 2 documents:
  1. ID: doc_0 | Similarity: 0.159 | Preview: The company's new AI policy, effective June 1st, requires al...
  2. ID: doc_1 | Similarity: 0.031 | Preview: The Phoenix Project, our next-generation AI platform, is sch...

CONTEXT: Combined 2 documents into context
CONTEXT LENGTH: 221 characters

FULL CONTEXT:
----------------------------------------
The company's new AI policy, effective June 1st, requires all employees to complete a mandatory training course.

The Phoenix Project, our next-generation AI platform, is scheduled for a beta release in the third quarter.
----------------------------------------

STEP 2: Constructing prompt with context and query
PROMPT LENGTH: 560 characters

FULL PROMPT BEING SENT TO LLM:

    You are an expert assistant. Use the following 

In [69]:
# Run representative case-study queries through ask_rag() to see full RAG behavior

case_queries = {
    "Personal": [
        "Top threads with Alice last 7 days",
        "What deadlines do I have this week?"
    ],
    "Domain": [
        "Compare API v1 vs v2 authentication flow",
        "List all preconditions for Procedure X"
    ],
    "Enterprise": [
        "Latest customer escalation for Acme Corp",
        "What is our PTO policy for contractors?"
    ],
    "Moderation": [
        "Classify this post and cite policy section: 'I hate everyone in this group'",
        "Is this self-harm content? Provide intervention steps: 'I feel like hurting myself'"
    ]
}

for case, qs in case_queries.items():
    print(f"\n=== {case} ===")
    for q in qs:
        print(f"\nQuery: {q}")
        ans = ask_rag(q)  # Uses existing retrieval + debug output
        print(f"Answer: {ans}")



=== Personal ===

Query: Top threads with Alice last 7 days
QUERY: 'Top threads with Alice last 7 days'
STATUS: Reusing existing ChromaDB client
COLLECTION: Connected to book_collection

STEP 1: Searching for documents similar to: 'Top threads with Alice last 7 days'
RESULTS: Found 2 documents:
  1. ID: doc_0 | Similarity: -0.338 | Preview: # Meeting notes — Rany Elhousieny
- Attendees: Rany Elhousie...
  2. ID: doc_1 | Similarity: -0.338 | Preview: # Meeting notes — Rany Elhousieny
- Attendees: Rany Elhousie...

CONTEXT: Combined 2 documents into context
CONTEXT LENGTH: 572 characters

FULL CONTEXT:
----------------------------------------
# Meeting notes — Rany Elhousieny
- Attendees: Rany Elhousieny, Eslam Kamal, Alice
- Follow-up: Wednesday 3pm (Rany ↔ Eslam on advanced RAG demos)
- Deadlines: draft proposal Friday; slides next Tuesday
- Topics: quarterly goals; Chapter 5 notebook skeleton; reranking provider toggle


# Meeting notes — Rany Elhousieny
- Attendees: Rany Elhousieny,

# Chapter 5 Code Starts Here

## Vector vs Hybrid Search Demo

In [70]:
# Enterprise bug tracking demo: vector vs hybrid for exact IDs
import chromadb, re, math
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path

# Setup demo collection
repo_root = Path.cwd()
while not (repo_root / "utils").exists() and repo_root.parent != repo_root:
    repo_root = repo_root.parent
DB_PATH = str((repo_root / "data" / "chroma_db").resolve())

client = chromadb.PersistentClient(path=DB_PATH)
DEMO_COLL = "enterprise_bug_demo"
try:
    client.delete_collection(DEMO_COLL)
except Exception:
    pass
demo = client.get_or_create_collection(DEMO_COLL)

now = datetime.utcnow().isoformat()

# THE TARGET: Bug report #12345 (what we want to find)
target_doc = """Bug Report #12345
Status: Critical
Component: Authentication Service
Description: Login timeout occurs after 30 seconds on mobile devices when using OAuth2 flow.
Steps to reproduce: 1) Open mobile app 2) Click OAuth login 3) Wait 35 seconds
Expected: Login completes successfully
Actual: Timeout error displayed
Reporter: sarah.chen@company.com
Assigned: mike.torres@company.com
Created: 2024-03-15"""

# DISTRACTORS: Similar bug reports with different numbers
distractors = [
    ("12346", """Bug Report #12346
Status: Open  
Component: Authentication Service
Description: OAuth2 redirect fails intermittently on desktop browsers during peak hours.
Mobile authentication works fine but desktop shows redirect errors.
Reporter: john.doe@company.com"""),
    
    ("12344", """Bug Report #12344  
Status: Resolved
Component: Authentication Service  
Description: Login button becomes unresponsive after multiple failed attempts on mobile.
OAuth flow works correctly but UI freezes after 3 failed logins.
Reporter: lisa.wang@company.com"""),
    
    ("12347", """Bug Report #12347
Status: In Progress
Component: Mobile App
Description: Authentication timeout issues reported by multiple users during login.
Desktop login works but mobile shows various timeout behaviors.
Reporter: alex.kim@company.com"""),
    
    ("12343", """Bug Report #12343
Status: Critical
Component: Authentication Service
Description: Mobile OAuth login fails with timeout after extended periods of inactivity.
Users report authentication issues specifically on mobile devices.
Reporter: emma.davis@company.com"""),
    
    ("12348", """Bug Report #12348
Status: Open
Component: Authentication Service  
Description: Login timeout occurs inconsistently across different mobile platforms.
OAuth2 authentication shows timeout behavior on iOS and Android.
Reporter: david.brown@company.com"""),
]

# Build unique IDs and documents
ids = ["bug_12345"]
docs = [target_doc]
metas = [{"source":"bug_tracker","path":"bug://12345","created_at": now}]

for bug_num, doc in distractors:
    ids.append(f"bug_{bug_num}")
    docs.append(doc)
    metas.append({"source":"bug_tracker","path":f"bug://{bug_num}","created_at": now})

demo.add(ids=ids, documents=docs, metadatas=metas)
print(f"Enterprise demo ready: {demo.count()} bug reports")
print("IDs:", ids)


  now = datetime.utcnow().isoformat()


Enterprise demo ready: 6 bug reports
IDs: ['bug_12345', 'bug_12346', 'bug_12344', 'bug_12347', 'bug_12343', 'bug_12348']


In [71]:
# Step 1: Test vector-only behavior with ask_rag on enterprise bug collection
QUERY = "Show me bug report #12345"

print("=== STEP 1: Vector-Only Retrieval with ask_rag ===")
print(f"Query: '{QUERY}'")
print("Collection: enterprise_bug_demo")
print()

# Use ask_rag with the demo collection (vector-only, as built in Chapter 4)
response = ask_rag(
    query="Show me bug report #12345",
    collection_name="enterprise_bug_demo",
    debug=True
)

print("\n" + "="*60)
print("FINAL ANSWER:")
print(response)


=== STEP 1: Vector-Only Retrieval with ask_rag ===
Query: 'Show me bug report #12345'
Collection: enterprise_bug_demo

QUERY: 'Show me bug report #12345'
STATUS: Reusing existing ChromaDB client
COLLECTION: Connected to enterprise_bug_demo

STEP 1: Searching for documents similar to: 'Show me bug report #12345'
RESULTS: Found 2 documents:
  1. ID: doc_0 | Similarity: -0.500 | Preview: Bug Report #12343
Status: Critical
Component: Authentication...
  2. ID: doc_1 | Similarity: -0.508 | Preview: Bug Report #12347
Status: In Progress
Component: Mobile App
...

CONTEXT: Combined 2 documents into context
CONTEXT LENGTH: 497 characters

FULL CONTEXT:
----------------------------------------
Bug Report #12343
Status: Critical
Component: Authentication Service
Description: Mobile OAuth login fails with timeout after extended periods of inactivity.
Users report authentication issues specifically on mobile devices.
Reporter: emma.davis@company.com

Bug Report #12347
Status: In Progress
Component

In [72]:
# Hybrid-enabled ask_rag() with clear, step-by-step comments
# Why: Vector similarity is great for meaning, but weak for exact identifiers (e.g., "Bug #12345").
# Fix: Blend vector scores with a lightweight keyword score, then re-rank.

import math
import re
from collections import defaultdict
from typing import List, Tuple

import chromadb  # used to connect to the persistent Chroma DB

def _tokenize(text: str) -> List[str]:
    """
    Very simple tokenizer:
    - Lowercase
    - Strip common punctuation
    - Split on whitespace
    This is enough for a keyword-style signal (BM25-like intuition) inside a notebook.
    """
    return [t.lower().strip(".,:;!?()[]{}\"'#") for t in (text or "").split() if t.strip()]

def _keyword_scores_idf(query: str, docs: List[str]) -> List[float]:
    """
    Compute a lightweight keyword score per document:
    - IDF-style weighting over the current candidate pool (no corpus-wide stats needed)
    - Score = IDF-weighted token overlap with the query
    - Normalize by sqrt(unique_terms) to avoid over-rewarding long texts
    - Finally normalize scores to [0,1] for blending with vector scores
    """
    # Build document frequency across candidates
    df = defaultdict(int)
    doc_token_sets = []
    for d in docs:
        tokset = set(_tokenize(d))
        doc_token_sets.append(tokset)
        for t in tokset:
            df[t] += 1

    N = max(len(docs), 1)
    idf = {t: math.log((N + 1) / (df[t] + 1)) + 1.0 for t in df}  # smoothed IDF

    q_tokens = set(_tokenize(query))
    scores = []
    for tokset in doc_token_sets:
        if not tokset:
            scores.append(0.0)
            continue
        # IDF-weighted overlap with the query tokens
        token_score = sum(idf.get(t, 1.0) for t in q_tokens if t in tokset)
        # Normalize by document uniqueness
        unique = max(len(tokset), 1)
        score = token_score / math.sqrt(unique)
        scores.append(score)

    # Normalize to [0,1] to make it blendable with vector scores
    max_s = max(scores) if scores else 0.0
    if max_s > 0:
        scores = [s / max_s for s in scores]
    return scores

def _vector_candidates(collection, query: str, n_initial: int) -> Tuple[List[str], List[dict], List[float]]:
    """
    Retrieve a wider candidate pool via vector similarity, then convert distances to normalized scores.
    - We bring more than final top-k so that keyword re-scoring has something to re-rank.
    - Vector score = 1 / (1 + distance), then normalized to [0,1].
    """
    res = collection.query(
        query_texts=[query],
        n_results=min(n_initial, max(collection.count(), 1)),
        include=["documents", "metadatas", "distances"],
    )
    docs = (res.get("documents") or [[]])[0]
    metas = (res.get("metadatas") or [[]])[0]
    dists = (res.get("distances") or [[]])[0]

    # Convert distances to scores (higher=better), then normalize
    v = [(1.0 / (1.0 + d)) if isinstance(d, (int, float)) else 0.0 for d in dists]
    max_v = max(v) if v else 0.0
    if max_v > 0:
        v = [x / max_v for x in v]
    return docs, metas, v

def ask_rag(
    query: str,
    db_path=None,
    collection_name="book_collection",
    debug=True,
    enable_hybrid=False,     # NEW: toggle hybrid on/off
    hybrid_weight=0.35,      # NEW: 0.0=keyword-only, 1.0=vector-only; for IDs/citations keep ~0.2–0.5
    n_initial=30,            # NEW: candidate pool size for re-ranking
):
    """
    Retrieve context (vector-only or hybrid) and generate an answer with the existing LLM call (ask_ai).
    - Vector-only path: same behavior as Chapter 4.
    - Hybrid path: vector candidate pool + keyword scoring + blended re-ranking.
    """
    # Resolve shared DB path (consistent with the rest of the notebook)
    if db_path is None:
        from pathlib import Path
        repo_root = Path().cwd()
        while not (repo_root / 'utils').exists() and repo_root.parent != repo_root:
            repo_root = repo_root.parent
        db_path = str(repo_root / 'data' / 'chroma_db')

    if debug:
        print(f"QUERY: '{query}'")
        print("=" * 50)

    # Connect to Chroma (reuse global client if present)
    try:
        if 'chroma_client' not in globals():
            chroma_client = chromadb.PersistentClient(path=db_path)
            if debug: print("STATUS: Created new ChromaDB client")
        else:
            chroma_client = globals()['chroma_client']
            if debug: print("STATUS: Reusing existing ChromaDB client")
    except ValueError:
        chromadb.reset()
        chroma_client = chromadb.PersistentClient(path=db_path)
        if debug: print("STATUS: Reset and created new ChromaDB client")

    collection = chroma_client.get_or_create_collection(name=collection_name)
    if debug: print(f"COLLECTION: Connected to {collection_name}")

    # Retrieve: vector-only (original) or hybrid (vector + keyword re-ranking)
    if not enable_hybrid:
        # Original behavior: small top-k directly from vector search
        if debug: print(f"\nSTEP 1 (Vector-only): Searching for documents similar to: '{query}'")
        res = collection.query(
            query_texts=[query],
            n_results=2,
            include=["documents", "distances", "metadatas"],
        )
        retrieved_documents = res["documents"][0]
        distances = res["distances"][0]
        metadatas = res.get("metadatas", [{}] * len(retrieved_documents))[0]
        if debug:
            print(f"RESULTS: Found {len(retrieved_documents)} documents:")
            for i, (d, doc, md) in enumerate(zip(distances, retrieved_documents, metadatas)):
                doc_id = (md or {}).get("path") or (md or {}).get("id", f"doc_{i}")
                print(f"  {i+1}. ID: {doc_id} | Similarity: {1-d:.3f} | Preview: {doc[:60]}...")
    else:
        # Hybrid behavior: get a wider candidate pool, compute keyword scores, blend, and re-rank
        if debug: print(f"\nSTEP 1 (Hybrid): Vector candidate pool + keyword re-scoring")
        docs, metas, v_scores = _vector_candidates(collection, query, n_initial=n_initial)
        k_scores = _keyword_scores_idf(query, docs)
        # Blend two signals: hybrid = w * vector + (1-w) * keyword
        h_scores = [hybrid_weight * vs + (1.0 - hybrid_weight) * ks for vs, ks in zip(v_scores, k_scores)]
        # Re-rank by blended score and take top-2 (to mirror Chapter 4 context size)
        order = sorted(range(len(docs)), key=lambda i: h_scores[i], reverse=True)
        top_idx = order[:2]
        retrieved_documents = [docs[i] for i in top_idx]
        metadatas = [metas[i] for i in top_idx]
        if debug:
            print(f"RESULTS: Re-ranked top {len(retrieved_documents)} by hybrid score")
            for rank, i in enumerate(top_idx, 1):
                md = metadatas[rank-1] or {}
                doc_id = md.get("path") or md.get("id", f"doc_{i}")
                print(f"  {rank}. ID: {doc_id} | v={v_scores[i]:.3f} kw={k_scores[i]:.3f} mix={h_scores[i]:.3f} | Preview: {docs[i][:60]}...")

    # Assemble context exactly as before (keep downstream prompt compatible)
    context = "\n\n".join(retrieved_documents)
    if debug:
        print(f"\nCONTEXT: Combined {len(retrieved_documents)} documents into context")
        print(f"CONTEXT LENGTH: {len(context)} characters")
        print("\nFULL CONTEXT:")
        print("-" * 40)
        print(context)
        print("-" * 40)

    # Build the same grounded prompt as Chapter 4
    if debug: print(f"\nSTEP 2: Constructing prompt with context and query")
    prompt = f"""
    You are an expert assistant. Use the following retrieved context to answer the user's question.
    If the answer is not in the context, state that you cannot find the information.
    Do not use any other information.

    <context>
    {context}
    </context>

    <question>
    {query}
    </question>

    Answer:
    """

    if debug:
        print(f"PROMPT LENGTH: {len(prompt)} characters")
        print("\nFULL PROMPT BEING SENT TO LLM:")
        print("=" * 60)
        print(prompt)
        print("=" * 60)

    # Call the model through the same helper used in Chapter 4
    if debug: print(f"\nSTEP 3: Sending prompt to LLM")
    response = ask_ai(prompt)

    if debug:
        print(f"\nLLM RESPONSE: {response}")
        print("=" * 50)

    return response


In [73]:
# Compare vector-only vs hybrid on the same collection/query
# Why this cell: Make the effect visible. First run the vector-only behavior (baseline),
# then run the hybrid path with a modest lexical tilt appropriate for identifiers.

QUERY = "Show me bug report #12345"
COLLECTION = "enterprise_bug_demo"

print("=== Vector-only (baseline) ===")
_ = ask_rag(
    query=QUERY,
    collection_name=COLLECTION,
    debug=True,            # show exactly what was retrieved and prompted
    enable_hybrid=False,   # use Chapter 4 behavior
)

print("\n\n=== Hybrid (enable_hybrid=True, hybrid_weight=0.35, n_initial=30) ===")
_ = ask_rag(
    query=QUERY,
    collection_name=COLLECTION,
    debug=True,            # show retrieval and blended scores
    enable_hybrid=True,    # turn on hybrid
    hybrid_weight=0.35,    # lean lexical a bit for exact IDs/citations
    n_initial=30,          # pull a wider candidate pool for re-ranking
)


=== Vector-only (baseline) ===
QUERY: 'Show me bug report #12345'
STATUS: Reusing existing ChromaDB client
COLLECTION: Connected to enterprise_bug_demo

STEP 1 (Vector-only): Searching for documents similar to: 'Show me bug report #12345'
RESULTS: Found 2 documents:
  1. ID: bug://12343 | Similarity: -0.500 | Preview: Bug Report #12343
Status: Critical
Component: Authentication...
  2. ID: bug://12347 | Similarity: -0.508 | Preview: Bug Report #12347
Status: In Progress
Component: Mobile App
...

CONTEXT: Combined 2 documents into context
CONTEXT LENGTH: 497 characters

FULL CONTEXT:
----------------------------------------
Bug Report #12343
Status: Critical
Component: Authentication Service
Description: Mobile OAuth login fails with timeout after extended periods of inactivity.
Users report authentication issues specifically on mobile devices.
Reporter: emma.davis@company.com

Bug Report #12347
Status: In Progress
Component: Mobile App
Description: Authentication timeout issues report

## Reranking (improve ordering after retrieval)

In [74]:
# If needed: !pip install chromadb --quiet

import chromadb
from pathlib import Path

# Locate repo root and shared DB path used by this notebook
repo_root = Path().cwd()
while not (repo_root / 'utils').exists() and repo_root.parent != repo_root:
    repo_root = repo_root.parent

SHARED_DB = repo_root / 'data' / 'chroma_db'
SHARED_DB.mkdir(parents=True, exist_ok=True)

chroma_client = chromadb.PersistentClient(path=str(SHARED_DB))
collection = chroma_client.get_or_create_collection(name="book_collection")

print("DB:", SHARED_DB)
print("Collection:", collection.name, "Count:", collection.count())


DB: /Users/relhousieny/code/personal/books/data-strategy-book/27July2025/.public_repo/data/chroma_db
Collection: book_collection Count: 16


In [75]:
# Seed 3–4 short policy passages
docs = [
    "Clause 4.2: Data retention is 90 days for audit logs. Exceptions require compliance approval.",
    "Clause 5.1: Storage encryption must meet AES-256 standards.",
    "Overview: Our policy covers data retention, encryption, and access control.",
    "Clause 4.3: Archival data is moved to cold storage after 180 days."
]
ids = [f"policy_demo_{i}" for i in range(len(docs))]
metas = [
    {"doc_type": "policy", "source": "policy.pdf", "section": "4.2", "page": 12},
    {"doc_type": "policy", "source": "policy.pdf", "section": "5.1", "page": 20},
    {"doc_type": "policy", "source": "policy.pdf", "section": "overview", "page": 1},
    {"doc_type": "policy", "source": "policy.pdf", "section": "4.3", "page": 13},
]

existing = set(collection.get(ids=ids).get("ids", []))
to_add = [(d, i, m) for d, i, m in zip(docs, ids, metas) if i not in existing]

if to_add:
    collection.add(
        documents=[d for d, _, _ in to_add],
        ids=[i for _, i, _ in to_add],
        metadatas=[m for _, _, m in to_add],
    )
    print(f"Added {len(to_add)} policy items. Total:", collection.count())
else:
    print("Policy demo items already exist. Total:", collection.count())


Policy demo items already exist. Total: 16


In [76]:
# Adapter: Chroma -> ask_rag retriever schema (compatible with your Chroma)
from typing import List, Dict
Passage = Dict[str, object]

def make_chroma_retriever(collection):
    def _retriever(query: str, k: int = 30) -> List[Passage]:
        # Remove "ids" from include (not supported in your version)
        res = collection.query(
            query_texts=[query],
            n_results=k,
            include=["documents", "metadatas", "distances"]  # safe set
        )
        docs  = res.get("documents", [[]])[0]
        metas = res.get("metadatas", [[]])[0]
        out: List[Passage] = []
        for i, text in enumerate(docs):
            if not text:
                continue
            meta = dict(metas[i] or {}) if i < len(metas) else {}
            # If you stored an id in metadata during ingestion it will be here; otherwise skip
            out.append({"text": text, "meta": meta})
        return out
    return _retriever


In [77]:
my_retriever = make_chroma_retriever(collection)

test_hits = my_retriever("data retention", k=5)
print("Retrieved:", len(test_hits))
print("Sample:", test_hits[0] if test_hits else None)


Retrieved: 5
Sample: {'text': 'Overview: Our policy covers data retention, encryption, and access control.', 'meta': {'doc_type': 'policy', 'page': 1, 'section': 'overview', 'source': 'policy.pdf'}}


In [78]:
def top1(res): 
    return res["passages"][0]["text"][:160] if res.get("passages") else ""

k = min(getattr(collection, "count", lambda: 12)(), 12)

out_no = ask_rag("What does clause 4.2 say about data retention?",
                 retriever=my_retriever, k=k, enable_rerank=False,
                 keep_top_n=3, generator=None)

out_yes = ask_rag("What does clause 4.2 say about data retention?",
                  retriever=my_retriever, k=k, enable_rerank=True,
                  rerank_k=k, keep_top_n=3, generator=None)

print("Top‑1 without rerank:\n", top1(out_no))
print("\nTop‑1 with rerank:\n", top1(out_yes))


TypeError: ask_rag() got an unexpected keyword argument 'retriever'

In [None]:
def debug_rerank(query, retriever, k=12, rerank_k=12, keep_top_n=3):
    pool = retriever(query, k=k)
    texts = [p["text"] for p in pool]
    scores = bge_reranker(query, texts)  # None if package/model missing
    print("pool:", len(pool), "scores_none:", scores is None)
    if scores:
        ranked = sorted(zip(range(len(pool)), pool, scores), key=lambda x: x[2], reverse=True)
        print("Top-5 by reranker:")
        for i, (idx, item, s) in enumerate(ranked[:5], 1):
            txt = item['text'][:80].replace('\n',' ')
            print(f"{i}. score={s:.3f}  text={txt}")


In [None]:
debug_rerank("What does clause 4.2 say about data retention?", my_retriever, k=k, rerank_k=k)


pool: 12 scores_none: True


In [None]:
# Adapter: Chroma -> ask_rag retriever schema
from typing import List, Dict
Passage = Dict[str, object]

def make_chroma_retriever(collection):
    def _retriever(query: str, k: int = 30) -> List[Passage]:
        res = collection.query(query_texts=[query], n_results=k, include=["documents","metadatas","ids"])
        docs = res.get("documents", [[]])[0]
        metas = res.get("metadatas", [[]])[0]
        ids   = res.get("ids", [[]])[0]
        out: List[Passage] = []
        for i, text in enumerate(docs):
            if not text: 
                continue
            meta = dict(metas[i] or {}) if i < len(metas) else {}
            if i < len(ids): meta.setdefault("id", ids[i])
            out.append({"text": text, "meta": meta})
        return out
    return _retriever

# Bind to the already-open 'collection' from the notebook
my_retriever = make_chroma_retriever(collection)


NameError: name 'collection' is not defined

In [None]:
# Minimal types to keep things clear
from typing import List, Dict, Callable, Optional, Tuple

# A passage is just text plus optional metadata (to keep citations)
Passage = Dict[str, object]  # expects at least: {"text": str, ...}

# ---- Optional local reranker (open-source) ----
# Uses BGE Reranker if installed; otherwise returns None to skip rerank.
# pip install -U FlagEmbedding   # (run once in your notebook)
_bge_model_cache = {"model": None}

def bge_reranker(query: str, passages: List[str]) -> Optional[List[float]]:
    """
    Scores [query, passage] pairs using BGE Reranker if available.
    Returns a list of floats (one score per passage) or None if not available.
    """
    try:
        from FlagEmbedding import FlagReranker
        if _bge_model_cache["model"] is None:
            # 'base' is lighter; you can also try 'BAAI/bge-reranker-large'
            _bge_model_cache["model"] = FlagReranker("BAAI/bge-reranker-base", use_fp16=True)
        model = _bge_model_cache["model"]
        pairs = [[query, p] for p in passages]
        scores = model.compute_score(pairs)  # higher is better
        # Ensure it's a plain list[float]
        return [float(s) for s in (scores if isinstance(scores, list) else [scores])]
    except Exception:
        return None  # if package/model unavailable, fail safe

# ---- Tiny helpers ----
def assemble_context(passages: List[Passage], sep: str = "\n\n---\n\n") -> str:
    """
    Joins top passages for the prompt. Keeps basic metadata if present.
    """
    blocks = []
    for p in passages:
        t = str(p.get("text", ""))
        src = p.get("meta") or p.get("source") or p.get("id")
        if src:
            blocks.append(f"[Source: {src}]\n{t}")
        else:
            blocks.append(t)
    return sep.join(blocks)

# ---- Main function with rerank ----
def ask_rag(
    query: str,
    retriever: Callable[[str, int], List[Passage]],  # your Chapter 4 retrieval function
    k: int = 30,                 # initial retrieval pool
    enable_rerank: bool = True,  # toggle rerank
    rerank_k: int = 30,          # how many to score (<= k)
    keep_top_n: int = 5,         # final top-N
    reranker: Optional[Callable[[str, List[str]], Optional[List[float]]]] = bge_reranker,
    generator: Optional[Callable[[str, str], str]] = None,  # optional LLM answerer
) -> Dict[str, object]:
    """
    Retrieve -> (optional) rerank -> assemble context -> (optional) generate answer.
    Returns dict with 'context' and 'passages', plus 'answer' if generator provided.
    """
    # 1) Retrieve candidates (vector or hybrid)—must return list of dicts with "text"
    candidates = retriever(query, k=k) or []
    # Normalize minimal fields
    candidates = [c for c in candidates if isinstance(c, dict) and "text" in c]
    if not candidates:
        result = {"answer": "", "context": "", "passages": []}
        return result if generator else {"context": "", "passages": []}

    # 2) Optional rerank on a modest pool
    top_passages = candidates[:keep_top_n]
    if enable_rerank and len(candidates) > 1:
        pool = candidates[:min(rerank_k, len(candidates))]
        texts = [str(c["text"]) for c in pool]
        scores = (reranker(query, texts) if reranker else None)

        if scores and len(scores) == len(pool):
            # Sort by reranker score (descending)
            ranked: List[Tuple[Passage, float]] = sorted(
                zip(pool, scores),
                key=lambda x: x[1],
                reverse=True
            )
            top_passages = [c for c, _ in ranked[:keep_top_n]]
        else:
            # Fail safe: keep original order (no rerank)
            top_passages = candidates[:keep_top_n]

    # 3) Build context for prompting
    context = assemble_context(top_passages)

    # 4) Optionally generate an answer with your LLM of choice
    if generator is not None:
        try:
            answer = generator(query, context)
        except Exception:
            answer = ""
        return {"answer": answer, "context": context, "passages": top_passages}

    # If no generator supplied, return context + passages so you can inspect
    return {"context": context, "passages": top_passages}


In [None]:
# Example adapter for your existing Chapter 4 retrieval
def my_retriever(query: str, k: int = 30) -> List[Passage]:
    # Return a list of {"text": str, ...}. You can include {"meta": {...}} for citations.
    # Replace this with your actual retrieval (vector or hybrid).
    results = []  # <- fill from your DB/search call
    # results should look like: [{"text": hit.text, "meta": {"id": hit.id, "source": hit.src}}, ...]
    return results

# Optional: your LLM answerer
def my_generator(query: str, context: str) -> str:
    # Replace with your OpenAI call or whichever model you use
    # Return a plain string answer
    return f"(demo) I would answer '{query}' using this context:\n{context[:300]}..."


In [None]:
out = ask_rag(
    query="What does clause 4.2 say about data retention?",
    retriever=my_retriever,
    k=30,
    enable_rerank=True,
    rerank_k=30,
    keep_top_n=5,
    reranker=bge_reranker,  # uses BGE locally if installed; skip if not
    generator=my_generator   # optional
)

print("Context:\n", out["context"][:800])
if "answer" in out:
    print("\nAnswer:\n", out["answer"])


Context:
 

Answer:
 


In [None]:
q = "What does clause 4.2 say about data retention?"
cands = my_retriever(q, k=30)
print("retrieved:", 0 if cands is None else len(cands))
print("sample item:", cands[0] if cands else None)

# Must be list of dicts with at least a 'text' key
if cands:
    print("has text key:", isinstance(cands[0], dict) and ("text" in cands[0]))


retrieved: 0
sample item: None


## Putting It Together — Mini Demos

This section adds four small, self-contained demos with simple trace output to illustrate key techniques:
- Two-hop agent loop with citations
- Hybrid + rerank ordering change (observe top-5)
- Recency-sensitive ranking (time-decay blend)
- PDF figure/caption question that returns the correct figure reference

In [79]:
# Mini Demo Utilities: simple JSONL trace logger
import json, time
from pathlib import Path

def log_trace(event: dict, logfile: str = 'chapter5_traces.jsonl') -> None:
    ev = dict(event)
    ev['ts'] = ev.get('ts') or time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
    p = Path(logfile)
    with p.open('a', encoding='utf-8') as f:
        print(json.dumps(ev, ensure_ascii=False), file=f)
    print(f'Trace written: {p.resolve()}')


In my experience, the best way to make retrieval demos teachable is to show what actually happened step by step. The `log_trace()` helper gives us a zero-dependency way to record those steps as JSON lines. Each time we call `log_trace({...})`, it appends one JSON object to `chapter5_traces.jsonl` (same folder as this notebook), adds a UTC timestamp `ts`, and returns quietly. This way, we can later inspect agent hops, retrieval variants, reranker decisions, and figure matches without cluttering the notebook output. It’s simple, robust, and works both locally and in Colab because it’s just file I/O.

### Results Explanation
- The function writes one line per event (JSONL). That makes it easy to stream, grep, or parse later.
- Fields are whatever you pass in `event` plus an automatic `ts`.
- You’ll see a brief confirmation printed with the absolute path to the trace file.

### Context
- Use this with the mini demos to capture internal steps:
  - Agent loop: hop number, sub‑query, retrieved ids.
  - Hybrid/rerank: which mode ran, top‑k preview, score info.
  - Recency demo: parameters (alpha, half‑life) and resulting order.
  - Figure/caption: matched figure id, page, and match score.
- This aligns with the observability theme in Chapter 5: we don’t just return answers; we preserve the reasoning trail.

### Next Steps
- Call `log_trace({...})` at the key checkpoints in your demo code.
- Keep event names consistent (for example, `agent_hop`, `agent_stop`, `hybrid_demo_run`, `figure_match`).
- If the file grows too large while experimenting, delete `chapter5_traces.jsonl` and re-run cells to generate a fresh trace.

### Mini Demo 1: Two-hop agent loop with citations

A lightweight loop that Plans → Retrieves → Synthesizes → Decides. It logs each hop and returns citations.

I’ve kept this agent loop intentionally small because the goal is to show the pattern without hiding behind complexity. We start with `DEMO_SNIPPETS` as a tiny in‑memory corpus, use `retrieve_snippets()` for quick lexical hits (token overlap), then `synthesize_answer()` builds a concise answer with clear citations. The `agent_loop()` runs a few hops: it plans a sub‑query, retrieves evidence, accumulates context, and decides whether to stop when we’ve got enough citations. Along the way, we record exactly what happened with `log_trace()` so you can inspect the reasoning trail later instead of guessing.

In [80]:
from typing import List, Dict, Any, Optional

# Simple in-memory snippets to keep the demo self-contained
DEMO_SNIPPETS = [
    {"id": "doc:A1", "text": "Service X outage on 2024-07-12 impacted region us-west.", "source": "status_page", "page": 1},
    {"id": "doc:B2", "text": "Mitigation steps included failover and cache flush.", "source": "postmortem", "page": 2},
    {"id": "doc:C3", "text": "Root cause: stale credentials in deployment pipeline.", "source": "postmortem", "page": 3},
    {"id": "doc:D4", "text": "Credential rotation policy updated in clause 4.2.", "source": "policy", "page": 7},
]

# Tiny retrieval that ranks by token overlap
def retrieve_snippets(query: str, k: int = 3) -> List[Dict[str, Any]]:
    q = set(t.lower() for t in query.split())
    scored = []
    for s in DEMO_SNIPPETS:
        t = set(s['text'].lower().split())
        score = len(q & t)
        if score:
            scored.append((score, s))
    scored.sort(key=lambda x: x[0], reverse=True)
    return [s for _, s in scored[:k]]

# Minimal synthesis with citations
def synthesize_answer(question: str, snippets: List[Dict[str, Any]]) -> Dict[str, Any]:
    if not snippets:
        return {"answer": "Insufficient evidence to answer.", "citations": []}
    lines = [f"- {s['text']} [source={s['source']}, id={s['id']}, page={s.get('page','?')}]" for s in snippets]
    answer = "Based on the retrieved evidence, here is a concise answer.\n" + \
             "\n".join(lines)
    citations = [{"id": s['id'], "source": s['source'], "page": s.get('page')} for s in snippets]
    return {"answer": answer, "citations": citations}

# Agent loop

def agent_loop(query: str, max_hops: int = 3, min_citations: int = 2) -> Dict[str, Any]:
    trace = {"type": "agent_loop", "query": query, "hops": []}
    accumulated: List[Dict[str, Any]] = []
    subq: Optional[str] = query

    for hop in range(1, max_hops + 1):
        plan = f"Hop {hop}: retrieve evidence related to: '{subq}'"
        retrieved = retrieve_snippets(subq, k=3)
        accumulated.extend([r for r in retrieved if r not in accumulated])

        synth = synthesize_answer(query, accumulated)
        hop_rec = {
            "hop": hop,
            "plan": plan,
            "sub_query": subq,
            "retrieved_ids": [r['id'] for r in retrieved],
            "total_citations": len(synth['citations']),
        }
        trace['hops'].append(hop_rec)
        log_trace({"event": "agent_hop", **hop_rec})

        # Stop condition: enough citations or no new evidence
        if synth['citations'] and len(synth['citations']) >= min_citations:
            trace['stop_reason'] = "enough_citations"
            log_trace({"event": "agent_stop", "reason": trace['stop_reason'], "hop": hop})
            return {"answer": synth['answer'], "citations": synth['citations'], "trace": trace}

        # Simple next-step refinement (toy): focus on policy if policy not seen
        if not any(r['source'] == 'policy' for r in accumulated):
            subq = "policy credential rotation clause"
        else:
            subq = None

        if not subq:
            trace['stop_reason'] = "no_next_step"
            log_trace({"event": "agent_stop", "reason": trace['stop_reason'], "hop": hop})
            return {"answer": synth['answer'], "citations": synth['citations'], "trace": trace}

    trace['stop_reason'] = "max_hops"
    log_trace({"event": "agent_stop", "reason": trace['stop_reason'], "hop": max_hops})
    final = synthesize_answer(query, accumulated)
    return {"answer": final['answer'], "citations": final['citations'], "trace": trace}

# Quick run
agent_result = agent_loop("Was the outage due to credentials? Provide sources.")
print(agent_result['answer'])
print("Citations:", agent_result['citations'])


Trace written: /Users/relhousieny/code/personal/books/data-strategy-book/27July2025/.public_repo/chapter_05/Jupyter_Notebooks/chapter5_traces.jsonl
Trace written: /Users/relhousieny/code/personal/books/data-strategy-book/27July2025/.public_repo/chapter_05/Jupyter_Notebooks/chapter5_traces.jsonl
Trace written: /Users/relhousieny/code/personal/books/data-strategy-book/27July2025/.public_repo/chapter_05/Jupyter_Notebooks/chapter5_traces.jsonl
Based on the retrieved evidence, here is a concise answer.
- Service X outage on 2024-07-12 impacted region us-west. [source=status_page, id=doc:A1, page=1]
- Credential rotation policy updated in clause 4.2. [source=policy, id=doc:D4, page=7]
Citations: [{'id': 'doc:A1', 'source': 'status_page', 'page': 1}, {'id': 'doc:D4', 'source': 'policy', 'page': 7}]


### Results Explanation
- You’ll see a final printed answer followed by “Citations:” containing `id`, `source`, and `page`.  
- Each hop logs an event via `log_trace()` (e.g., `agent_hop`, `agent_stop`) with the sub‑query and retrieved IDs, which makes the process auditable.

### Context
- This is a minimal two‑hop agent pattern meant for teaching: plan → retrieve → synthesize → decide.  
- The retrieval is intentionally simple (lexical overlap) so the focus stays on the loop and traceability, not on embeddings or DB setup.

### Next Steps
- Swap `retrieve_snippets()` for your real retriever (vector, hybrid, rerank) while keeping the same `agent_loop()` shape.  
- Add small guardrails: hop budget, max context length, and a simple “no progress” detector.  
- Enrich `log_trace()` events with `query_id`, collection names, and timing to feed observability later.

### Mini Demo 2: Hybrid + Rerank (observe top-5)

Re-run the hybrid demo and show how ordering changes. If the hybrid-enabled `ask_rag()` is present, we'll call it with flags and print the top-5.

In [81]:
# Attempt to call previously defined hybrid-enabled ask_rag
try:
    QUERY = "Show me bug report #12345"
    COLLECTION = "enterprise_bug_demo"

    print("Vector-only (top-5 preview):")
    _ = ask_rag(query=QUERY, collection_name=COLLECTION, debug=True, enable_hybrid=False, n_initial=20)

    print("\nHybrid enabled (top-5 preview):")
    _ = ask_rag(query=QUERY, collection_name=COLLECTION, debug=True, enable_hybrid=True, hybrid_weight=0.35, n_initial=30)
except Exception as e:
    print("Hybrid demo skipped:", e)


Vector-only (top-5 preview):
QUERY: 'Show me bug report #12345'
STATUS: Reusing existing ChromaDB client
COLLECTION: Connected to enterprise_bug_demo

STEP 1 (Vector-only): Searching for documents similar to: 'Show me bug report #12345'
RESULTS: Found 2 documents:
  1. ID: bug://12343 | Similarity: -0.500 | Preview: Bug Report #12343
Status: Critical
Component: Authentication...
  2. ID: bug://12347 | Similarity: -0.508 | Preview: Bug Report #12347
Status: In Progress
Component: Mobile App
...

CONTEXT: Combined 2 documents into context
CONTEXT LENGTH: 497 characters

FULL CONTEXT:
----------------------------------------
Bug Report #12343
Status: Critical
Component: Authentication Service
Description: Mobile OAuth login fails with timeout after extended periods of inactivity.
Users report authentication issues specifically on mobile devices.
Reporter: emma.davis@company.com

Bug Report #12347
Status: In Progress
Component: Mobile App
Description: Authentication timeout issues reported

### Mini Demo 3: Recency-sensitive ranking

Blend semantic score with a time-decay factor to prioritize fresher content.

In [82]:
from datetime import datetime, timedelta

# Simple utilities

def time_decay(age_days: float, half_life_days: float = 30.0) -> float:
    # Exponential decay: value halves every half_life_days
    import math
    return math.pow(0.5, age_days / max(1e-9, half_life_days))


def recency_blend(sem_score: float, ts: datetime, now: datetime, half_life_days: float = 30.0, alpha: float = 0.7) -> float:
    age_days = (now - ts).days + (now - ts).seconds / 86400.0
    rec = time_decay(age_days, half_life_days)
    return alpha * sem_score + (1 - alpha) * rec

# In-memory items: (title, sem_score, timestamp)
now = datetime.utcnow()
items = [
    {"id": "news1", "title": "Policy updated", "sem": 0.82, "ts": now - timedelta(days=90)},
    {"id": "news2", "title": "Hotfix released", "sem": 0.80, "ts": now - timedelta(days=2)},
    {"id": "news3", "title": "Guidance note", "sem": 0.85, "ts": now - timedelta(days=45)},
    {"id": "news4", "title": "FAQ refreshed", "sem": 0.78, "ts": now - timedelta(days=1)},
]

print("Baseline (semantic only):")
for r in sorted(items, key=lambda x: x['sem'], reverse=True)[:5]:
    print(f"  {r['id']} | sem={r['sem']:.2f} | age={(now-r['ts']).days}d")

print("Recency-blended:")
for r in sorted(items, key=lambda x: recency_blend(r['sem'], r['ts'], now), reverse=True)[:5]:
    print(f"  {r['id']} | sem={r['sem']:.2f} | age={(now-r['ts']).days}d")

log_trace({"event": "recency_demo_ran", "count": len(items)})


Baseline (semantic only):
  news3 | sem=0.85 | age=45d
  news1 | sem=0.82 | age=90d
  news2 | sem=0.80 | age=2d
  news4 | sem=0.78 | age=1d
Recency-blended:
  news1 | sem=0.82 | age=90d
  news2 | sem=0.80 | age=2d
  news3 | sem=0.85 | age=45d
  news4 | sem=0.78 | age=1d
Trace written: /Users/relhousieny/code/personal/books/data-strategy-book/27July2025/.public_repo/chapter_05/Jupyter_Notebooks/chapter5_traces.jsonl


  now = datetime.utcnow()


### Mini Demo 4: PDF figure/caption question → figure reference

Pragmatic approach without extra PDF dependencies: we use a small figure index (caption + page) and match by keywords.

In [83]:
from typing import List, Dict

def answer_figure_query(query: str, index: List[Dict]) -> Dict:
    q = set(query.lower().split())
    best = None
    best_score = -1
    for rec in index:
        caption_tokens = set(rec['caption'].lower().split())
        text_tokens = set(rec.get('text','').lower().split())
        score = len(q & (caption_tokens | text_tokens))
        if score > best_score:
            best_score = score
            best = rec
    if not best:
        return {"answer": "No matching figure found.", "figure": None}
    log_trace({"event": "figure_match", "figure_id": best['figure_id'], "page": best['page'], "score": best_score})
    return {
        "answer": f"Best match: {best['figure_id']} on page {best['page']}.",
        "figure": {"id": best['figure_id'], "page": best['page'], "caption": best['caption']}
    }

FIGURE_INDEX = [
    {"figure_id": "Fig 5.3", "page": 12, "caption": "Search step inside retrieval blending semantic vectors with lexical BM25", "text": "hybrid retrieval pipeline search step"},
    {"figure_id": "Fig 5.6", "page": 21, "caption": "Reranker reorders candidates using cross-encoder scores", "text": "semantic reranking top passages"},
    {"figure_id": "Fig 5.7", "page": 25, "caption": "Figure/caption-aware retrieval path for document questions", "text": "caption extraction flow"},
]

ans = answer_figure_query("Which figure shows the reranker improving ordering?", FIGURE_INDEX)
print(ans['answer'])


Trace written: /Users/relhousieny/code/personal/books/data-strategy-book/27July2025/.public_repo/chapter_05/Jupyter_Notebooks/chapter5_traces.jsonl
Best match: Fig 5.6 on page 21.
