# DocInsight

- The first goal of this project is to build a working IR (Information Retrieval) system that can work with some of the most common file formats (csv, txt, docx, pptx, xlsx, pdf).
- The second goal will be to incorporate elements of the IR system to build a RAG (Retrieval-Augmented Generation) system with acceptable results. 


# File Ingestion 

Below are the functions that will be used to ingest (read and store the data from the files).

Each file will return the data in the form of a dict with params:
- source_path
- file_type
- blocks      (structured content units)
- text        (flattened debug view)
- metadata


In [1]:
import os 
from pathlib import Path
core_dir = Path.cwd() / 'core' 
core_dir.mkdir(exist_ok=True)
core_dir.is_dir()

True

Uncomment the line below to create the file

In [None]:
#%%writefile core/ingestion.py

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List
from pypdf import PdfReader
from docx import Document
from pptx import Presentation
from pptx.enum.shapes import PP_PLACEHOLDER
import pandas as pd

class BaseIngestor(ABC):
    def __init__(self, file_path: Path):
        self.file_path = file_path

    @abstractmethod
    def extract(self) -> Dict:
        """
        Returns a dictionary with:
        - source_path
        - file_type
        - blocks      (structured content units)
        - text        (flattened debug view)
        - metadata
        """
        pass

class PDFIngestor(BaseIngestor):
    def extract(self) -> Dict:
        reader = PdfReader(self.file_path)
        blocks: List[Dict] = []

        for page_idx, page in enumerate(reader.pages):
            text = page.extract_text() or ""
            if text.strip():
                blocks.append({
                    "type": "page",
                    "page_index": page_idx,
                    "text": text
                })

        flattened_text = "\n\n".join(b["text"] for b in blocks)

        return {
            "source_path": str(self.file_path),
            "file_type": "pdf",
            "blocks": blocks,
            "text": flattened_text,
            "metadata": {
                "num_pages": len(reader.pages)
            }
        }


class DOCXIngestor(BaseIngestor):
    def extract(self) -> Dict:
        doc = Document(self.file_path)
        blocks: List[Dict] = []

        for p in doc.paragraphs:
            text = p.text.strip()
            if not text:
                continue

            style_name = p.style.name if p.style else "Normal"

            if style_name.startswith("Heading"):
                try:
                    level = int(style_name.split()[-1])
                except ValueError:
                    level = None

                blocks.append({
                    "type": "heading",
                    "level": level,
                    "text": text
                })
            else:
                blocks.append({
                    "type": "paragraph",
                    "text": text
                })

        flattened_text = "\n".join(b["text"] for b in blocks)

        return {
            "source_path": str(self.file_path),
            "file_type": "docx",
            "blocks": blocks,
            "text": flattened_text,
            "metadata": {
                "num_blocks": len(blocks)
            }
        }


class PPTXIngestor(BaseIngestor):
    def extract(self) -> Dict:
        prs = Presentation(self.file_path)
        blocks: List[Dict] = []

        for slide_idx, slide in enumerate(prs.slides):
            slide_elements = []

            for shape in slide.shapes:
                if not hasattr(shape, "text"):
                    continue

                text = shape.text.strip()
                if not text:
                    continue

                role = "body"

                if shape.is_placeholder:
                    ph_type = shape.placeholder_format.type
                    if ph_type == PP_PLACEHOLDER.TITLE:
                        role = "title"

                slide_elements.append({
                    "role": role,
                    "text": text
                })

            if slide_elements:
                blocks.append({
                    "type": "slide",
                    "slide_index": slide_idx,
                    "elements": slide_elements
                })

        flattened_text = "\n\n".join(
            el["text"]
            for slide in blocks
            for el in slide["elements"]
        )

        return {
            "source_path": str(self.file_path),
            "file_type": "pptx",
            "blocks": blocks,
            "text": flattened_text,
            "metadata": {
                "num_slides": len(prs.slides)
            }
        }

class CSVIngestor(BaseIngestor):
    def extract(self) -> Dict:
        df = pd.read_csv(self.file_path)

        schema = {}
        for col in df.columns:
            series = df[col]

            inferred_type = str(series.dtype)

            # crude but effective datetime detection
            if inferred_type == "object":
                sample = series.dropna().iloc[:5]

                if not sample.empty:
                    try:
                        pd.to_datetime(sample, errors="raise")
                        inferred_type = "datetime"
                    except Exception:
                        pass



            schema[col] = inferred_type

        blocks = [{
            "type": "table",
            "rows": df.to_dict(orient="records")
        }]

        flattened_text = df.to_csv(index=False)

        return {
            "source_path": str(self.file_path),
            "file_type": "csv",
            "blocks": blocks,
            "text": flattened_text,
            "metadata": {
                "num_rows": df.shape[0],
                "num_columns": df.shape[1],
                "schema": schema
            }
        }
    

class TXTIngestor(BaseIngestor):
    def extract(self) -> Dict:
        content = self.file_path.read_text(encoding="utf-8")

        blocks = [{
            "type": "text",
            "text": content
        }]

        return {
            "source_path": str(self.file_path),
            "file_type": "txt",
            "blocks": blocks,
            "text": content,
            "metadata": {
                "num_characters": len(content)
            }
        }


class XLSXIngestor(BaseIngestor):
    def extract(self) -> dict:
        xls = pd.ExcelFile(self.file_path)
        blocks = []

        for sheet_name in xls.sheet_names:
            df = xls.parse(sheet_name)

            blocks.append({
                "type": "sheet",
                "sheet_name": sheet_name,
                "dataframe": df
            })

        combined_text = "\n\n".join(
            f"Sheet: {b['sheet_name']}\n{b['dataframe'].to_csv(index=False)}"
            for b in blocks
        )

        return {
            "source_path": str(self.file_path),
            "file_type": "xlsx",
            "blocks": blocks,
            "text": combined_text,
            "metadata": {
                "num_sheets": len(blocks),
                "sheet_names": xls.sheet_names
            }
        }
    
def get_ingestor(file_path: Path):
    suffix = file_path.suffix.lower()

    if suffix == ".pdf":
        return PDFIngestor(file_path)
    elif suffix == ".docx":
        return DOCXIngestor(file_path)
    elif suffix == ".pptx":
        return PPTXIngestor(file_path)
    elif suffix == ".csv":
        return CSVIngestor(file_path)
    elif suffix == ".txt":
        return TXTIngestor(file_path)
    elif suffix =='.xlsx':
        return XLSXIngestor(file_path)
    else:
        raise ValueError(f"Unsupported file type: {suffix}")
    

def ingest_file(path_str: str):
    path = Path(path_str)

    if not path.exists():
        raise FileNotFoundError(path)

    ingestor = get_ingestor(path)
    document = ingestor.extract()

    return document



Overwriting core/ingestion.py


Let's test the ingestion functions.

In [3]:
sample_files = [
        "data/raw/sample.pdf",
        "data/raw/sample.docx",
        "data/raw/sample.pptx",
        "data/raw/sample.csv",
        
    ]

for file in sample_files:
        try:
            doc = ingest_file(file)
            print("=" * 80)
            print(f"Ingested: {doc['source_path']}")
            print(f"Type: {doc['file_type']}")
            print(f"Metadata: {doc['metadata']}")
            print(f"Preview:\n{doc['text'][:500]}")
        except Exception as e:
            print(f"Failed on {file}: {e}")

Ingested: data\raw\sample.pdf
Type: pdf
Metadata: {'num_pages': 3}
Preview:
Annual Sales Performance Review — 2023 
This document provides a detailed review of sales performance for the fiscal year 2023. 
The objective is to analyze revenue trends, regional performance, and category-level 
breakdowns. 
Key goals of this report include: 
• Identifying revenue growth patterns over time 
• Comparing regional performance 
• Highlighting underperforming product categories 
  

