#Executive Summary

This Data Preparation notebook forms the first stage of a Retrieval-Augmented Generation (RAG) pipeline designed to process and index content from multiple document types (PDF, Word, PowerPoint, and Excel). The notebook locates files within a designated Google Drive folder, validates whether each has been previously processed, and reads any new documents for ingestion.

Central to this pipeline is chunking: larger sections of text (especially from PDFs, Word headings, Excel rows, and PPT slides) are broken into manageable parts. Each chunk gets a unique document ID and includes important metadata, such as headings, slide numbers, row identifiers, or section titles. By splitting content into smaller pieces, the pipeline improves downstream retrieval accuracy and performance.

Once chunked, the text is passed to the NebiusEmbedding class from LlamaIndex, which internally leverages Nebius-hosted embedding models. These embeddings transform text into high-dimensional vectors, capturing semantic meaning. They are then stored in two primary indexes:

Whoosh Index (BM25-based) - supports text-based retrieval.
Faiss Index (Vector-based) - supports semantic similarity searches.


Overall, the Data Preparation notebook delivers a MVP that ingests documents, splits them into meaningful chunks, and creates indexes accessible to the second component of the pipeline. Logging and version tracking are also built in, ensuring traceability and easy debugging.

OUTPUT FILES
As the final result, this notebook creates files:
FAISS
1) default__vectore_store.faiss
2) docstore.json
3) graph_store.json
4) index_store.json
WHOOSH
5) MAIN (Whoosh index)

These files collectively contain vector data, text content, and metadata, allowing for seamless reloading in subsequent inference steps.

# Backlog

ISSUES TO ADDRESS:

1) think of adding more metadata to pdf documents. Why don't keep and sections (if they exist) and pages for pdf documents?

2) Need to systematize metadata. Now it looks very differently depending on file type:
Entry 1:
{
    "page_label": "1",
    "file_name": "Test_file_pdf_(computer vision).pdf"
}

or

Entry 1:
{
    "chunk_number": 1,
    "total_chunks_in_section": 1,
    "start_token": 0,
    "end_token": 529,
    "chunk_type": "token_chunk",
    "source": "Autonomus vehicles and fusion.docx"
}

3) add a count of tokens being used for embedding (Section 7)


4) if we want to work with complicated excel and pdf files - we need to add LlamaParce functionality

5) version control e.g. GIT isn't implemented

6) regular saving and backup of crusial data, like, for example, faiss.index file

#1 Install required libraries

In [4]:
#1. Install required libraries
!pip install pdfplumber pandas openpyxl bleach faiss-cpu openai
!pip install -U llama-index llama-index-core llama-parse llama-index-readers-file openai
!pip install llama-index-embeddings-nebius
!pip install llama-index-vector-stores-faiss
!pip install python-pptx python-docx
!pip install whoosh



#2 Configuration: Mount Google Drive and Config Environment

Mounts your Google Drive to access the project’s directories, configures environment variables (like the API key), and prepares important paths for reading and saving files. Ensures logging folders, data directories, and other essential locations exist. Also sets up the global Nebius endpoint (Nebius hosts lots of open-sources LLMs and has very reasonable per-token prices) and key for subsequent interactions.

In [None]:
#2. Configuration: Mount Google Drive and config environment

from google.colab import drive
drive.mount('/content/gdrive')

import os
from pathlib import Path
import logging
import json
import openai  # Import openai to set api_base globally

# Logging configuration parameters
logging_level = logging.INFO
logging_format = "%(asctime)s - %(levelname)s - %(message)s"
log_folder=Path("/content/gdrive/MyDrive/RAG_Project_5/data_preparation/")
log_file="data_preparation.log"
max_bytes=5 * 1024 * 1024  # 5 MB
backup_count=3  # Keep up to 3 backup files


# Define directories for file processing and validate the directories exist
data_directory = Path('/content/gdrive/MyDrive/RAG_Project_5/data')
to_process_dir = data_directory / 'to_process'
processed_dir = data_directory / 'processed'

for folder in [to_process_dir, processed_dir]:
    folder.mkdir(parents=True, exist_ok=True)  # Create if not exists

# Define directories for indexes
whoosh_index_path = data_directory / "whoosh_index"
os.makedirs(whoosh_index_path, exist_ok=True)

faiss_index_path = data_directory / "faiss_index"
os.makedirs(faiss_index_path, exist_ok=True)



# Path to the JSON config file
config_path = '/content/gdrive/MyDrive/Colab_Notebooks/config.json'

# Load the API key (Nebius AI)
with open(config_path, encoding='utf-8-sig') as config_file:
    config = json.load(config_file)
    os.environ['API_KEY'] = config['API_KEY']

# Set the API key and API base globally
openai.api_key = os.environ.get("API_KEY")
openai.api_base = "https://api.studio.nebius.ai/v1/"  # Nebius AI endpoint

# Chunk parameters for word documents
max_tokens=1024
overlap_tokens=50

# Define embedding model from Nebius AI Studio
model_name = "BAAI/bge-en-icl" # from Nebius AI Studio

Mounted at /content/gdrive


#3. Logging Configuration
Sets up a rotating file handler to manage logs in both console and log files. Ensures logs don't grow indefinitely, helping maintain clarity during debugging or operational monitoring. This will record all activities in the data preparation process.


In [None]:
#3. Logging Configuration

import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
import os

