In [1]:
import argparse
import os
import re
import uuid
from collections import Counter, defaultdict
from pathlib import Path


import faiss
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
from pdfminer.high_level import extract_text

  from .autonotebook import tqdm as notebook_tqdm


In [14]:
def extract_pages_text(pdf_path):
    """Return list of page texts (1-indexed pages -> index 0 = page 1)"""
    # pdfminer.extract_text supports page_numbers argument
    from pdfminer.pdfpage import PDFPage
    pages = []
    # Get total pages by iterating once
    with open(pdf_path, 'rb') as f:
        for _ in PDFPage.get_pages(f):
            pages.append("")  # initialize with empty string (never None)
    total = len(pages)
    for i in range(total):
        try:
            text = extract_text(pdf_path, page_numbers=[i])
            pages[i] = text or ""  # ensure we store a string, not None
        except Exception:
            # If extraction fails for a page (e.g. images-only), keep empty string
            pages[i] = ""
    return pages

In [15]:
def detect_repeated_header_footer(page_texts, head_lines=3, tail_lines=3, sample_pages=10):
    """Heuristic: look for lines that repeat across many pages in the first/last N lines.
    Returns (header_pattern, footer_pattern) regex strings (may be None).
    """
    sample = page_texts
    if len(page_texts) > sample_pages:
        # sample evenly
        idxs = np.linspace(0, len(page_texts) - 1, sample_pages, dtype=int)
        sample = [page_texts[i] for i in idxs]

    headers = []
    footers = []
    for p in sample:
        if not p:
            continue  # skip empty / None pages
        lines = [l.strip() for l in p.splitlines() if l.strip()]
        if not lines:
            continue
        headers.append("\n".join(lines[:head_lines]))
        footers.append("\n".join(lines[-tail_lines:]))

    def common_pattern(strings, threshold_ratio=0.4):
        if not strings:
            return None
        cnt = Counter(strings)
        common, freq = cnt.most_common(1)[0]
        if freq / len(strings) >= threshold_ratio:
            # escape regex special chars, but allow digits (page numbers) variability
            # replace runs of digits with \d+
            esc = re.escape(common)
            esc = re.sub(r'\\\d\+', r'\\d\+', esc)  # no-op if none
            esc = re.sub(r'\\\d{1,}', r'\\d+', esc)
            # also collapse variable whitespace
            esc = re.sub(r'\\\s\+', r'\\s+', esc)
            return esc
        return None

    header_pat = common_pattern(headers)
    footer_pat = common_pattern(footers)
    return header_pat, footer_pat


In [16]:
def clean_page_text(text, header_pat=None, footer_pat=None):
    if not text:
        return ""
    s = text
    # remove header
    if header_pat:
        try:
            s = re.sub(r'(?m)^' + header_pat + r'\s*\n?', '', s)
        except re.error:
            pass
    if footer_pat:
        try:
            s = re.sub(r'(?m)\n?\s*' + footer_pat + r'$','', s)
        except re.error:
            pass
    # remove multiple blank lines, normalize spaces
    s = re.sub(r'\r', '\n', s)
    s = re.sub(r'\n{3,}', '\n\n', s)
    s = re.sub(r'[ \t]{2,}', ' ', s)
    # trim lines
    s = '\n'.join([ln.strip() for ln in s.splitlines() if ln.strip()])
    return s


In [17]:
def simple_sentence_split(text):
    # naive sentence splitter
    if not text:
        return []
    # protect abbreviations (very naive)
    sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z0-9])', text)
    # fallback to line-based
    if len(sentences) == 1:
        sentences = [ln for ln in text.splitlines() if ln.strip()]
    return [s.strip() for s in sentences if s.strip()]