Quarterly Revenue Overview 
 
Total revenue for 2023 showed a steady upward trend, with notable acc
Ingested: data\raw\sample.docx
Type: docx
Metadata: {'num_blocks': 19}
Preview:
Customer Churn Analysis — Internal Memo
Background
Customer churn has become a growing concern over the past two years.
Understanding churn patterns is critical for improving retention strategies.
Data Sources
The analysis is based on the following sources:
Monthly customer subscription records
Support ticket logs
Customer satisfaction surve

  pd.to_datetime(sample, errors="raise")
  pd.to_datetime(sample, errors="raise")


To see them indiviudally

In [4]:
from pathlib import Path

In [5]:
# takes the first pdf file if there are multiple 
pdf_file = [f for f in sample_files if Path(f).suffix.lower() == '.pdf'][0]
ingest_file(pdf_file)

{'source_path': 'data\\raw\\sample.pdf',
 'file_type': 'pdf',
 'blocks': [{'type': 'page',
   'page_index': 0,
   'text': 'Annual Sales Performance Review — 2023 \nThis document provides a detailed review of sales performance for the fiscal year 2023. \nThe objective is to analyze revenue trends, regional performance, and category-level \nbreakdowns. \nKey goals of this report include: \n• Identifying revenue growth patterns over time \n• Comparing regional performance \n• Highlighting underperforming product categories \n  '},
  {'type': 'page',
   'page_index': 1,
   'text': 'Quarterly Revenue Overview \n \nTotal revenue for 2023 showed a steady upward trend, with notable acceleration in Q3. \nQuarterly revenue figures (in million USD): \n• Q1: 12.4 \n• Q2: 14.1 \n• Q3: 18.7 \n• Q4: 19.3 \nThe growth observed in Q3 coincides with the launch of a new marketing campaign. \n  '},
  {'type': 'page',
   'page_index': 2,
   'text': 'Regional Performance Analysis \nSales performance varied 

In [6]:
# takes the first csv file if there are multiple 
csv_file = [f for f in sample_files if Path(f).suffix.lower() == '.csv'][0]
ingest_file(csv_file)

  pd.to_datetime(sample, errors="raise")
  pd.to_datetime(sample, errors="raise")


{'source_path': 'data\\raw\\sample.csv',
 'file_type': 'csv',
 'blocks': [{'type': 'table',
   'rows': [{'date': '2023-01-01',
     'region': 'North America',
     'product_category': 'Electronics',
     'revenue': 1200000,
     'units_sold': 3400},
    {'date': '2023-02-01',
     'region': 'North America',
     'product_category': 'Electronics',
     'revenue': 1250000,
     'units_sold': 3600},
    {'date': '2023-03-01',
     'region': 'Europe',
     'product_category': 'Home Appliances',
     'revenue': 980000,
     'units_sold': 2100},
    {'date': '2023-04-01',
     'region': 'Europe',
     'product_category': 'Home Appliances',
     'revenue': 1020000,
     'units_sold': 2300},
    {'date': '2023-05-01',
     'region': 'Asia-Pacific',
     'product_category': 'Accessories',
     'revenue': 640000,
     'units_sold': 4100},
    {'date': '2023-06-01',
     'region': 'Asia-Pacific',
     'product_category': 'Accessories',
     'revenue': 690000,
     'units_sold': 4500},
    {'date'

In [7]:
# takes the first pdf file if there are multiple 
docx_file = [f for f in sample_files if Path(f).suffix.lower() == '.docx'][0]
ingest_file(docx_file)

{'source_path': 'data\\raw\\sample.docx',
 'file_type': 'docx',
 'blocks': [{'type': 'heading',
   'level': 1,
   'text': 'Customer Churn Analysis — Internal Memo'},
  {'type': 'heading', 'level': 2, 'text': 'Background'},
  {'type': 'paragraph',
   'text': 'Customer churn has become a growing concern over the past two years.\nUnderstanding churn patterns is critical for improving retention strategies.'},
  {'type': 'heading', 'level': 2, 'text': 'Data Sources'},
  {'type': 'paragraph',
   'text': 'The analysis is based on the following sources:'},
  {'type': 'paragraph', 'text': 'Monthly customer subscription records'},
  {'type': 'paragraph', 'text': 'Support ticket logs'},
  {'type': 'paragraph', 'text': 'Customer satisfaction survey results'},
  {'type': 'paragraph',
   'text': 'Each data source contributes a different perspective on customer behavior.'},
  {'type': 'heading', 'level': 2, 'text': 'Key Findings'},
  {'type': 'paragraph',
   'text': 'The analysis identified several n

## Chunking

uncomment below to create chunking.py in core_dir

In [6]:
%%writefile core/chunking.py



import hashlib


def make_chunk_id(source_path: str, local_id: str) -> str:
    raw = f"{source_path}::{local_id}"
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16]


def chunk_pdf(doc: dict) -> list:
    chunks = []

    for block in doc["blocks"]:
        page_idx = block["page_index"]
        text = block["text"].strip()

        if not text:
            continue

        chunk_id = make_chunk_id(
            doc["source_path"],
            f"page_{page_idx}"
        )

        chunks.append({
            "id": chunk_id,
            "source": doc["source_path"],
            "file_type": "pdf",
            "text": text,
            "metadata": {
                "page_index": page_idx,
                "block_type": block["type"]
            }
        })

    return chunks

def chunk_docx(doc: dict) -> list:
    chunks = []
    current_heading = None
    buffer = []

    for block in doc["blocks"]:
        if block["type"] == "heading":
            if buffer:
                chunk_id = make_chunk_id(
                    doc["source_path"],
                    current_heading or "intro"
                )

                chunks.append({
                    "id": chunk_id,
                    "source": doc["source_path"],
                    "file_type": "docx",
                    "text": "\n".join(buffer),
                    "metadata": {
                        "section": current_heading
                    }
                })

                buffer = []

            current_heading = block["text"]

        elif block["type"] == "paragraph":
            buffer.append(block["text"])

    # flush last section
    if buffer:
        chunk_id = make_chunk_id(
            doc["source_path"],
            current_heading or "final"
        )

        chunks.append({
            "id": chunk_id,
            "source": doc["source_path"],
            "file_type": "docx",
            "text": "\n".join(buffer),
            "metadata": {
                "section": current_heading
            }
        })

    return chunks

def chunk_pptx(doc: dict) -> list:
    chunks = []

    for slide in doc["blocks"]:
        slide_idx = slide["slide_index"]

        texts = []
        title = None

        for el in slide["elements"]:
            if el["role"] == "title":
                title = el["text"]
            texts.append(el["text"])

        content = "\n".join(texts).strip()
        if not content:
            continue

        chunk_id = make_chunk_id(
            doc["source_path"],
            f"slide_{slide_idx}"
        )

        chunks.append({
            "id": chunk_id,
            "source": doc["source_path"],
            "file_type": "pptx",
            "text": content,
            "metadata": {
                "slide_index": slide_idx,
                "title": title
            }
        })

    return chunks

def chunk_csv(doc: dict) -> list:
    chunk_id = make_chunk_id(
        doc["source_path"],
        "table"
    )

    return [{
        "id": chunk_id,
        "source": doc["source_path"],
        "file_type": "csv",
        "text": doc["text"],
        "metadata": {
            "schema": doc["metadata"]["schema"],
            "num_rows": doc["metadata"]["num_rows"]
        }
    }]


def chunk_txt(doc: dict, max_chars: int = 800, overlap: int = 100) -> list:
    text = doc["text"]
    chunks = []

    start = 0
    idx = 0

    while start < len(text):
        end = start + max_chars
        chunk_text = text[start:end].strip()

        if not chunk_text:
            break

        chunk_id = make_chunk_id(
            doc["source_path"],
            f"chunk_{idx}"
        )

        chunks.append({
            "id": chunk_id,
            "source": doc["source_path"],
            "file_type": "txt",
            "text": chunk_text,
            "metadata": {
                "char_start": start,
                "char_end": min(end, len(text))
            }
        })

        start = end - overlap
        idx += 1

    return chunks


def chunk_xlsx(doc: dict) -> list:
    chunks = []

    for block in doc["blocks"]:
        sheet_name = block["sheet_name"]
        df = block["dataframe"]

        chunk_id = make_chunk_id(
            doc["source_path"],
            f"sheet_{sheet_name}"
        )

        chunks.append({
            "id": chunk_id,
            "source": doc["source_path"],
            "file_type": "xlsx",
            "text": df.to_csv(index=False),
            "metadata": {
                "sheet_name": sheet_name,
                "num_rows": df.shape[0],
                "num_columns": df.shape[1],
                "columns": list(df.columns)
            }
        })

    return chunks


def chunk_document(doc: dict) -> list:
    file_type = doc["file_type"]

    if file_type == "pdf":
        return chunk_pdf(doc)
    elif file_type == "docx":
        return chunk_docx(doc)
    elif file_type == "pptx":
        return chunk_pptx(doc)
    elif file_type == "csv":
        return chunk_csv(doc)
    elif file_type =='txt':
        return chunk_txt(doc)
    elif file_type == 'xlsx':
        return chunk_xlsx(doc)
    else:
        raise ValueError(f"No chunker for {file_type}")



Writing core/chunking.py


Demo of ingestion + chunking

In [9]:
doc = ingest_file("data/raw/sample.docx")
chunks = chunk_document(doc)

for c in chunks:
    print("=" * 60)
    print("Metadata:", c["metadata"])
    print("Text preview:")
    print(c["text"][:100])


Metadata: {'section': 'Background'}
Text preview:
Customer churn has become a growing concern over the past two years.
Understanding churn patterns is
Metadata: {'section': 'Data Sources'}
Text preview:
The analysis is based on the following sources:
Monthly customer subscription records
Support ticket
Metadata: {'section': 'Key Findings'}
Text preview:
The analysis identified several notable patterns:
Customers with unresolved support tickets churn at
Metadata: {'section': 'Recommendations'}
Text preview:
Based on the findings, the following actions are recommended:
Prioritize resolution of long-standing


In [10]:
doc = ingest_file("data/raw/sample.pptx")
chunks = chunk_document(doc)

for c in chunks:
    print("=" * 60)
    print(c["metadata"])
    print("Text preview:")
    print(c["text"][:100])


{'slide_index': 0, 'title': None}
Text preview:
2023 Product Performance Summary
Overview of key performance indicators
Focus on revenue and custome
{'slide_index': 1, 'title': 'Top Performing Products'}
Text preview:
Top Performing Products
Product A showed consistent growth
Product B peaked in mid-year
Product C un
{'slide_index': 2, 'title': 'Revenue by Product Category'}
Text preview:
Revenue by Product Category
{'slide_index': 3, 'title': 'Quarterly Trends'}
Text preview:
Quarterly Trends
Q1 and Q2 growth was moderate
Q3 saw a sharp increase
Q4 stabilized at a higher bas
{'slide_index': 4, 'title': 'Key Takeaways'}
Text preview:
Key Takeaways
Electronics remain the primary revenue driver
Accessories require strategic reevaluati


In [11]:
doc = ingest_file("data/raw/sample.pdf")
chunks = chunk_document(doc)

for c in chunks:
    print("=" * 60)
    print(c["metadata"])
    print(c["text"][:100])


{'page_index': 0, 'block_type': 'page'}
Annual Sales Performance Review — 2023 
This document provides a detailed review of sales performanc
{'page_index': 1, 'block_type': 'page'}
Quarterly Revenue Overview 
 
Total revenue for 2023 showed a steady upward trend, with notable acce
{'page_index': 2, 'block_type': 'page'}
Regional Performance Analysis 
Sales performance varied significantly by region. 
• North America re


## Simple Information Retrieval using lexical socre

In [12]:
from typing import List
import re

def search_chunks(query: str, chunks: List[dict], top_k: int = 5) -> List[dict]:
    """
    Simple lexical search: count query occurrence in each chunk
    Returns top_k chunks sorted by count descending
    """
    query_lower = query.lower()
    scored = []

    for chunk in chunks:
        text_lower = chunk["text"].lower()
        score = text_lower.count(query_lower)

        if score > 0:
            scored.append((score, chunk))

    # Sort descending by score
    scored.sort(key=lambda x: x[0], reverse=True)

    return [c for _, c in scored[:top_k]]

from pathlib import Path

def build_corpus(file_paths: list[str]) -> list[dict]:
    corpus = []

    for path in file_paths:
        doc = ingest_file(path)
        chunks = chunk_document(doc)
        corpus.extend(chunks)

    return corpus

def lexical_score(text: str, query: str) -> int:
    tokens = re.findall(r"\w+", query.lower())
    text_lower = text.lower()

    return sum(text_lower.count(tok) for tok in tokens)

def search_corpus(query: str, corpus: list[dict], top_k: int = 5) -> list[dict]:
    scored = []

    for chunk in corpus:
        score = lexical_score(chunk["text"], query)
        if score > 0:
            scored.append((score, chunk))

    scored.sort(key=lambda x: x[0], reverse=True)
    return [c for _, c in scored[:top_k]]

def display_results(results: list[dict]):
    for rank, chunk in enumerate(results, start=1):
        print("=" * 80)
        print(f"Rank: {rank}")
        print("Source:", chunk["source"])
        print("File type:", chunk["file_type"])
        print("Metadata:", chunk["metadata"])
        print("Text preview:")
        print(chunk["text"][:400])




In [13]:
doc = ingest_file("data/raw/sample.pdf")
chunks = chunk_document(doc)

query = "revenue"
results = search_chunks(query, chunks, top_k=3)

for r in results:
    print("="*60)
    print("Chunk ID:", r["id"])
    print("Metadata:", r["metadata"])
    print("Text snippet:", r["text"][:300])


Chunk ID: a45d9ceede09fd42
Metadata: {'page_index': 1, 'block_type': 'page'}
Text snippet: Quarterly Revenue Overview 
 
Total revenue for 2023 showed a steady upward trend, with notable acceleration in Q3. 
Quarterly revenue figures (in million USD): 
• Q1: 12.4 
• Q2: 14.1 
• Q3: 18.7 
• Q4: 19.3 
The growth observed in Q3 coincides with the launch of a new marketing campaign.
Chunk ID: 6dbe80d88422232e
Metadata: {'page_index': 0, 'block_type': 'page'}
Text snippet: Annual Sales Performance Review — 2023 
This document provides a detailed review of sales performance for the fiscal year 2023. 
The objective is to analyze revenue trends, regional performance, and category-level 
breakdowns. 
Key goals of this report include: 
• Identifying revenue growth patterns
Chunk ID: a95f2e24ce11c218
Metadata: {'page_index': 2, 'block_type': 'page'}
Text snippet: Regional Performance Analysis 
Sales performance varied significantly by region. 
• North America remained the strongest region, contri

In [14]:
files = [
        "data/raw/sample.pdf",
        "data/raw/sample.docx",
        "data/raw/sample.pptx",
        "data/raw/sample.csv",
        
    ]

corpus = build_corpus(files)
print(f"Total chunks indexed: {len(corpus)}")

query = "revenue growth by region"
results = search_corpus(query, corpus, top_k=5)

display_results(results)

Total chunks indexed: 13
Rank: 1
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 2, 'block_type': 'page'}
Text preview:
Regional Performance Analysis 
Sales performance varied significantly by region. 
• North America remained the strongest region, contributing over 45% of total 
revenue. 
• Europe experienced moderate growth, particularly in Germany and France. 
• Asia-Pacific showed the highest growth rate but from a smaller base. 
Future analysis should explore correlations between regional marketing spend and r
Rank: 2
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 0, 'block_type': 'page'}
Text preview:
Annual Sales Performance Review — 2023 
This document provides a detailed review of sales performance for the fiscal year 2023. 
The objective is to analyze revenue trends, regional performance, and category-level 
breakdowns. 
Key goals of this report include: 
• Identifying revenue growth patterns over time 
• Comparing regional performance

  pd.to_datetime(sample, errors="raise")
  pd.to_datetime(sample, errors="raise")


## Semantic Retrieval

In [15]:
from sentence_transformers import SentenceTransformer
import numpy as np
from numpy.linalg import norm

model = SentenceTransformer("all-MiniLM-L6-v2")

def embed_chunks(chunks: list[dict]) -> np.ndarray:
    texts = [c["text"] for c in chunks]
    embeddings = model.encode(texts, show_progress_bar=True)
    return embeddings



def cosine_similarity(a, b):
    return (a @ b) / (norm(a) * norm(b))

def semantic_search(query: str, chunks: list[dict], embeddings: np.ndarray, top_k: int = 5):
    query_emb = model.encode(query)

    scores = []
    for emb, chunk in zip(embeddings, chunks):
        score = cosine_similarity(query_emb, emb)
        scores.append((score, chunk))

    scores.sort(key=lambda x: x[0], reverse=True)
    return [c for _, c in scores[:top_k]]



In [16]:
corpus = build_corpus(files)
embeddings = embed_chunks(corpus)

query = "Which products underperformed in 2023?"
results = semantic_search(query, corpus, embeddings, top_k=5)

display_results(results)

  pd.to_datetime(sample, errors="raise")
  pd.to_datetime(sample, errors="raise")


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Rank: 1
Source: data\raw\sample.pptx
File type: pptx
Metadata: {'slide_index': 1, 'title': 'Top Performing Products'}
Text preview:
Top Performing Products
Product A showed consistent growth
Product B peaked in mid-year
Product C underperformed expectations
Rank: 2
Source: data\raw\sample.pptx
File type: pptx
Metadata: {'slide_index': 0, 'title': None}
Text preview:
2023 Product Performance Summary
Overview of key performance indicators
Focus on revenue and customer adoption
Comparison across product categories
Rank: 3
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 0, 'block_type': 'page'}
Text preview:
Annual Sales Performance Review — 2023 
This document provides a detailed review of sales performance for the fiscal year 2023. 
The objective is to analyze revenue trends, regional performance, and category-level 
breakdowns. 
Key goals of this report include: 
• Identifying revenue growth patterns over time 
• Comparing regional performance 
• Highlighting underpe

## Hybrid Retrieval

Hybrid scoring function
Design choice (important)

- Default weights:

    - α = 0.6 semantic

    - β = 0.4 lexical

Semantic slightly dominates, but lexical can *override* nonsense

In [17]:
def min_max_normalize(scores: list[float]) -> list[float]:
    if not scores:
        return scores

    min_s, max_s = min(scores), max(scores)
    if min_s == max_s:
        return [1.0] * len(scores)

    return [(s - min_s) / (max_s - min_s) for s in scores]


def hybrid_search(
    query: str,
    chunks: list[dict],
    embeddings,
    alpha: float = 0.6,
    beta: float = 0.4,
    top_k: int = 5
):
    # --- Lexical scores ---
    lexical_scores = [
        lexical_score(chunk["text"], query)
        for chunk in chunks
    ]
    lexical_norm = min_max_normalize(lexical_scores)

    # --- Semantic scores ---
    query_emb = model.encode(query)
    semantic_scores = [
        cosine_similarity(query_emb, emb)
        for emb in embeddings
    ]
    semantic_norm = min_max_normalize(semantic_scores)

    # --- Combine ---
    combined = []
    for i, chunk in enumerate(chunks):
        score = alpha * semantic_norm[i] + beta * lexical_norm[i]
        combined.append((score, chunk))

    combined.sort(key=lambda x: x[0], reverse=True)
    return [c for _, c in combined[:top_k]]


In [18]:
files = [
        "data/raw/sample.pdf",
        "data/raw/sample.docx",
        "data/raw/sample.pptx",
        "data/raw/sample.csv",
        
    ]

corpus = build_corpus(files)
embeddings = embed_chunks(corpus)

query = "Which regions showed declining revenue trends?"
results = hybrid_search(query, corpus, embeddings, top_k=5)

display_results(results)

  pd.to_datetime(sample, errors="raise")
  pd.to_datetime(sample, errors="raise")


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Rank: 1
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 2, 'block_type': 'page'}
Text preview:
Regional Performance Analysis 
Sales performance varied significantly by region. 
• North America remained the strongest region, contributing over 45% of total 
revenue. 
• Europe experienced moderate growth, particularly in Germany and France. 
• Asia-Pacific showed the highest growth rate but from a smaller base. 
Future analysis should explore correlations between regional marketing spend and r
Rank: 2
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 0, 'block_type': 'page'}
Text preview:
Annual Sales Performance Review — 2023 
This document provides a detailed review of sales performance for the fiscal year 2023. 
The objective is to analyze revenue trends, regional performance, and category-level 
breakdowns. 
Key goals of this report include: 
• Identifying revenue growth patterns over time 
• Comparing regional performance 
• Highlighting underper

uncomment below to create retrieval.py in core_dir

In [8]:
%%writefile  core/retrieval.py

from typing import List, Dict
import numpy as np
from numpy.linalg import norm
from sentence_transformers import SentenceTransformer


# -----------------------------
# Models
# -----------------------------

_model = SentenceTransformer("all-MiniLM-L6-v2")


# -----------------------------
# Utilities
# -----------------------------

def cosine_similarity(a, b):
    return (a @ b) / (norm(a) * norm(b))


def lexical_score(text: str, query: str) -> float:
    """
    Simple term overlap score.
    Intentionally naive but deterministic.
    """
    text_tokens = set(text.lower().split())
    query_tokens = set(query.lower().split())
    return len(text_tokens & query_tokens)


def min_max_normalize(scores: List[float]) -> List[float]:
    if not scores:
        return scores

    min_s, max_s = min(scores), max(scores)
    if min_s == max_s:
        return [1.0] * len(scores)

    return [(s - min_s) / (max_s - min_s) for s in scores]


# -----------------------------
# Embeddings
# -----------------------------

def embed_chunks(chunks: List[Dict]) -> np.ndarray:
    texts = [c["text"] for c in chunks]
    return _model.encode(texts, show_progress_bar=True)


# -----------------------------
# Simple Retrieval
# -----------------------------



def search_corpus(query: str, corpus: list[dict], top_k: int = 5) -> list[dict]:
    scored = []

    for chunk in corpus:
        score = lexical_score(chunk["text"], query)
        if score > 0:
            scored.append((score, chunk))

    scored.sort(key=lambda x: x[0], reverse=True)
    return [c for _, c in scored[:top_k]]

# -----------------------------
# Semantic Retrieval
# -----------------------------


def semantic_search(query: str, chunks: list[dict], embeddings: np.ndarray, top_k: int = 5):
    query_emb = model.encode(query)

    scores = []
    for emb, chunk in zip(embeddings, chunks):
        score = cosine_similarity(query_emb, emb)
        scores.append((score, chunk))

    scores.sort(key=lambda x: x[0], reverse=True)
    return [c for _, c in scores[:top_k]]


# -----------------------------
# Hybrid Retrieval
# -----------------------------

def hybrid_search(
    query: str,
    chunks: List[Dict],
    embeddings: np.ndarray,
    alpha: float = 0.6,
    beta: float = 0.4,
    top_k: int = 5
):
    # Lexical
    lexical_scores = [lexical_score(c["text"], query) for c in chunks]
    lexical_norm = min_max_normalize(lexical_scores)

    # Semantic
    query_emb = _model.encode(query)
    semantic_scores = [
        cosine_similarity(query_emb, emb)
        for emb in embeddings
    ]
    semantic_norm = min_max_normalize(semantic_scores)

    # Combine
    combined = []
    for i, chunk in enumerate(chunks):
        score = alpha * semantic_norm[i] + beta * lexical_norm[i]
        combined.append((score, chunk))

    combined.sort(key=lambda x: x[0], reverse=True)
    return [c for _, c in combined[:top_k]]


Writing core/retrieval.py


# Putting it altogether, a simple IR system

### Building a corpus

In [19]:
file_paths = [
    "data/raw/sample.pdf",
    "data/raw/sample.docx",
    "data/raw/sample.pptx",
    "data/raw/sample.csv",
    
]

corpus = build_corpus(file_paths)

print(f"Total chunks in corpus: {len(corpus)}")

corpus[0]


Total chunks in corpus: 13


  pd.to_datetime(sample, errors="raise")
  pd.to_datetime(sample, errors="raise")


{'id': '6dbe80d88422232e',
 'source': 'data\\raw\\sample.pdf',
 'file_type': 'pdf',
 'text': 'Annual Sales Performance Review — 2023 \nThis document provides a detailed review of sales performance for the fiscal year 2023. \nThe objective is to analyze revenue trends, regional performance, and category-level \nbreakdowns. \nKey goals of this report include: \n• Identifying revenue growth patterns over time \n• Comparing regional performance \n• Highlighting underperforming product categories',
 'metadata': {'page_index': 0, 'block_type': 'page'}}

### Lexical Search

In [20]:
query = "revenue growth"

lexical_results = search_corpus(
    query=query,
    corpus=corpus,
    top_k=5
)

print("LEXICAL RESULTS")
display_results(lexical_results)


LEXICAL RESULTS
Rank: 1
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 2, 'block_type': 'page'}
Text preview:
Regional Performance Analysis 
Sales performance varied significantly by region. 
• North America remained the strongest region, contributing over 45% of total 
revenue. 
• Europe experienced moderate growth, particularly in Germany and France. 
• Asia-Pacific showed the highest growth rate but from a smaller base. 
Future analysis should explore correlations between regional marketing spend and r
Rank: 2
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 1, 'block_type': 'page'}
Text preview:
Quarterly Revenue Overview 
 
Total revenue for 2023 showed a steady upward trend, with notable acceleration in Q3. 
Quarterly revenue figures (in million USD): 
• Q1: 12.4 
• Q2: 14.1 
• Q3: 18.7 
• Q4: 19.3 
The growth observed in Q3 coincides with the launch of a new marketing campaign.
Rank: 3
Source: data\raw\sample.pdf
File type: pdf
Metadata: {

### Semantic Search

Embed corpus

In [21]:
embeddings = embed_chunks(corpus)
print(f"Embedding shape: {embeddings.shape}")


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embedding shape: (13, 384)


In [22]:
semantic_results = semantic_search(
    query=query,
    chunks=corpus,
    embeddings=embeddings,
    top_k=5
)

print("SEMANTIC RESULTS")
display_results(semantic_results)


SEMANTIC RESULTS
Rank: 1
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 1, 'block_type': 'page'}
Text preview:
Quarterly Revenue Overview 
 
Total revenue for 2023 showed a steady upward trend, with notable acceleration in Q3. 
Quarterly revenue figures (in million USD): 
• Q1: 12.4 
• Q2: 14.1 
• Q3: 18.7 
• Q4: 19.3 
The growth observed in Q3 coincides with the launch of a new marketing campaign.
Rank: 2
Source: data\raw\sample.pptx
File type: pptx
Metadata: {'slide_index': 2, 'title': 'Revenue by Product Category'}
Text preview:
Revenue by Product Category
Rank: 3
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 0, 'block_type': 'page'}
Text preview:
Annual Sales Performance Review — 2023 
This document provides a detailed review of sales performance for the fiscal year 2023. 
The objective is to analyze revenue trends, regional performance, and category-level 
breakdowns. 
Key goals of this report include: 
• Identifying revenue growth patter

### Hybrid (Lexical + Semantic) Search

In [23]:
hybrid_results = hybrid_search(
    query=query,
    chunks=corpus,
    embeddings=embeddings,
    alpha=0.6,   # semantic weight
    beta=0.4,    # lexical weight
    top_k=5
)

print("HYBRID RESULTS")
display_results(hybrid_results)


HYBRID RESULTS
Rank: 1
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 1, 'block_type': 'page'}
Text preview:
Quarterly Revenue Overview 
 
Total revenue for 2023 showed a steady upward trend, with notable acceleration in Q3. 
Quarterly revenue figures (in million USD): 
• Q1: 12.4 
• Q2: 14.1 
• Q3: 18.7 
• Q4: 19.3 
The growth observed in Q3 coincides with the launch of a new marketing campaign.
Rank: 2
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'page_index': 2, 'block_type': 'page'}
Text preview:
Regional Performance Analysis 
Sales performance varied significantly by region. 
• North America remained the strongest region, contributing over 45% of total 
revenue. 
• Europe experienced moderate growth, particularly in Germany and France. 
• Asia-Pacific showed the highest growth rate but from a smaller base. 
Future analysis should explore correlations between regional marketing spend and r
Rank: 3
Source: data\raw\sample.pdf
File type: pdf
Metadata: {'

**What the above demonstrates is:**

* ✅ Multi-format document ingestion (PDF, DOCX, PPTX, CSV, TXT)
* ✅ Structure-aware chunking aligned with document semantics
* ✅ Stable chunk identity across ingestion and retrieval stages
* ✅ Corpus-level retrieval over heterogeneous sources
* ✅ Lexical, semantic, and hybrid ranking strategies operating on the same corpus

Taken together, these components constitute a **real, albeit rudimentary, Information Retrieval (IR) system**.
While it does not yet include persistence, re-ranking, or answer synthesis, it faithfully implements the core retrieval pipeline that underpins modern RAG and search systems.



## Making a simple RAG system

### Testing the models 

Before adding the models to the pipeline lets first see whether the hardware can run the model standalone. If successfull and the pipeline still fails it will be easier to debug as this testing should confrim that the model works as intented and problem should be somewhere else in the pipeline.

In [24]:
from llama_cpp import Llama

llm = Llama(
    model_path="models/Phi-3-mini-4k-instruct-q4.gguf",
    n_gpu_layers=20,
    n_ctx=2048,
    verbose=False,  # optional: hides the huge logs
)

response = llm.create_chat_completion(
    messages=[
        {"role": "system", "content": "You are a concise technical assistant."},
        {"role": "user", "content": "Explain cosine similarity in one sentence."}
    ],
    max_tokens=64,
    temperature=0.2,
)

print(response["choices"][0]["message"]["content"])


llama_context: n_ctx_per_seq (2048) < n_ctx_train (4096) -- the full capacity of the model will not be utilized


 Cosine similarity is a measure of the cosine of the angle between two non-zero vectors in a multi-dimensional space, indicating the degree of similarity between the vectors.


In [25]:
from llama_cpp import Llama

mistral = Llama(
    model_path="models/mistral-7b-instruct-v0.2.Q4_K_M.gguf",
    n_gpu_layers=10,   # start at 10~14 go up/down as required; reduce if OOM
    n_ctx=2048,
    verbose = False
)

response = mistral.create_chat_completion(
    messages=[
        {"role": "system", "content": "You are a concise technical assistant."},
        {"role": "user", "content": "Explain cosine similarity in one sentence."}
    ],
    max_tokens=64,
    temperature=0.2,
)

print(response["choices"][0]["message"]["content"])


llama_context: n_ctx_per_seq (2048) < n_ctx_train (32768) -- the full capacity of the model will not be utilized


 Cosine similarity is a measure of the degree to which two non-zero vectors point in the same direction in a multi-dimensional space, with the result being a value between -1 and 1 that represents the strength and direction of the similarity.


### Combining both models in a function

In [26]:
from typing import Literal
from llama_cpp import Llama

MODEL_CONFIGS = {
    "phi3": {
        "path": "models/Phi-3-mini-4k-instruct-q4.gguf",
        "n_ctx": 4096, # 2048
        "default_gpu_layers": 12
    },
    "mistral7b": {
        "path": "models/mistral-7b-instruct-v0.2.Q4_K_M.gguf",
        "n_ctx": 6000, # 2048
        "default_gpu_layers": 14
    }
}

def run_llm_inference(
    prompt: str,
    model_name: Literal["phi3", "mistral7b"] = "phi3",
    max_tokens: int = 256,
    temperature: float = 0.2,
    system_prompt: str = None,
) -> str:
    """
    Run inference using a selected lightweight LLM.

    Parameters
    ----------
    prompt : str
        The user question or instruction to answer.
    model_name : {'phi3', 'mistral7b'}
        - 'phi3': Phi-3-mini-4k-instruct (lighter, faster)
        - 'mistral7b': Mistral-7B-Instruct-v0.2-Q4_K_M (larger, stronger)
    max_tokens : int
        Upper bound on generation length. Keep moderate to avoid OOM.
    temperature : float
        Sampling temperature; lower is more deterministic.
    system_prompt : str | None
        Instructions that govern assistant behavior.

    Returns
    -------
    str
        The assistant’s generated text.
    """

    if model_name not in MODEL_CONFIGS:
        raise ValueError(
            f"Unsupported model '{model_name}'. "
            f"Choose 'phi3' or 'mistral7b'."
        )

    config = MODEL_CONFIGS[model_name]

    # Default system prompt if not provided
    if system_prompt is None:
        system_prompt = (
            "You are a helpful assistant. "
            "Use context responsibly and answer the query clearly."
        )

    llm = Llama(
        model_path=config["path"],
        n_gpu_layers=config["default_gpu_layers"],
        n_ctx=config["n_ctx"],
        verbose=False
    )

    # Use chat API for both instruct models
    response = llm.create_chat_completion(
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": prompt}
        ],
        max_tokens=max_tokens,
        temperature=temperature,
    
    )

    return response["choices"][0]["message"]["content"]


