# Experiments: data_ingestion

**Original File:** `src/document_ingestion/data_ingestion.py`

## Purpose
This module provides comprehensive document ingestion capabilities for the Document Portal. It handles file upload, text extraction, chunking, FAISS index management, and session-based document organization.

## Key Components

### 1. FaissManager
- Manages FAISS vector index creation and loading
- Supports incremental document addition with deduplication
- Persists metadata for tracking ingested documents

### 2. ChatIngestor
- Main ingestion pipeline for RAG/chat applications
- Handles file upload, document loading, and chunking
- Builds retrievers for question answering

### 3. DocHandler
- PDF save and read operations for document analysis
- Session-based file organization

### 4. DocumentComparator
- Save, read, and combine PDFs for comparison
- Session-based versioning and cleanup

## Prerequisites
- `langchain`, `langchain-community`, `faiss-cpu` installed
- `PyMuPDF` (fitz) for PDF processing
- Environment variables configured (API keys)
- Utility modules: `file_io`, `document_ops`, `model_loader`

## Instructions & Setup Guide

### Execution Order
1. Run the imports cell
2. Explore FaissManager for vector index operations
3. Use ChatIngestor to build a RAG retriever
4. Use DocHandler for document analysis workflows
5. Use DocumentComparator for document comparison workflows

### Dependencies
```bash
pip install langchain langchain-community faiss-cpu PyMuPDF python-dotenv
```

### Configuration
- Ensure `.env` file contains `GROQ_API_KEY` and/or `GOOGLE_API_KEY`
- Default directories: `data/`, `faiss_index/`
- Run from project root directory for proper imports

## 1. Imports and Dependencies

Import all required modules for document ingestion.

In [None]:
from __future__ import annotations
import os
import sys
import json
import uuid
import hashlib
import shutil
from pathlib import Path
from typing import Iterable, List, Optional, Dict, Any

# PDF processing
import fitz  # PyMuPDF

# LangChain imports
from langchain.schema import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS

# Project imports
from utils.model_loader import ModelLoader
from logger import GLOBAL_LOGGER as log
from exception.custom_exception import DocumentPortalException
from utils.file_io import generate_session_id, save_uploaded_files
from utils.document_ops import load_documents, concat_for_analysis, concat_for_comparison

# Supported file types
SUPPORTED_EXTENSIONS = {".pdf", ".docx", ".txt"}

print("All imports successful!")
print(f"Supported file extensions: {SUPPORTED_EXTENSIONS}")

---
## Part 1: FaissManager Class

The `FaissManager` class handles FAISS vector index operations:
- Create new indexes from text
- Load existing indexes
- Add documents incrementally with deduplication
- Persist metadata for tracking

In [None]:
class FaissManager:
    """
    FAISS vector index manager with load-or-create pattern.
    
    Features:
    - Automatic index creation/loading
    - Incremental document addition
    - Deduplication via fingerprinting
    - Metadata persistence
    """
    
    def __init__(self, index_dir: Path, model_loader: Optional[ModelLoader] = None):
        """
        Initialize FaissManager.
        
        Args:
            index_dir: Directory to store FAISS index files
            model_loader: Optional ModelLoader instance (creates new if not provided)
        """
        self.index_dir = Path(index_dir)
        self.index_dir.mkdir(parents=True, exist_ok=True)
        
        # Metadata file tracks ingested documents
        self.meta_path = self.index_dir / "ingested_meta.json"
        self._meta: Dict[str, Any] = {"rows": {}}  # Dict of fingerprint -> True
        
        # Load existing metadata if available
        if self.meta_path.exists():
            try:
                self._meta = json.loads(self.meta_path.read_text(encoding="utf-8")) or {"rows": {}}
            except Exception:
                self._meta = {"rows": {}}
        
        # Initialize embeddings model
        self.model_loader = model_loader or ModelLoader()
        self.emb = self.model_loader.load_embeddings()
        self.vs: Optional[FAISS] = None
        
    def _exists(self) -> bool:
        """Check if FAISS index files exist."""
        return (self.index_dir / "index.faiss").exists() and (self.index_dir / "index.pkl").exists()
    
    @staticmethod
    def _fingerprint(text: str, md: Dict[str, Any]) -> str:
        """
        Generate a unique fingerprint for a document chunk.
        Uses source file path + row_id if available, else content hash.
        """
        src = md.get("source") or md.get("file_path")
        rid = md.get("row_id")
        if src is not None:
            return f"{src}::{'' if rid is None else rid}"
        return hashlib.sha256(text.encode("utf-8")).hexdigest()
    
    def _save_meta(self):
        """Persist metadata to disk."""
        self.meta_path.write_text(
            json.dumps(self._meta, ensure_ascii=False, indent=2), 
            encoding="utf-8"
        )