def setup_logging(
    log_folder: Path,
    log_file: str = "data_preparation.log",
    max_bytes: int = 5 * 1024 * 1024,  # 5 MB
    backup_count: int = 3,
) -> None:
    """
    Configures logging with log rotation to write logs to both a file and the console.

    Args:
        log_folder (Path): The directory where the log file will be stored.
        log_file (str, optional): The name of the log file. Defaults to "data_preparation.log".
        max_bytes (int, optional): Maximum size of the log file in bytes before it is rotated. Defaults to 5 MB.
        backup_count (int, optional): Number of backup files to keep (when log file exides max_bytes size, it becomes a backup file). Defaults to 3.

    Returns:
        None
    """
    os.makedirs(log_folder, exist_ok=True)  # Ensure the folder exists
    log_file_path = log_folder / log_file

    # Create a RotatingFileHandler
    rotating_handler = RotatingFileHandler(
        filename=log_file_path,
        maxBytes=max_bytes,
        backupCount=backup_count,
        encoding="utf-8",
    )

    # Configure logging
    logging.basicConfig(
        level = logging_level,
        format = logging_format,
        handlers=[
            rotating_handler,  # Handles log rotation
            logging.StreamHandler(),  # Logs also appear in Colab's output
        ],
        force=True,  # Force the configuration to apply even if logging was already configured
    )

    logging.info("Logging with rotation has been successfully configured.")

# Initialize logging with log rotation
setup_logging(
    log_folder=log_folder,
    log_file=log_file,
    max_bytes=max_bytes,
    backup_count=backup_count,
)

2025-01-02 10:40:08,659 - INFO - Logging with rotation has been successfully configured.


#4. Load already processed files and validate new files for processing

Loads existing records of processed files (from JSON) to avoid reprocessing duplicates. Scans the “to_process” directory to find any new PDFs, Excel files, PowerPoint decks, or Word documents. Logs and displays new files to process. This step helps keep track of which files have already been ingested into the indexing pipeline.

Key output of the section: four lists of files (one for each file type) which are located in the Data directory, but haven't been processed yet: new_files_pdf, new_files_excel, etc.



In [3]:
# 4. Load already processed files and validate new files for processing

import json
from pathlib import Path
from typing import Set, List

def load_processed_files(record_path: Path) -> Set[str]:
    """
    Loads the set of processed files from a JSON record.

    Args:
        record_path (Path): Path to the JSON file containing processed file names.

    Returns:
        Set[str]: A set of processed file names.
        (In Python, a set is an unordered collection of unique item, no duplicates are allowed;
        operations like adding, removing, and checking membership are efficient with sets)
    """
    if record_path.exists():
        with open(record_path, 'r') as f:
            processed = set(json.load(f)) # converts the list into a set (checks and removes duplicates)
        logging.info(f"Loaded processed files from {record_path}.")
    else:
        processed = set()
        logging.info(f"No processed files record found at {record_path}. Starting fresh.")
    return processed

def log_new_files(file_list: List[Path], file_type: str) -> None:
    """
    Logs and prints a summary of new files to process.

    Args:
        file_list (List[Path]): List of new files to process.
        file_type (str): The type of files (e.g., 'PDF', 'Excel', 'PPT', 'DOC').

    Returns:
        None
    """
    if not file_list:
        logging.info(f"No new {file_type.upper()} files found in {to_process_dir}.")
        print(f"No new {file_type.upper()} files to process.\n")
    else:
        logging.info(f"Found {len(file_list)} new {file_type.upper()} files to process.")
        print(f"New {file_type.upper()} Files Found:")
        for file in file_list:
            print(file)
        print()

# Define paths for records of processed files per file type
processed_files_record_pdf = processed_dir / "processed_files_pdf.json"
processed_files_record_excel = processed_dir / "processed_files_excel.json"
processed_files_record_ppt = processed_dir / "processed_files_ppt.json"
processed_files_record_doc = processed_dir / "processed_files_doc.json"

# Load processed files
processed_files_pdf = load_processed_files(processed_files_record_pdf)
processed_files_excel = load_processed_files(processed_files_record_excel)
processed_files_ppt = load_processed_files(processed_files_record_ppt)
processed_files_doc = load_processed_files(processed_files_record_doc)


# Supported extensions for processing
supported_extensions = [".pdf", ".xls", ".xlsx", ".ppt", ".pptx", ".doc", ".docx"]

# List all relevant files recursively from nested folders
all_files = [f for f in to_process_dir.rglob("*") if f.is_file() and f.suffix.lower() in supported_extensions]

# Categorize files by type
files_pdf = [f for f in all_files if f.suffix.lower() == ".pdf"]
files_excel = [f for f in all_files if f.suffix.lower() in [".xls", ".xlsx"]]
#files_ppt = [f for f in all_files if f.suffix.lower() == ".ppt"]
files_ppt = [f for f in all_files if f.suffix.lower() in [".ppt", ".pptx"]]
files_doc = [f for f in all_files if f.suffix.lower() in [".doc", ".docx"]]

# Identify new files for processing
new_files_pdf = [f for f in files_pdf if f.name not in processed_files_pdf]
new_files_excel = [f for f in files_excel if f.name not in processed_files_excel]
new_files_ppt = [f for f in files_ppt if f.name not in processed_files_ppt]
new_files_doc = [f for f in files_doc if f.name not in processed_files_doc]

# Validate new files
log_new_files(new_files_pdf, "PDF")
log_new_files(new_files_excel, "Excel")
log_new_files(new_files_ppt, "PPT")
log_new_files(new_files_doc, "DOC")

NameError: name 'processed_dir' is not defined

#5 Setup of utility functions for Data Ingestion & Preprocessing Sections

Provides helper functions used throughout the data ingestion and preprocessing pipeline. It includes functions to assign unique IDs to documents and enrich them with metadata like source filename or other relevant fields.