In [27]:
# Testing both models

print("=== Phi-3-mini ===")
print(run_llm_inference("What is cosine similarity?", model_name="phi3"))

print("=== Mistral-7B-Instruct ===")
print(run_llm_inference("What is cosine similarity?", model_name="mistral7b"))


=== Phi-3-mini ===
 Cosine similarity is a measure used to calculate the similarity between two non-zero vectors of an inner product space. It is most commonly applied in the context of text analysis, where the vectors represent the frequency of words in documents. The cosine similarity is calculated as the cosine of the angle between the two vectors, which is a measure of how closely related the two vectors are in terms of their orientation.

The formula for cosine similarity between two vectors A and B is:

cosine_similarity(A, B) = (A • B) / (||A|| * ||B||)

where:
- A • B is the dot product of vectors A and B,
- ||A|| is the magnitude (or Euclidean norm) of vector A,
- ||B|| is the magnitude of vector B.

The result of cosine similarity is a value between -1 and 1. A cosine similarity of 1 indicates that the vectors are identical, a value of 0 indicates orthogonality (no similarity), and a value of -1 indicates that the vectors are diametrically opposed.

Here's a simple Python fun

llama_context: n_ctx_per_seq (6000) < n_ctx_train (32768) -- the full capacity of the model will not be utilized


 Cosine similarity is a measure of the similarity between two non-zero vectors of an inner product space, such as Euclidean space. It is used to determine the degree of alignment between the vectors, and is often used in text analysis, information retrieval, and machine learning to compare the similarity of documents or data points.

