# 1 Setup & Library Imports

First, we import the necessary libraries and
set up the directory where our ingested and processed data will live.





In [11]:
import os
import json
import pandas as pd
import pdfplumber  # For extracting text from PDF documents
from datetime import datetime
from typing import List, Dict

# Directory configuration 
# We'll store raw files (PDFs, CSVs, etc.) inside a "data" folder.
DATA_DIR = "./data"

# Define the path where we'll save processed text chunks.
# JSONL (JSON Lines) format is used because it handles large datasets efficiently:
# each line is a valid JSON object, so you can read it line by line.
OUTPUT_FILE = os.path.join(DATA_DIR, "chunks.jsonl")

# Create the directory if it doesn't exist.
os.makedirs(DATA_DIR, exist_ok=True)

print(f"Data directory ready at: {DATA_DIR}")

Data directory ready at: ./data


In [12]:
# import os
data_path = os.path.abspath('./data')
print(f"Looking for files in: {data_path}")
files = os.listdir('./data')
print("\nFiles found:")
for file in files:
    print(f"- {file}")

Looking for files in: /home/diego/Projects/advanced-multisource-rag-project/data

Files found:
- MOCK_DATA.csv:Zone.Identifier
- test.pdf
- .ipynb_checkpoints
- The Journey Towards 6G - A Digital and Societal Revolution in the Making - IEEE Resource - copia.pdf:Zone.Identifier
- chunks.jsonl
- MOCK_DATA.csv


# 2 Load and Extract Text from PDFs

This function will scan the data folder for any PDF files
and extract their text content using the pdfplumber library.

In [13]:
import os
import pdfplumber
from typing import List, Dict
import signal

class TimeoutError(Exception):
    pass

def timeout_handler(signum, frame):
    """Handler for timeout signal."""
    raise TimeoutError("PDF extraction timed out")

def load_pdfs_column_aware(pdf_paths: List[str] | str, n_columns: int = 2, timeout_sec: int = 60) -> List[Dict]:
    """
    Extracts text from multi-column PDFs, processing each column separately.
    Includes a timeout mechanism to prevent hanging on large/problematic PDFs.

    Args:
        pdf_paths (List[str] | str): List of PDF file paths, a single PDF path,
            or a directory path containing PDF files.
        n_columns (int): Number of columns per page (default=2).
        timeout_sec (int): Timeout in seconds per PDF (default=60).

    Returns:
        List[Dict]: List of dictionaries with "id", "source", and "text".
    """
    # Normalize input: accept a single path string, a list of paths, or a directory
    if isinstance(pdf_paths, str):
        if os.path.isdir(pdf_paths):
            # Directory: collect .pdf files inside
            pdf_list = [os.path.join(pdf_paths, f) for f in os.listdir(pdf_paths) if f.lower().endswith('.pdf')]
        else:
            pdf_list = [pdf_paths]
    else:
        # assume iterable of paths
        pdf_list = list(pdf_paths)

    docs = []

    for path in pdf_list:
        if not os.path.exists(path):
            print(f"Skipping missing path: {path}")
            continue
        if os.path.isdir(path):
            # Skip directories (already handled), but be defensive
            print(f"Skipping directory path: {path}")
            continue

        print(f"Processing {path} (timeout={timeout_sec}s)...")
        all_text = []
        
        try:
            with pdfplumber.open(path) as pdf:
                num_pages = len(pdf.pages)
                print(f"   Found {num_pages} pages. Extracting text...")
                
                for page_idx, page in enumerate(pdf.pages):
                    # Print progress every 10 pages
                    if (page_idx + 1) % 10 == 0:
                        print(f"   Progress: {page_idx + 1}/{num_pages} pages...")
                    
                    width = page.width
                    height = page.height

                    # Split the page width into N equal column boxes
                    column_width = width / n_columns
                    for i in range(n_columns):
                        left = i * column_width
                        right = (i + 1) * column_width
                        bbox = (left, 0, right, height)
                        column = page.within_bbox(bbox)
                        try:
                            text = column.extract_text(x_tolerance=2, y_tolerance=2)
                        except Exception as e:
                            print(f"Column {i} extraction failed on page {page_idx + 1}: {e}")
                            text = None
                        if text:
                            all_text.append(text.strip())
        except Exception as e:
            print(f"Error reading {path}: {e}")
            continue

        if all_text:
            combined_text = "\n".join(all_text)
            docs.append({
                "id": os.path.basename(path).replace(".pdf", ""),
                "source": path,
                "text": combined_text
            })
            print(f"Successfully extracted text from {path}")
        else:
            print(f"No text extracted from {path}")

    print(f"\nLoaded {len(docs)} PDFs with column-aware extraction.")
    return docs

# Example use: accept a single path, a list, or a directory
print("Starting PDF extraction (this may take a moment for large files)...\n")
pdf_docs = load_pdfs_column_aware(DATA_DIR)
print(f"\nExtracted {len(pdf_docs)} PDF documents.")

Starting PDF extraction (this may take a moment for large files)...

Processing ./data/test.pdf (timeout=60s)...
   Found 9 pages. Extracting text...
Successfully extracted text from ./data/test.pdf

Loaded 1 PDFs with column-aware extraction.

