<a href="https://colab.research.google.com/github/hamzafarooq/multi-agent-course/blob/main/Module_3/Semantic_Cache/Semantic_cache_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**If you use our code, please cite:**

@misc{2024<br>
  title = {Semantic Cache from Scratch},<br>
  author = {Hamza Farooq, Darshil Modi, Kanwal Mehreen, Nazila Shafiei},<br>
  keywords = {Semantic Cache},<br>
  year = {2024},<br>
  copyright = {APACHE 2.0 license}<br>
}

## Semantic Cache

Semantic caching accelerates retrieval-augmented workflows by storing and reusing previous embedding-based lookups instead of issuing fresh queries every time. In this notebook, we’ll build a lightweight semantic cache from scratch using:

- **Nomic text embeddings** (`nomic-ai/nomic-embed-text-v1.5`) to convert documents and queries into dense vectors  
- **FAISS** (Facebook AI Similarity Search) to index and quickly search those vectors  
- A **ground-truth evaluation** dataset to measure cache hit/miss accuracy  
- **Traversaal Ares API** to fetch live data when cache misses require real-time information  

Rather than re-computing embeddings and retrieval for every query, our cache lets us:

1. **Embed** a corpus once and index it for fast L2 nearest-neighbor lookup  
2. **Embed** each new query and check if it’s already “covered” by a cached result  
3. **Fall back** to a full retrieval (and store the new result) only when necessary  
4. **Invoke** the Traversaal Ares API for live internet search when needed  

This approach reduces redundant compute, lowers end-to-end latency, and makes RAG pipelines more efficient—especially when query patterns exhibit repetition, temporal locality, or high similarity. We’ll walk through:

1. Loading the Nomic embed model with `trust_remote_code=True`  
2. Encoding a document set and building a FAISS index  
3. Loading a real-world ground-truth CSV for evaluation  
4. Implementing the core cache hit/miss logic  
5. Falling back to Traversaal Ares API for live data on cache misses  
6. Measuring performance gains against a “no-cache” baseline  

By the end, you’ll have a reusable semantic cache scaffold that you can plug into any RAG or search-over-embeddings pipeline. Let’s get started!  


## Setup and Dependencies

In [None]:
# Install the necessary libraries
!pip install -U faiss-cpu sentence_transformers transformers

In [None]:
# Import the necessary libraries

# FAISS for efficient similarity search over vector embeddings
import faiss  # Builds and queries approximate nearest neighbor indices

# Lightweight SQL database for caching metadata, query logs, or evaluation results
import sqlite3  # Persistence layer for storing cache entries or metrics

# SentenceTransformers wrapper around transformer models for text embeddings
from sentence_transformers import SentenceTransformer  # Loads Nomic/embed or other SBERT-style models

# PyTorch backend required by SentenceTransformer and optional model fine-tuning
import torch  # Tensor operations, GPU acceleration, and model inference support

# Transformers library components for causal LLM-based answer generation
from transformers import AutoModelForCausalLM, AutoTokenizer
#   - AutoModelForCausalLM: Load pretrained language models (e.g., GPT variants)
#   - AutoTokenizer: Tokenize text input/output for the LLM

# Core numerical library for array and matrix operations on embeddings
import numpy as np  # Handles vector math, concatenation, and statistical computations

# Pretty-printing complex Python objects during development/debugging
from pprint import pprint  # Nicely formats nested dicts or lists when exploring outputs


# Define prediction function (Using Traversaal Ares API)

Instead of using an LLM endpoint, we will be using Ares API for retrieval and generation, however you can replace is with your own rag function in 'generate answer' function

Traversaal Ares API is a cutting-edge solution designed to provide real-time search results generated from user queries. Leveraging advanced Large Language Models (LLMs), Ares connects to the internet to deliver accurate and factual information, including relevant URLs for reference. This API is tailored for speed and efficiency, providing lightning-fast search results within 3-4 seconds. Currently available for free during the beta phase, with priced solutions coming soon.