The cosine similarity is calculated by taking the dot product of the two vectors and dividing it by the product of the magnitudes (lengths) of the vectors. The result is a value between -1 and 1, where 1 indicates identical vectors, 0 indicates orthogonal (perpendicular) vectors, and -1 indicates opposite vectors. A value close to 1 indicates high similarity, while a value close to -1 indicates low similarity.

The formula for cosine similarity is:

cosine_similarity = (A · B) / (||A|| * ||B||)

where A and B are the input vectors, "·" represents the dot product, and "||" represents the magnitude (or Euclidean norm) of the vector.


### Context Assembly

In [28]:
def assemble_context(results, max_tokens: int = 3000) -> str:
    """
    Assemble retrieved chunks into a single context string for LLM inference.

    This function concatenates the top-ranked retrieved chunks while enforcing
    an approximate token budget. The default limit (3000 tokens) is chosen to
    accommodate small-to-mid sized instruction-tuned LLMs running on
    consumer-grade GPUs (e.g., RTX 3060 with ~6GB VRAM).

    Why this constraint exists:
    - Larger context windows increase VRAM usage and inference latency.
    - Quantized 7B-class models typically operate safely within a 2k–4k token window.
    - Exceeding this budget may cause OOM errors or severe slowdowns.

    Advanced users with more VRAM (or CPU-based inference) may safely increase
    this value, provided their selected model supports larger context windows.

    Parameters
    ----------
    results : list of dict
        Ranked retrieval results. Each entry must contain:
        - "text": chunk text
        - "source": source document identifier
        - "id": stable chunk identifier

    max_tokens : int, optional (default=3000)
        Approximate token budget for the assembled context.

    Returns
    -------
    str
        A single formatted context string suitable for RAG-style prompting.
    """

    context_blocks = []
    token_count = 0

    for chunk in results:
        text = chunk["text"]
        source = chunk.get("source", "unknown")
        chunk_id = chunk.get("id", "unknown")

        # Approximate token count (1 token ≈ 0.75 words is model-dependent)
        approx_tokens = len(text.split())

        if token_count + approx_tokens > max_tokens:
            break

        block = (
            f"[Source: {source} | Chunk ID: {chunk_id}]\n"
            f"{text}\n"
        )

        context_blocks.append(block)
        token_count += approx_tokens

    return "\n".join(context_blocks)