Extracted 1 PDF documents.


# 3 Load and Extract Text from CSV Files
Some data might come in structured tabular form (CSV).
We'll read them using pandas and convert each row into a text representation

In [14]:
def load_csvs(data_dir: str) -> List[Dict]:
    """
    Load and flatten CSV files into text chunks.
    Each row becomes a small text paragraph joined by commas.
    """
    csv_data = []

    for file in os.listdir(data_dir):
        if file.lower().endswith(".csv"):
            file_path = os.path.join(data_dir, file)
            print(f"Reading CSV: {file}")

            try:
                df = pd.read_csv(file_path)
                # Convert each row into a textual form (for embedding later)
                for _, row in df.iterrows():
                    row_text = ", ".join([f"{col}: {str(row[col])}" for col in df.columns])
                    csv_data.append({
                        "source": file,
                        "text": row_text,
                        "type": "csv"
                    })
            except Exception as e:
                print(f"Error reading {file}: {e}")

    return csv_data


# Run and preview the CSV loading function
csv_docs = load_csvs(DATA_DIR)
print(f"Loaded {len(csv_docs)} CSV row(s).")

Reading CSV: MOCK_DATA.csv
Loaded 1000 CSV row(s).


# 4 Combine and Clean Documents

Merge all documents (PDFs + CSVs) into one list.
We'll also do light cleaning — remove empty or very short texts.

In [15]:
all_docs = pdf_docs + csv_docs

# Filter out empty entries
all_docs = [doc for doc in all_docs if len(doc["text"].strip()) > 50]

print(f"Combined {len(all_docs)} documents after cleaning.")



Combined 1001 documents after cleaning.


# 4.1 Clean and Normalization of Extracted Text
In later steps issues were found with how pdfplumber extracts text and preserves some elements from the original document such as line breaks and hyphens. 
This next cell will normalize text before chunking in order to improve readability and retrieval quality.

In [16]:
import re

def clean_text_advanced(text: str) -> str:
    """
    Cleans and normalizes extracted text from PDFs/CSVs, with better spacing control.
    - Fixes missing spaces after punctuation or between words merged by PDF parsing.
    - Removes excessive newlines and spaces.
    - Keeps acronyms and numbers readable.
    """
    # Replace newlines and tabs with spaces
    text = re.sub(r'[\r\n\t]+', ' ', text)

    # Add a space between letters and numbers if merged (e.g., "5Gnetwork" -> "5G network")
    text = re.sub(r'(?<=[a-zA-Z])(?=\d)', ' ', text)
    text = re.sub(r'(?<=\d)(?=[a-zA-Z])', ' ', text)

    # Add missing space between lowercase-uppercase transitions (e.g., “inB5G” -> “in B5G”)
    text = re.sub(r'(?<=[a-z])(?=[A-Z])', ' ', text)

    # Fix hyphenated line breaks (e.g., "inter-\nnational" -> "international")
    text = re.sub(r'-\s+', '', text)

    # Collapse multiple spaces
    text = re.sub(r'\s{2,}', ' ', text)

    # Trim spaces at start and end
    text = text.strip()

    return text

# Apply improved cleaning to all documents
for doc in all_docs:
    doc["text"] = clean_text_advanced(doc["text"])

print(f"Applied advanced cleaning to {len(all_docs)} documents.")

Applied advanced cleaning to 1001 documents.


# 5 Chunk the Text

We split large documents into smaller pieces ("chunks").
This is critical for RAG systems since embeddings work best
on 200–1000 token segments.

In [17]:
# Notebook 01 — Improved Chunking (Recommended for RAG)
# Using newer langchain import path (v0.1.0+)
from langchain_text_splitters import RecursiveCharacterTextSplitter
import nltk
nltk.download("punkt")

def smart_chunk_text(text: str):
    """
    Improved chunking:
    - Sentence-aware
    - Recursive splits for edge cases
    - Multi-level separators
    - Maintains consistent ~800-token chunks with overlap
    """

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=800,      # target chunk length in characters (≈ tokens)
        chunk_overlap=150,
        separators=[
            "\n\n",
            "\n",
            ". ",
            "? ",
            "! ",
            "; ",
            ", ",
            " ",
            ""
        ]
    )

    return text_splitter.split_text(text)

# Apply to all documents (keeps original logic)
chunked_docs = []
for doc in all_docs:
    chunks = smart_chunk_text(doc["text"])
    for i, chunk in enumerate(chunks):
        chunked_docs.append({
            "id": f"{doc.get('id', i)}_chunk{i}",
            "source": doc["source"],
            "chunk": chunk
        })

print(f"Created {len(chunked_docs)} sentence-aware chunks from {len(all_docs)} documents.")


Created 1073 sentence-aware chunks from 1001 documents.


[nltk_data] Downloading package punkt to /home/diego/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


# 6 Save Chunks to JSONL File
We save the processed chunks so that future steps (embedding generation,
indexing, retrieval) can load them efficiently without re-processing.

In [18]:
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
    for record in chunked_docs:
        json.dump(record, f, ensure_ascii=False)
        f.write("\n")

print(f"Saved {len(chunked_docs)} chunks to {OUTPUT_FILE}")

Saved 1073 chunks to ./data/chunks.jsonl