print("FaissManager class defined (Part 1)!")

In [None]:
# Add methods to FaissManager

def add_documents(self, docs: List[Document]):
    """
    Add documents to the index with deduplication.
    
    Args:
        docs: List of LangChain Document objects
    
    Returns:
        int: Number of new documents added
    """
    if self.vs is None:
        raise RuntimeError("Call load_or_create() before add_documents().")
    
    new_docs: List[Document] = []
    
    for d in docs:
        # Generate fingerprint for deduplication
        key = self._fingerprint(d.page_content, d.metadata or {})
        if key in self._meta["rows"]:
            continue  # Skip already ingested document
        self._meta["rows"][key] = True
        new_docs.append(d)
        
    if new_docs:
        self.vs.add_documents(new_docs)
        self.vs.save_local(str(self.index_dir))
        self._save_meta()
        
    return len(new_docs)

FaissManager.add_documents = add_documents


def load_or_create(self, texts: Optional[List[str]] = None, 
                   metadatas: Optional[List[dict]] = None):
    """
    Load existing FAISS index or create new one from texts.
    
    Args:
        texts: List of text strings (required for new index)
        metadatas: Optional metadata for each text
    
    Returns:
        FAISS: The loaded or created vectorstore
    """
    # Try to load existing index
    if self._exists():
        self.vs = FAISS.load_local(
            str(self.index_dir),
            embeddings=self.emb,
            allow_dangerous_deserialization=True,
        )
        return self.vs
    
    # Create new index from texts
    if not texts:
        raise DocumentPortalException(
            "No existing FAISS index and no data to create one", sys
        )
    
    self.vs = FAISS.from_texts(
        texts=texts, 
        embedding=self.emb, 
        metadatas=metadatas or []
    )
    self.vs.save_local(str(self.index_dir))
    return self.vs

FaissManager.load_or_create = load_or_create

print("FaissManager methods added!")

### FaissManager Usage Example

In [None]:
# Example: Create a new FAISS index
from pathlib import Path

# Initialize FaissManager
index_dir = Path("faiss_index/demo_index")
fm = FaissManager(index_dir)
print(f"FaissManager initialized at: {index_dir}")
print(f"Index exists: {fm._exists()}")

In [None]:
# Create index with sample texts
sample_texts = [
    "Machine learning is a subset of artificial intelligence.",
    "Deep learning uses neural networks with many layers.",
    "Natural language processing deals with text and speech.",
    "Computer vision enables machines to interpret images.",
]

sample_metas = [
    {"source": "intro.txt", "topic": "ML"},
    {"source": "deep_learning.txt", "topic": "DL"},
    {"source": "nlp.txt", "topic": "NLP"},
    {"source": "cv.txt", "topic": "CV"},
]

vs = fm.load_or_create(texts=sample_texts, metadatas=sample_metas)
print(f"Vectorstore created with {len(sample_texts)} documents")

In [None]:
# Test similarity search
query = "What is deep learning?"
results = vs.similarity_search(query, k=2)

print(f"Query: {query}\n")
for i, doc in enumerate(results):
    print(f"Result {i+1}: {doc.page_content}")
    print(f"Metadata: {doc.metadata}\n")

---
## Part 2: ChatIngestor Class

The `ChatIngestor` class provides the main ingestion pipeline for RAG applications:
- File upload handling
- Document loading and chunking
- Retriever building