In [None]:
# 5 Setup of utility functions for Data Ingestion & Preprocessing Sections

from llama_index.core import Document
from pathlib import Path
from typing import List
import uuid

# Functions are used in preprocessing PDF and WORD files
def assign_doc_id(documents: list) -> list:
    """
    Assigns a unique doc_id to each document, adds this doc_id to metadata.

    Args:
        documents (list): List of Document objects.

    Returns:
        list: List of Document objects with assigned doc_id.
    """
    for doc in documents:
        # Assign a unique ID
        doc_id = str(uuid.uuid4())
        doc.metadata["doc_id"] = doc_id
    return documents

def extract_metadata(doc: Document, source_file: str, additional_info: dict) -> Document:
    """
    Updates Document metadata with additional information: source file name and other key-value pairs from additional info.

    Args:
        doc (Document): The document object.
        source_file (str): The source file name.
        additional_info (dict): Additional metadata information.

    Returns:
        Document: Document object with updated metadata.
    """
    doc.metadata["source"] = source_file
    for key, value in additional_info.items():
        doc.metadata[key] = value
    return doc

# 6. Data Ingestion & Preprocessing for PDF

Loads PDF files using LlamaIndex's PDFReader, converting each page into a separate Document instance. Automatically assigns document IDs and attaches metadata (like the source filename). Collects all pages from the provided list of new PDFs, logs any processing errors, and returns a list of documents ready for indexing.

Key output of the section - list of Document objects, each representing a page of a pdf file from new_files_pdf list of files (see Section 4)



In [None]:
# 6. Data Ingestion & Preprocessing for PDF

from llama_index.core import Document
from llama_index.readers.file import PDFReader
from pathlib import Path
from typing import List
# import uuid

def load_pdf_docs(pdf_paths: List[Path]) -> List[Document]:
    """
    Uses LlamaIndex PDFReader to load text content of PDF files
    and organizes them into Document objects.

    Each page of a PDF file (with associated metadata) will be a separate Document.

    Args:
        pdf_paths (List[Path]): List of PDF file paths to process.

    Returns:
        List[Document]: A list of Document objects containing the text and metadata from the PDFs.
    """
    pdf_reader = PDFReader()
    all_docs = []
    for pdf_path in pdf_paths:
        try:
            docs = pdf_reader.load_data(pdf_path)
            docs = assign_doc_id(docs)  # Assign doc_id and add doc_id to metadata
            # Extract additional metadata if available
            for doc in docs:
                additional_info = {
                  #  "page_number": doc.metadata.get("page_number"), # no need for page number here - "page label" will be extracted automatically
                    # Add other metadata fields as necessary
                }
                doc = extract_metadata(doc, source_file=pdf_path.name, additional_info=additional_info)
            all_docs.extend(docs)
            logging.info(f"Processed PDF: {pdf_path.name} with {len(docs)} pages.")
        except Exception as e:
            logging.error(f"Failed to process PDF {pdf_path.name}: {e}")
    return all_docs

# Load pdf docs
if new_files_pdf:
    pdf_docs = load_pdf_docs(new_files_pdf)
    logging.info(f"Total new PDF documents loaded: {len(pdf_docs)}")
else:
    pdf_docs = []
    logging.info("No new PDF files to process.")

2025-01-02 11:34:11,366 - INFO - No new PDF files to process.


# 7. Data Ingestion & Preprocessing for Excel

Parses Excel files sheet by sheet and row by row, converting each row (+ concatenated headers) into a separate Document. Uses Pandas to read each sheet, constructs a text string describing each row's columns and data, and attaches metadata like sheet names and row numbers.

Key output of the section - list of Document objects, each representing a row of an excel file from new_files_excel set of files (see Section 4)

In [None]:
# 7. Data Ingestion & Preprocessing for Excel

from llama_index.core import Document
import pandas as pd
from pathlib import Path
from typing import List

def load_excel_docs(excel_paths: List[Path]) -> List[Document]:
    """
    Uses Pandas to load documents from Excel files and organizes them into Document objects.

    Each row of an Excel sheet is converted into a Document with metadata.

    Args:
        excel_paths (List[Path]): List of Excel file paths to process.

    Returns:
        List[Document]: A list of Document objects containing the text and metadata from the Excel files.
    """
    all_docs = []
    for excel_path in excel_paths:
        try:
            xls = pd.ExcelFile(excel_path)

            for sheet_name in xls.sheet_names:
                df = pd.read_excel(xls, sheet_name=sheet_name)

                for idx, row in df.iterrows():
                    row_dict = ", ".join(f"{col.strip().lower()}: {str(row[col]).strip()}" for col in df.columns)
                    metadata = {
                        "sheet": sheet_name,
                        "row_number": idx + 1,  # 1-based indexing
                    }

                    doc = Document(text=row_dict, metadata=metadata)
                    all_docs.append(doc)

            logging.info(f"Processed Excel: {excel_path.name} with {len(df)} rows.")
        except Exception as e:
            logging.error(f"Failed to process Excel {excel_path.name}: {e}")

    # Assign doc_ids and extract metadata
    all_docs = assign_doc_id(all_docs)
    for doc in all_docs:
        additional_info = {}  # Add any additional metadata if necessary
        doc = extract_metadata(doc, source_file=excel_path.name, additional_info=additional_info)
    return all_docs

if new_files_excel:
    excel_docs = load_excel_docs(new_files_excel)
    logging.info(f"Total new Excel documents loaded: {len(excel_docs)}")
else:
    excel_docs = []
    logging.info("No new Excel files to process.")