## Key Features:
- **Real-time Search Results:** Ares API offers unparalleled speed in generating search results.
- **Internet Connectivity:** Connects to the internet to fetch the latest and most accurate information.
- **Lightning-Fast Response:** Delivers search results with URLs in 3-4 seconds.
- **Free Beta Access:** Available for free during for the first 100 calls
- **Factual and Accurate:** Ensures the information provided is accurate and supported by relevant references. [Can make mistakes though]

## Getting Started:
To access the Ares API, sign up at [api.traversaal.ai](https://api.traversaal.ai) and refer to the usage documentation at [docs.traversaal.ai](https://docs.traversaal.ai/docs/intro).

Experience the future of AI-driven search with Traversaal Ares API!


In [None]:
from google.colab import userdata # Colab utility for securely storing/retrieving credentials
import requests # HTTP client for REST API calls

def make_prediction(data):
    """
    Send a text query to the Traversaal Ares live-predict endpoint and return parsed JSON.

    Args:
        data (str): The user query string to send for prediction.

    Returns:
        dict or None: Parsed JSON response from Ares API if successful, else None.

    Raises:
        RuntimeError: If ARES_API_KEY is missing.
    """
    url = "https://api-ares.traversaal.ai/live/predict"
    # Retrieve API key from Colab secure storage; fail fast if missing
    api_key = userdata.get("ARES_API_KEY")
    if not api_key:
        raise RuntimeError("Missing ARES_API_KEY in Colab userdata")
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json"
    }

    # The ARES API expects a JSON body of the form {"query": <text>}
    payload = {"query": data}

    try:
        response = requests.post(url, json=payload, headers=headers)
        # Check for HTTP 200 OK
        if response.status_code == 200:
            print("Request was successful.")
            try:
                 # If the response contains JSON data, you can parse it using response.json()
                return response.json()
            except ValueError:
                # Response text wasn’t valid JSON
                print("No JSON data in the response.")
                return None
        else:
            # Non-200 status; surface the code for debugging
            print(f"Request failed with status code {response.status_code}.")
            return None

    except requests.exceptions.RequestException as e:
        print(f"Error during request: {e}")
        return None


In [None]:
# Test the make_prediction function
response=make_prediction(['Events happening in London this week. '])
response

### Define SemanticCaching Class

In this cell we define `SemanticCaching`—a lightweight cache that:

1. Uses **FAISS** to index and lookup question embeddings (Euclidean distance).  
2. Leverages **Nomic Embed** (`nomic-ai/nomic-embed-text-v1.5`) to encode questions into vectors.  
3. Persists cache entries (questions, embeddings, answers) in a JSON file.  
4. Falls back to `make_prediction()` (via Traversaal Ares API) when no suitable cache hit is found.  
5. Measures and logs query latency for both hits and misses.  


In [None]:
import faiss            # Efficient similarity search over vector embeddings
import json             # Read/write cache from a JSON file
import numpy as np      # Numerical operations on embeddings
from sentence_transformers import SentenceTransformer  # Load Nomic embed model
from transformers import AutoTokenizer, AutoModelForCausalLM  # (Optional) LLM for answer gen
import time             # Measure latency

