# Document Ingestion for Baseline RAG

This notebook demonstrates document ingestion for the baseline RAG system using Langchain's document processing utilities.

## Features
- Robust document loading with specialized loaders for different file types
- Smart text splitting that preserves context
- Metadata preservation and enhancement
- Batch processing for large document sets

## Usage
1. Load source material (text files, PDFs, etc.)
2. Process and chunk documents
3. Ingest into RAG system

In [None]:
import os
import sys
import json
import uuid
from pathlib import Path
from typing import List, Dict, Any, Optional
from tqdm.auto import tqdm
from tqdm.notebook import tqdm as tqdm_notebook

from langchain.document_loaders import (
    PyPDFLoader,
    TextLoader,
    Docx2txtLoader,
    UnstructuredFileLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document

In [None]:
def validate_document(doc: Dict[str, Any]) -> bool:
    """Validate document content and metadata.
    
    Args:
        doc: Document to validate
        
    Returns:
        True if document is valid, False otherwise
    """
    # Check for required fields
    if not doc.get('content'):
        print("Document missing content")
        return False
    
    # Check content is not empty or just whitespace
    if not doc['content'].strip():
        print("Document content is empty or whitespace")
        return False
    
    # Check content length is reasonable
    if len(doc['content']) < 10:  # Arbitrary minimum length
        print(f"Document content too short: {len(doc['content'])} chars")
        return False
    
    # Check metadata exists
    if not isinstance(doc.get('metadata'), dict):
        print("Document missing metadata dictionary")
        return False
    
    return True

In [None]:
def process_documents(
    file_paths: List[str], 
    chunk_size: int = 500, 
    chunk_overlap: int = 50, 
    enable_chunking: bool = True,
    metadata: Optional[Dict] = None
) -> List[Dict[str, Any]]:
    """Process documents using Langchain's document loaders and text splitter
    
    Args:
        file_paths: List of paths to documents
        chunk_size: Maximum number of characters per chunk
        chunk_overlap: Number of characters to overlap between chunks
        enable_chunking: Whether to split documents into chunks
        metadata: Optional metadata to add to all documents
        
    Returns:
        List of processed documents ready for RAG ingestion
    """
    # Initialize text splitter if chunking is enabled
    text_splitter = None
    if enable_chunking:
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )
    
    # Map file extensions to loaders
    loaders = {
        '.pdf': PyPDFLoader,
        '.txt': TextLoader,
        '.docx': Docx2txtLoader,
        '*': UnstructuredFileLoader
    }
    
    processed_docs = []
    success_count = 0
    failure_count = 0
    
    with tqdm_notebook(total=len(file_paths), desc="Processing files") as pbar:
        for file_path in file_paths:
            try:
                # Get appropriate loader
                ext = Path(file_path).suffix.lower()
                loader_cls = loaders.get(ext, loaders['*'])
                
                # Load document
                loader = loader_cls(file_path)
                docs = loader.load()
                
                # Add file info to metadata
                file_metadata = metadata.copy() if metadata else {}
                file_metadata.update({
                    'source_file': file_path,
                    'file_type': ext,
                    'file_name': Path(file_path).name
                })
                
                # Add metadata to documents
                for doc in docs:
                    doc.metadata.update(file_metadata)
                
                if enable_chunking:
                    # Split into chunks
                    chunks = text_splitter.split_documents(docs)
                    
                    # Convert to RAG format
                    for i, chunk in enumerate(chunks):
                        chunk_metadata = chunk.metadata.copy()
                        chunk_metadata.update({
                            'chunk_index': i,
                            'total_chunks': len(chunks)
                        })
                        
                        doc_dict = {
                            'id': str(uuid.uuid4()),
                            'content': chunk.page_content,
                            'metadata': chunk_metadata
                        }
                        
                        if validate_document(doc_dict):
                            processed_docs.append(doc_dict)
                            success_count += 1
                        else:
                            failure_count += 1
                else:
                    # Keep documents as is
                    for doc in docs:
                        doc_dict = {
                            'id': str(uuid.uuid4()),
                            'content': doc.page_content,
                            'metadata': doc.metadata
                        }
                        
                        if validate_document(doc_dict):
                            processed_docs.append(doc_dict)
                            success_count += 1
                        else:
                            failure_count += 1
                
                pbar.set_postfix({
                    'Status': 'Success',
                    'File': Path(file_path).name,
                    'Success': success_count,
                    'Failed': failure_count
                })
                
            except Exception as e:
                pbar.set_postfix({
                    'Status': f'Error: {type(e).__name__}',
                    'File': Path(file_path).name,
                    'Success': success_count,
                    'Failed': failure_count
                })
                print(f"Error processing {file_path}: {str(e)}")
                failure_count += 1
                continue
                
            pbar.update(1)
    
    print(f"\nProcessing complete:")
    print(f"Successfully processed: {success_count} documents")
    print(f"Failed to process: {failure_count} documents")
    
    return processed_docs