2025-01-02 11:34:15,238 - INFO - Processed Excel: Lessons Learned_2021 BOHAI_ZPEC Campaign_MD_short.xlsx with 49 rows.
2025-01-02 11:34:15,241 - INFO - Total new Excel documents loaded: 49


#8. Data Ingestion & Preprocessing for PowerPoint

Extracts text content slide by slide from PowerPoint presentations. Each slide's text gets packaged into a Document with metadata (slide number, source file). This approach ensures each slide is indexed separately, making it easier to search for content within individual slides later on.

In [None]:
# 8. Data Ingestion & Preprocessing for PowerPoint

from pptx import Presentation
from pathlib import Path
from typing import List
from llama_index.core import Document

def load_ppt_docs(ppt_paths: List[Path]) -> List[Document]:
    """
    Loads text content from PowerPoint files and organizes them into Document objects.

    Each slide is converted into a Document with metadata.

    Args:
        ppt_paths (List[Path]): List of PowerPoint file paths to process.

    Returns:
        List[Document]: A list of Document objects containing the text and metadata from the slides.
    """
    all_docs = []
    for ppt_path in ppt_paths:
        try:
            prs = Presentation(ppt_path)
            for idx, slide in enumerate(prs.slides):
                slide_text = "\n".join(shape.text for shape in slide.shapes if hasattr(shape, "text"))  # Collects only text from the slide
                metadata = {
                    "slide_number": idx + 1,
                }
                doc = Document(text=slide_text, metadata=metadata)
                all_docs.append(doc)
            logging.info(f"Processed PowerPoint: {ppt_path.name} with {len(prs.slides)} slides.")
        except Exception as e:
            logging.error(f"Failed to process PowerPoint {ppt_path.name}: {e}")

    # Assign doc_ids and extract metadata
    all_docs = assign_doc_id(all_docs)
    for doc in all_docs:
        additional_info = {}  # Add any additional metadata if necessary
        doc = extract_metadata(doc, source_file=ppt_path.name, additional_info=additional_info)
    return all_docs

if new_files_ppt:
    ppt_docs = load_ppt_docs(new_files_ppt)
    logging.info(f"Total new PowerPoint documents loaded: {len(ppt_docs)}")
else:
    ppt_docs = []
    logging.info("No new PowerPoint files to process.")

2025-01-02 11:34:19,616 - INFO - No new PowerPoint files to process.


#9. Data Ingestion & Preprocessing for Word

Loads Word documents, splitting them into sections based on headings. Large sections are further subdivided into smaller chunks by token count to ensure more granular indexing. Each chunk gets its own metadata, including heading titles, chunk numbers, and a unique document ID.

In [None]:
# 9. Data Ingestion & Preprocessing for Word

# !!! I plan to rewrite this section to use assing_doc_id and extract_metadata utility functions to make my code more smooth and readable.

from docx import Document as WordDocument
from pathlib import Path
from typing import List
from llama_index.core import Document
from transformers import GPT2TokenizerFast
import uuid
import textwrap

# Initialize tokenizer for token-based chunking
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")

def split_large_section(
    section: List[str], metadata: dict, max_tokens: int, overlap_tokens: int
) -> List[Document]:
    """
    Splits a large text section into smaller chunks based on token count.

    Args:
        section (List[str]): List of paragraphs in the section.
        metadata (dict): Metadata associated with the section.
        max_tokens (int): Maximum number of tokens per chunk.
        overlap_tokens (int): Number of overlapping tokens between chunks.

    Returns:
        List[Document]: List of smaller Document objects.
    """
    all_docs = []
    text = "\n".join(section)
    tokens = tokenizer.encode(text)
    total_chunks = (len(tokens) + max_tokens - 1) // max_tokens  # Calculate total chunks

    for chunk_number, i in enumerate(range(0, len(tokens), max_tokens - overlap_tokens), start=1):
        chunk_tokens = tokens[i : i + max_tokens]
        chunk_text = tokenizer.decode(chunk_tokens)

        # Add metadata for the current chunk
        chunk_metadata = {
            **(metadata or {}),
            "chunk_number": chunk_number,
            "total_chunks_in_section": total_chunks,
            "start_token": i,
            "end_token": min(i + max_tokens, len(tokens)),
            "chunk_type": "token_chunk",
            "doc_id": str(uuid.uuid4()),  # Assign unique doc_id for each chunk
        }

        doc = Document(text=chunk_text, metadata=chunk_metadata)
        all_docs.append(doc)

    return all_docs

def chunk_by_headings(
    word_doc: WordDocument, max_tokens: int = 1024, overlap_tokens: int = 50
) -> List[Document]:
    """
    Chunks a Word document into sections based on headings and splits large sections by tokens.

    Args:
        word_doc (WordDocument): The Word document object.
        max_tokens (int, optional): Maximum number of tokens per chunk. Defaults to 1024.
        overlap_tokens (int, optional): Number of overlapping tokens between chunks. Defaults to 50.

    Returns:
        List[Document]: List of Document objects containing text and metadata.
    """
    all_docs = []
    current_section = []
    current_metadata = None

    for paragraph in word_doc.paragraphs:
        if paragraph.style.name.startswith("Heading"):  # Detect headings
            # Save the current section as chunks
            if current_section:
                all_docs.extend(
                    split_large_section(
                        current_section, current_metadata, max_tokens, overlap_tokens
                    )
                )
                current_section = []

            # Update metadata for the new section
            current_metadata = {"section": paragraph.text.strip()}

        # Add paragraph text to the current section
        current_section.append(paragraph.text.strip())

    # Save the last section if it contains any content
    if current_section:
        all_docs.extend(
            split_large_section(
                current_section, current_metadata, max_tokens, overlap_tokens
            )
        )

    return all_docs