## LLM Answer Generation

In [29]:
def generate_llm_answer(
    context: str,
    query: str,
    model: str,
    system_prompt: str = (
        "You are a technical assistant.\n"
        "Answer the question using ONLY the provided context.\n"
        "If the answer cannot be found in the context, say so explicitly."
    ),
) -> str:
    """
    Generate an answer using a lightweight, VRAM-constrained LLM for RAG.

    This function performs grounded generation by combining:
    - a system prompt (behavioral constraint),
    - retrieved context (knowledge grounding),
    - and a user query.

    The model must be explicitly selected from the supported options to ensure
    predictable VRAM usage and inference behavior.

    Supported models:
    - "mistral7b": higher reasoning quality, ~4–5GB VRAM (4-bit)
    - "phi3": lower VRAM, faster inference, weaker reasoning
    - defaults to "phi3"

    Parameters
    ----------
    context : str
        Assembled retrieval context produced by `assemble_context`.

    query : str
        User query to be answered.

    model : str
        Model identifier. Must be one of:
        - "mistral7b"
        - "phi3"

    system_prompt : str, optional
        Instruction to guide model behavior. Defaults to a strict
        context-grounded RAG prompt.

    Returns
    -------
    str
        Generated answer text.
    """

    if model not in MODEL_CONFIGS:
        raise ValueError(
            f"Unsupported model '{model}'. "
            f"Choose from: {list(MODEL_CONFIGS.keys())}"
        )


    if not context.strip():
        return (
        "No relevant information was found in the retrieved documents "
        "to answer this question."
    )


    prompt = f"""
{system_prompt}

Context:
---------
{context}

Question:
---------
{query}

Answer:
""".strip()

    # Placeholder for actual inference call
    # Example options:
    # - llama_cpp.Llama(...)
    # - subprocess call to llama.cpp binary
    # - local wrapper around gguf inference

    if len(prompt) > 12000:
        raise ValueError("Prompt too long for configured context window.")

    answer = run_llm_inference(
        prompt=prompt,
        model_name=model,
    )

    return answer