In [None]:
def process_directory(
    dir_path: str, 
    chunk_size: int = 500,
    chunk_overlap: int = 50,
    enable_chunking: bool = True,
    metadata: Optional[Dict] = None, 
    recursive: bool = True
) -> List[Dict[str, Any]]:
    """Process all documents in a directory
    
    Args:
        dir_path: Path to directory
        chunk_size: Maximum number of characters per chunk
        chunk_overlap: Number of characters to overlap between chunks
        enable_chunking: Whether to split documents into chunks
        metadata: Optional metadata to add to all documents
        recursive: Whether to process subdirectories
        
    Returns:
        List of processed documents ready for RAG ingestion
    """
    # Get all files
    if recursive:
        file_paths = []
        for root, _, files in os.walk(dir_path):
            for file in files:
                if file.endswith(('.txt', '.pdf', '.docx')):  # Add more extensions as needed
                    file_paths.append(os.path.join(root, file))
    else:
        file_paths = [
            os.path.join(dir_path, f) for f in os.listdir(dir_path)
            if f.endswith(('.txt', '.pdf', '.docx'))
        ]
    
    return process_documents(
        file_paths, 
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        enable_chunking=enable_chunking,
        metadata=metadata
    )

In [None]:
def ingest_documents(
    source_path: str, 
    rag_system: Any, 
    chunk_size: int = 500,
    chunk_overlap: int = 50,
    enable_chunking: bool = True,
    metadata: Optional[Dict] = None, 
    batch_size: int = 100,
    max_retries: int = 3
):
    """Ingest documents from a file or directory into RAG system
    
    Args:
        source_path: Path to file or directory
        rag_system: Any RAG system with ingest_documents method
        chunk_size: Maximum number of characters per chunk
        chunk_overlap: Number of characters to overlap between chunks
        enable_chunking: Whether to split documents into chunks
        metadata: Optional metadata to add to all documents
        batch_size: Number of documents to process in each batch
        max_retries: Maximum number of retry attempts for failed batches
    """
    # Process documents
    if os.path.isfile(source_path):
        documents = process_documents(
            [source_path], 
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            enable_chunking=enable_chunking,
            metadata=metadata
        )
    elif os.path.isdir(source_path):
        documents = process_directory(
            source_path, 
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            enable_chunking=enable_chunking,
            metadata=metadata
        )
    else:
        raise ValueError(f"Invalid source path: {source_path}")
    
    if not documents:
        print("No valid documents to ingest")
        return
    
    # Track success/failure counts
    success_count = 0
    failure_count = 0
    
    # Ingest in batches with retry logic
    total_batches = (len(documents) + batch_size - 1) // batch_size
    with tqdm_notebook(total=total_batches, desc="Ingesting documents") as pbar:
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            batch_num = (i//batch_size) + 1
            
            # Retry logic for failed batches
            for attempt in range(max_retries):
                try:
                    rag_system._store_documents(batch, batch_size=batch_size)
                    success_count += len(batch)
                    pbar.set_postfix({
                        'Status': 'Success',
                        'Batch': f"{batch_num}/{total_batches}",
                        'Success': success_count,
                        'Failed': failure_count
                    })
                    break
                except Exception as e:
                    if attempt == max_retries - 1:
                        failure_count += len(batch)
                        pbar.set_postfix({
                            'Status': f'Error: {type(e).__name__}',
                            'Batch': f"{batch_num}/{total_batches}",
                            'Success': success_count,
                            'Failed': failure_count
                        })
                        print(f"Error ingesting batch {batch_num} after {max_retries} attempts: {str(e)}")
                    else:
                        pbar.set_postfix({
                            'Status': f'Retry {attempt+1}/{max_retries}',
                            'Batch': f"{batch_num}/{total_batches}",
                            'Success': success_count,
                            'Failed': failure_count
                        })
                        continue
            
            pbar.update(1)
    
    print(f"\nIngestion complete:")
    print(f"Successfully ingested: {success_count} documents")
    print(f"Failed to ingest: {failure_count} documents")