def load_doc_docs(
    doc_paths: List[Path], max_tokens: int = 1024, overlap_tokens: int = 50
) -> List[Document]:
    """
    Processes Word documents by chunking them into sections based on headings.

    Args:
        doc_paths (List[Path]): List of Word document file paths to process.
        max_tokens (int): Maximum number of tokens per chunk.
        overlap_tokens (int): Number of overlapping tokens between chunks.

    Returns:
        List[Document]: List of Document objects containing the text and metadata from the Word files.
    """
    all_docs = []
    for doc_path in doc_paths:
        try:
            word_doc = WordDocument(doc_path)
            logging.info(f"Processing {doc_path.name} by headings.")

            # Use chunk_by_headings function
            docs = chunk_by_headings(word_doc, max_tokens, overlap_tokens)

            # Ensure all chunks include the source metadata
            for doc in docs:
                if 'source' not in doc.metadata:
                    doc.metadata['source'] = doc_path.name  # Add source metadata if missing

            all_docs.extend(docs)
            logging.info(f"Processed Word Document: {doc_path.name} with {len(docs)} chunks.")
        except Exception as e:
            logging.error(f"Failed to process Word Document {doc_path.name}: {e}")
    return all_docs

if new_files_doc:
    doc_docs = load_doc_docs(new_files_doc, max_tokens=max_tokens, overlap_tokens=overlap_tokens)
    logging.info(f"Total new Word documents loaded: {len(doc_docs)}")
else:
    doc_docs = []
    logging.info("No new Word files to process.")


2025-01-02 11:34:23,612 - INFO - No new Word files to process.


#10 Create / Update Whoosh Index - add new documents to it

Creates or opens an existing Whoosh index to store newly ingested documents. Indexes fields such as text content, source, and any additional metadata like sheet numbers, slide numbers, or section headings. This step updates the search index with new documents so they can be retrieved later using BM25 or other text-based query methods.

In [None]:
# 10. Create / Update Whoosh Index - add new documents to it

from whoosh.index import create_in, open_dir, exists_in
from whoosh.fields import Schema, TEXT, ID, NUMERIC
from whoosh import index
from pathlib import Path
import os
import logging
import uuid

# Define the schema with all necessary metadata fields
schema = Schema(
    content=TEXT(stored=True),                  # Store the text content for search
    source=ID(stored=True),                     # Store the document source as an identifier
    sheet=ID(stored=True),                      # For Excel documents
    row_number=NUMERIC(stored=True, sortable=True),
    slide_number=NUMERIC(stored=True, sortable=True),  # For PPT documents
    section=TEXT(stored=True),                  # For Word documents
    chunk_number=NUMERIC(stored=True, sortable=True),
    total_chunks_in_section=NUMERIC(stored=True, sortable=True),
    page_number=NUMERIC(stored=True, sortable=True),    # For PDF documents
    doc_id=ID(stored=True, unique=True),         # Unique identifier for each document
    # Add other metadata fields as needed
)

def create_or_update_whoosh_index(documents: list, index_dir: Path) -> None:
    """
    Creates or updates a Whoosh index with the given documents, including all metadata fields.

    Args:
        documents (list): List of Document objects to add to the index.
        index_dir (Path): Directory where the Whoosh index will be stored.

    Returns:
        None
    """
    try:
        # Ensure the index directory exists
        os.makedirs(index_dir, exist_ok=True)

        # Check if an existing Whoosh index exists or create a new one
        if not exists_in(index_dir):
            # Create a new Whoosh index
            idx = create_in(index_dir, schema)
            logging.info(f"Created a new Whoosh index at {index_dir}.")
            before_count = 0  # No documents in a newly created index
        else:
            # Open the existing Whoosh index
            idx = open_dir(index_dir)
            with idx.searcher() as searcher:
                before_count = searcher.doc_count()  # Count existing documents
            logging.info(f"Opened existing Whoosh index at {index_dir}. Total docs before: {before_count}.")

        # Add or update documents in the index
        writer = idx.writer()

        for doc in documents:
            try:
                # Extract content and source from the document
                content = doc.text
                source = doc.metadata.get("source", "unknown_source")
                doc_id = doc.metadata.get("doc_id", str(uuid.uuid4()))  # Ensure doc_id exists

                # Prepare metadata fields
                doc_fields = {
                    "content": content,
                    "source": source,
                    "doc_id": doc_id,
                }

                # Conditionally add metadata fields if they exist
                if "sheet" in doc.metadata:
                    doc_fields["sheet"] = doc.metadata["sheet"]
                if "row_number" in doc.metadata:
                    doc_fields["row_number"] = doc.metadata["row_number"]
                if "slide_number" in doc.metadata:
                    doc_fields["slide_number"] = doc.metadata["slide_number"]
                if "section" in doc.metadata:
                    doc_fields["section"] = doc.metadata["section"]
                if "chunk_number" in doc.metadata:
                    doc_fields["chunk_number"] = doc.metadata["chunk_number"]
                if "total_chunks_in_section" in doc.metadata:
                    doc_fields["total_chunks_in_section"] = doc.metadata["total_chunks_in_section"]
                if "page_number" in doc.metadata:
                    doc_fields["page_number"] = doc.metadata["page_number"]

                # Add or update document in the index
                writer.update_document(**doc_fields)
            except Exception as e:
                logging.error(f"Failed to index document {doc.metadata.get('doc_id', 'Unknown ID')}: {e}")

        # Commit changes to the index
        writer.commit()

        # Count documents after updating the index
        with idx.searcher() as searcher:
            after_count = searcher.doc_count()

        # Log the document counts
        newly_added = len(documents)
        logging.info(
            f"Whoosh index updated. Docs before: {before_count}, docs added: {newly_added}, docs after: {after_count}."
        )
    except Exception as e:
        logging.error(f"Failed to create/update Whoosh index at {index_dir}: {e}")
        raise