In [30]:
# 1 Build Corpus
file_paths = [
    "data/raw/sample.pdf",
    "data/raw/sample.docx",
    "data/raw/sample.pptx",
    "data/raw/sample.csv",
    
]

corpus = build_corpus(file_paths)

print(f"Total chunks in corpus: {len(corpus)}")

print(corpus[0])

# 2 Embed Chunks
embeddings = embed_chunks(corpus)


Total chunks in corpus: 13
{'id': '6dbe80d88422232e', 'source': 'data\\raw\\sample.pdf', 'file_type': 'pdf', 'text': 'Annual Sales Performance Review — 2023 \nThis document provides a detailed review of sales performance for the fiscal year 2023. \nThe objective is to analyze revenue trends, regional performance, and category-level \nbreakdowns. \nKey goals of this report include: \n• Identifying revenue growth patterns over time \n• Comparing regional performance \n• Highlighting underperforming product categories', 'metadata': {'page_index': 0, 'block_type': 'page'}}


  pd.to_datetime(sample, errors="raise")
  pd.to_datetime(sample, errors="raise")


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

In [31]:
query = "What were the revenue and units sold on 01-01-23"

results = hybrid_search(
    query=query,
    chunks=corpus,
    embeddings=embeddings,
    alpha=0.6,   # semantic weight
    beta=0.4,    # lexical weight
    top_k=5
)