In [None]:
class ChatIngestor:
    """
    Main ingestion pipeline for RAG/chat applications.
    
    Features:
    - Handles file uploads (PDF, DOCX, TXT)
    - Document chunking with configurable size/overlap
    - Session-based directory organization
    - Builds FAISS retrievers for question answering
    """
    
    def __init__(
        self,
        temp_base: str = "data",
        faiss_base: str = "faiss_index",
        use_session_dirs: bool = True,
        session_id: Optional[str] = None,
    ):
        """
        Initialize ChatIngestor.
        
        Args:
            temp_base: Base directory for temporary file storage
            faiss_base: Base directory for FAISS indexes
            use_session_dirs: Whether to create session subdirectories
            session_id: Optional session ID (auto-generated if not provided)
        """
        try:
            self.model_loader = ModelLoader()
            
            self.use_session = use_session_dirs
            self.session_id = session_id or generate_session_id()
            
            # Set up directories
            self.temp_base = Path(temp_base)
            self.temp_base.mkdir(parents=True, exist_ok=True)
            self.faiss_base = Path(faiss_base)
            self.faiss_base.mkdir(parents=True, exist_ok=True)
            
            self.temp_dir = self._resolve_dir(self.temp_base)
            self.faiss_dir = self._resolve_dir(self.faiss_base)

            log.info("ChatIngestor initialized",
                     session_id=self.session_id,
                     temp_dir=str(self.temp_dir),
                     faiss_dir=str(self.faiss_dir),
                     sessionized=self.use_session)
        except Exception as e:
            log.error("Failed to initialize ChatIngestor", error=str(e))
            raise DocumentPortalException("Initialization error in ChatIngestor", e) from e
            
    def _resolve_dir(self, base: Path):
        """Resolve directory path with optional session subdirectory."""
        if self.use_session:
            d = base / self.session_id
            d.mkdir(parents=True, exist_ok=True)
            return d
        return base
        
    def _split(self, docs: List[Document], chunk_size=1000, chunk_overlap=200) -> List[Document]:
        """Split documents into chunks."""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size, 
            chunk_overlap=chunk_overlap
        )
        chunks = splitter.split_documents(docs)
        log.info("Documents split", chunks=len(chunks), chunk_size=chunk_size, overlap=chunk_overlap)
        return chunks

print("ChatIngestor class defined (Part 1)!")

In [None]:
# Add build_retriever method to ChatIngestor