# We don't use backup_whoosh_index explicitly yet
def backup_whoosh_index(whoosh_index_path: Path, backup_path: Path) -> None:
    """
    Creates a backup of the Whoosh index.

    Args:
        whoosh_index_path (Path): Path to the Whoosh index.
        backup_path (Path): Path to store the backup.

    Returns:
        None
    """
    import shutil  # Imported here as per coding preference

    try:
        if whoosh_index_path.exists():
            shutil.copytree(whoosh_index_path, backup_path, dirs_exist_ok=True)
            logging.info(f"Whoosh index backed up to {backup_path}.")
        else:
            logging.warning(f"Whoosh index path {whoosh_index_path} does not exist. Backup skipped.")
    except Exception as e:
        logging.error(f"Failed to backup Whoosh index: {e}")

# Define all_new_docs by aggregating all processed documents
all_new_docs = pdf_docs + excel_docs + ppt_docs + doc_docs

# Execute indexing
# whoosh_index_path = data_directory / "whoosh_index" - moved to config section
create_or_update_whoosh_index(all_new_docs, whoosh_index_path)

2025-01-02 11:34:29,125 - INFO - Created a new Whoosh index at /content/gdrive/MyDrive/RAG_Project_5/data/whoosh_index.
2025-01-02 11:34:29,483 - INFO - Whoosh index updated. Docs before: 0, docs added: 49, docs after: 49.


# 11. Create / Update Faiss Index - add new documents to it

Builds or updates a Faiss-based vector index for semantic search. Uses the Nebius embedding model to create embeddings for all new documents, then stores these vectors in a Faiss index. If an index doesn't exist, it is created from scratch; otherwise, it is loaded and incrementally updated with any newly ingested documents.

Key output of the section - updated vector database (Index) - vectorstore. Vectorstore contains text embeddings, metadata and text chunks of processed files.

In [None]:
# 11 Create / Update Faiss Index - add new documents to it

import faiss
import httpx
from pathlib import Path
from llama_index.embeddings.nebius import NebiusEmbedding
from llama_index.vector_stores.faiss import FaissVectorStore
from llama_index.core import VectorStoreIndex, StorageContext, load_index_from_storage
from llama_index.core.storage.docstore.simple_docstore import SimpleDocumentStore
from llama_index.core.storage.index_store import SimpleIndexStore
from llama_index.core import Document, Settings

import uuid # do we really need it here?
import logging

# Setup NebiusEmbedding model with custom_http_client
# model_name = "BAAI/bge-en-icl" #- setup in config section
custom_http_client = httpx.Client(timeout=60.0)
embedding_model = NebiusEmbedding(
    api_key=os.environ.get("API_KEY"),
    model_name=model_name,
    http_client=custom_http_client,
    api_base="https://api.studio.nebius.ai/v1/",  # Explicitly specify api_base
    # api_base=api_base,  # Using api_base variable defined in the config section doesn't work - probably, due to some bugs in NebiusEmbedding module
)

# Determine dimension of embedding model - needed for FAISS index
sample_text = "test text to create an embedding"
sample_embedding = embedding_model.get_text_embedding(sample_text)
embedding_dimension = len(sample_embedding)

print(f"\nEmbedding Model {model_name} creates {embedding_dimension}-dimensional vectors.\n")
print(f"Sample embedding for text '{sample_text}':")
print(sample_embedding[:10], "...\n")

# Set the embedding model in LlamaIndex settings
Settings.embed_model = embedding_model

def create_or_update_faiss_index(
    documents: list[Document], index_path: Path, embedding_model: NebiusEmbedding, embedding_dimension: int
) -> None:
    """
    Creates or updates a FAISS-based index with the given documents.

    Args:
        documents (list[Document]): List of Document objects to add to the index.
        index_path (Path): Path to the directory where the index will be stored.
        embedding_model (NebiusEmbedding): The embedding model used for indexing.
        embedding_dimension (int): The dimension of the embeddings.

    Returns:
        None
    """
    if documents:
        newly_added = len(documents)
        if (index_path / "default__vector_store.faiss").exists():
            # Load existing index
            vector_store = FaissVectorStore.from_persist_path(str(index_path / "default__vector_store.faiss"))
            storage_context = StorageContext.from_defaults(vector_store=vector_store, persist_dir=str(index_path))  # Reconstruct storage_context from files
            index = load_index_from_storage(storage_context, embedding=embedding_model)  # Reloading the index

            # Count vectors BEFORE
            before_count = vector_store.client.ntotal

            # Update index with new documents
            for doc in documents:
                index.insert(doc)  # Update index with new documents

            # Count vectors AFTER
            after_count = vector_store.client.ntotal

            #logging.info("Inserted new documents into existing FAISS index.")
            logging.info(f"Inserted {newly_added} new docs. Vectors before: {before_count}, after: {after_count}")
        else:
            # Create a new index
            vector_store = FaissVectorStore(faiss.IndexFlatIP(embedding_dimension))
            storage_context = StorageContext.from_defaults(
                vector_store=vector_store,
                docstore=SimpleDocumentStore(),
                index_store=SimpleIndexStore(),
                persist_dir=str(index_path),
            )
            index = VectorStoreIndex.from_documents(documents, storage_context=storage_context, embedding=embedding_model) # Create index from scratch

            # Count vectors BEFORE
            before_count = 0

            # Count vectors AFTER
            after_count = vector_store.client.ntotal

            #logging.info("Inserted new documents into a freshly created FAISS index.")
            logging.info(
                        f"Created a new FAISS index and inserted {newly_added} docs. "
                        f"Vectors before: 0, after: {after_count}"
                    )
        # Persist the created/updated index
        index.storage_context.persist(
            persist_dir=str(index_path),
            vector_store_fname="vector_store.faiss",  # Important to save with .faiss suffix
        )
        logging.info("FAISS index created/updated and persisted successfully.")

    else:
        logging.info("No documents to index for FAISS.")