context = assemble_context(results)

answer = generate_llm_answer(
    context=context,
    query=query,
    model="mistral7b",  # "mistral7b" | "phi3"
)
print("\n=== FINAL ANSWER ===")
print(answer)


llama_context: n_ctx_per_seq (6000) < n_ctx_train (32768) -- the full capacity of the model will not be utilized



=== FINAL ANSWER ===
 According to the context provided, the revenue on 01-01-23 for the North America region in the Electronics product category was $1,200,000 and the units sold were 3,400.


In [9]:
%%writefile generation.py

from typing import Literal
from llama_cpp import Llama

MODEL_CONFIGS = {
    "phi3": {
        "path": "models/Phi-3-mini-4k-instruct-q4.gguf",
        "n_ctx": 4096, # 2048
        "default_gpu_layers": 12
    },
    "mistral7b": {
        "path": "models/mistral-7b-instruct-v0.2.Q4_K_M.gguf",
        "n_ctx": 6000, # 2048
        "default_gpu_layers": 14
    }
}

def run_llm_inference(
    prompt: str,
    model_name: Literal["phi3", "mistral7b"] = "phi3",
    max_tokens: int = 256,
    temperature: float = 0.2,
    system_prompt: str = None,
) -> str:
    """
    Run inference using a selected lightweight LLM.

    Parameters
    ----------
    prompt : str
        The user question or instruction to answer.
    model_name : {'phi3', 'mistral7b'}
        - 'phi3': Phi-3-mini-4k-instruct (lighter, faster)
        - 'mistral7b': Mistral-7B-Instruct-v0.2-Q4_K_M (larger, stronger)
    max_tokens : int
        Upper bound on generation length. Keep moderate to avoid OOM.
    temperature : float
        Sampling temperature; lower is more deterministic.
    system_prompt : str | None
        Instructions that govern assistant behavior.

    Returns
    -------
    str
        The assistant’s generated text.
    """

    if model_name not in MODEL_CONFIGS:
        raise ValueError(
            f"Unsupported model '{model_name}'. "
            f"Choose 'phi3' or 'mistral7b'."
        )

    config = MODEL_CONFIGS[model_name]

    # Default system prompt if not provided
    if system_prompt is None:
        system_prompt = (
            "You are a helpful assistant. "
            "Use context responsibly and answer the query clearly."
        )

    llm = Llama(
        model_path=config["path"],
        n_gpu_layers=config["default_gpu_layers"],
        n_ctx=config["n_ctx"],
        verbose=False
    )

    # Use chat API for both instruct models
    response = llm.create_chat_completion(
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": prompt}
        ],
        max_tokens=max_tokens,
        temperature=temperature,
    
    )

    return response["choices"][0]["message"]["content"]


def assemble_context(results, max_tokens: int = 3000) -> str:
    """
    Assemble retrieved chunks into a single context string for LLM inference.

    This function concatenates the top-ranked retrieved chunks while enforcing
    an approximate token budget. The default limit (3000 tokens) is chosen to
    accommodate small-to-mid sized instruction-tuned LLMs running on
    consumer-grade GPUs (e.g., RTX 3060 with ~6GB VRAM).

    Why this constraint exists:
    - Larger context windows increase VRAM usage and inference latency.
    - Quantized 7B-class models typically operate safely within a 2k–4k token window.
    - Exceeding this budget may cause OOM errors or severe slowdowns.

    Advanced users with more VRAM (or CPU-based inference) may safely increase
    this value, provided their selected model supports larger context windows.

    Parameters
    ----------
    results : list of dict
        Ranked retrieval results. Each entry must contain:
        - "text": chunk text
        - "source": source document identifier
        - "id": stable chunk identifier

    max_tokens : int, optional (default=3000)
        Approximate token budget for the assembled context.

    Returns
    -------
    str
        A single formatted context string suitable for RAG-style prompting.
    """

    context_blocks = []
    token_count = 0

    for chunk in results:
        text = chunk["text"]
        source = chunk.get("source", "unknown")
        chunk_id = chunk.get("id", "unknown")

        # Approximate token count (1 token ≈ 0.75 words is model-dependent)
        approx_tokens = len(text.split())

        if token_count + approx_tokens > max_tokens:
            break

        block = (
            f"[Source: {source} | Chunk ID: {chunk_id}]\n"
            f"{text}\n"
        )

        context_blocks.append(block)
        token_count += approx_tokens

    return "\n".join(context_blocks)