def built_retriver(
    self,
    uploaded_files: Iterable,
    *,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    k: int = 5,
):
    """
    Build a retriever from uploaded files.
    
    Args:
        uploaded_files: Iterable of file objects (e.g., from Streamlit/FastAPI)
        chunk_size: Size of text chunks (default: 1000)
        chunk_overlap: Overlap between chunks (default: 200)
        k: Number of documents to retrieve (default: 5)
    
    Returns:
        Retriever: Configured FAISS retriever
    """
    try:
        # Save uploaded files to temp directory
        paths = save_uploaded_files(uploaded_files, self.temp_dir)
        
        # Load documents from saved files
        docs = load_documents(paths)
        if not docs:
            raise ValueError("No valid documents loaded")
        
        # Split documents into chunks
        chunks = self._split(docs, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        
        # Initialize FAISS manager
        fm = FaissManager(self.faiss_dir, self.model_loader)
        
        # Extract texts and metadata from chunks
        texts = [c.page_content for c in chunks]
        metas = [c.metadata for c in chunks]
        
        # Create or load vectorstore
        vs = fm.load_or_create(texts=texts, metadatas=metas)
        
        # Add documents (handles deduplication)
        added = fm.add_documents(chunks)
        log.info("FAISS index updated", added=added, index=str(self.faiss_dir))
        
        # Return retriever
        return vs.as_retriever(search_type="similarity", search_kwargs={"k": k})
        
    except Exception as e:
        log.error("Failed to build retriever", error=str(e))
        raise DocumentPortalException("Failed to build retriever", e) from e

ChatIngestor.built_retriver = built_retriver
print("build_retriever method added!")

### ChatIngestor Usage Example

In [None]:
# Initialize ChatIngestor
ingestor = ChatIngestor(
    temp_base="data/multi_doc_chat",
    faiss_base="faiss_index",
    session_id="notebook_demo"
)

print(f"Session ID: {ingestor.session_id}")
print(f"Temp directory: {ingestor.temp_dir}")
print(f"FAISS directory: {ingestor.faiss_dir}")

---
## Part 3: DocHandler Class

The `DocHandler` class provides PDF operations for document analysis workflows.

In [None]:
class DocHandler:
    """
    PDF save + read (page-wise) for analysis workflows.
    
    Features:
    - Save uploaded PDFs to session directory
    - Read PDF content with page separation
    - Session-based file organization
    """
    
    def __init__(self, data_dir: Optional[str] = None, session_id: Optional[str] = None):
        """
        Initialize DocHandler.
        
        Args:
            data_dir: Base directory for document storage
            session_id: Optional session ID (auto-generated if not provided)
        """
        self.data_dir = data_dir or os.getenv(
            "DATA_STORAGE_PATH", 
            os.path.join(os.getcwd(), "data", "document_analysis")
        )
        self.session_id = session_id or generate_session_id("session")
        self.session_path = os.path.join(self.data_dir, self.session_id)
        os.makedirs(self.session_path, exist_ok=True)
        log.info("DocHandler initialized", 
                 session_id=self.session_id, 
                 session_path=self.session_path)

    def save_pdf(self, uploaded_file) -> str:
        """
        Save uploaded PDF to session directory.
        
        Args:
            uploaded_file: File object with name and content
        
        Returns:
            str: Path to saved file
        """
        try:
            filename = os.path.basename(uploaded_file.name)
            if not filename.lower().endswith(".pdf"):
                raise ValueError("Invalid file type. Only PDFs are allowed.")
            save_path = os.path.join(self.session_path, filename)
            with open(save_path, "wb") as f:
                if hasattr(uploaded_file, "read"):
                    f.write(uploaded_file.read())
                else:
                    f.write(uploaded_file.getbuffer())
            log.info("PDF saved successfully", 
                     file=filename, 
                     save_path=save_path, 
                     session_id=self.session_id)
            return save_path
        except Exception as e:
            log.error("Failed to save PDF", error=str(e), session_id=self.session_id)
            raise DocumentPortalException(f"Failed to save PDF: {str(e)}", e) from e

    def read_pdf(self, pdf_path: str) -> str:
        """
        Read PDF content with page separation.
        
        Args:
            pdf_path: Path to PDF file
        
        Returns:
            str: Full text content with page markers
        """
        try:
            text_chunks = []
            with fitz.open(pdf_path) as doc:
                for page_num in range(doc.page_count):
                    page = doc.load_page(page_num)
                    text_chunks.append(
                        f"\n--- Page {page_num + 1} ---\n{page.get_text()}"
                    )
            text = "\n".join(text_chunks)
            log.info("PDF read successfully", 
                     pdf_path=pdf_path, 
                     session_id=self.session_id, 
                     pages=len(text_chunks))
            return text
        except Exception as e:
            log.error("Failed to read PDF", 
                      error=str(e), 
                      pdf_path=pdf_path, 
                      session_id=self.session_id)
            raise DocumentPortalException(f"Could not process PDF: {pdf_path}", e) from e

print("DocHandler class defined!")

### DocHandler Usage Example

In [None]:
# Initialize DocHandler
doc_handler = DocHandler(session_id="analysis_notebook_demo")

print(f"Session ID: {doc_handler.session_id}")
print(f"Session path: {doc_handler.session_path}")

In [None]:
# Example: Read an existing PDF
import os

# Look for any PDF in the data directory
data_dir = "data/multi_doc_chat"
if os.path.exists(data_dir):
    pdfs = [f for f in os.listdir(data_dir) if f.endswith('.txt') or f.endswith('.pdf')]
    if pdfs:
        sample_path = os.path.join(data_dir, pdfs[0])
        print(f"Found file: {sample_path}")
        
        if sample_path.endswith('.txt'):
            with open(sample_path, 'r') as f:
                content = f.read()
            print(f"\nFirst 500 characters:\n{content[:500]}...")
    else:
        print("No sample files found.")
else:
    print(f"Directory not found: {data_dir}")

---
## Part 4: DocumentComparator Class

The `DocumentComparator` class handles file operations for document comparison workflows.

In [None]:
class DocumentComparator:
    """
    Save, read & combine PDFs for comparison with session-based versioning.
    
    Features:
    - Save reference and actual documents to session
    - Read individual PDFs with page markers
    - Combine all PDFs in session for comparison
    - Clean up old sessions
    """
    
    def __init__(self, base_dir: str = "data/document_compare", 
                 session_id: Optional[str] = None):
        """
        Initialize DocumentComparator.
        
        Args:
            base_dir: Base directory for comparison sessions
            session_id: Optional session ID (auto-generated if not provided)
        """
        self.base_dir = Path(base_dir)
        self.session_id = session_id or generate_session_id()
        self.session_path = self.base_dir / self.session_id
        self.session_path.mkdir(parents=True, exist_ok=True)
        log.info("DocumentComparator initialized", session_path=str(self.session_path))

    def save_uploaded_files(self, reference_file, actual_file):
        """
        Save reference and actual PDF files.
        
        Args:
            reference_file: Reference document file object
            actual_file: Actual document file object
        
        Returns:
            tuple: Paths to saved reference and actual files
        """
        try:
            ref_path = self.session_path / reference_file.name
            act_path = self.session_path / actual_file.name
            
            for fobj, out in ((reference_file, ref_path), (actual_file, act_path)):
                if not fobj.name.lower().endswith(".pdf"):
                    raise ValueError("Only PDF files are allowed.")
                with open(out, "wb") as f:
                    if hasattr(fobj, "read"):
                        f.write(fobj.read())
                    else:
                        f.write(fobj.getbuffer())
                        
            log.info("Files saved", 
                     reference=str(ref_path), 
                     actual=str(act_path), 
                     session=self.session_id)
            return ref_path, act_path
        except Exception as e:
            log.error("Error saving PDF files", error=str(e), session=self.session_id)
            raise DocumentPortalException("Error saving files", e) from e

    def read_pdf(self, pdf_path: Path) -> str:
        """
        Read PDF content with page markers.
        
        Args:
            pdf_path: Path to PDF file
        
        Returns:
            str: Text content with page separators
        """
        try:
            with fitz.open(pdf_path) as doc:
                if doc.is_encrypted:
                    raise ValueError(f"PDF is encrypted: {pdf_path.name}")
                parts = []
                for page_num in range(doc.page_count):
                    page = doc.load_page(page_num)
                    text = page.get_text()
                    if text.strip():
                        parts.append(f"\n --- Page {page_num + 1} --- \n{text}")
            log.info("PDF read successfully", file=str(pdf_path), pages=len(parts))
            return "\n".join(parts)
        except Exception as e:
            log.error("Error reading PDF", file=str(pdf_path), error=str(e))
            raise DocumentPortalException("Error reading PDF", e) from e

    def combine_documents(self) -> str:
        """
        Combine all PDFs in session directory.
        
        Returns:
            str: Combined text from all documents with labels
        """
        try:
            doc_parts = []
            for file in sorted(self.session_path.iterdir()):
                if file.is_file() and file.suffix.lower() == ".pdf":
                    content = self.read_pdf(file)
                    doc_parts.append(f"Document: {file.name}\n{content}")
            combined_text = "\n\n".join(doc_parts)
            log.info("Documents combined", count=len(doc_parts), session=self.session_id)
            return combined_text
        except Exception as e:
            log.error("Error combining documents", error=str(e), session=self.session_id)
            raise DocumentPortalException("Error combining documents", e) from e

    def clean_old_sessions(self, keep_latest: int = 3):
        """
        Clean up old session directories.
        
        Args:
            keep_latest: Number of most recent sessions to keep
        """
        try:
            sessions = sorted(
                [f for f in self.base_dir.iterdir() if f.is_dir()], 
                reverse=True
            )
            for folder in sessions[keep_latest:]:
                shutil.rmtree(folder, ignore_errors=True)
                log.info("Old session folder deleted", path=str(folder))
        except Exception as e:
            log.error("Error cleaning old sessions", error=str(e))
            raise DocumentPortalException("Error cleaning old sessions", e) from e

print("DocumentComparator class defined!")

### DocumentComparator Usage Example

In [None]:
# Initialize DocumentComparator
comparator = DocumentComparator(session_id="compare_notebook_demo")

print(f"Session ID: {comparator.session_id}")
print(f"Session path: {comparator.session_path}")

In [None]:
# List existing sessions
if comparator.base_dir.exists():
    sessions = [d.name for d in comparator.base_dir.iterdir() if d.is_dir()]
    print(f"Existing sessions: {sessions}")
else:
    print("No comparison sessions exist yet.")

---
## Summary & Next Steps

### Key Takeaways

1. **FaissManager** provides robust vector index management:
   - Load-or-create pattern for index handling
   - Incremental document addition with deduplication
   - Metadata persistence for tracking

2. **ChatIngestor** is the main RAG ingestion pipeline:
   - Handles file upload and document loading
   - Configurable chunking (size, overlap)
   - Session-based organization

3. **DocHandler** handles PDF operations for analysis:
   - Save uploaded PDFs
   - Read with page separation

4. **DocumentComparator** manages document comparison workflows:
   - Save reference and actual documents
   - Combine documents for LLM comparison
   - Session cleanup utilities

### Possible Extensions
- Add support for more file formats (PPTX, XLSX)
- Implement parallel document processing
- Add OCR support for scanned PDFs
- Implement index versioning and rollback
- Add document metadata extraction during ingestion
- Support for cloud storage backends (S3, GCS)