# Execute FAISS indexing
create_or_update_faiss_index(all_new_docs, faiss_index_path, embedding_model, embedding_dimension)

2025-01-02 11:34:36,380 - INFO - HTTP Request: POST https://api.studio.nebius.ai/v1/embeddings "HTTP/1.1 200 OK"



Embedding Model BAAI/bge-en-icl creates 4096-dimensional vectors.

Sample embedding for text 'test text to create an embedding':
[0.004482269287109375, -0.00551605224609375, 0.008056640625, 0.0103302001953125, -0.025604248046875, -0.004878997802734375, -0.01409912109375, 0.0168914794921875, 0.0027904510498046875, -0.00604248046875] ...



2025-01-02 11:34:36,727 - INFO - HTTP Request: POST https://api.studio.nebius.ai/v1/embeddings "HTTP/1.1 200 OK"
2025-01-02 11:34:37,283 - INFO - HTTP Request: POST https://api.studio.nebius.ai/v1/embeddings "HTTP/1.1 200 OK"
2025-01-02 11:34:37,599 - INFO - HTTP Request: POST https://api.studio.nebius.ai/v1/embeddings "HTTP/1.1 200 OK"
2025-01-02 11:34:37,812 - INFO - HTTP Request: POST https://api.studio.nebius.ai/v1/embeddings "HTTP/1.1 200 OK"
2025-01-02 11:34:38,016 - INFO - HTTP Request: POST https://api.studio.nebius.ai/v1/embeddings "HTTP/1.1 200 OK"
2025-01-02 11:34:38,064 - INFO - Created a new FAISS index and inserted 49 docs. Vectors before: 0, after: 49
2025-01-02 11:34:38,113 - INFO - FAISS index created/updated and persisted successfully.


# 12. Move processed files to a designated folder and update the records of processed files

After successful ingestion and indexing, moves newly processed files from “to_process” into the “processed” folder. Also updates the JSON records of processed files to include these newly completed items, preventing redundant reprocessing in the future.

In [None]:
# 12. Move processed files to a designated folder and update the records of processed files to keep track of which files have already been processed

def move_files_to_processed(files: List[Path], destination_dir: Path) -> None:
    """
    Moves processed files to the designated 'processed' directory.

    Args:
        files (List[Path]): List of file paths to move.
        destination_dir (Path): Destination directory where files will be moved.

    Returns:
        None
    """
    for file in files:
        try:
            target_path = destination_dir / file.relative_to(to_process_dir)  # Maintain folder structure
            target_path.parent.mkdir(parents=True, exist_ok=True)  # Create parent folders if needed
            file.rename(target_path)
            logging.info(f"Moved {file.name} to {target_path}")
        except Exception as e:
            logging.error(f"Failed to move file {file.name} to {target_path}: {e}")

def update_processed_files(record_path: Path, file_list: List[Path]) -> None:
    """
    Updates the record of processed files (JSON files that keep track of files being processed).

    Adds the names of the files that have just been processed to a JSON record.

    Args:
        record_path (Path): Path to the JSON record file.
        file_list (List[Path]): List of file paths that have been processed.

    Returns:
        None
    """
    try:
        processed = set()
        if record_path.exists():
            with open(record_path, 'r') as f:
                processed = set(json.load(f))
        processed.update([file.name for file in file_list])
        with open(record_path, 'w') as f:
            json.dump(list(processed), f)
        logging.info(f"Updated processed files record at {record_path}.")
    except Exception as e:
        logging.error(f"Error updating processed files record at {record_path}: {e}")

# Move each type of processed files to /processed/
move_files_to_processed(new_files_pdf, processed_dir)
move_files_to_processed(new_files_excel, processed_dir)
move_files_to_processed(new_files_ppt, processed_dir)
move_files_to_processed(new_files_doc, processed_dir)

# Update the processed files record for each file type
update_processed_files(processed_files_record_pdf, new_files_pdf)
update_processed_files(processed_files_record_excel, new_files_excel)
update_processed_files(processed_files_record_ppt, new_files_ppt)
update_processed_files(processed_files_record_doc, new_files_doc)


2025-01-02 11:34:43,557 - INFO - Moved Lessons Learned_2021 BOHAI_ZPEC Campaign_MD_short.xlsx to /content/gdrive/MyDrive/RAG_Project_5/data/processed/Lessons Learned_2021 BOHAI_ZPEC Campaign_MD_short.xlsx
2025-01-02 11:34:43,568 - INFO - Updated processed files record at /content/gdrive/MyDrive/RAG_Project_5/data/processed/processed_files_pdf.json.
2025-01-02 11:34:43,578 - INFO - Updated processed files record at /content/gdrive/MyDrive/RAG_Project_5/data/processed/processed_files_excel.json.
2025-01-02 11:34:43,588 - INFO - Updated processed files record at /content/gdrive/MyDrive/RAG_Project_5/data/processed/processed_files_ppt.json.
2025-01-02 11:34:43,599 - INFO - Updated processed files record at /content/gdrive/MyDrive/RAG_Project_5/data/processed/processed_files_doc.json.