In [18]:
def chunk_text(text, max_chars=1000, overlap=200):
    sentences = simple_sentence_split(text)
    chunks = []
    cur = ""
    cur_len = 0
    start_idx = 0
    for sent in sentences:
        if cur_len + len(sent) + 1 <= max_chars:
            if cur:
                cur += ' ' + sent
            else:
                cur = sent
            cur_len = len(cur)
        else:
            chunks.append((cur, start_idx, start_idx + cur_len))
            # start new chunk with overlap
            # compute overlap in characters from end of cur
            overlap_text = cur[-overlap:] if overlap and overlap < len(cur) else cur
            cur = overlap_text + ' ' + sent
            start_idx = start_idx + cur_len - len(overlap_text)
            cur_len = len(cur)
    if cur:
        chunks.append((cur, start_idx, start_idx + cur_len))
    # give chunk ids and order
    return chunks

In [19]:
def process_row(row, max_chars=1000, overlap=200):
    pdf_path = Path(row['path'])
    pages = extract_pages_text(str(pdf_path))
    header_pat, footer_pat = detect_repeated_header_footer(pages)

    rows = []
    doc_id = str(uuid.uuid4())
    title_guess = row['nombre'] or pdf_path.stem
    chunk_counter = 0

    for i, p in enumerate(pages, start=1):
        cleaned = clean_page_text(p, header_pat, footer_pat)
        if i == 1 and not row['nombre']:
            first_line = cleaned.splitlines()[0] if cleaned.splitlines() else ''
            if len(first_line) > 10:
                title_guess = first_line[:200]

        chunks = chunk_text(cleaned, max_chars=max_chars, overlap=overlap)
        for text, start_char, end_char in chunks:
            chunk_id = f"{doc_id}_p{i}_c{chunk_counter}"
            rows.append({
                'chunk_id': chunk_id,
                'doc_id': doc_id,
                'title': title_guess,
                'page': i,
                'text': text,
                'start_char': int(start_char),
                'end_char': int(end_char),
                'url': row['url'],
                'fecha': row['fecha'],
                'vigencia': row['vigencia'],
            })
            chunk_counter += 1
    return rows

In [22]:
def process_csv(csv_path, output_dir, model_name='all-MiniLM-L6-v2',
                max_chars=1000, overlap=200, index_type='ip'):
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    df_meta = pd.read_csv(csv_path)
    all_rows = []
    for _, row in df_meta.iterrows():
        rows = process_row(row, max_chars=max_chars, overlap=overlap)
        all_rows.extend(rows)

    df = pd.DataFrame(all_rows)
    if df.empty:
        raise RuntimeError('No text extracted from PDFs')
    
    
    model = SentenceTransformer(model_name)
    texts = df['text'].tolist()
    embeddings = model.encode(texts, convert_to_numpy=True, show_progress_bar=True)

    dim = embeddings.shape[1]
    if index_type.lower() in ('ip', 'indexflatip'):
        index = faiss.IndexFlatIP(dim)
        faiss.normalize_L2(embeddings)
    else:
        index = faiss.IndexFlatL2(dim)
    index.add(embeddings)

    index_path = output_dir / 'index.faiss'
    faiss.write_index(index, str(index_path))

    chunks_path = output_dir / 'chunks.parquet'
    df.to_parquet(chunks_path, index=False)

    mapping = pd.DataFrame({
        'chunk_id': df['chunk_id'].tolist(),
        'position': list(range(len(df)))
    })
    mapping.to_parquet(output_dir / 'mapping.parquet', index=False)

    print(f"Saved FAISS index to: {index_path}")
    print(f"Saved chunks to: {chunks_path}")
    print(f"Saved mapping to: {output_dir / 'mapping.parquet'}")


In [23]:
csv_path = '../../data/sources.csv'
output_dir = '../../data/processed/'


res = process_csv(csv_path, output_dir)
print('Done:', res)

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`
Batches: 100%|██████████| 10/10 [00:07<00:00,  1.32it/s]


Saved FAISS index to: ..\..\data\processed\index.faiss
Saved chunks to: ..\..\data\processed\chunks.parquet
Saved mapping to: ..\..\data\processed\mapping.parquet
Done: None