class SemanticCaching:

    def __init__(self, json_file='cache.json', clear_on_init=False):
        # Initialize Faiss index with Euclidean distance
        self.index = faiss.IndexFlatL2(768)  # Use IndexFlatL2 with Euclidean distance
        if self.index.is_trained:
            print('Index trained')

        # Initialize Sentence Transformer model
        self.encoder = SentenceTransformer('nomic-ai/nomic-embed-text-v1.5', trust_remote_code=True)


        # Uncomment the following lines to use DialoGPT for question generation
        # self.tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-large")
        # self.model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-large")

        # Euclidean distance threshold for cache hits (lower = more similar)
        self.euclidean_threshold = 0.2

        # JSON file to persist cache entries
        self.json_file = json_file

        # Load cache or clear already loaded cache
        if clear_on_init:
          self.clear_cache()
        else:
          self.load_cache()

    def clear_cache(self):
        """
        Clears in-memory cache, resets FAISS index, and overwrites cache.json with an empty structure.
        """
        self.cache = {
            'questions': [],
            'embeddings': [],
            'answers': [],
            'response_text': []
        }
        self.index = faiss.IndexFlatL2(768)  # Reinitialize FAISS index
        self.save_cache()
        print("Semantic cache cleared.")

    def load_cache(self):
        """Load existing cache or initialize empty structure."""
        try:
            with open(self.json_file, 'r') as file:
                self.cache = json.load(file)
        except FileNotFoundError:
          # Structure: lists of questions, embeddings, answers, and full response text
            self.cache = {'questions': [], 'embeddings': [], 'answers': [], 'response_text': []}

    def save_cache(self):
        """Persist cache back to disk."""
        with open(self.json_file, 'w') as file:
            json.dump(self.cache, file)

    def ask(self, question: str) -> str:
        """
        Returns a cached answer if within threshold, otherwise generates,
        caches, and returns a new answer.
        """
        start_time = time.time()
        try:
            # Encode the incoming question
            l = [question]
            # embedding = self.encoder.encode(l)
            embedding = self.encoder.encode(l, normalize_embeddings=True)

            # Search for the nearest neighbor in the index
            D, I = self.index.search(embedding, 1)

            # 3) If a neighbor exists and is within threshold → cache hit
            if D[0] >= 0:
                if I[0][0] != -1 and D[0][0] <= self.euclidean_threshold:
                    row_id = int(I[0][0])
                    print(f'Cache hit at row: {row_id} with score {1 - D[0][0]}') #score inversed to show similarity
                    print(f"Time taken: {time.time() - start_time:.3f}s")
                    return self.cache['response_text'][row_id]

            # Handle the case when there are not enough results or Euclidean distance is not met
            answer, response_text = self.generate_answer(question)

            # Append new entry to cache
            self.cache['questions'].append(question)
            self.cache['embeddings'].append(embedding[0].tolist())
            self.cache['answers'].append(answer)
            self.cache['response_text'].append(response_text)
            self.index.add(embedding)
            self.save_cache()
            print(f"Time taken: {time.time() - start_time:.3f}s")

            return response_text

        except Exception as e:
            raise RuntimeError(f"Error during 'ask' method: {e}")

    def generate_answer(self, question: str) -> str:
        """
        Always use the Traversaal Ares API for new answers.
        Returns (full API result dict, extracted response_text).
        """
        try:
            result = make_prediction([question])
            response_text = result['data']['response_text']

            return result, response_text
        except Exception as e:
            raise RuntimeError(f"Error during 'generate_answer' method: {e}")


In [None]:
# Instantiate the semantic cache: builds/loads FAISS index, encoder, and JSON cache
cache = SemanticCaching()

# Uncomment and use to re-instantiate the semantic cache and clear exisitng cache entries
# cache = SemanticCaching(clear_on_init=True)

### Testing the Semantic Cache

In this section, we validate the behavior of our `SemanticCaching` class using a small set of example questions. We’ll loop through three queries:


In [None]:
# First test question
question1 = "What is the capital of France?"
answer1 = cache.ask(question1)  # Cache miss: generates new answer via API and stores it
print(answer1)

# Second test question
question2 = "Who is the CEO of Apple?"
answer2 = cache.ask(question2)  # Cache miss: generates answer, stores embedding + response
print(answer2)

# Third test question
question3 = "Who is the CEO of Facebook?"
answer3 = cache.ask(question3)  # Cache hit or miss depends on similarity threshold
print(answer3)

# Fourth test question
question4 = "What is the capital of India?"
answer4 = cache.ask(question4)
print(answer4)

# Note:
# If question3 is found similar enough (within threshold) to question2, it returns cached answer2.
# Otherwise, it generates a fresh answer and adds it to the cache.

In [None]:
print(cache.ask("Can you tell me what is the Capital of India"))

In [None]:
print(cache.ask('Who is the CEO of Facebook?'))

In [None]:
print(cache.ask('Who is the current CEO of Google?'))

In [None]:
print(cache.ask('Is Sundar Pichai the CEO of Google?'))

In [None]:
print(cache.ask('Best local food spots in Edinburgh for a couple?'))

In [None]:
print(cache.ask('Best local food spots in Edinburgh?'))

In [None]:
print(cache.ask('Best local food spots in London?'))