def generate_llm_answer(
    context: str,
    query: str,
    model: str,
    system_prompt: str = (
        "You are a technical assistant.\n"
        "Answer the question using ONLY the provided context.\n"
        "If the answer cannot be found in the context, say so explicitly."
    ),
) -> str:
    """
    Generate an answer using a lightweight, VRAM-constrained LLM for RAG.

    This function performs grounded generation by combining:
    - a system prompt (behavioral constraint),
    - retrieved context (knowledge grounding),
    - and a user query.

    The model must be explicitly selected from the supported options to ensure
    predictable VRAM usage and inference behavior.

    Supported models:
    - "mistral7b": higher reasoning quality, ~4–5GB VRAM (4-bit)
    - "phi3": lower VRAM, faster inference, weaker reasoning
    - defaults to "phi3"

    Parameters
    ----------
    context : str
        Assembled retrieval context produced by `assemble_context`.

    query : str
        User query to be answered.

    model : str
        Model identifier. Must be one of:
        - "mistral7b"
        - "phi3"

    system_prompt : str, optional
        Instruction to guide model behavior. Defaults to a strict
        context-grounded RAG prompt.

    Returns
    -------
    str
        Generated answer text.
    """

    if model not in MODEL_CONFIGS:
        raise ValueError(
            f"Unsupported model '{model}'. "
            f"Choose from: {list(MODEL_CONFIGS.keys())}"
        )


    if not context.strip():
        return (
        "No relevant information was found in the retrieved documents "
        "to answer this question."
    )


    prompt = f"""
{system_prompt}

Context:
---------
{context}

Question:
---------
{query}

Answer:
""".strip()

    
    # Inference backend is currently llama.cpp via llama_cpp.
    # This function can be extended to support alternative backends
    # (e.g., vLLM, Triton, remote APIs) without changing call sites.


    if len(prompt) > 12000:
        raise ValueError("Prompt too long for configured context window.")

    answer = run_llm_inference(
        prompt=prompt,
        model_name=model,
    )

    return answer


Writing generation.py


The above is simply taking the functions created for LLM answer generation and saving it into a .py file to modularize the workflow. 

In [2]:
%%writefile state.py

import pickle
from pathlib import Path
from typing import List, Dict, Optional

import numpy as np
from sentence_transformers import SentenceTransformer
from llama_cpp import Llama

from core.ingestion import ingest_file
from core.chunking import chunk_document
from core.retrieval import embed_chunks
from generation import MODEL_CONFIGS


DATA_DIR = Path("data_state")
DATA_DIR.mkdir(exist_ok=True)

CORPUS_PATH = DATA_DIR / "corpus.pkl"
EMBEDDINGS_PATH = DATA_DIR / "embeddings.npy"


_corpus: Optional[List[Dict]] = None
_embeddings: Optional[np.ndarray] = None
_embedding_model: Optional[SentenceTransformer] = None
_llm_cache: Dict[str, Llama] = {}
_last_answer: Optional[str] = None


def load_state():
    global _corpus, _embeddings

    if CORPUS_PATH.exists():
        with open(CORPUS_PATH, "rb") as f:
            _corpus = pickle.load(f)

    if EMBEDDINGS_PATH.exists():
        _embeddings = np.load(EMBEDDINGS_PATH)

    return _corpus, _embeddings


def save_state(corpus, embeddings):
    with open(CORPUS_PATH, "wb") as f:
        pickle.dump(corpus, f)
    np.save(EMBEDDINGS_PATH, embeddings)


def get_embedding_model():
    global _embedding_model
    if _embedding_model is None:
        _embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
    return _embedding_model


def build_or_update_corpus(paths: List[Path], rebuild=False):
    global _corpus, _embeddings

    if rebuild or _corpus is None:
        _corpus = []
        _embeddings = None

    existing_sources = {c["source"] for c in _corpus} if _corpus else set()
    new_chunks = []

    for path in paths:
        if str(path) in existing_sources:
            continue
        doc = ingest_file(str(path))
        new_chunks.extend(chunk_document(doc))

    if not new_chunks:
        return _corpus

    model = get_embedding_model()
    new_embeddings = model.encode([c["text"] for c in new_chunks])

    _embeddings = (
        new_embeddings if _embeddings is None
        else np.vstack([_embeddings, new_embeddings])
    )

    _corpus.extend(new_chunks)
    save_state(_corpus, _embeddings)
    return _corpus


def get_corpus():
    global _corpus
    if _corpus is None:
        load_state()
    return _corpus


def get_embeddings():
    global _embeddings
    if _embeddings is None:
        load_state()
    return _embeddings


def get_llm(model_name: str) -> Llama:
    if model_name not in MODEL_CONFIGS:
        raise ValueError(f"Unsupported model: {model_name}")

    if model_name not in _llm_cache:
        cfg = MODEL_CONFIGS[model_name]
        _llm_cache[model_name] = Llama(
            model_path=cfg["path"],
            n_ctx=cfg["n_ctx"],
            n_gpu_layers=cfg["default_gpu_layers"],
            verbose=False
        )

    return _llm_cache[model_name]


def get_last_answer():
    return _last_answer


def set_last_answer(answer: str):
    global _last_answer
    _last_answer = answer


Overwriting state.py


The state file has some new functions namely to keep track of a few things to improve the efficiency of the workflow. First the corpus nad embeddings are created from the directory (once) and updated only if new files are detected. Another important addition is the tracking of the current answer generated by the LLM and storing in a state variable called "_last_answer" 

In [3]:
%%writefile app.py

from pathlib import Path
from typing import List

from state import (
    build_or_update_corpus,
    get_corpus,
    get_embeddings,
    get_last_answer,
    set_last_answer,
    get_llm
)

from core.retrieval import hybrid_search
from generation import assemble_context, generate_llm_answer


def collect_files(paths: List[str]) -> List[Path]:
    files = []
    for p in paths:
        path = Path(p)
        if path.is_dir():
            files.extend(f for f in path.rglob("*") if f.is_file())
        else:
            files.append(path)
    return files


def estimate_tokens(llm, text: str) -> int:
    return len(llm.tokenize(text.encode("utf-8")))


def answer_query(query: str, model="mistral7b", top_k=3):
    corpus = get_corpus()
    embeddings = get_embeddings()

    results = hybrid_search(
        query=query,
        chunks=corpus,
        embeddings=embeddings,
        top_k=top_k
    )

    context = assemble_context(results)

    prev = get_last_answer()
    if prev:
        context = f"[Previous Answer]\n{prev}\n\n{context}"

    answer = generate_llm_answer(
        context=context,
        query=query,
        model=model
    )

    set_last_answer(answer)

    llm = get_llm(model)
    tokens = {
        "query": estimate_tokens(llm, query),
        "context": estimate_tokens(llm, context),
        "answer": estimate_tokens(llm, answer),
    }
    tokens["total"] = sum(tokens.values())

    return answer, tokens


Writing app.py


A simple app.py for using the model using CLI

In [4]:
%%writefile streamlit_app.py

import streamlit as st
from app import answer_query, collect_files
from state import build_or_update_corpus, get_corpus


st.set_page_config(page_title="Local Hybrid RAG", layout="wide")
st.title("Local Hybrid RAG (Lexical + Semantic)")


st.sidebar.header("Corpus")
data_dir = st.sidebar.text_input("Data directory", "data/raw")
rebuild = st.sidebar.checkbox("Rebuild corpus")

if st.sidebar.button("Ingest"):
    files = collect_files([data_dir])
    build_or_update_corpus(files, rebuild=rebuild)
    st.sidebar.success("Corpus ready")

corpus = get_corpus()
if corpus:
    st.sidebar.write(f"Chunks: {len(corpus)}")


query = st.text_input("Query")

col1, col2 = st.columns(2)
with col1:
    model = st.selectbox("Model", ["mistral7b"])
with col2:
    top_k = st.slider("Top-K", 1, 10, 3)

if st.button("Run") and query:
    with st.spinner("Running hybrid retrieval..."):
        answer, tokens = answer_query(query, model=model, top_k=top_k)

    st.subheader("Answer")
    st.write(answer)

    st.subheader("Token Usage")
    st.json(tokens)


Writing streamlit_app.py


A simple streamlit UI that combines everything done till now.