#Initial Setups

In [1]:
!pip install -q pydantic evaluate transformers datasets accelerate bitsandbytes peft trl wandb
!pip install -q torch torchinfo sentence-transformers faiss-cpu chromadb whoosh
!pip install -q scikit-learn matplotlib seaborn pandas numpy
!pip install transformers

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.3/61.3 MB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m564.7/564.7 kB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.8/19.8 MB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m468.8/468.8 kB[0m [31m14.7 MB/s[0m eta [36

In [22]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Dict, Tuple, Optional, Any, Union
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict, Counter
import time
import math

# Pydantic for data validation
from pydantic import BaseModel, Field, field_validator, ConfigDict

from google.colab import userdata
HF_TOKEN= userdata.get('test_hf_1') # HF token


# HuggingFace ecosystem
from transformers import (
    AutoTokenizer,
    AutoModel,
    AutoModelForCausalLM,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer
)
from datasets import Dataset as HFDataset
import evaluate  # HF evaluate library for metrics

# PEFT for parameter-efficient fine-tuning
from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training

# TRL for RLHF and instruction tuning
from trl import SFTTrainer, DPOTrainer

# Accelerate for distributed training
from accelerate import Accelerator

# 8-bit optimization
import bitsandbytes as bnb

# IR libraries
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import faiss  # Facebook AI Similarity Search for efficient nearest neighbor

# Experiment tracking
import wandb

# Set device and seeds
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(42)
np.random.seed(42)
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

Using device: cpu


#Data Structures and Pydantic Validation

In [3]:
class Document(BaseModel):
  """Document with validation using pydantic v2"""

  model_config = ConfigDict(
      extra='forbid',  # no extra fields allowed
      validate_assignment=True # validate on reassingment
  )


  doc_id: str = Field(..., min_length = 1, description="Document identifier")
  title: str = Field(..., min_length = 1, description="Document title")
  content: str = Field(..., min_length = 1, description="Document text content")
  metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")

  @field_validator('content')
  @classmethod
  def validate_content(cls, v:str) -> str:
    """Ensure content is non-empty after stripping"""

    if not v.strip():
      raise ValueError("Content cannot be empty")
    return v.strip()


  def get_tokens(self) -> List[str]:
    """Simple whitespace tokenization"""

    return self.content.lower().split()


class Query(BaseModel):
  """Query with validation"""

  query_id: str = Field(..., min_length=1)
  text: str = Field(..., min_length=1)
  metadata: Dict[str, Any] = Field(default_factory=dict )

  def get_tokens(self) -> List[str]:
    """Tokenize query text"""

    return self.text.lower().split()


class DocumentCollection(BaseModel):
  """Collection of documents with indexing capabilities"""

  documents: List[Document] = Field(defualt_factory=list)

  def add_document(self, doc: Document):
    """Add validated document to collection"""

    self.documents.append(doc)


  def get_vocabulary(self) -> List[str]:
    """Extract unique vocabulary from all the documents"""

    vocab = set()
    for doc in self.documents:
      vocab.update(doc.get_tokens()) # Adding all the tokens to the set

    return sorted(list(vocab))



##Simple test with Shakespeare corpus from the book

In [4]:
shakespeare_docs = [
  Document(
    doc_id="as_you_like_it",
    title="As You Like It",
    content="battle good fool wit love forest magic",
    metadata={"genre": "comedy", "year": 1599}
  ),
  Document(
    doc_id="twelfth_night",
    title="Twelfth Night",
    content="good fool wit love comedy mistaken identity",
    metadata={"genre": "comedy", "year": 1602}
  ),
  Document(
    doc_id="julius_caesar",
    title="Julius Caesar",
    content="battle battle battle good fool war rome politics",
    metadata={"genre": "tragedy", "year": 1599}
  ),
  Document(
    doc_id="henry_v",
    title="Henry V",
    content="battle battle battle battle good wit war king england",
    metadata={"genre": "history", "year": 1599}
  )
]

collection = DocumentCollection(documents=shakespeare_docs)
print(f"Created collection with {len(collection.documents)} documents")
print(f"Vocabulary size: {len(collection.get_vocabulary())} unique terms")
print(f"Sample vocabulary: {collection.get_vocabulary()[:5]}")

Created collection with 4 documents
Vocabulary size: 15 unique terms
Sample vocabulary: ['battle', 'comedy', 'england', 'fool', 'forest']


# TF-IDF with scikit-learn

In [5]:
class TFIDFRetriever:
  """TF-idf retriever with pydantic validation and sklearn"""

  def __init__(self, collection: DocumentCollection):
    self.collection = collection
    self.vectorizer = TfidfVectorizer(
      lowercase=True,  #convert to lowercase
      max_features=1000, # max vocab size
      ngram_range=(1,2), # use unigrams and bigrams
      sublinear_tf=True, # log(tf) instead of just raw tf
      smooth_idf=True, # Laplace smoothing
      norm='l2' # normalize with ridge for cos sim
    )

    self.tfidf_matrix = None
    self.doc_ids = []


  def fit(self):
    """Build TF-IDF matrix from doc collection"""

    texts = []

    for doc in self.collection.documents:
      texts.append(doc.content)
      self.doc_ids.append(doc.doc_id) # storing doc ids


    print("Documents from test")
    print("Row 0: battle good fool wit love forest magic")
    print("Row 1: good fool wit love comedy mistaken identity")
    print("Row 2: battle battle battle good fool war rome politics")
    print("Row 3: battle battle battle battle good wit war king england")


    # fit and transform docs to tf idf matrix
    self.tfidf_matrix = self.vectorizer.fit_transform(texts) # [n_docs, n_features]  n_docs x vocab per doc
    print(f"\nself.tfidf_matrix.shape: {self.tfidf_matrix.shape}\n")
    print(f"\nself.tfidf_matrix:\n {self.tfidf_matrix}\n")


  def search(self,query: Query, top_k: int=3) -> List[Tuple[Document,float]]:
    """Search doc using tf-idf cos similarity"""

    # Transform qry to tf-idf vector
    query_vector = self.vectorizer.transform([query.text]) # [1, n_features]  single vector with all the features/words

    # Compute cos similarity
    similarities = cosine_similarity(query_vector, self.tfidf_matrix).flatten()

    print(f"\nQuery: '{query.text}'")
    print(f"Query vector shape: {query_vector.shape}")
    print(f"Similarities: {similarities}")

    # Top-k doc indices
    top_indices = np.argsort(similarities)[-top_k:][::-1] # sort desc

    results = []

    for idx in top_indices:
      doc = self.collection.documents[idx]
      score = similarities[idx]
      results.append((doc, score))
      print(f"  Rank {len(results)}: '{doc.title}' (score={score:.4f})")

    return results


# Simple test for the TF-IDF Retriever

In [6]:
retriever = TFIDFRetriever(collection)
retriever.fit()


# Get the feature names (what each column represents)
print("\nUnigrams and bigrams:\n")
feature_names = retriever.vectorizer.get_feature_names_out()
for idx, feature in enumerate(feature_names):
    print(f"Column {idx}: '{feature}'")

test_query = Query(query_id="q1", text="battle war")
results = retriever.search(test_query, top_k=3)

Documents from test
Row 0: battle good fool wit love forest magic
Row 1: good fool wit love comedy mistaken identity
Row 2: battle battle battle good fool war rome politics
Row 3: battle battle battle battle good wit war king england

self.tfidf_matrix.shape: (4, 32)


self.tfidf_matrix:
 <Compressed Sparse Row sparse matrix of dtype 'float64'
	with 50 stored elements and shape (4, 32)>
  Coords	Values
  (0, 0)	0.22325169970289915
  (0, 11)	0.18252289313304706
  (0, 6)	0.22325169970289915
  (0, 29)	0.22325169970289915
  (0, 17)	0.2757602638693091
  (0, 9)	0.34976692846571494
  (0, 20)	0.34976692846571494
  (0, 2)	0.22325169970289915
  (0, 12)	0.22325169970289915
  (0, 8)	0.2757602638693091
  (0, 30)	0.2757602638693091
  (0, 19)	0.34976692846571494
  (0, 10)	0.34976692846571494
  (1, 11)	0.17057535199597096
  (1, 6)	0.20863814180702328
  (1, 29)	0.20863814180702328
  (1, 17)	0.25770961257841624
  (1, 12)	0.20863814180702328
  (1, 8)	0.25770961257841624
  (1, 30)	0.25770961257841624
  (1

### Display TF-IDF matrix

In [7]:
# make df
tfidf_df = pd.DataFrame(
  retriever.tfidf_matrix.toarray(),
  columns=feature_names,
  index=[f"Doc{i}" for i in range(len(shakespeare_docs))]
)
tfidf_df

Unnamed: 0,battle,battle battle,battle good,comedy,comedy mistaken,england,fool,fool war,fool wit,forest,...,mistaken identity,politics,rome,rome politics,war,war king,war rome,wit,wit love,wit war
Doc0,0.223252,0.0,0.223252,0.0,0.0,0.0,0.223252,0.0,0.27576,0.349767,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.223252,0.27576,0.0
Doc1,0.0,0.0,0.0,0.326872,0.326872,0.0,0.208638,0.0,0.25771,0.0,...,0.326872,0.0,0.0,0.0,0.0,0.0,0.0,0.208638,0.25771,0.0
Doc2,0.409648,0.408235,0.195199,0.0,0.0,0.0,0.195199,0.305818,0.0,0.0,...,0.0,0.305818,0.305818,0.305818,0.24111,0.0,0.305818,0.0,0.0,0.0
Doc3,0.426292,0.463077,0.178642,0.0,0.0,0.279877,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.220658,0.279877,0.0,0.178642,0.0,0.279877


# BM25 Scorer

In [8]:
class BM25Scorer:
  """BM25 scoring with parameter tuning"""

  def __init__(self, collection: DocumentCollection, k1: float=1.2, b:float=0.75):
    """
    k1: controls term frequency saturation (typically 1.2-2.0). Enough to give score to a repeated word but not making a word repeated 100 times 100 times more important.
    b: controls length normalization (0=no normalization, 1=full normalization). Regularization over too large docs.
    """

    self.collection = collection
    self.k1 = k1 # Term frequency saturation param
    self.b = b   # Length normalization param
    self.doc_lengths = {} # Store doc lengths
    self.avg_doc_length = 0 # Avg doc length
    self.doc_freqs = defaultdict(int) # doc freqs per term
    self.N = len(collection.documents) # total num of docs
    self.term_freqs = {} # Term freqs per doc

    self._compute_statistics()


  def _compute_statistics(self):
    """Copmute doc statistics for BM25"""

    total_length = 0

    for doc in self.collection.documents:
      tokens = doc.get_tokens()
      doc_length = len(tokens)
      self.doc_lengths[doc.doc_id] = doc_length
      total_length += doc_length

      # Count term freq for this doc
      term_freq = defaultdict(int)
      unique_terms = set()
      for token in tokens:
        term_freq[token] += 1 # add 1 on occurence
        unique_terms.add(token)

      self.term_freqs[doc.doc_id] = term_freq

      # Update doc frequencies
      for term in unique_terms:
        self.doc_freqs[term] += 1 # Increment doc freq per term


    # Compute avg length
    self.avg_doc_length = total_length / self.N


    print(f"BM25 Statistics:")
    print(f"  Documents: {self.N}")
    print(f"  Avg doc length: {self.avg_doc_length:.2f}")
    print(f"  Unique terms: {len(self.doc_freqs)}")


  def score(self, query: Query, doc: Document) -> float:
    """
    Compute BM25 score for query-document pair.

    BM25(q,d) = Σ IDF(t) * (tf(t,d) * (k1 + 1)) / (tf(t,d) + k1 * (1 - b + b * |d|/avgdl))

    Where for each term t in query q:
      - IDF(t) = log((N - df(t) + 0.5) / (df(t) + 0.5)) — inverse document frequency
      - tf(t,d) = frequency of term t in document d
      - |d| = length of document d (number of terms)
      - avgdl = average document length in collection
      - k1 = term frequency saturation parameter (default 1.2). Keeps repeated more important but not in a linear scale per occureance.
      - b = length normalization parameter (0=none, 1=full, default 0.75). Penalizes too long docs.

    The formula balances term importance (IDF) with normalized term frequency,
    preventing bias toward longer documents while rewarding term repetition up to a limit.
    """

    score = 0.0
    query_terms = query.get_tokens() # query terms
    doc_length = self.doc_lengths[doc.doc_id]

    for term in query_terms:
      if term not in self.doc_freqs:
        continue # skip terms not in the collection


      # Compute IDF: log((N-df + 0.5)/ (df + 0.5) )

      df = self.doc_freqs[term] # Doc freq

      idf = math.log((self.N - df + 0.5) / (df + 0.5)) # Inv Doc Freq with smoothing
      print("\n\n\nidf = math.log((self.N - df + 0.5) / (df + 0.5))\n")
      print(f"{idf:.3f} = math.log(({self.N} - {df} + 0.5) / ({df} + 0.5))\n")


      # Get term freq in doc
      tf = self.term_freqs[doc.doc_id].get(term, 0)
      print(f"tf = {tf}\n\n")

      #Compute normalized term freq
      # tf_normalized (tf * (k1 + 1)) / (tf + k1 * (1 - b + b * |d| / avgdl ))

      norm_factor = 1 - self.b + self.b * (doc_length / self.avg_doc_length)
      tf_component = (tf * (self.k1 + 1 )) / (tf + self.k1 * norm_factor) # Normalized TF


      # Add to score
      term_score = idf * tf_component # idf * normalized tf
      score += term_score

      if tf > 0:  # Only print for terms that appear in document
        print(f"    Term '{term}': tf={tf}, df={df}, idf={idf:.3f}, contribution={term_score:.3f}")
        print("\n", "-"*50,"\n")

    return score


  def search(self, query: Query, top_k: int = 3) -> List[Tuple[Document, float]]:
    """Search documents using BM25 scoring"""

    scores = []

    print(f"\nBM25 Search for: '{query.text}'")

    for doc in self.collection.documents:
      score = self.score(query, doc) # compute bm25 score
      scores.append((doc, score))
      print(f"  '{doc.title}': {score:.4f}")

    # Sort desc by score
    scores.sort(key=lambda x: x[1], reverse=True)

    # Return top k
    return scores[:top_k]


In [9]:
# Test BM25 scorer
bm25 = BM25Scorer(collection, k1=1.2, b=0.75)
query = Query(query_id="q2", text="battle fool")
bm25_results = bm25.search(query, top_k=3)



print("\n\n")
print("\nThe negative IDF -0.847 here means these terms are so common they're actually penalizing the scores rather than helping.")

print("\nHigh IDF = term is RARE, appears in few documents -> very distinctive/important")
print("\nLow/Negative IDF = term is COMMON, appears in many documents -> not distinctive\n")


print("\nTop BM25 Results:")
for rank, (doc, score) in enumerate(bm25_results, 1):
    print(f"  {rank}. {doc.title}: {score:.4f}")

BM25 Statistics:
  Documents: 4
  Avg doc length: 7.75
  Unique terms: 15

BM25 Search for: 'battle fool'



idf = math.log((self.N - df + 0.5) / (df + 0.5))

-0.847 = math.log((4 - 3 + 0.5) / (3 + 0.5))

tf = 1


    Term 'battle': tf=1, df=3, idf=-0.847, contribution=-0.882

 -------------------------------------------------- 




idf = math.log((self.N - df + 0.5) / (df + 0.5))

-0.847 = math.log((4 - 3 + 0.5) / (3 + 0.5))

tf = 1


    Term 'fool': tf=1, df=3, idf=-0.847, contribution=-0.882

 -------------------------------------------------- 

  'As You Like It': -1.7644



idf = math.log((self.N - df + 0.5) / (df + 0.5))

-0.847 = math.log((4 - 3 + 0.5) / (3 + 0.5))

tf = 0





idf = math.log((self.N - df + 0.5) / (df + 0.5))

-0.847 = math.log((4 - 3 + 0.5) / (3 + 0.5))

tf = 1


    Term 'fool': tf=1, df=3, idf=-0.847, contribution=-0.882

 -------------------------------------------------- 

  'Twelfth Night': -0.8822



idf = math.log((self.N - df + 0.5) / (df + 0.5))

-0.8

# Inverted Index Implementation

In [10]:
class InvertedIndex:
  """Efficient inverted index"""

  def __init__(self):
    self.index = defaultdict(list) # term -> list of (doc_id, positions)
    self.doc_lengths = {} #  doc_id -> document lenght
    self.doc_norms = {} # dog_id -> L2 norm for cos sim

  def add_document(self, doc: Document):
    """Add doc to inverted index"""

    tokens = doc.get_tokens()
    self.doc_lengths[doc.doc_id] = len(tokens)

    # track term freq for L2 norm
    term_freqs = defaultdict(int)

    # build postings listwith positions
    for position, token in enumerate(tokens):
      self.index[token].append((doc.doc_id, position))
      term_freqs[token] += 1

    # compute l2 norm for doc vector
    norm = math.sqrt(sum(tf**2 for tf in term_freqs.values()))
    self.doc_norms[doc.doc_id] = norm

    print(f"\nIndexed '{doc.title}': {len(tokens)} tokens, {len(term_freqs)} unique terms")


  def search_term(self, term: str) -> List[Tuple[str, List[int]]]:
    """Search for docs containing a term"""

    # if not contained term in docs retrieve empty list
    term = term.lower()
    if term not in self.index:
      return []

    # group positions by doc
    doc_positions = defaultdict(list)
    for doc_id, position in self.index[term]:
      doc_positions[doc_id].append(position) # group positions by doc

    return list(doc_positions.items()) # return (doc_id, positions) pairs


  def boolean_and(self,terms: List[str]) -> List[str]:
    """Find docs containing ALL terms (AND operation)"""
    if not terms:
      return []


    # get doc sets for each term
    doc_sets = []
    for term in terms:
      docs = set(doc_id for doc_id, _ in self.index[term.lower()])
      doc_sets.append(docs)


    # intersect all docs sets
    result = doc_sets[0] # start with tfirst set
    for doc_set in doc_sets[1:]:
      result = result.intersection(doc_set) # intersect subsequents

    return list(result)


  def boolean_or(self, terms: List[str]) -> List[str]:
    """Find documents containing ANY term (OR operation)"""

    result = set()
    for term in terms:
      docs = set( doc_id for doc_id, _ in self.index[term.lower()] )
      result = result.union(docs)

    return list(result)


  def print_index(self):
      """Print inverted index structure"""
      print("\nInverted Index:")
      for term in sorted(self.index.keys()):
          postings = self.index[term]
          doc_freq = len(set(doc_id for doc_id, _ in postings))  # Count unique documents
          print(f"\n  '{term}' -> df={doc_freq}, postings={postings}")  # Show first 5 postings



## Test Inverted Index

In [11]:
inv_index = InvertedIndex()
for doc in collection.documents:
  inv_index.add_document(doc)


inv_index.print_index()

# Test boolean search
print("\nBoolean Search Tests:")
print(f"  Documents with 'battle': {inv_index.search_term('battle')}")
print(f"  Documents with 'battle' AND 'fool': {inv_index.boolean_and(['battle', 'fool'])}")
print(f"  Documents with 'love' OR 'war': {inv_index.boolean_or(['love', 'war'])}")


Indexed 'As You Like It': 7 tokens, 7 unique terms

Indexed 'Twelfth Night': 7 tokens, 7 unique terms

Indexed 'Julius Caesar': 8 tokens, 6 unique terms

Indexed 'Henry V': 9 tokens, 6 unique terms

Inverted Index:

  'battle' -> df=3, postings=[('as_you_like_it', 0), ('julius_caesar', 0), ('julius_caesar', 1), ('julius_caesar', 2), ('henry_v', 0), ('henry_v', 1), ('henry_v', 2), ('henry_v', 3)]

  'comedy' -> df=1, postings=[('twelfth_night', 4)]

  'england' -> df=1, postings=[('henry_v', 8)]

  'fool' -> df=3, postings=[('as_you_like_it', 2), ('twelfth_night', 1), ('julius_caesar', 4)]

  'forest' -> df=1, postings=[('as_you_like_it', 5)]

  'good' -> df=4, postings=[('as_you_like_it', 1), ('twelfth_night', 0), ('julius_caesar', 3), ('henry_v', 4)]

  'identity' -> df=1, postings=[('twelfth_night', 6)]

  'king' -> df=1, postings=[('henry_v', 7)]

  'love' -> df=2, postings=[('as_you_like_it', 4), ('twelfth_night', 3)]

  'magic' -> df=1, postings=[('as_you_like_it', 6)]

  'mistak

# Examples of Different types of Embeddings

**Query:**  
"Where is the Louvre?"  

**Document:**  
"The Louvre Museum is located in Paris, France"  

---

## 1. Cross-Encoder (Full BERT)

**Input:**  
`[CLS] Where is the Louvre? [SEP] The Louvre Museum is located in Paris, France [SEP]`

**Process:**  
- Single BERT encoder processes query and document together.  
- Produces a single scalar score.  

**Output:**  
- Shape: `[1]`  
- Storage: Cannot pre-store (must re-encode for each query-document pair).  

---

## 2. Bi-Encoder

**Input:**  
- Query Encoder (BERTQ): `[CLS] Where is the Louvre?`  
- Document Encoder (BERTD): `[CLS] The Louvre Museum is located...`

**Process:**  
- Encode separately into vectors.  

**Output:**  
- Query vector: `[768 dims]`  
- Document vector: `[768 dims]`  
- Example:  
  - Query: `[-0.23, 0.41, ..., 0.87]`  
  - Document: `[0.15, -0.62, ..., 0.34]`  
- Score: `dot_product(query_vec, doc_vec) -> scalar`  
- Shape: `Query=[768], Doc=[768]`  
- Storage: Pre-store all document vectors.  

---

## 3. ColBERT

**Input Tokens:**  
- Query: `[Q] Where is the Louvre [PAD] ...`  
- Document: `[D] The Louvre Museum is located in Paris France`

**Process:**  
- Each token encoded separately with BERT + linear projection.  

**Output:**  
- Query matrix: `[32 × 128]`  
- Document matrix: `[9 × 128]`  
- Example (partial):  
  - Query: `[q_Q], [q_Where], [q_is], [q_the], [q_Louvre], [q_PAD] ...`  
  - Document: `[d_The], [d_Louvre], [d_Museum], [d_is], [d_located], [d_in], [d_Paris], [d_France]`  

**Scoring (MaxSim):**  
- For each query token, take maximum similarity with any document token.  
  - `q_Where -> max(d_located) = 0.82`  
  - `q_is -> max(d_is) = 0.95`  
  - `q_Louvre -> max(d_Louvre) = 0.99`  
- Ignore padding tokens.  
- Final score = sum of max similarities.  

**Storage:**  
- Pre-store all document token matrices.  

---

## Storage Comparison (1M Documents)

- Cross-Encoder: `0 vectors (cannot pre-store)`  
- Bi-Encoder: `1M × 768 floats ≈ 3 GB`  
- ColBERT: `1M × ~60 tokens × 128 floats ≈ 30 GB`  
<br>



# Dense Retrieval with BERT and PEFT

In [12]:
class DenseBERTRetriever:
  """Dense retrieval using BERT embeddings with PEFT for efficiency"""

  def __init__(self, model_name:str = 'bert-base-uncased', use_peft: bool = True):
    self.device = device
    self.tokenizer = AutoTokenizer.from_pretrained(model_name)

    # Load model with optional 8-bit quantization
    if use_peft and torch.cuda.is_available():
      bnb_config = BitsAndBytesConfig(
          load_in_8bit = True,
          bnb_8bit_compute_dtype=torch.float16,
          bnb_8bit_quant_type="nf8", # Quantization type selected: NF8 (Normal Float 8-bit)
          bnb_8bit_use_double_quant=True # # Double quantization for more memory savings
          )

      self.model = AutoModel.from_pretrained(
          model_name,
          quantization_config = bnb_config,
          device_map="auto" # It intelligently distributes the model's parameters across your system's resources, prioritizing faster devices like GPUs.
      )

      # prepare for k-bit training
      self.model = prepare_model_for_kbit_training(self.model)

      # Add LoRA adapters for fine-tuning
      peft_config = LoraConfig(
          r=8,
          lora_alpha=32, # scaling param
          target_modules=["query", "value"],  # Attention Layers
          lora_dropout=0.1,
          bias = "none",
          task_type = TaskType.FEATURE_EXTRACTION, # task type for embeddings
      )

      self.model = get_peft_model(self.model, peft_config)  # Returns a Peft model object from a model and a config

      self.model.print_trainable_parameters() # show parameter efficiency

    else:
      self.model = AutoModel.from_pretrained(model_name).to(self.device)


    self.doc_embeddings = {} # Cache for doc embeddings



  def encode_text(self, text:str) -> torch.Tensor:
    """Encode text to dense vector using BERT [CLS] special token"""

    # Tokenize text
    inputs = self.tokenizer(
        text,
        padding=True, # pad to max length in batch
        truncation=True,  # truncate to max length
        max_length=512, # max length
        return_tensors="pt"
    ).to(self.device)


    # get BERT outputs
    with torch.no_grad(): # we will do inference to only extract the embeddings for the indexing/search
      outputs = self.model(**inputs) # forward pass [1, sequence_length, 768]


    # using [CLS] token embedding as doc representation
    cls_embedding = outputs.last_hidden_state[:,0,:] # [batch_size, seq_length, hidden_size] -> [batch_size, hidden_size] so it is [1, 768]

    print(f"Encoded '{text[:30]}...': shape={cls_embedding.shape}")
    return cls_embedding.cpu()  # Move to CPU for storage


  def index_documents(self, documents: List[Document]):
    """Pre-compute and cache doc embeddings"""

    print(f"\nIndexing {len(documents)} documents with BERT...")

    for doc in documents:
      embedding = self.encode_text(doc.content)
      self.doc_embeddings[doc.doc_id] = embedding # cache embedding


    # stack all the embeddings for efficient similarity computation
    self.doc_matrix = torch.stack(list(self.doc_embeddings.values())).squeeze(1) # squeeze(1) removes dimension at index 1 so if it has size 1: [n_docs, 1, 768] -> [n_docs, 768]
    print(f"Document matrix shape: {self.doc_matrix.shape}")
    print(f"Document matrix:\n {self.doc_matrix[:,:20]}")


  def search(self, query: Query, top_k:int = 3) -> List[Tuple[str,float]]:
    """Search using the dense vector similarity"""

    # Encode query
    query_embedding = self.encode_text(query.text).squeeze(0) # output shape [hidden_size]

    # compute cos sim
    # norm vectors
    query_norm = query_embedding / query_embedding.norm() # L2 normalization
    doc_norms = self.doc_matrix / self.doc_matrix.norm(dim=1, keepdim=True) # normalize each doc

    # compute dot product
    similarities = torch.matmul(doc_norms, query_norm) # [n_docs]

    print(f"\nDense search for: '{query.text}'")
    print(f"Query embedding shape: {query_embedding.shape}")
    print(f"Similarities: {similarities}")

    # get top-k indices
    top_scores, top_indices = torch.topk(similarities, min(top_k, len(similarities)))

    results = []
    doc_ids = list(self.doc_embeddings.keys())
    for idx, score in zip(top_indices, top_scores):
      doc_id = doc_ids[idx]
      results.append((doc_id, score.item()))
      print(f"  {doc_id}: {score.item():.4f}")


## Testing Dense Retrieval


In [13]:
dense_retriever = DenseBERTRetriever(use_peft=torch.cuda.is_available())
dense_retriever.index_documents(collection.documents)

query = Query(query_id="q3", text="war battle")
dense_results = dense_retriever.search(query, top_k=3)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]


Indexing 4 documents with BERT...
Encoded 'battle good fool wit love fore...': shape=torch.Size([1, 768])
Encoded 'good fool wit love comedy mist...': shape=torch.Size([1, 768])
Encoded 'battle battle battle good fool...': shape=torch.Size([1, 768])
Encoded 'battle battle battle battle go...': shape=torch.Size([1, 768])
Document matrix shape: torch.Size([4, 768])
Document matrix:
 tensor([[-0.6397,  0.1917, -0.1856, -0.0429, -0.6057, -0.2334,  0.6254,  0.0160,
         -0.2607, -0.2735,  0.1154,  0.1545,  0.3871,  0.2930,  0.1245,  0.1331,
         -0.1775,  0.2469,  0.3730, -0.2732],
        [-0.5598,  0.0561, -0.1338,  0.0238, -0.0677,  0.0127,  0.7196,  0.4464,
         -0.3710,  0.0573,  0.2302,  0.0439,  0.2988,  0.4506, -0.0326,  0.1620,
         -0.2610,  0.3622,  0.5300, -0.2685],
        [-0.5756, -0.0466, -0.2201,  0.1625, -0.3577, -0.1111,  0.0881,  0.5446,
         -0.3810, -0.3346,  0.0512,  0.1658,  0.0133,  0.5183,  0.1651,  0.1395,
         -0.3084,  0.3998,  0.4595, -

#ColBERT Implementation

In [29]:
class ColBERTRetriever:
  """ColBERT: Efficient retireval via late interation over BERT with multi-vector embeddings of 128 dim per each token"""

  def __init__(self, model_name: str = "bert-base-uncased", dim:int= 128):
    self.device = device
    self.dim = dim
    self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    self.bert = AutoModel.from_pretrained(model_name).to(self.device)

    # Linear layer from BERT outputs to lower dim for the embeddings (128)
    self.linear = nn.Linear(self.bert.config.hidden_size, dim).to(self.device)
    self.doc_embeddings = {} # Cache embeddings

  def encode_document(self, text:str, doc_id: str) -> torch.Tensor:
    """Encode document to token-level embeddings"""

    # Adding [D] special token for docs
    text = "[D] " + text # doc marker

    inputs = self.tokenizer(
      text,
      padding='max_length',
      truncation=True,
      max_length=128,
      return_tensors='pt'
    ).to(self.device)

    with torch.no_grad():
      outputs = self.bert(**inputs) # get bert outputs
      token_embeddings = outputs.last_hidden_state # [1,seq len, hiddn size]

      # Project to selected dimensionality (128)
      token_embeddings = self.linear(token_embeddings) # [1,seq_len, 128]

      # L2 Normalize each token embedding
      token_embeddings = F.normalize(token_embeddings, dim=-1, p=2)

    print(f"Encoded document '{doc_id}': shape={token_embeddings.shape}")

    return token_embeddings.cpu() # moving to cou for storage


  def encode_query(self, text:str) -> torch.Tensor:
    """Encode query to token-level embeddings"""

    # Adding [Q] special token for qrys
    text = "[Q] " + text # qry marker


    inputs = self.tokenizer(
        text,
        padding='max_length',
        truncation=True,
        max_length=32,
        return_tensors='pt'
    ).to(device)

    # get attention mask to identify real vs padding tokens
    attention_mask = inputs['attention_mask'] # [1, seq_len]

    with torch.no_grad():
      outputs = self.bert(**inputs) # get bert outputs
      token_embeddings = outputs.last_hidden_state # [1,seq_len, hiddn_size]

      # project to dim 128
      token_embeddings = self.linear(token_embeddings) # [1, seq_len, 128]

      # L2 norm
      token_embeddings = F.normalize(token_embeddings, p=2, dim=-1)

      # mask padding tokens
      token_embeddings = token_embeddings * attention_mask.unsqueeze(-1) # zero out padding


    return token_embeddings # keeping on device for cos sim


  def index_documents(self,documents:List[Document]):
    """Index all documents"""

    print(f"\nIndexing {len(documents)} documents with ColBERT...")

    for doc in documents:
      embeddings = self.encode_document(doc.content, doc.doc_id)
      self.doc_embeddings[doc.doc_id] = embeddings # cache embeddings

  def compute_maxsim(self, query_embeddings: torch.Tensor, doc_embeddings:torch.Tensor) -> float:
    """
    Compute MaxSim score between query and document
    Score(q,d) = Σ_i max_j (q_i · d_j) for all query tokens i and doc tokens j
    """

    query_embeddings = query_embeddings.to(self.device)
    doc_embeddings = doc_embeddings.to(self.device)


    # Compute all pairwise dot prod
    # [1, q_len, dim] @ [1, d_len, dim]^T = [1, q_len, d_len]
    similarity_matrix = torch.bmm(
        query_embeddings,              # [1, q_len, dim]  = [1, 32, 128]
        doc_embeddings.transpose(1,2)  # [1, dim, d_len]  = [1, 128, 128]

    )                          # Result: [1, q_len, d_len] = [1, 32, 128]

    # find max sim wit hany doc token for each query token
    max_similarities, _ =  similarity_matrix.max(dim=2) # [1, q_len] = [1, 32]

    # sum over all query tokens
    score = max_similarities.sum().item() # scalar

    return score

  def search(self, query: Query, top_k: int=3) -> List[Tuple[str, float]]:
    """Search using ColBERT MaxSim"""

    # encode qry
    query_embeddings = self.encode_query(query.text) # [1, q_len, dim]
    print(f"\nColBERT search for: '{query.text}'")
    print(f"Query embedding shape: {query_embeddings.shape}")


    scores = []

    for doc_id, doc_embeddings in self.doc_embeddings.items():
      score = self.compute_maxsim(query_embeddings, doc_embeddings)
      scores.append((doc_id, score))
      print(f"  {doc_id}: MaxSim={score:.4f}")


    # sort by score desc
    scores.sort(key=lambda x: x[1], reverse=True) # sort by score desc

    return scores[:top_k] # return only top k results



## Test ColBERT


In [31]:
colbert = ColBERTRetriever(dim=128)
colbert.index_documents(collection.documents)

query = Query(query_id="q4", text="fool love")
colbert_results = colbert.search(query, top_k=3)

print("The ranking (julius_caesar > twelfth_night > as_you_like_it) appears essentially random given the score similarities. For this to work properly, it is needed to either use a pre-trained ColBERT model or fine-tune the encoders and linear projection on retrieval data with proper contrastive loss.")



Indexing 4 documents with ColBERT...
Encoded document 'as_you_like_it': shape=torch.Size([1, 128, 128])
Encoded document 'twelfth_night': shape=torch.Size([1, 128, 128])
Encoded document 'julius_caesar': shape=torch.Size([1, 128, 128])
Encoded document 'henry_v': shape=torch.Size([1, 128, 128])

ColBERT search for: 'fool love'
Query embedding shape: torch.Size([1, 32, 128])
  as_you_like_it: MaxSim=5.3212
  twelfth_night: MaxSim=5.4441
  julius_caesar: MaxSim=5.5698
  henry_v: MaxSim=5.4214
The ranking (julius_caesar > twelfth_night > as_you_like_it) appears essentially random given the score similarities. For this to work properly, it is needed to either use a pre-trained ColBERT model or fine-tune the encoders and linear projection on retrieval data with proper contrastive loss.