# 13. Display entries from the processed documents for verification

Shows a sample of newly ingested documents (like PDF pages, Excel rows, slides, Word chunks) with their metadata. Useful for quick verification that text chunks and metadata have been correctly split and associated. Only a few entries (but one can manage exact number and location of chunks) are displayed to ensure the pipeline's correctness without overwhelming output.

In [None]:
# 13. Display text chunks and metadata created

import json
import textwrap
from typing import List
from llama_index.core import Document

total_new_docs = len(all_new_docs)
print(f"Total new documents created: {total_new_docs}\n")

def display_entries(entries: List[Document], file_type: str, start: int = 0, num: int = 5, text_width: int = 100, truncate: bool = False, max_length: int = 1500):
    """
    Displays specified entries of metadata for verification.
    The 'text' field (LlamaIndex Document) is displayed in a wrapped and readable format.

    Args:
        entries (List[Document]): List of Document objects.
        file_type (str): The type of documents (e.g., 'PDF', 'Excel', 'PPT', 'DOC').
        start (int, optional): Starting index. Defaults to 0.
        num (int, optional): Number of entries to display. Defaults to 5.
        text_width (int, optional): Width for text wrapping. Defaults to 100.
        truncate (bool, optional): Whether to truncate long texts. Defaults to False.
        max_length (int, optional): Maximum length of text to display if truncating. Defaults to 1500.

    Returns:
        None
    """
    print(f"Displaying {num} Entries starting from index {start} in {file_type} Documents:\n")

    end = start + num
    for i, doc in enumerate(entries[start:end], start=start + 1):
        print(f"Entry {i}:")
        metadata = doc.metadata
        print(json.dumps(metadata, indent=4))

        text = doc.text
        if text:
            print("\nText:")
            if truncate and len(text) > max_length:
                displayed_text = text[:max_length] + "...\n[Text truncated]"
            else:
                displayed_text = text
            wrapped_text = textwrap.fill(displayed_text, width=text_width)
            print(wrapped_text)

        print("\n" + "-" * text_width + "\n")

# Display entries for each document type
display_entries(pdf_docs, 'PDF')
display_entries(excel_docs, 'Excel', start=0, num=5)
display_entries(ppt_docs, 'PPT', start=0, num=5)
display_entries(doc_docs, 'DOC', start=0, num=5)


Total new documents created: 49

Displaying 5 Entries starting from index 0 in PDF Documents:

Displaying 5 Entries starting from index 0 in Excel Documents:

Entry 1:
{
    "sheet": "Lessons Learned and Proposals",
    "row_number": 1,
    "doc_id": "84f2b87d-8701-4227-a6dc-907b13727d8c",
    "source": "Lessons Learned_2021 BOHAI_ZPEC Campaign_MD_short.xlsx"
}

Text:
##: 1.1, type: General, reference: All, subject: KPI Matrix for Contractor's performance,
description: Assessment of the Contractor's performance as per newly developed KPI matrix.
Acceptance of problematic wells with discount as per matrix, which include: Cement quality, Survey &
Logging quality, Trajectory and target accuracy, Programs preparation & reports., lessons learned:
nan, action: Contract's App update required

----------------------------------------------------------------------------------------------------

Entry 2:
{
    "sheet": "Lessons Learned and Proposals",
    "row_number": 2,
    "doc_id": "58b1abf9

APPENDIXES:

Print all the codecells from the pipeline above - to use this code (all the code cells together) later to create a .py file for production

Skip only cells marked "# SKIP ME"











In [2]:
# SKIP ME

# APPENDIXES: Print all the codecells from the pipeline above (skips cells marked "# SKIP ME")

import json

# Fetch the current notebook's metadata and contents
from google.colab import _message

def get_notebook_content():
    try:
        notebook = _message.blocking_request('get_ipynb', timeout_sec=5)
        return notebook['ipynb']
    except Exception as e:
        print(f"Error fetching notebook content: {e}")
        return None

# Get the notebook content
notebook_data = get_notebook_content()

if notebook_data:
    code_cells = [
        "".join(cell["source"])
        for cell in notebook_data.get("cells", [])
        if cell["cell_type"] == "code" and "# SKIP ME" not in "".join(cell["source"])
    ]

    # Join the code cells with three empty lines between them
    readable_code = "\n\n\n".join(code_cells)

#    print("Extracted Code Cells:\n\n\n")
    print(readable_code)
else:
    print("Could not fetch notebook content.")

#1. Install required libraries
!pip install pdfplumber pandas openpyxl bleach faiss-cpu openai
!pip install -U llama-index llama-index-core llama-parse llama-index-readers-file openai
!pip install llama-index-embeddings-nebius
!pip install llama-index-vector-stores-faiss
!pip install python-pptx python-docx
!pip install whoosh


#2. Configuration: Mount Google Drive and config environment

from google.colab import drive
drive.mount('/content/gdrive')

import os
from pathlib import Path
import logging
import json
import openai  # Import openai to set api_base globally

# Logging configuration parameters
logging_level = logging.INFO
logging_format = "%(asctime)s - %(levelname)s - %(message)s"
log_folder=Path("/content/gdrive/MyDrive/RAG_Project_5/data_preparation/")
log_file="data_preparation.log"
max_bytes=5 * 1024 * 1024  # 5 MB
backup_count=3  # Keep up to 3 backup files


# Define directories for file processing and validate the directories exist
data_directory = Path('/content/gdriv