# data ingestion pipeline 

In [40]:
!uv pip install python-docx faiss-cpu rank-bm25

[2K[37m‚†ô[0m [2mfaiss-cpu==1.8.0.post1                                                        [0m

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


[2K[2mResolved [1m7 packages[0m [2min 223ms[0m[0m                                         [0m
[2K[2mPrepared [1m1 package[0m [2min 43ms[0m[0m                                               
[2K[2mInstalled [1m1 package[0m [2min 0.92ms[0m[0m                               [0m
 [32m+[39m [1mrank-bm25[0m[2m==0.2.2[0m


In [13]:
# ============================================================
# üì¶ Data Ingestion Pipeline ‚Äî Starter Notebook
# Author: Manodeep Ray
# Date: 2025-10-14
# ============================================================

import os
import shutil
import json
import hashlib
from datetime import datetime
import pandas as pd
from pathlib import Path
from docx import Document
from PyPDF2 import PdfReader

# ------------------------------------------------------------
# 1Ô∏è‚É£ Define folder structure
# ------------------------------------------------------------
BASE_DIR = Path.cwd()
UPLOADED_DIR = BASE_DIR / "uploaded"
DATA_WAREHOUSE_DIR = BASE_DIR / "database" / "data_warehouse"
LOG_DIR = BASE_DIR / "database" / "logs"

# Create directories if not exist
for path in [UPLOADED_DIR, DATA_WAREHOUSE_DIR, LOG_DIR]:
    path.mkdir(parents=True, exist_ok=True)

# ------------------------------------------------------------
# 2Ô∏è‚É£ Define file tracking files
# ------------------------------------------------------------
STATUS_FILE = LOG_DIR / "file_status.json"
PROCESSED_CSV = LOG_DIR / "processed_files.csv"
PROCESS_LOG = LOG_DIR / "processing_log.txt"

# Initialize files if not exist
if not STATUS_FILE.exists():
    with open(STATUS_FILE, "w") as f:
        json.dump({}, f, indent=4)

if not PROCESSED_CSV.exists():
    pd.DataFrame(columns=["file_name", "status", "timestamp", "hash"]).to_csv(PROCESSED_CSV, index=False)

# ------------------------------------------------------------
# 3Ô∏è‚É£ Helper Functions
# ------------------------------------------------------------
def compute_file_hash(file_path):
    """Compute SHA256 hash of a file."""
    sha256 = hashlib.sha256()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            sha256.update(chunk)
    return sha256.hexdigest()

def log_message(message):
    """Log messages with timestamp."""
    timestamp = datetime.utcnow().isoformat()
    with open(PROCESS_LOG, "a") as log:
        log.write(f"[{timestamp}] {message}\n")
    print(message)

def load_status():
    """Load status JSON."""
    with open(STATUS_FILE, "r") as f:
        return json.load(f)

def save_status(status_data):
    """Save updated status JSON."""
    with open(STATUS_FILE, "w") as f:
        json.dump(status_data, f, indent=4)

# ------------------------------------------------------------
# 4Ô∏è‚É£ File Reading (Simple Preview Functionality)
# ------------------------------------------------------------
def read_file(file_path):
    """Read text from TXT, DOCX, or PDF."""
    ext = file_path.suffix.lower()
    try:
        if ext == ".txt":
            with open(file_path, "r", encoding="utf-8") as f:
                return f.read()
        elif ext == ".docx":
            doc = Document(file_path)
            return "\n".join([p.text for p in doc.paragraphs])
        elif ext == ".pdf":
            reader = PdfReader(file_path)
            return "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
        else:
            return None
    except Exception as e:
        log_message(f"‚ùå Error reading file {file_path.name}: {e}")
        return None

# ------------------------------------------------------------
# 5Ô∏è‚É£ Main Ingestion Function
# ------------------------------------------------------------
def ingest_files():
    status_data = load_status()

    for file_path in UPLOADED_DIR.glob("*.*"):
        file_name = file_path.name
        file_hash = compute_file_hash(file_path)
        timestamp = datetime.utcnow().isoformat()

        # Skip if already processed and hash unchanged
        if file_name in status_data and status_data[file_name]["hash"] == file_hash:
            log_message(f"‚ö†Ô∏è File {file_name} already processed ‚Äî skipping.")
            continue

        # Try reading and moving file
        text_data = read_file(file_path)
        if text_data is None:
            status_data[file_name] = {"status": "failed", "timestamp": timestamp, "hash": file_hash}
            log_message(f"‚ùå Failed to process {file_name}")
            continue

        try:
            # Move to data warehouse
            dest_path = DATA_WAREHOUSE_DIR / file_name
            shutil.move(str(file_path), str(dest_path))

            # Update status
            status_data[file_name] = {"status": "pending", "timestamp": timestamp, "hash": file_hash}
            log_message(f"‚úÖ Processed and moved {file_name} to data warehouse.")

        except Exception as e:
            status_data[file_name] = {"status": "failed", "timestamp": timestamp, "hash": file_hash}
            log_message(f"‚ùå Error moving file {file_name}: {e}")

    # Save status JSON
    save_status(status_data)

    # Update CSV
    df = pd.DataFrame([
        {"file_name": k, "status": v["status"], "timestamp": v["timestamp"], "hash": v["hash"]}
        for k, v in status_data.items()
    ])
    df.to_csv(PROCESSED_CSV, index=False)



[2K[37m‚†ô[0m [2mpython-docx==1.1.2                                                            [0m

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


[2K[2mResolved [1m6 packages[0m [2min 127ms[0m[0m                                         [0m
[2K[2mInstalled [1m1 package[0m [2min 9ms[0m[0mst1                               [0m
 [32m+[39m [1mfaiss-cpu[0m[2m==1.8.0.post1[0m


In [14]:
# ------------------------------------------------------------
# 6Ô∏è‚É£ Run Ingestion
# ------------------------------------------------------------
ingest_files()


‚úÖ Processed and moved NoteGPT_Justice_ What'sTheRightThingToDo_Episode01_THE MORAL SIDE OF MURDER_.txt to data warehouse.
‚úÖ Processed and moved NoteGPT_Last Lecture Series_ How to Live your Life at Full Power ‚Äî Graham Weaver.txt to data warehouse.
‚úÖ Processed and moved ml_intro.txt to data warehouse.
‚úÖ Processed and moved NoteGPT_Justice_ What'sTheRightThingToDo_Episode01_THE MORAL SIDE OF MURDER_ (1).txt to data warehouse.
‚úÖ Processed and moved NoteGPT_Optimal Protocols for Studying & Learning.txt to data warehouse.
‚úÖ Processed and moved Hausman_PhilosophyOfEconomicsAnthology.pdf to data warehouse.
‚úÖ Processed and moved NoteGPT_The Trillion Dollar Equation.txt to data warehouse.
‚úÖ Processed and moved NoteGPT_Take a Seat in the Harvard MBA Case Classroom.txt to data warehouse.
‚úÖ Processed and moved 6372215474167643589th hsitory FILE.pdf to data warehouse.


In [81]:
# ============================================================
# üß© Cleaning + Chunking + Metadata Tracking (RAG-Ready)
# Author: Manodeep Ray
# ============================================================

import json
import re
import hashlib
from datetime import datetime
from pathlib import Path
from PyPDF2 import PdfReader
from docx import Document

# ------------------------------------------------------------
# üß± Directory Setup
# ------------------------------------------------------------
BASE_DIR = Path.cwd()
DATA_WAREHOUSE_DIR = BASE_DIR / "database" / "data_warehouse"
PROCESSED_DIR = BASE_DIR / "database" / "processed"
CLEANED_DIR = PROCESSED_DIR / "cleaned"
CHUNK_DIR = PROCESSED_DIR / "chunks"
LOG_DIR = BASE_DIR /"database" / "logs"

for path in [CLEANED_DIR, CHUNK_DIR, LOG_DIR]:
    path.mkdir(parents=True, exist_ok=True)

CHUNK_STATUS_FILE = LOG_DIR / "chunk_status.json"
if not CHUNK_STATUS_FILE.exists():
    with open(CHUNK_STATUS_FILE, "w") as f:
        json.dump({}, f, indent=4)

# ------------------------------------------------------------
# üßπ Cleaning Function
# ------------------------------------------------------------
def clean_text(text):
    """Clean and standardize extracted text."""
    if not isinstance(text, str):
        return ""
    
    text = text.encode("ascii", "ignore").decode()                  # Remove non-ASCII
    text = re.sub(r"http\S+|www\S+|https\S+", "[URL]", text)        # Replace URLs
    text = re.sub(r"[^a-zA-Z0-9.,;:?!()\[\]'\s-]", " ", text)      # Remove unwanted chars
    text = re.sub(r"\s+", " ", text).strip()                        # Normalize whitespace
    text = text.lower()                                             # Normalize case
    return text

# ------------------------------------------------------------
# üìÑ File Reading with Page Extraction (for PDFs)
# ------------------------------------------------------------
def read_file_content_with_pages(file_path):
    """Extracts text by pages if PDF, else as single document."""
    ext = file_path.suffix.lower()
    try:
        if ext == ".pdf":
            reader = PdfReader(file_path)
            return [(i + 1, page.extract_text()) for i, page in enumerate(reader.pages) if page.extract_text()]
        elif ext == ".docx":
            doc = Document(file_path)
            text = "\n".join([p.text for p in doc.paragraphs])
            return [(None, text)]
        elif ext == ".txt":
            with open(file_path, "r", encoding="utf-8") as f:
                return [(None, f.read())]
        else:
            return []
    except Exception as e:
        print(f"‚ö†Ô∏è Error reading {file_path.name}: {e}")
        return []

# ------------------------------------------------------------
# üß© Chunking Function
# ------------------------------------------------------------
def chunk_text(text, chunk_size=800, overlap=100):
    """Split text into overlapping chunks."""
    words = text.split()
    chunks = []
    start = 0
    while start < len(words):
        end = start + chunk_size
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        start += chunk_size - overlap
    return chunks

# ------------------------------------------------------------
# üîë Hash & JSON Utilities
# ------------------------------------------------------------
def compute_file_hash(file_path):
    sha256 = hashlib.sha256()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            sha256.update(chunk)
    return sha256.hexdigest()

def load_chunk_status():
    with open(CHUNK_STATUS_FILE, "r") as f:
        return json.load(f)

def save_chunk_status(data):
    with open(CHUNK_STATUS_FILE, "w") as f:
        json.dump(data, f, indent=4)

# ------------------------------------------------------------
# üöÄ Main Process
# ------------------------------------------------------------
def process_data_warehouse_with_metadata(chunk_size=800, overlap=100):
    chunk_status = load_chunk_status()

    for file_path in DATA_WAREHOUSE_DIR.glob("*.*"):
        print(f"\nüìÑ Processing {file_path.name} ...")
        file_hash = compute_file_hash(file_path)
        timestamp = datetime.utcnow().isoformat()

        # Extract text (with page numbers)
        pages = read_file_content_with_pages(file_path)
        if not pages:
            print(f"‚ùå No readable text found in {file_path.name}")
            continue

        all_chunk_metadata = []
        cleaned_file = CLEANED_DIR / f"{file_path.stem}_cleaned.txt"

        # Process each page
        with open(cleaned_file, "w", encoding="utf-8") as cf:
            for page_number, page_text in pages:
                if not page_text:
                    continue

                cleaned_text = clean_text(page_text)
                cf.write(cleaned_text + "\n\n")

                # Chunk per page
                chunks = chunk_text(cleaned_text, chunk_size, overlap)
                chunk_folder = CHUNK_DIR / file_path.stem
                chunk_folder.mkdir(parents=True, exist_ok=True)

                for i, chunk in enumerate(chunks, start=1):
                    chunk_file = chunk_folder / f"{file_path.stem}_page{page_number or 0}_chunk_{i}.txt"
                    with open(chunk_file, "w", encoding="utf-8") as f:
                        f.write(chunk)

                    # Metadata for retrieval / vector DB
                    all_chunk_metadata.append({
                        "chunk_id": len(all_chunk_metadata) + 1,
                        "file_name": file_path.name,
                        "source": str(file_path.relative_to(BASE_DIR)),
                        "chunk_file": chunk_file.name,
                        "chunk_path": str(chunk_file.relative_to(BASE_DIR)),
                        "page_number": page_number,
                        "chunk_length": len(chunk.split()),
                        "hash": file_hash,
                        "timestamp": timestamp,
                        "processed": False
                    })

        # Record metadata
        chunk_status[file_path.name] = {
            "status": "chunked",
            "timestamp": timestamp,
            "hash": file_hash,
            "chunks": all_chunk_metadata,
            "status": "pending"
        }

        print(f"‚úÖ {len(all_chunk_metadata)} chunks created with metadata for {file_path.name}")

    save_chunk_status(chunk_status)
    print("\nüìò Updated chunk_status.json")



In [82]:
# ------------------------------------------------------------
# ‚úÖ Run
# ------------------------------------------------------------
process_data_warehouse_with_metadata(chunk_size=800, overlap=100)



üìÑ Processing NoteGPT_Justice_ What'sTheRightThingToDo_Episode01_THE MORAL SIDE OF MURDER_.txt ...
‚úÖ 11 chunks created with metadata for NoteGPT_Justice_ What'sTheRightThingToDo_Episode01_THE MORAL SIDE OF MURDER_.txt

üìÑ Processing NoteGPT_Last Lecture Series_ How to Live your Life at Full Power ‚Äî Graham Weaver.txt ...
‚úÖ 9 chunks created with metadata for NoteGPT_Last Lecture Series_ How to Live your Life at Full Power ‚Äî Graham Weaver.txt

üìÑ Processing ml_intro.txt ...
‚úÖ 1 chunks created with metadata for ml_intro.txt

üìÑ Processing NoteGPT_Justice_ What'sTheRightThingToDo_Episode01_THE MORAL SIDE OF MURDER_ (1).txt ...
‚úÖ 10 chunks created with metadata for NoteGPT_Justice_ What'sTheRightThingToDo_Episode01_THE MORAL SIDE OF MURDER_ (1).txt

üìÑ Processing NoteGPT_Optimal Protocols for Studying & Learning.txt ...
‚úÖ 27 chunks created with metadata for NoteGPT_Optimal Protocols for Studying & Learning.txt

üìÑ Processing Hausman_PhilosophyOfEconomicsAnthology.p

In [None]:
import json
import os

# Paths
CHUNKS_STATUS_PATH = "database/logs/chunk_status.json"
CHUNK_TRACES_PATH = "database/logs/chunk_traces.json"

# --- 1. Load existing chunk status ---
if not os.path.exists(CHUNKS_STATUS_PATH):
    raise FileNotFoundError("‚ùå chunks_status.json not found. Please run the chunking step first.")

with open(CHUNKS_STATUS_PATH, "r") as f:
    chunks_status = json.load(f)

# --- 2. Load existing chunk traces (if available) ---
if os.path.exists(CHUNK_TRACES_PATH):
    with open(CHUNK_TRACES_PATH, "r") as f:
        chunk_traces = json.load(f)
else:
    chunk_traces = {}

# --- 3. Create or update flat trace dictionary ---
new_traces = 0

for file_name, info in chunks_status.items():
    if "chunks" in info and isinstance(info["chunks"], list):
        for chunk in info["chunks"]:
            trace_id = f"{file_name}_chunk_{chunk['chunk_id']}"
            if trace_id not in chunk_traces:  # only add if new
                chunk_traces[trace_id] = {
                    "file_name": chunk["file_name"],
                    "chunk_id": chunk["chunk_id"],
                    "source": chunk["source"],
                    "chunk_file": chunk["chunk_file"],
                    "chunk_path": chunk["chunk_path"],
                    "page_number": chunk["page_number"],
                    "chunk_length": chunk["chunk_length"],
                    "timestamp": chunk["timestamp"],
                    "hash": chunk["hash"],
                    "vectorized": False,
                }
                new_traces += 1

# --- 4. Save merged trace dictionary ---
os.makedirs(os.path.dirname(CHUNK_TRACES_PATH), exist_ok=True)
with open(CHUNK_TRACES_PATH, "w") as f:
    json.dump(chunk_traces, f, indent=4)

# --- 5. Summary ---
print(f"‚úÖ Chunk trace file updated successfully: {CHUNK_TRACES_PATH}")
print(f"Total chunks tracked: {len(chunk_traces)}")
print(f"üÜï New traces added: {new_traces}")


‚úÖ Chunk trace file created successfully: database/logs/chunk_traces.json
Total chunks tracked: 735


In [None]:
import os
import json
import hashlib
import numpy as np
from datetime import datetime
from sentence_transformers import SentenceTransformer
from tqdm import tqdm

# Paths
DATA_WAREHOUSE_DIR = "database/data_warehouse"
VECTOR_DB_DIR = "database/vectordb"

DATA_LOGS_DIR = "database/logs"
VECTOR_LOG_PATH = os.path.join(DATA_LOGS_DIR, "vector_log.json")
CHUNK_TRACKER_PATH = os.path.join(DATA_LOGS_DIR, "chunk_traces.json")

# Create directories if not exist
os.makedirs(VECTOR_DB_DIR, exist_ok=True)

# Load model
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

# Load chunk metadata
if os.path.exists(CHUNK_TRACKER_PATH):
    with open(CHUNK_TRACKER_PATH, "r") as f:
        chunk_tracker = json.load(f)
else:
    raise FileNotFoundError("Chunk tracker not found. Please run the chunking step first.")

# Load or initialize vector log
if os.path.exists(VECTOR_LOG_PATH):
    with open(VECTOR_LOG_PATH, "r") as f:
        vector_log = json.load(f)
else:
    vector_log = {}

def compute_embedding(text):
    """Compute embeddings for a chunk using SentenceTransformer."""
    return embedding_model.encode(text)

def generate_vector_id(file_name, chunk_id):
    """Create a reproducible vector ID."""
    return hashlib.md5(f"{file_name}_{chunk_id}".encode()).hexdigest()

def read_chunk_text(chunk_path: str):
    """Read chunk text content from file."""
    try:
        with open(chunk_path, "r", encoding="utf-8") as f:
            return f.read()
    except Exception as e:
        print(f"‚ö†Ô∏è Error reading {chunk_path}: {e}")
        return ""

# === Embedding Pipeline ===
def create_vector_db(chunk_traces, vector_log):
    """Traverse chunk_traces.json and generate embeddings for all chunks."""
    
    total_vectors = 0

    for trace_id, chunk_info in tqdm(chunk_traces.items(), desc="Generating Embeddings"):
        if not chunk_info["vectorized"]:
            file_name = chunk_info["file_name"]
            chunk_id = chunk_info["chunk_id"]
            chunk_path = chunk_info["chunk_path"]

            # Generate vector ID
            vector_id = generate_vector_id(file_name, chunk_id)

            # Skip if already embedded
            if vector_id in vector_log:
                chunk_info["vectorized"] = True
                continue

            # Read chunk text
            text = read_chunk_text(chunk_path)
            if not text.strip():
                print(f"‚ö†Ô∏è Empty text in {chunk_path}, skipping.")
                continue

            # Compute embedding
            vector = compute_embedding(text)

            # Save vector
            vec_path = save_vector(vector, vector_id)

            # Build metadata for vector log
            metadata = {
                "source": chunk_info["source"],
                "page_number": chunk_info["page_number"],
                "timestamp": chunk_info["timestamp"],
                "hash": chunk_info["hash"]
            }

            # Update vector log
            vector_log[vector_id] = {
                "file_name": file_name,
                "chunk_id": chunk_id,
                "chunk_path": chunk_path,
                "metadata": metadata,
                "embedding_path": vec_path
            }

            # Mark chunk as vectorized
            chunk_info["vectorized"] = True
            total_vectors += 1

    # Save vector log
    with open(VECTOR_LOG_PATH, "w") as f:
        json.dump(vector_log, f, indent=4)

    print(f"‚úÖ Embedding complete. Total vectors stored: {len(vector_log)}")

    # Save updated chunk traces
    with open(CHUNK_TRACES_PATH, "w") as f:
        json.dump(chunk_traces, f, indent=4)

    print(f"‚úÖ Chunk trace file updated: {CHUNK_TRACES_PATH}")
    print(f"Total chunks tracked: {len(chunk_traces)}, Newly vectorized: {total_vectors}")


In [85]:
# Run embedding pipeline
create_vector_db(chunk_tracker, vector_log)

Generating Embeddings: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 735/735 [00:00<00:00, 272911.95it/s]

‚úÖ Embedding complete. Total vectors stored: 735
‚úÖ Chunk trace file updated: database/logs/chunk_traces.json
Total chunks tracked: 735, Newly vectorized: 0





In [92]:
import os
import numpy as np
import faiss
import json

VECTORSTORE_DIR = "data/vectorstores"
os.makedirs(VECTORSTORE_DIR, exist_ok=True)

def build_faiss_index(vector_log, persist_dir=VECTORSTORE_DIR):
    """Build and persist a FAISS index from stored vectors."""
    
    vectors = []
    ids = []

    for vector_id, info in vector_log.items():
        vec = np.load(info["embedding_path"])
        vectors.append(vec)
        ids.append(vector_id)

    vectors = np.array(vectors).astype("float32")

    # Build FAISS index
    index = faiss.IndexFlatL2(vectors.shape[1])
    index.add(vectors)

    # Persist index
    index_path = os.path.join(persist_dir, "faiss_index.idx")
    faiss.write_index(index, index_path)

    # Save vector ID mapping
    ids_path = os.path.join(persist_dir, "vector_ids.json")
    with open(ids_path, "w") as f:
        json.dump(ids, f, indent=4)

    print(f"‚úÖ FAISS index saved to {index_path}")
    print(f"‚úÖ Vector IDs saved to {ids_path}")

    return index, ids
def search_similar_chunks(query, index, ids, vector_log, top_k=5):
    """Retrieve top-k most similar chunks to a query."""
    query_vector = embedding_model.encode(query).astype('float32').reshape(1, -1)
    distances, indices = index.search(query_vector, top_k)
    results = []
    for dist, idx in zip(distances[0], indices[0]):
        vector_id = ids[idx]
        results.append({
            "vector_id": vector_id,
            "file_name": vector_log[vector_id]["file_name"],
            "chunk_id": vector_log[vector_id]["chunk_id"],
            "metadata": vector_log[vector_id]["metadata"],
            "distance": float(dist)
        })
    return results

# Build index once
def load_vectorstore(persist_dir=VECTORSTORE_DIR):
    """Load a persisted FAISS index and vector ID mapping."""
    index_path = os.path.join(persist_dir, "faiss_index.idx")
    ids_path = os.path.join(persist_dir, "vector_ids.json")

    if not os.path.exists(index_path) or not os.path.exists(ids_path):
        raise FileNotFoundError("FAISS index or vector_ids.json not found. Build the index first.")

    # Load FAISS index
    index = faiss.read_index(index_path)

    # Load vector IDs
    with open(ids_path, "r") as f:
        ids = json.load(f)

    print(f"‚úÖ FAISS index and vector IDs loaded from {persist_dir}")
    return index, ids

index, ids = build_faiss_index(vector_log)

# Example usage:
index, ids = load_vectorstore(VECTORSTORE_DIR)

# Example search function (needs compute_embedding and vector_log defined)
query = "indian histiry process"
results = search_similar_chunks(query, index, ids, vector_log)
print(json.dumps(results, indent=4))


‚úÖ FAISS index saved to data/vectorstores/faiss_index.idx
‚úÖ Vector IDs saved to data/vectorstores/vector_ids.json
‚úÖ FAISS index and vector IDs loaded from data/vectorstores
[
    {
        "vector_id": "c9d57951035735e39b1d98b72e3b15e9",
        "file_name": "6372215474167643589th hsitory FILE.pdf",
        "chunk_id": 130,
        "metadata": {
            "source": "database/data_warehouse/6372215474167643589th hsitory FILE.pdf",
            "page_number": 131,
            "timestamp": "2025-10-14T12:32:57.229448",
            "hash": "67f2bad993dfeb712bd7702842da673643046f01299e10d07e1123d135f3ab3f"
        },
        "distance": 1.153246283531189
    },
    {
        "vector_id": "06dd7119c3f1e5cb85bbe9262402c65e",
        "file_name": "6372215474167643589th hsitory FILE.pdf",
        "chunk_id": 131,
        "metadata": {
            "source": "database/data_warehouse/6372215474167643589th hsitory FILE.pdf",
            "page_number": 133,
            "timestamp": "2025-10-14

In [93]:
import json
import os

# === Paths ===
VECTOR_LOG_PATH = "database/logs/vector_log.json"
CHUNK_TRACES_PATH = "database/logs/chunk_traces.json"

# === Load logs ===
with open(VECTOR_LOG_PATH, "r") as f:
    vector_log = json.load(f)

with open(CHUNK_TRACES_PATH, "r") as f:
    chunk_traces = json.load(f)


def load_chunk_text(chunk_path):
    """Safely load a chunk's text content."""
    try:
        with open(chunk_path, "r", encoding="utf-8") as f:
            return f.read()
    except Exception as e:
        print(f"‚ö†Ô∏è Error reading chunk: {chunk_path} ‚Üí {e}")
        return ""


def retrieve_chunks_from_results(similarity_results, vector_log, chunk_traces):
    """
    Retrieve original chunk text and metadata using vector IDs.
    """
    retrieved_chunks = []

    for result in similarity_results:
        vector_id = result["vector_id"]

        if vector_id not in vector_log:
            print(f"‚ö†Ô∏è Missing vector_id in log: {vector_id}")
            continue

        log_entry = vector_log[vector_id]
        chunk_path = log_entry["chunk_path"]
        file_name = log_entry["file_name"]
        chunk_id = log_entry["chunk_id"]
        metadata = log_entry["metadata"]

        # Load chunk text
        text = load_chunk_text(chunk_path)

        # Add to result set
        retrieved_chunks.append({
            "vector_id": vector_id,
            "file_name": file_name,
            "chunk_id": chunk_id,
            "chunk_path": chunk_path,
            "metadata": metadata,
            "similarity_distance": result.get("distance"),
            "chunk_text": text
        })

    return retrieved_chunks


In [94]:
retrieved_chunks = retrieve_chunks_from_results(results, vector_log, chunk_traces)

# Print example output
for r in retrieved_chunks:
    print("="*80)
    print(f"üìÑ File: {r['file_name']} (Chunk {r['chunk_id']})")
    print(f"üîó Path: {r['chunk_path']}")
    print(f"üß≠ Metadata: {r['metadata']}")
    print(f"üìè Distance: {r['similarity_distance']:.4f}")
    print(f"üìù Chunk Text:\n{r['chunk_text'][:300]}...")  # show first 300 chars

üìÑ File: 6372215474167643589th hsitory FILE.pdf (Chunk 130)
üîó Path: database/processed/chunks/6372215474167643589th hsitory FILE/6372215474167643589th hsitory FILE_page131_chunk_1.txt
üß≠ Metadata: {'source': 'database/data_warehouse/6372215474167643589th hsitory FILE.pdf', 'page_number': 131, 'timestamp': '2025-10-14T12:32:57.229448', 'hash': '67f2bad993dfeb712bd7702842da673643046f01299e10d07e1123d135f3ab3f'}
üìè Distance: 1.1532
üìù Chunk Text:
the integration of princely states: a case study of jammu and kashmir123...
üìÑ File: 6372215474167643589th hsitory FILE.pdf (Chunk 131)
üîó Path: database/processed/chunks/6372215474167643589th hsitory FILE/6372215474167643589th hsitory FILE_page133_chunk_1.txt
üß≠ Metadata: {'source': 'database/data_warehouse/6372215474167643589th hsitory FILE.pdf', 'page_number': 133, 'timestamp': '2025-10-14T12:32:57.229448', 'hash': '67f2bad993dfeb712bd7702842da673643046f01299e10d07e1123d135f3ab3f'}
üìè Distance: 1.1836
üìù Chunk Text:
india 

In [83]:
import json
import csv
from pathlib import Path

CHUNK_STATUS_PATH = "database/logs/chunk_status.json"
CHUNK_TRACES_PATH = "database/logs/chunk_traces.json"
FILE_STATUS_PATH = "database/logs/file_status.json"
PROCESSED_CSV_PATH = "database/logs/processed_files.csv"

def update_chunk_status_and_files(chunk_traces_path=CHUNK_TRACES_PATH,
                                  chunk_status_path=CHUNK_STATUS_PATH,
                                  file_status_path=FILE_STATUS_PATH,
                                  processed_csv_path=PROCESSED_CSV_PATH):
    # Load JSON and CSV files
    with open(chunk_traces_path, "r", encoding="utf-8") as f:
        chunk_traces = json.load(f)
    with open(chunk_status_path, "r", encoding="utf-8") as f:
        chunk_status = json.load(f)
    with open(file_status_path, "r", encoding="utf-8") as f:
        file_status = json.load(f)

    processed_files = []
    if Path(processed_csv_path).exists():
        with open(processed_csv_path, newline="", encoding="utf-8") as csvfile:
            reader = csv.DictReader(csvfile)
            processed_files = [row for row in reader]

    # Update chunk_status
    for file_name, file_info in chunk_status.items():
        chunks = file_info.get("chunks", [])
        num_processed = 0
        for chunk in chunks:
            # Safe lookup in chunk_traces
            chunk_key = f"{chunk['file_name']}_chunk_{chunk['chunk_id']}"
            if chunk_key in chunk_traces and chunk_traces[chunk_key].get("vectorized", False):
                chunk["processed"] = True
            if chunk["processed"]:
                num_processed += 1

        file_info["num_chunks"] = len(chunks)
        file_info["num_processed_chunks"] = num_processed

    # Check if entire file is processed
    for file_name, file_info in chunk_status.items():
        if file_info["num_chunks"] == file_info["num_processed_chunks"] and file_info.get("num_chunks", 0) > 0:
            file_info["status"] = "processed"
            # Update file_status.json
            if file_name in file_status:
                file_status[file_name]["status"] = "processed"
            # Update processed_files.csv
            for row in processed_files:
                if row["file_name"] == file_name:
                    row["status"] = "processed"

    # Save updates
    with open(chunk_status_path, "w", encoding="utf-8") as f:
        json.dump(chunk_status, f, indent=4)
    with open(file_status_path, "w", encoding="utf-8") as f:
        json.dump(file_status, f, indent=4)
    with open(processed_csv_path, "w", newline="", encoding="utf-8") as csvfile:
        fieldnames = ["file_name", "status", "timestamp", "hash"]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(processed_files)

    print(f"‚úÖ Chunk status, file status, and processed CSV updated successfully.")



In [84]:
update_chunk_status_and_files(chunk_traces_path=CHUNK_TRACES_PATH,
                                  chunk_status_path=CHUNK_STATUS_PATH,
                                  file_status_path=FILE_STATUS_PATH,
                                  processed_csv_path=PROCESSED_CSV_PATH)

‚úÖ Chunk status, file status, and processed CSV updated successfully.
