<div style="background:#634C44; color:white; padding:12px; box-sizing:border-box; border-radius:4px;">

</div>

# Text Preprocessing

> **Author:** Jason Cruz  
  **Last updated:** 11/16/2025  
  **Python version:** 3.12  
  **Project:** Fiscal Tone

---

## üìå Summary
Welcome to the **Fiscal Tone** Text Preprocessing notebook! This notebook will guide you through the **step-by-step process** of 


### What will this notebook help you achieve?
1. **Downloading PDFs** from the BCRP Weekly Reports (WR).
2. **Generating PDF inputs** by shortening them to focus on key pages containing GDP growth rate tables.
3. **Cleaning-up extracted data** to ensure it's usable and building RTD.
4. **Concatenating RTD** from different years and frequencies (monthly, quarterly, annual).
5. **Updating metadata** for storing base years changes and other revisions-based information.
6. **Converting RTD** to releases dataset for econometric analysis.

üåê **Main Data Source:** [BCRP Weekly Report](https://www.bcrp.gob.pe/publicaciones/nota-semanal.html) (üì∞ WR, from here on)  
For any questions or issues, feel free to reach out via email: [Jason üì®](mailto:jj.cruza@up.edu.pe)

---

### ‚öôÔ∏è Initial Set-up

Before preprocessing the new GDP releases data, we need to perform some initial set-up steps:

1. üß∞ **Import helper functions** from `gdp_rtd_pipeline.py` that are required for this notebook.
2. üõ¢Ô∏è **Connect to the PostgreSQL database** that will contain GDP revisions datasets. _(This step is pending: direct access will be provided via ODBC or other methods, allowing users to connect from any software or programming language.)_
3. üìÇ **Create necessary folders** to store inputs, outputs, logs, and screenshots.

In [None]:
#from cf_mef_functions.py import * 

> üöß Although the second step (database connection) is pending, the notebook currently works using **flat files (CSV)**. These CSV files will **not be saved in GitHub** as they are included in the `.gitignore` to ensure no data is stored publicly. Users can be confident that no data will be stored on GitHub. The notebook **automatically generates the CSV files**, giving users direct access to the dataset on their own systems. The data is created on the fly and can be saved locally for further use.

### üß∞ Import helper functions

This notebook relies on a set of helper functions found in the script `gdp_rtd_pipeline.py`. These functions will be used throughout the notebook, so please ensure you have them ready by running the line of code below.

In [None]:
from text_preprocessing_pipeline import *

> üõ†Ô∏è **Libraries:** Before you begin, please ensure that you have the required libraries installed and imported. See all the libraries you need section by section in `gdp_rtd_pipeline.py`.

In [None]:
#!pip install os # Comment this code with "#" if you have already installed this library.

**Check out Python information**

In [1]:
import sys
import platform

print("üêç Python Information")
print(f"  Version  : {sys.version.split()[0]}")
print(f"  Compiler : {platform.python_compiler()}")
print(f"  Build    : {platform.python_build()}")
print(f"  OS       : {platform.system()} {platform.release()}")

üêç Python Information
  Version  : 3.12.11
  Compiler : MSC v.1929 64 bit (AMD64)
  Build    : ('main', 'Jun  5 2025 12:58:53')
  OS       : Windows 10


### üìÇ Create necessary folders

We will start by creating the necessary folders to store the data at various stages of processing. The following code ensures all required directories exist, and if not, it creates them.

In [3]:
import os

In [4]:
from pathlib import Path  # Importing Path module from pathlib to handle file and directory paths in a cross-platform way.

# Get current working directory
PROJECT_ROOT = Path.cwd()  # Get the current working directory where the notebook is being executed.

# User input for folder location
user_input = input("Enter relative path (default='.'): ").strip() or "."  # Prompt user to input the folder path or use the default value "."
target_path = (PROJECT_ROOT / user_input).resolve()  # Combine the project root directory with user input to get the full target path.

# Create the necessary directories if they don't already exist
target_path.mkdir(parents=True, exist_ok=True)  # Creates the target folder and any necessary parent directories.
print(f"Using path: {target_path}")  # Print out the path being used for confirmation.

# Define paths for saving data and PDFs
data_folder = 'data'  # This folder will store the new Weekly Reports (post-2013), which are in PDF format.
raw_data_subfolder = os.path.join(data_folder, 'raw')  # Subfolder for saving the raw PDFs exactly as downloaded from the BCRP website.
input_data_subfolder = os.path.join(data_folder, 'input')  # Subfolder for saving reduced PDFs that contain only the selected pages with GDP growth tables.
output_data_subfolder = os.path.join(data_folder, 'output')

# Additional folders for metadata, records, and alert tracking
metadata_folder = 'metadata'  # Folder for storing metadata files like wr_metadata.csv.
record_folder = 'record'  # Folder for storing .txt files that track the files already processed to avoid reprocessing them.

# Create additional required folders
for folder in [data_folder, raw_data_subfolder, input_data_subfolder, output_data_subfolder, metadata_folder, record_folder]:
    os.makedirs(folder, exist_ok=True)  # Create the additional folders if they don't exist.
    print(f"üìÇ {folder} created")  # Print confirmation for each of these additional folders.


Enter relative path (default='.'):  .


Using path: C:\Users\Jason Cruz\OneDrive\Documentos\RA\CIUP\Policy Brief\GitHub\peruvian_policy_brief
üìÇ data created
üìÇ data\raw created
üìÇ data\input created
üìÇ data\output created
üìÇ metadata created
üìÇ record created


---

# 1. BOT

---

In [13]:
import os
import requests
import pandas as pd
from bs4 import BeautifulSoup
import time
from urllib.parse import urlparse, parse_qs, unquote
import re
from time import time as timer


# === üß† UTILITIES ===

def clean_filename(s):
    """
    Sanitize a string to make it safe for filenames:
    - Removes non-alphanumeric symbols (except underscores and hyphens)
    - Trims and replaces spaces with underscores

    Parameters:
        s (str): Original string.

    Returns:
        str: Safe filename string.
    """
    s = re.sub(r'[^\w\s-]', '', s)
    s = re.sub(r'\s+', '_', s.strip())
    return s[:100]  # Truncate to 100 characters max


# === üåê SCRAPE CF WEBSITE ===

def scrape_cf(url):
    """
    Helper function to scrape document metadata from the Consejo Fiscal (CF) website.

    Parameters:
        url (str): URL of the CF page containing document links.

    Returns:
        list: List of dictionaries containing metadata of documents.
    """
    t0 = timer()

    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')

    rows = soup.select('table.table tbody tr')
    data = []

    for row in rows:
        try:
            date = row.select_one('td.size100 p').text.strip()
            link_tag = row.select_one('td a')
            title = link_tag.text.strip()
            doc_url = link_tag['href']

            short_name = clean_filename(title) + ".pdf"

            data.append({
                'date': date,
                'title': title,
                'pdf_filename': short_name,
                'url': doc_url
            })
        except Exception as e:
            print("‚ùå Error processing row:", e)

    t1 = timer()
    print(f"‚åõ scrape_cf executed in {t1 - t0:.2f} seconds.")

    return data


# === üì• DOWNLOAD CF PDFs (MAIN SCRIPT) ===

def download_cf_pdfs(cf_urls, raw_pdf_folder, download_record_folder, download_record_txt):
    """
    Downloads PDF documents from the Consejo Fiscal website using metadata from scraping.
    Downloads both 'informes' and 'comunicados' at the same time and stores URLs in a unified CSV.

    Args:
        cf_urls (list): List of URLs for the reports and announcements sections.
        raw_pdf_folder (str): Folder to store downloaded PDFs.
        download_record_folder (str): Folder to store the record text file.
        download_record_txt (str): Text file to track the downloaded PDFs.
    """
    t0 = timer()

    # Ensure folders exist
    os.makedirs(raw_pdf_folder, exist_ok=True)
    os.makedirs(download_record_folder, exist_ok=True)

    # Initialize headers for requests
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
        'Accept': 'application/pdf',
        'Connection': 'keep-alive'
    }

    # Load the already downloaded records from the text file
    record_path = os.path.join(download_record_folder, download_record_txt)
    downloaded_files = set()
    if os.path.exists(record_path):
        with open(record_path, 'r', encoding='utf-8') as f:
            downloaded_files = set(f.read().splitlines())

    # Collect all document metadata from both 'informes' and 'comunicados'
    all_data = []
    for url in cf_urls:
        print(f"üåê Scraping documents from {url}")
        data = scrape_cf(url)
        all_data.extend(data)

    # Reverse the order to start from the oldest first (we assume the list is from newest to oldest)
    all_data.reverse()

    # Convert scraped data to DataFrame
    df = pd.DataFrame(all_data)

    # Process each document for downloading
    errors = []
    skipped_files = []  # List of skipped files
    new_counter = 0  # Count of new files downloaded
    for i, row in df.iterrows():
        # Check if file has already been downloaded
        if row['pdf_filename'] in downloaded_files:
            skipped_files.append(row['pdf_filename'])
            continue

        print(f"\n[{i + 1}/{len(df)}] Processing: {row['title']}")
        try:
            # Try to fetch the document
            response = requests.get(row['url'], headers=headers, timeout=15)
            soup = BeautifulSoup(response.text, 'html.parser')

            pdf_url = None

            # 1. Try <a> with .pdf
            pdf_link_tag = soup.find('a', href=lambda x: x and '.pdf' in x.lower())
            if pdf_link_tag:
                temp_pdf_url = pdf_link_tag['href']
                try:
                    head_response = requests.head(temp_pdf_url, headers=headers, allow_redirects=True, timeout=10)
                    if head_response.status_code == 200:
                        pdf_url = temp_pdf_url
                        print("üîó Found working <a> link.")
                    else:
                        print(f"‚ö†Ô∏è <a> link returned {head_response.status_code}. Trying <iframe>.")
                except Exception as e:
                    print(f"‚ö†Ô∏è Error validating <a> link: {e}. Trying <iframe>.")

            # 2. If <a> fails, try <iframe>
            if not pdf_url:
                iframe_tag = soup.find('iframe', src=lambda x: x and ('.pdf' in x.lower() or 'docs.google.com/viewer' in x.lower()))
                if iframe_tag:
                    iframe_src = iframe_tag['src']
                    if iframe_src.startswith('//'):
                        iframe_src = 'https:' + iframe_src

                    if 'docs.google.com/viewer' in iframe_src:
                        parsed = urlparse(iframe_src)
                        query = parse_qs(parsed.query)
                        if 'url' in query:
                            pdf_url = unquote(query['url'][0])
                            print("üîó Extracted PDF URL from Google Docs Viewer.")
                        else:
                            print("‚ö†Ô∏è Google Docs Viewer iframe found but no 'url' parameter.")
                    else:
                        pdf_url = iframe_src
                        print("üîó Found PDF in <iframe>.")

            # 3. Download the PDF (with retry logic)
            if pdf_url:
                df.at[i, 'final_pdf_url'] = pdf_url
                filename = row['pdf_filename']
                filepath = os.path.join(raw_pdf_folder, filename)

                # Retry download up to 3 times
                for attempt in range(3):  # Retry up to 3 times
                    try:
                        pdf_response = requests.get(pdf_url, headers=headers, timeout=20)
                        with open(filepath, 'wb') as f:
                            f.write(pdf_response.content)
                        print(f"‚úÖ Saved: {filename}")
                        downloaded_files.add(filename)  # Add to the set of downloaded files

                        # Immediately record the filename in the text file
                        with open(record_path, 'a', encoding='utf-8') as f:
                            f.write(f"{filename}\n")

                        new_counter += 1  # Increment newly downloaded counter
                        break
                    except Exception as e:
                        print(f"‚ö†Ô∏è Attempt {attempt+1}/3 failed to download {pdf_url}: {e}")
                        if attempt < 2:
                            time.sleep(2)
                        else:
                            raise e
            else:
                print(f"‚ö†Ô∏è PDF not found on page: {row['url']}")
                errors.append(row['url'])

                debug_path = f'debug_{i+1}.html'
                with open(debug_path, 'w', encoding='utf-8') as f:
                    f.write(soup.prettify())
                print(f"üìù Saved HTML for debugging: {debug_path}")

            time.sleep(1)  # polite pause between downloads

        except Exception as e:
            print(f"‚ùå Error processing {row['url']}: {e}")
            errors.append(row['url'])

    # === SAVE UPDATED CSV ===
    df.to_csv('cf_documents.csv', index=False)
    print(f"\nüìù Updated CSV saved with final_pdf_url column: cf_documents.csv")

    # === FINAL SUMMARY ===
    elapsed_time = round(time.time() - t0)  # Seconds elapsed
    total_links = len(df)  # Total links processed

    print("\nüìä Summary:")
    print(f"\nüîó Total links processed: {total_links}")
    if skipped_files:
        print(f"üóÇÔ∏è {len(skipped_files)} already downloaded PDFs were skipped.")
    print(f"‚ûï Newly downloaded: {new_counter}")
    print(f"‚è±Ô∏è {elapsed_time} seconds")

In [15]:
# URLs for both informes and comunicados
cf_urls = ['https://cf.gob.pe/p/informes/', 'https://cf.gob.pe/p/comunicados/']

# Specify folder paths
raw_pdf_folder = raw_data_subfolder  # Replace with actual folder
download_record_folder = record_folder  # Replace with actual folder
download_record_txt = 'downloaded_pdfs.txt'

# Run the PDF downloader
download_cf_pdfs(
    cf_urls=cf_urls,
    raw_pdf_folder=raw_pdf_folder,
    download_record_folder=download_record_folder,
    download_record_txt=download_record_txt
)

üåê Scraping documents from https://cf.gob.pe/p/informes/
‚åõ scrape_cf executed in 0.76 seconds.
üåê Scraping documents from https://cf.gob.pe/p/comunicados/
‚åõ scrape_cf executed in 0.79 seconds.

[28/79] Processing: Comunicado N¬∞ 06-2025-CF ‚Äì El Consejo Fiscal alerta sobre los crecientes riesgos por la proliferaci√≥n de leyes con impacto fiscal adverso e insta a revertir con urgencia esta situaci√≥n
üîó Found working <a> link.
‚úÖ Saved: Comunicado_N_06-2025-CF_El_Consejo_Fiscal_alerta_sobre_los_crecientes_riesgos_por_la_proliferaci√≥n_d.pdf

[71/79] Processing: Informe N¬∞ 01-2024-CF. Opini√≥n del Consejo Fiscal sobre el Informe de Actualizaci√≥n de Proyecciones Macroecon√≥micas 2024-2027
üîó Found working <a> link.
‚úÖ Saved: Informe_N_01-2024-CF_Opini√≥n_del_Consejo_Fiscal_sobre_el_Informe_de_Actualizaci√≥n_de_Proyecciones_Ma.pdf

[72/79] Processing: Informe N¬∞ 02-2024-CF ‚Äì Opini√≥n del Consejo Fiscal acerca de la evoluci√≥n de las finanzas p√∫blicas y del  cumplimient

# Delete A/R

In [None]:
# Ruta a la carpeta de reports
reports_folder = "pdf_reports"

# Lista de archivos a eliminar
pdfs_to_remove = [
    "Informe_N_001-2019-CF.pdf",
    "Informe_N_002-2018.pdf"
]


In [None]:
def remove_unwanted_pdfs(folder_path, filenames_to_remove):
    """
    Deletes specific unwanted PDF files from a given folder.

    Parameters:
    - folder_path: str, the directory containing the PDFs
    - filenames_to_remove: list of str, filenames to delete
    """
    t0 = timer()
    removed_count = 0

    print(f"üóëÔ∏è Starting cleanup in: {folder_path}")
    for filename in filenames_to_remove:
        full_path = os.path.join(folder_path, filename)
        if os.path.exists(full_path):
            os.remove(full_path)
            print(f"‚úÖ Deleted: {filename}")
            removed_count += 1
        else:
            print(f"‚ö†Ô∏è File not found: {filename}")

    t1 = timer()
    print(f"\nüßπ Cleanup complete. Total files removed: {removed_count}")
    print(f"‚è±Ô∏è Time taken: {t1 - t0:.2f} seconds")



In [None]:
remove_unwanted_pdfs(reports_folder, pdfs_to_remove)

# Clasificando entre scanned and editable PDFs

Please, compruebe que la carpeta "" contenga scaneados y que la carpeta "" ediatables. En caso de que no se haya clasificado correctamente los pdfs, agreguelos manualemnte. Esto es vital para los pr√≥ximos c√≥digos.

In [None]:
import os
import fitz  # PyMuPDF, used to extract text from PDFs
import shutil
from timeit import default_timer as timer

def is_editable_pdf(file_path, min_text_length=20):
    """
    Checks whether a PDF contains extractable text (i.e., is editable).

    Parameters:
    - file_path: str, path to the PDF file
    - min_text_length: int, minimum number of characters required to be considered editable

    Returns:
    - True if editable, False if likely scanned
    """
    try:
        with fitz.open(file_path) as doc:
            total_text = "".join(page.get_text() for page in doc)
        return len(total_text.strip()) >= min_text_length
    except Exception as e:
        print(f"‚ùå Error reading {file_path}: {e}")
        return False


def classify_pdfs_by_type(source_dirs, output_dir_scanned="scanned_pdf", output_dir_editable="editable_pdf"):
    """
    Classifies PDF files from given directories into 'editable' and 'scanned' folders.

    Parameters:
    - source_dirs: list of str, folders containing PDF files
    - output_dir_scanned: str, target directory for scanned PDFs
    - output_dir_editable: str, target directory for editable PDFs
    """
    t0 = timer()
    os.makedirs(output_dir_scanned, exist_ok=True)
    os.makedirs(output_dir_editable, exist_ok=True)

    total_files = 0
    scanned_count = 0
    editable_count = 0

    print("üîç Starting PDF classification...\n")

    for folder in source_dirs:
        for filename in os.listdir(folder):
            if filename.lower().endswith(".pdf"):
                total_files += 1
                pdf_path = os.path.join(folder, filename)

                if is_editable_pdf(pdf_path):
                    shutil.copy(pdf_path, os.path.join(output_dir_editable, filename))
                    editable_count += 1
                    print(f"üíª Editable: {filename}")
                else:
                    shutil.copy(pdf_path, os.path.join(output_dir_scanned, filename))
                    scanned_count += 1
                    print(f"üñ®Ô∏è Scanned: {filename}")

    t1 = timer()
    print("\n‚úÖ Classification complete!")
    print(f"üìÑ Total PDFs processed: {total_files}")
    print(f"üíª Editable PDFs: {editable_count}")
    print(f"üñ®Ô∏è Scanned PDFs: {scanned_count}")
    print(f"üìÅ Saved editable PDFs in: '{output_dir_editable}'")
    print(f"üìÅ Saved scanned PDFs in:  '{output_dir_scanned}'")
    print(f"‚è±Ô∏è Time taken: {t1 - t0:.2f} seconds")

In [None]:
source_folders = ["pdf_announcements", "pdf_reports"]
classify_pdfs_by_type(source_folders)

<div id="1-2"; style="background-color: #292929; padding: 10px; line-height: 1.5; font-family: 'PT Serif Pro Book';">
    <h2 style="text-align: left; color: #ff8575;">
        1.2 Metadata
    </h2>
</div>

# Concatenate


In [None]:
def combine_metadata(csv_reports="cf_reports.csv", csv_announcements="cf_announcements.csv", output_csv="cf_pdfs_metadata.csv"):
    """
    Combines metadata from two separate CSV files (reports and announcements) into one.

    Parameters:
    - csv_reports: str, path to the reports metadata CSV
    - csv_announcements: str, path to the announcements metadata CSV
    - output_csv: str, path for the combined output CSV

    Saves:
    - A single CSV file containing all metadata under the name specified in output_csv
    """
    t0 = timer()
    print("üì• Loading metadata...")

    df_reports = pd.read_csv(csv_reports)
    df_announcements = pd.read_csv(csv_announcements)

    df_combined = pd.concat([df_reports, df_announcements], ignore_index=True)
    df_combined.to_csv(output_csv, index=False)

    t1 = timer()
    print(f"‚úÖ Combined metadata saved as '{output_csv}'")
    print(f"üßæ Total records: {len(df_combined)}")
    print(f"‚è±Ô∏è Time taken: {t1 - t0:.2f} seconds")


In [None]:
combine_metadata()

<div id="1-3"; style="background-color: #292929; padding: 10px; line-height: 1.5; font-family: 'PT Serif Pro Book';">
    <h2 style="text-align: left; color: #ff8575;">
        1.3 Split into paragraphs
    </h2>
</div>

# Editable PDFs

In [None]:
def process_editable_pdfs(folder_path, metadata_csv_path="cf_pdfs_metadata.csv", output_csv="raw_editable_pdfs_dataset.csv"):
    """
    Processes editable PDF documents, extracting structured paragraphs and saving to CSV.

    Parameters:
    - folder_path: str, path to the folder containing editable PDFs
    - metadata_csv_path: str, path to the CSV file with document metadata
    - output_csv: str, output CSV filename to save extracted paragraphs

    Returns:
    - DataFrame with extracted structured text and metadata
    """
    t0 = timer()
    print("üì• Loading metadata...")
    metadata_df = pd.read_csv(metadata_csv_path)

    # --- Extract doc_type, doc_id, year from title ---
    def extract_doc_info(row):
        title = row["title"]
        match = re.search(r"\b(Informe|Comunicado)\b(?:\s+CF)?(?:\s+(?:N[¬∞¬∫o]|No))?\s*(\d{2,4})", title, re.IGNORECASE)
        doc_type = match.group(1).capitalize() if match else None
        doc_id = match.group(2) if match and match.lastindex >= 2 else None
        year_match = re.search(r"\b(\d{4})\b", str(row.get("date", "")))
        year = year_match.group(1) if year_match else None
        return pd.Series({"doc_type": doc_type, "doc_id": doc_id, "year": year})

    metadata_df[["doc_type", "doc_id", "year"]] = metadata_df.apply(extract_doc_info, axis=1)

    print("üß† Metadata enriched. Starting paragraph extraction...\n")
    all_records = []
    filenames = [f for f in os.listdir(folder_path) if f.lower().endswith(".pdf")]
    total_files = len(filenames)

    for idx, filename in enumerate(sorted(filenames), start=1):
        file_path = os.path.join(folder_path, filename)

        try:
            print(f"üìÑ Processing: {idx}. {filename}")
            with pdfplumber.open(file_path) as pdf:
                paragraph_counter = 1
                anexo_found = False

                for page_num, page in enumerate(pdf.pages, start=1):
                    if anexo_found:
                        break

                    # Extract words with their size and vertical position
                    words = page.extract_words(extra_attrs=["size", "top"])
                    FONT_MIN = 11.0
                    FONT_MAX = 11.9
                    clean_words = [w for w in words if FONT_MIN <= w["size"] <= FONT_MAX]
                    if not clean_words:
                        continue

                    # Group words by their vertical position to form lines
                    lines_dict = {}
                    for word in clean_words:
                        line_top = round(word["top"], 1)
                        lines_dict.setdefault(line_top, []).append(word["text"])

                    lines = [
                        " ".join(words).strip()
                        for _, words in sorted(lines_dict.items())
                        if words
                    ]

                    if not lines:
                        continue

                    page_text = "\n".join(lines)

                    # üö´ Stop extraction at "Anexo"
                    match = re.search(r"(?mi)^ *Anexos?\b[\s\w]*:?", page_text)
                    if match:
                        page_text = page_text[:match.start()].strip()
                        anexo_found = True
                        print(f"üõë 'Anexo' detected on page {page_num}. Truncating content.")

                    # Paragraph segmentation
                    lines = page_text.strip().split("\n")
                    lines = [line.strip() for line in lines if line.strip()]
                    paragraph_lines = []

                    for i, line in enumerate(lines):
                        is_new_paragraph = (
                            line.startswith("‚Ä¢")
                            or line.startswith("‚û¢")
                            or (i > 0 and lines[i - 1].strip().endswith("."))
                            or (i > 0 and len(lines[i - 1].split()) <= 3)
                        )

                        if is_new_paragraph:
                            if paragraph_lines:
                                all_records.append({
                                    "filename": filename,
                                    "page": page_num,
                                    "paragraph_id": paragraph_counter,
                                    "text": " ".join(paragraph_lines).strip()
                                })
                                paragraph_counter += 1
                            paragraph_lines = [line]
                        else:
                            paragraph_lines.append(line)

                    if paragraph_lines:
                        all_records.append({
                            "filename": filename,
                            "page": page_num,
                            "paragraph_id": paragraph_counter,
                            "text": " ".join(paragraph_lines).strip()
                        })
                        paragraph_counter += 1

        except Exception as e:
            print(f"‚ùå Error processing {filename}: {e}")

    df = pd.DataFrame(all_records)
    if df.empty:
        print("‚ö†Ô∏è No text extracted.")
        return df

    # Merge with metadata
    df = df.merge(
        metadata_df[["pdf_filename", "title", "doc_type", "doc_id", "year", "date"]],
        left_on="filename", right_on="pdf_filename", how="left"
    )

    df = df[["title", "doc_type", "doc_id", "year", "date", "page", "paragraph_id", "text"]]
    df.to_csv(output_csv, index=False)

    t1 = timer()
    print(f"\n‚úÖ Extraction complete. Total paragraphs: {len(df)}")
    print(f"üíæ Saved to: {output_csv}")
    print(f"‚è±Ô∏è Time taken: {t1 - t0:.2f} seconds")
    return df

In [None]:
df_editable = process_editable_pdfs("editable_pdf")

In [None]:
df_editable

# Scanned PDFs

"""
Procesa PDFs escaneados usando OCR para extraer p√°rrafos por p√°gina y metadatos.

Args:
    folder_path (str): Ruta a la carpeta con archivos PDF escaneados.
    dpi (int): Resoluci√≥n al convertir PDF a imagen.
    lang (str): Idioma para el OCR ('spa' para espa√±ol, 'eng' para ingl√©s, etc.).

Returns:
    pd.DataFrame: DataFrame con columnas:
        ['filename', 'year', 'date', 'announcement', 'page', 'paragraph_id', 'text']
"""    

Puede tardar un poco m√°s. Se excluye pies de pagina con detecci√≥n visual de lineas que marcan el inicio. Se p√°ginas de anexos.

In [None]:
# === OCR Utilities ===

def detect_cut_line_y(image, min_length_ratio=0.2, y_range=(0.5, 0.85), debug_path=None):
    """
    Detects a horizontal line that likely marks the beginning of the footer in a scanned PDF page.
    
    Parameters:
        image (PIL.Image): Page image to analyze.
        min_length_ratio (float): Minimum line length relative to image width.
        y_range (tuple): Vertical range (as a proportion of image height) where footer lines are expected.
        debug_path (str): Optional path to save debug image with the detected line.
    
    Returns:
        int: Y-coordinate to crop the image above the footer line, or image height if no line is found.
    """
    gray = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2GRAY)
    _, thresh = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY_INV)

    lines = cv2.HoughLinesP(
        thresh, 1, np.pi / 180, threshold=100,
        minLineLength=int(image.width * min_length_ratio), maxLineGap=5
    )

    if lines is not None:
        height = image.height
        min_y, max_y = int(height * y_range[0]), int(height * y_range[1])

        horizontal_lines = [
            (x1, y1, x2, y2) for x1, y1, x2, y2 in lines[:, 0]
            if abs(y1 - y2) <= 3 and min_y <= y1 <= max_y
        ]

        if horizontal_lines:
            best_line = min(horizontal_lines, key=lambda l: l[1])
            if debug_path:
                img_dbg = image.copy()
                dbg_np = np.array(img_dbg)
                cv2.line(dbg_np, (best_line[0], best_line[1]), (best_line[2], best_line[3]), (0, 0, 255), 2)
                cv2.imwrite(debug_path, cv2.cvtColor(dbg_np, cv2.COLOR_RGB2BGR))
            return best_line[1]

    return image.height  # Default: no footer line detected


# OCR

# Major function

In [None]:
# === OCR Utilities ===

def detect_cut_line_y(image, min_length_ratio=0.17, y_range=(0.55, 0.90), debug_path=None):
    """
    Detects a horizontal line likely indicating the beginning of the footer in scanned PDFs.

    Args:
        image (PIL.Image): Page image.
        min_length_ratio (float): Minimum length of line relative to image width.
        y_range (tuple): Vertical range to search (proportional to height).
        debug_path (str): Optional file path to save a debug image with detected line.

    Returns:
        int: Y-coordinate of the detected line, or image height if none found.
    """
    gray = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2GRAY)
    _, thresh = cv2.threshold(gray, 160, 255, cv2.THRESH_BINARY_INV)

    lines = cv2.HoughLinesP(
        thresh, 1, np.pi / 180, threshold=80,
        minLineLength=int(image.width * min_length_ratio), maxLineGap=5
    )

    if lines is not None:
        height = image.height
        min_y, max_y = int(height * y_range[0]), int(height * y_range[1])

        horizontal_lines = [
            (x1, y1, x2, y2) for x1, y1, x2, y2 in lines[:, 0]
            if abs(y1 - y2) <= 5 and min_y <= y1 <= max_y
        ]

        if horizontal_lines:
            best_line = min(horizontal_lines, key=lambda l: l[1])
            if debug_path:
                img_dbg = image.copy()
                dbg_np = np.array(img_dbg)
                cv2.line(dbg_np, (best_line[0], best_line[1]), (best_line[2], best_line[3]), (0, 0, 255), 2)
                cv2.imwrite(debug_path, cv2.cvtColor(dbg_np, cv2.COLOR_RGB2BGR))
            return best_line[1]

    return image.height  # No line detected ‚Üí return full height


In [None]:
# === OCR Processing for Scanned PDFs ===

def process_scanned_pdfs(folder_path, metadata_csv_path="cf_pdfs_metadata.csv", dpi=300, lang='spa', debug=True):
    """
    Extracts paragraphs from scanned PDFs using OCR, excluding footers and annexes.

    Args:
        folder_path (str): Folder with scanned PDFs.
        metadata_csv_path (str): Path to metadata CSV file.
        dpi (int): Resolution used to convert PDFs to images.
        lang (str): OCR language code.
        debug (bool): Whether to save debug images with detected lines.

    Returns:
        pd.DataFrame: Paragraph-level extracted data.
    """
    print("üì• Loading metadata...")
    metadata_df = pd.read_csv(metadata_csv_path)

    def extract_doc_info(row):
        title = row.get("title", "")
        match = re.search(r"\b(Informe|Comunicado)\b(?:\s+CF)?(?:\s+(?:N[¬∞¬∫o]|No))?\s*(\d{2,4})", title, re.IGNORECASE)
        doc_type = match.group(1).capitalize() if match else None
        doc_id = match.group(2) if match and match.lastindex >= 2 else None
        year_match = re.search(r"\b(\d{4})\b", str(row.get("date", "")))
        year = year_match.group(1) if year_match else None
        return pd.Series({"doc_type": doc_type, "doc_id": doc_id, "year": year})

    metadata_df[["doc_type", "doc_id", "year"]] = metadata_df.apply(extract_doc_info, axis=1)
    print("üß† Metadata enriched. Starting OCR paragraph extraction...\n")

    all_records = []
    filenames = [f for f in os.listdir(folder_path) if f.lower().endswith(".pdf")]
    total_files = len(filenames)

    for idx, filename in enumerate(sorted(filenames), start=1):
        file_path = os.path.join(folder_path, filename)
        print(f"üñ®Ô∏è OCR Processing ({idx}/{total_files}): {filename}")
        try:
            images = convert_from_path(file_path, dpi=dpi)
            paragraph_counter = 1
            anexo_found = False

            for page_num, image in enumerate(images, start=1):
                if anexo_found:
                    break

                debug_path = None
                if debug:
                    os.makedirs("debug_lines", exist_ok=True)
                    debug_path = f"debug_lines/{filename}_page_{page_num}.png"

                cut_y = detect_cut_line_y(image, debug_path=debug_path)
                cropped_img = image.crop((0, 0, image.width, cut_y))

                page_text = pytesseract.image_to_string(cropped_img, lang=lang)

                if not page_text.strip():
                    continue

                # Stop at 'Anexo'
                match = re.search(r"(?mi)^ *Anexos?\b[\s\w]*:?", page_text)
                if match:
                    page_text = page_text[:match.start()].strip()
                    anexo_found = True
                    print(f"üõë 'Anexo' detected on page {page_num}. Truncating content.")

                lines = [line.strip() for line in page_text.split("\n") if line.strip()]
                paragraph_lines = []

                for i, line in enumerate(lines):
                    is_new_paragraph = (
                        line.startswith("‚Ä¢") or line.startswith("‚û¢") or
                        (i > 0 and lines[i - 1].strip().endswith(".")) or
                        (i > 0 and len(lines[i - 1].split()) <= 3)
                    )
                    if is_new_paragraph:
                        if paragraph_lines:
                            all_records.append({
                                "filename": filename,
                                "page": page_num,
                                "paragraph_id": paragraph_counter,
                                "text": " ".join(paragraph_lines).strip()
                            })
                            paragraph_counter += 1
                        paragraph_lines = [line]
                    else:
                        paragraph_lines.append(line)

                if paragraph_lines:
                    all_records.append({
                        "filename": filename,
                        "page": page_num,
                        "paragraph_id": paragraph_counter,
                        "text": " ".join(paragraph_lines).strip()
                    })
                    paragraph_counter += 1

        except Exception as e:
            print(f"‚ùå Error processing {filename}: {e}")

    df = pd.DataFrame(all_records)
    if df.empty:
        print("‚ö†Ô∏è No text extracted.")
        return df

    df = df.merge(
        metadata_df[["pdf_filename", "title", "doc_type", "doc_id", "year", "date"]],
        left_on="filename", right_on="pdf_filename", how="left"
    )
    df = df[["title", "doc_type", "doc_id", "year", "date", "page", "paragraph_id", "text"]]
    print(f"\n‚úÖ OCR extraction complete. Total paragraphs: {len(df)}")
    return df

In [None]:
# === Run and Save ===
df_scanned = process_scanned_pdfs(
    folder_path="scanned_pdf",
    metadata_csv_path="cf_pdfs_metadata.csv",
    dpi=400,
    lang='spa',
    debug=True
)
df_scanned.to_csv("raw_scanned_pdfs_dataset.csv", index=False)
print("üíæ Saved as 'raw_scanned_pdfs_dataset.csv'")

In [None]:
df_scanned

# NUEVA UTILIDAD

In [None]:
import numpy as np
import cv2
import pandas as pd
from PIL import Image
from sklearn.cluster import DBSCAN

def filter_text_body_dynamic(
    image,
    size_threshold_percentile=40,
    merge_dist=20,
    expand_margin=10,
    debug_path=None,
    audit_csv_path=None
):
    """
    Filtra regiones de texto fuera del cuerpo principal del documento mediante detecci√≥n din√°mica.

    Args:
        image (PIL.Image): Imagen de entrada.
        size_threshold_percentile (int): Percentil m√≠nimo de altura para considerar caja v√°lida.
        merge_dist (int): Distancia m√°xima entre cajas para considerar agrupaci√≥n.
        expand_margin (int): Expansi√≥n del marco del cuerpo principal.
        debug_path (str): Ruta para guardar imagen de depuraci√≥n.
        audit_csv_path (str): Ruta para guardar CSV con info de cajas.

    Returns:
        PIL.Image: Imagen filtrada.
    """
    img_rgb = np.array(image.convert("RGB"))
    gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
    _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)

    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    boxes = [cv2.boundingRect(c) for c in contours]
    if not boxes:
        return image

    heights = [h for (_, _, _, h) in boxes]
    height_threshold = np.percentile(heights, size_threshold_percentile)

    filtered_boxes = [(x, y, w, h) for (x, y, w, h) in boxes if h >= height_threshold]
    if not filtered_boxes:
        return image

    centers = np.array([[x + w//2, y + h//2] for (x, y, w, h) in filtered_boxes])
    clustering = DBSCAN(eps=merge_dist, min_samples=3).fit(centers)
    labels = clustering.labels_

    largest_label = pd.Series(labels).value_counts().idxmax()
    main_group_boxes = [box for i, box in enumerate(filtered_boxes) if labels[i] == largest_label]

    x_min = max(0, min(x for x, _, _, _ in main_group_boxes) - expand_margin)
    x_max = max(x + w for x, _, w, _ in main_group_boxes) + expand_margin
    y_min = max(0, min(y for _, y, _, _ in main_group_boxes) - expand_margin)
    y_max = max(y + h for _, y, _, h in main_group_boxes) + expand_margin

    mask = np.zeros_like(gray)
    accepted, rejected = [], []

    for x, y, w, h in boxes:
        cx, cy = x + w // 2, y + h // 2
        if x_min <= cx <= x_max and y_min <= cy <= y_max:
            accepted.append((x, y, w, h))
            cv2.rectangle(mask, (x, y), (x + w, y + h), 255, -1)
        else:
            rejected.append((x, y, w, h))

    result = cv2.bitwise_and(img_rgb, img_rgb, mask=mask)
    filtered_image = Image.fromarray(result)

    if debug_path:
        debug_img = img_rgb.copy()
        for x, y, w, h in accepted:
            cv2.rectangle(debug_img, (x, y), (x + w, y + h), (0, 255, 0), 2)
        for x, y, w, h in rejected:
            cv2.rectangle(debug_img, (x, y), (x + w, y + h), (0, 0, 255), 1)
        cv2.rectangle(debug_img, (x_min, y_min), (x_max, y_max), (255, 0, 255), 2)
        cv2.imwrite(debug_path, cv2.cvtColor(debug_img, cv2.COLOR_RGB2BGR))

    if audit_csv_path:
        records = []
        for x, y, w, h in accepted:
            records.append({"x": x, "y": y, "w": w, "h": h, "status": "accepted"})
        for x, y, w, h in rejected:
            records.append({"x": x, "y": y, "w": w, "h": h, "status": "rejected"})
        pd.DataFrame(records).to_csv(audit_csv_path, index=False)

    return filtered_image


In [None]:
import os
import re
import pytesseract
import pandas as pd
from pdf2image import convert_from_path
from PIL import Image

def process_scanned_pdfs(folder_path, metadata_csv_path="cf_pdfs_metadata.csv", dpi=300, lang='spa', debug=True):
    print("üì• Loading metadata...")
    metadata_df = pd.read_csv(metadata_csv_path)

    def extract_doc_info(row):
        title = row.get("title", "")
        match = re.search(r"\b(Informe|Comunicado)\b(?:\s+CF)?(?:\s+(?:N[¬∞¬∫o]|No))?\s*(\d{2,4})", title, re.IGNORECASE)
        doc_type = match.group(1).capitalize() if match else None
        doc_id = match.group(2) if match and match.lastindex >= 2 else None
        year_match = re.search(r"\b(\d{4})\b", str(row.get("date", "")))
        year = year_match.group(1) if year_match else None
        return pd.Series({"doc_type": doc_type, "doc_id": doc_id, "year": year})

    metadata_df[["doc_type", "doc_id", "year"]] = metadata_df.apply(extract_doc_info, axis=1)
    print("üß† Metadata enriched. Starting OCR paragraph extraction...\n")

    all_records = []
    filenames = [f for f in os.listdir(folder_path) if f.lower().endswith(".pdf")]
    total_files = len(filenames)

    for idx, filename in enumerate(sorted(filenames), start=1):
        file_path = os.path.join(folder_path, filename)
        print(f"üñ®Ô∏è OCR Processing ({idx}/{total_files}): {filename}")
        try:
            images = convert_from_path(file_path, dpi=dpi)
            paragraph_counter = 1
            anexo_found = False

            for page_num, image in enumerate(images, start=1):
                if anexo_found:
                    break

                os.makedirs("debug_lines", exist_ok=True)
                debug_path = f"debug_lines/{filename}_page_{page_num}.png" if debug else None
                audit_csv_path = f"debug_lines/{filename}_page_{page_num}_boxes.csv" if debug else None

                # üß† NUEVA funci√≥n din√°mica
                filtered_img = filter_text_body_dynamic(
                    image,
                    size_threshold_percentile=40,
                    merge_dist=20,
                    expand_margin=10,
                    debug_path=debug_path,
                    audit_csv_path=audit_csv_path
                )

                page_text = pytesseract.image_to_string(filtered_img, lang=lang)
                if not page_text.strip():
                    continue

                # üõë Stop at Anexos
                match = re.search(r"(?mi)^ *Anexos?\b[\s\w]*:?", page_text)
                if match:
                    page_text = page_text[:match.start()].strip()
                    anexo_found = True
                    print(f"üõë 'Anexo' detected on page {page_num}. Truncating content.")

                lines = [line.strip() for line in page_text.split("\n") if line.strip()]
                paragraph_lines = []

                for i, line in enumerate(lines):
                    is_new_paragraph = (
                        line.startswith("‚Ä¢") or line.startswith("‚û¢") or
                        (i > 0 and lines[i - 1].strip().endswith(".")) or
                        (i > 0 and len(lines[i - 1].split()) <= 3)
                    )
                    if is_new_paragraph:
                        if paragraph_lines:
                            all_records.append({
                                "filename": filename,
                                "page": page_num,
                                "paragraph_id": paragraph_counter,
                                "text": " ".join(paragraph_lines).strip()
                            })
                            paragraph_counter += 1
                        paragraph_lines = [line]
                    else:
                        paragraph_lines.append(line)

                if paragraph_lines:
                    all_records.append({
                        "filename": filename,
                        "page": page_num,
                        "paragraph_id": paragraph_counter,
                        "text": " ".join(paragraph_lines).strip()
                    })
                    paragraph_counter += 1

        except Exception as e:
            print(f"‚ùå Error processing {filename}: {e}")

    df = pd.DataFrame(all_records)
    if df.empty:
        print("‚ö†Ô∏è No text extracted.")
        return df

    df = df.merge(
        metadata_df[["pdf_filename", "title", "doc_type", "doc_id", "year", "date"]],
        left_on="filename", right_on="pdf_filename", how="left"
    )
    df = df[["title", "doc_type", "doc_id", "year", "date", "page", "paragraph_id", "text"]]
    print(f"\n‚úÖ OCR extraction complete. Total paragraphs: {len(df)}")
    return df


In [None]:
df_scanned = process_scanned_pdfs(
    folder_path="sc",  # Cambia por tu carpeta de PDFs
    metadata_csv_path="cf_pdfs_metadata.csv",
    dpi=400,
    lang='spa',
    debug=True
)
df_scanned.to_csv("raw_scanned_pdfs_dataset.csv", index=False)
print("üíæ Saved as 'raw_scanned_pdfs_dataset.csv'")


<div style="font-family: PT Serif Pro Book; text-align: left; color: dark; font-size: 16px;">
    <span style="font-size: 30px; color: #033280; font-weight: bold;">
        <a href="#outilne" style="color: #033280; text-decoration: none;">&#11180;</a>
    </span> 
    <a href="#outilne" style="color: #cd301b; text-decoration: none;">Back to the outline.</a>
</div>

# Concatenate

In [None]:
import pandas as pd

# Concatenar los dos dataframes
text_df = pd.concat([df_editable, df_scanned], ignore_index=True)

# Ordenar por la columna 'date'
text_df = text_df.sort_values(by='date')

# Si deseas resetear los √≠ndices despu√©s de la concatenaci√≥n
text_df = text_df.reset_index(drop=True)

In [None]:
# Exportar a CSV con el mismo nombre
text_df.to_csv("raw_text_dataset.csv", index=False, encoding="utf-8")

In [None]:
text_df

In [None]:
text_df.count()

In [None]:
# Filtrar las filas donde "text" empieza con una letra min√∫scula
text_df[text_df["text"].str.match(r"^[a-z]")]

<div id="1-4"; style="background-color: #292929; padding: 10px; line-height: 1.5; font-family: 'PT Serif Pro Book';">
    <h2 style="text-align: left; color: #ff8575;">
        1.4 Noise reduction
    </h2>
</div>

# BOTH


# Testing on scanned 

In [None]:
df_copy_editable_1 = df_editable.copy()

In [None]:
df_copy_editable_1

In [None]:
df_copy_scanned_1 = df_scanned.copy()

In [None]:
df_copy_scanned_1

In [None]:
import re
import unicodedata

In [None]:
import pandas as pd
import re

def clean_noise(df, filter_keywords=True):
    df_clean = df.copy()

    # === COMPILAR PATRONES DE DIRECCIONES A ELIMINAR ===
    pattern_direccion_completa = re.compile(
        r"(?:Av\.|Jr\.|Calle)\s+[\w\s√Å√â√ç√ì√ö√°√©√≠√≥√∫]+\d{1,5}\s+‚Äî\s+Oficina\s+\d{1,4}\s+‚Äî\s+[\w\s√Å√â√ç√ì√ö√°√©√≠√≥√∫]+.*",
        flags=re.IGNORECASE
    )
    
    # === PATR√ìN DE DIRECCI√ìN ESCANEADA O CORRUPTA (completo o incrustado) ===
    pattern_direccion_ruido = re.compile(
        r"""
        (?:
            \b(?:Av|Jr|Calle|Psj|Prolongaci√≥n)\.?\s+            # V√≠a
            [\w√Å√â√ç√ì√ö√°√©√≠√≥√∫√ë√±\s\.\-]{3,30}                        # Nombre de calle
            \s+\d{3,6}                                          # N√∫mero
            (?:\s+[^\s]{3,15})?                                 # C√≥digo corrupto: "OfIKi981", etc.
            \s*[-‚Äì‚Äî]?\s*
            (?:San\s+Isidro|Miraflores|Magdalena|Surco|Jesus\s+Mar√≠a|Lince|Barranco)?
            (?:\s+e\s+)?                                        # Posible "e"
            (?:\.pe|cf\.gob\.pe)?                               # Dominio
        )
        """,
        flags=re.IGNORECASE | re.VERBOSE
    )

    # === CONVERTIR A UNICODE SEGURO ===
    def to_unicode(x):
        try:
            if isinstance(x, bytes):
                return x.decode("utf-8", errors="ignore").strip()
            return str(x).strip()
        except:
            return ""

    df_clean["text"] = df_clean["text"].apply(to_unicode)
    
    # === ELIMINAR FILAS CON DIRECCIONES REPETITIVAS (ANTES DE LIMPIEZAS) ===
    df_clean = df_clean[~df_clean["text"].str.contains(pattern_direccion_completa)].reset_index(drop=True)
    
    # === LIMPIAR O ELIMINAR DIRECCIONES ESCANEADAS ===
    def limpiar_o_eliminar_direccion(text):
        if re.fullmatch(pattern_direccion_ruido, text.strip()):
            return None  # Eliminar p√°rrafo completo
        return re.sub(pattern_direccion_ruido, "", text)  # Reemplazar si est√° embebido

    df_clean["text"] = df_clean["text"].apply(limpiar_o_eliminar_direccion)
    df_clean = df_clean[df_clean["text"].notnull() & df_clean["text"].str.strip().astype(bool)].reset_index(drop=True)

    # === LIMPIEZAS B√ÅSICAS VECTORIAL ===
    df_clean["text"] = df_clean["text"].str.replace(r"\b\d{1,3}/\d{1,3}\b", "", regex=True)
    df_clean["text"] = df_clean["text"].str.replace(r"\s+([,;:.!?])", r"\1", regex=True)

    # === LIMPIEZA DE URLS (normales y corruptas) ===
    df_clean["text"] = df_clean["text"].str.replace(
        r"(https?://|htps://|htttp://|htpss://|www\.|wwww\.)[\w\-\.]*\.[a-z]{2,6}(\/[\w\-\.%]*)*", 
        "", regex=True
    )
    df_clean["text"] = df_clean["text"].str.replace(
        r"\b(htps?|htttp|htpss):\/\/[^\s]*", "", regex=True
    )

    df_clean["text"] = df_clean["text"].str.replace(r"[‚Ä¢‚û¢√ò*ÔÄ™¬∞¬°!?¬ø\"]", "", regex=True)
    df_clean["text"] = df_clean["text"].str.replace(r":\s*$", ".", regex=True)
    df_clean["text"] = df_clean["text"].str.replace(
        r"Lima[,]?\s+\d{1,2}\s+de\s+[a-z√°√©√≠√≥√∫]+\s+de\s+\d{4}", "", regex=True, flags=re.IGNORECASE)
    df_clean["text"] = df_clean["text"].str.replace("p.p.", "puntos porcentuales")
    df_clean["text"] = df_clean["text"].str.replace(
        r'^\s*((?:[ivxlcdm]+|[a-zA-Z]|\d+)[\.\)]\s*)+', '', regex=True, flags=re.IGNORECASE)
    df_clean["text"] = df_clean["text"].str.replace(
        r"\b(?:a|al|de|del|con|por|para|y|o|en|sin|sobre|ante|tras|entre|hacia|hasta|durante|mediante|excepto|salvo|seg√∫n)\.$",
        lambda m: m.group(0)[:-1], regex=True)
    
    # === REEMPLAZAR EXPRESIONES CORRUPTAS ===
    pattern_minist = re.compile(r"MINIST\s+ERIO[\w%-¬∫|\"‚Äù]+|8N%IC%A\s+Y\s+FIN\s+em.*?Usuaro", flags=re.IGNORECASE)
    pattern_garbage_symbols = re.compile(r"[^\w\s√Å√â√ç√ì√ö√°√©√≠√≥√∫√ë√±]{5,}")
    
    # Patr√≥n para capturar bloques muy corruptos (inicio o intermedios)
    pattern_noise_block = re.compile(
        r"(?:[‚Äî\[\]_=\]\|%#@*<>~¬´¬ª]{2,}|[\w]*‚Äî[\w]*|‚Äî\s*[A-Z]{1}\s*‚Äî)+", 
        flags=re.UNICODE
    )
    
    df_clean["text"] = df_clean["text"].str.replace(pattern_minist, "", regex=True)
    df_clean["text"] = df_clean["text"].str.replace(pattern_garbage_symbols, "", regex=True)
    
    # Aplica reemplazo a esos bloques basura
    df_clean["text"] = df_clean["text"].str.replace(pattern_noise_block, "", regex=True)
    
    # Patr√≥n para frases de cortes√≠a y cierre
    pattern_cortesia = re.compile(
        r"(aprovecho\s+la\s+oportunidad\s+para\s+expresar(le)?\s*(a usted)?\s*las?\s*muestras?\s+de\s+mi\s+especial\s+consideraci√≥n( y estima)?[\.]?)|"  # Variante directa
        r"(le\s+reitero\s+(las?\s+)?muestras?\s+de\s+(mi\s+)?especial\s+consideraci√≥n[\.]?)|"  # Otra variante frecuente
        r"(hago\s+propicia\s+la\s+ocasi√≥n\s+para\s+saludar(lo|la)?\s*(muy)?\s*atentamente[\.]?)|"  # Variante "hago propicia..."
        r"(sin\s+otro\s+particular.*?me\s+despido[^\n\.]*[\.]?)|"  # Frase de cierre cl√°sica
        r"(con\s+(mi\s+)?especial\s+consideraci√≥n[\.]?)",  # Frase final t√≠pica
        flags=re.IGNORECASE
    )

    # Aplicar limpieza al texto
    df_clean["text"] = df_clean["text"].str.replace(pattern_cortesia, "", regex=True)

    # === FILTROS DE TEXTO V√ÅLIDO ===
    def is_valid(text):
        txt = text.strip()
        letters = [c for c in txt if c.isalpha()]
        if letters and sum(c.isupper() for c in letters) / len(letters) > 0.7:
            return False
        if txt.lower().startswith("fuente:") or re.search(r"\b(Fuente:|Elaboraci√≥n:)", txt, flags=re.IGNORECASE):
            return False
        if re.search(r"(?:\b\w\s){3,}\w", txt):
            return False
        patterns = [
            r'PCA\s*(Inicial|1er\s*Trim\.|2do\s*Trim\.|3er\s*Trim\.)',
            r'entre\s+\d{4}\sy\s+\d{4}\s*,?\s*\d+\s*de\s+cada\s+100\s*(leyes?|insistencia|implicancia\s*fiscal)',
            r'‚Äú[^‚Äù]*\(p√°g\.\s*\d+\)[^‚Äù]*‚Äù|\(p√°g\.\s*\d+\)',
            r"V√©ase informes\s+(N¬∞\s*\d{2}-\d{4}-CF[, y]*){2,}",  # puedes mantenerla si gustas
            r"\b(V√©ase|Ver|Consultar|Rem√≠tase|Rev√≠sese)\s+(el\s+)?(informe|art√≠culo|documento|reporte|an√°lisis|dictamen|comunicado\s+oficial|nota)\b.*?(N[¬∞¬∫]?\s*\d{1,3})?.*?\b(del\s+)?\d{4}",
            r"¬¢\s*US?|US?\s*¬¢", r"¬¢"
        ]
        if any(re.search(p, txt, flags=re.IGNORECASE) for p in patterns):
            return False
        if len(txt.split()) < 6:
            return False
        return True

    df_clean = df_clean[df_clean["text"].apply(is_valid)].reset_index(drop=True)

    # === FUSI√ìN DE P√ÅRRAFOS CONSECUTIVOS ===
    rows = []
    i = 0
    while i < len(df_clean):
        current_row = df_clean.iloc[i].copy()
        current_text = current_row["text"]
        current_title = current_row["title"]

        while i + 1 < len(df_clean):
            next_row = df_clean.iloc[i + 1]
            next_text = next_row["text"]
            next_title = next_row["title"]

            if next_title != current_title:
                break

            if not current_text.endswith(".") and next_text and next_text[0].isalpha() and next_text.endswith("."):
                current_text += " " + next_text
                i += 1
                continue

            if not current_text.endswith(".") and re.match(r"^\d{4}", next_text):
                current_text += " " + next_text
                i += 1
                continue

            if re.match(r"^[\(,;:\s]*[a-z\(]", next_text):
                current_text += " " + next_text
                i += 1
                continue

            if not current_text.endswith(".") and re.match(r"^\d{1,3}(?:,\d+)?", next_text):
                current_text += " " + next_text
                i += 1
                continue

            break

        current_row["text"] = current_text
        rows.append(current_row)
        i += 1

    df_result = pd.DataFrame(rows).reset_index(drop=True)
    df_result["paragraph_id"] = df_result.groupby("title").cumcount() + 1

    # === LIMPIEZAS ADICIONALES ===
    cleaned_rows = []
    for idx, row in df_result.iterrows():
        text = row["text"].strip()
        starts_with_special = re.match(r'^[*%\"\'\‚Äú\\\[\]\{\}\(\)\^\-+=<>~#@|]', text)
        starts_with_number = re.match(r"^\d+\s+", text)
        prev_text = df_result.iloc[idx - 1]["text"].strip() if idx > 0 else ""
        prev_ends_with_dot = prev_text.endswith(".")

        if starts_with_special:
            continue
        if starts_with_number and prev_ends_with_dot:
            continue

        cleaned_rows.append(row)

    df_result = pd.DataFrame(cleaned_rows).reset_index(drop=True)

    # Eliminar n√∫meros al inicio del texto aunque est√©n pegados
    df_result["text"] = df_result["text"].str.replace(r"^\d+\s*", "", regex=True)
    df_result["text"] = df_result["text"].str.replace(r"^\d+(?=\w)", "", regex=True)
    
    # Eliminar observaciones con menos de 75 caracteres
    df_result = df_result[df_result["text"].str.len() >= 75].reset_index(drop=True)

    # === FILTRO OPCIONAL CON KEYWORDS DE ALERTA FISCAL ===
    if filter_keywords:
        keywords = [
            "Incumplimiento de reglas fiscales", "Preocupaci√≥n", "Advertencia", "Alerta",
            "Riesgos fiscales", "Desv√≠o del d√©ficit fiscal", "No cumplimiento", "Desviaciones significativas",
            "Margen significativo", "Problema de credibilidad fiscal", "Credibilidad fiscal",
            "Sostenibilidad fiscal", "Consolidaci√≥n fiscal", "Medidas correctivas", "Recomendaci√≥n",
            "Necesidad de tomar medidas", "Control del gasto p√∫blico", "Presiones fiscales",
            "Exceso de optimismo en proyecciones", "Exceso de gasto", "Aumento de gasto",
            "Reducci√≥n de d√©ficit fiscal", "Incremento de ingresos permanentes",
            "Falta de compromiso con la responsabilidad fiscal", "Medidas de consolidaci√≥n",
            "Deficiencia en la ejecuci√≥n del gasto", "Aumento de la deuda p√∫blica",
            "Iniciativas legislativas que afectan las finanzas p√∫blicas", "Incremento del gasto p√∫blico",
            "Beneficios tributarios sin justificaci√≥n", "Tratamientos tributarios preferenciales",
            "Erosi√≥n de la base tributaria", "Elusi√≥n y evasi√≥n tributaria",
            "Aumento de gastos no previstos", "Aumento de gastos extraordinarios",
            "Aumento de gastos en remuneraciones", "Crecimiento del gasto no financiero",
            "Problema de sostenibilidad", "Riesgos de sostenibilidad fiscal", "Aumento de deuda neta",
            "Desajuste fiscal", "Falta de transparencia en el gasto", "Riesgos de sobreendeudamiento",
            "Excepciones a las reglas fiscales", "Riesgo de incumplimiento de metas fiscales",
            "Aumento de los compromisos de deuda", "Riesgo de insolvencia",
            "Falta de flexibilidad fiscal", "Desajuste entre el presupuesto y el MMM",
            "Riesgo de incumplimiento debido a presiones de gasto", "Erosi√≥n de la capacidad recaudatoria",
            "Incremento de la deuda p√∫blica", "Falta de control de gastos extraordinarios",
            "Necesidad de ajustar el gasto", "Inestabilidad macroecon√≥mica",
            "Problemas fiscales derivados de iniciativas legislativas",
            "Riesgo de desajustes fiscales por reformas",
            "Falta de capacidad de generar ingresos fiscales", "Riesgo de gasto excesivo",
            "Incremento del gasto p√∫blico no controlado", "Medidas de ajuste fiscal",
            "Inestabilidad presupuestaria", "Riesgo de inestabilidad fiscal",
            "Falta de sostenibilidad de la deuda", "Compromiso con la disciplina fiscal",
            "Necesidad de mejorar la disciplina fiscal", "Riesgos derivados de la crisis financiera",
            "Emergencia fiscal", "No cumplimiento de los l√≠mites de deuda",
            "Riesgo de presi√≥n sobre las finanzas p√∫blicas", "Riesgos de sostenibilidad a largo plazo",
            "Inconsistencia en las proyecciones fiscales", "Proyecciones fiscales no realistas",
            "Implicaciones fiscales de la situaci√≥n de Petroper√∫",
            "Desajuste en las proyecciones fiscales", "Necesidad de consolidaci√≥n fiscal",
            "Riesgos de desequilibrio fiscal", "Amenaza a la estabilidad fiscal",
            "Inseguridad fiscal", "Inconsistencias fiscales", "Falta de previsi√≥n en el gasto",
            "Riesgo de p√©rdida de control fiscal", "Impacto fiscal no anticipado",
            "Presi√≥n de gastos adicionales", "Aumento en la presi√≥n fiscal",
            "Erosi√≥n de las finanzas p√∫blicas", "Riesgo de d√©ficit fiscal no controlado",
            "Aumento de la carga fiscal", "Riesgo de crisis fiscal",
            "Propuestas legislativas que generan gasto",
            "Propuestas que limitan la recaudaci√≥n fiscal",
            "Iniciativas fiscales con implicaciones negativas",
            "Aumento de los gastos sociales no previstos",
            "Riesgo de incumplimiento de los l√≠mites fiscales",
            "Propuestas legislativas que no cumplen con las reglas fiscales",
            "Desviaciones fiscales no justificadas",
            "Proyecciones de d√©ficit fiscal no alcanzables",
            "Riesgos derivados de iniciativas legislativas excesivas",
            "Crecimiento de la deuda p√∫blica sin control",
            "Necesidad de pol√≠ticas fiscales m√°s estrictas"
        ]
        keywords_lower = [k.lower() for k in keywords]
        df_result = df_result[
            df_result["text"].str.lower().apply(lambda txt: any(k in txt for k in keywords_lower))
        ].reset_index(drop=True)

    return df_result


In [None]:
#df_alertas = clean_noise(df, filter_keywords=True)   # Solo alertas fiscales
df_editable_cleaned = clean_noise(df_copy_editable_1, filter_keywords=False) # Todo texto √∫til

In [None]:
df_editable_cleaned

In [None]:
df_copy_ed_2 = df_editable_cleaned[df_editable_cleaned["text"].str.len() <= 75].reset_index(drop=True)
df_copy_ed_2

# Scanned

### The same abve


In [None]:
df_scanned_cleaned = clean_noise(df_copy_scanned_1, filter_keywords=False)

In [None]:
df_scanned_cleaned

In [None]:
df_copy_sc_2 = df_scanned_cleaned[df_scanned_cleaned["text"].str.len() <= 75].reset_index(drop=True)
df_copy_sc_2

# Limpieza global

In [None]:
raw_text_dataset = pd.read_csv("raw_text_dataset.csv")

In [None]:
raw_text_dataset

In [None]:
input_text_dataset = clean_noise(raw_text_dataset, filter_keywords=True)

In [None]:
input_text_dataset.count()

In [None]:
# Exportar a CSV con el mismo nombre
input_text_dataset.to_csv("input_text_dataset_true.csv", index=False, encoding="utf-8")

# Limpieza para estad√≠sticos

In [None]:
# === Carga del DataFrame (si no est√° cargado) ===
input_text_dataset = pd.read_csv("input_text_dataset_true.csv")

## Order by year and date

In [None]:
filtered_df = input_text_dataset[input_text_dataset["doc_type"].isna()]
filtered_df

In [None]:
def sort_by_year_and_date(df):
    """
    Convierte la columna 'date' a datetime (si es necesario) y ordena el DataFrame por 'year' y 'date'.
    
    Par√°metros:
        df (pd.DataFrame): DataFrame que contiene las columnas 'year' y 'date'.
    
    Retorna:
        pd.DataFrame: El DataFrame ordenado correctamente por 'year' y 'date'.
    """
    required_cols = {'year', 'date'}
    if not required_cols.issubset(df.columns):
        raise ValueError("El DataFrame debe contener las columnas 'year' y 'date'.")

    # Convertir 'date' a datetime si no lo es ya
    if not pd.api.types.is_datetime64_any_dtype(df["date"]):
        df["date"] = pd.to_datetime(df["date"], dayfirst=True, errors='coerce')

    return df.sort_values(by=["year", "date", "page", "paragraph_id"]).reset_index(drop=True)


In [None]:
input_text_dataset = sort_by_year_and_date(input_text_dataset)
input_text_dataset

In [None]:
import pandas as pd

def fill_doc_type_from_title(df):
    """
    Completa los valores vac√≠os en la columna 'doc_type' a partir del contenido de la columna 'title'.
    Solo extrae 'Comunicado' o 'Informe' si est√°n al inicio del t√≠tulo.
    
    Par√°metros:
        df (pd.DataFrame): DataFrame con las columnas 'title' y 'doc_type'.
    
    Retorna:
        pd.DataFrame: El DataFrame con 'doc_type' actualizado donde era nulo.
    """
    # Verificar que las columnas requeridas existan
    required_cols = {'title', 'doc_type'}
    if not required_cols.issubset(df.columns):
        raise ValueError("El DataFrame debe contener las columnas 'title' y 'doc_type'.")

    # Crear m√°scara de filas donde doc_type est√° vac√≠o
    mask = df["doc_type"].isna()

    # Expresi√≥n regular para capturar solo "Comunicado" o "Informe" al inicio del t√≠tulo
    regex = r"^(Comunicado|Informe)\b"

    # Extraer y llenar solo en filas vac√≠as
    df.loc[mask, "doc_type"] = df.loc[mask, "title"].str.extract(regex)[0]

    return df


In [None]:
input_text_dataset = fill_doc_type_from_title(input_text_dataset)
input_text_dataset

In [None]:
filtered_df = input_text_dataset[input_text_dataset["doc_id"].isna()]
filtered_df

In [None]:
def fill_missing_doc_id(input_text_dataset):
    """
    Llena los valores vac√≠os en 'doc_id' con n√∫meros flotantes secuenciales por a√±o, 
    basados en el orden de 'date', empezando en 1.0.

    Par√°metros:
        input_text_dataset (pd.DataFrame): DataFrame que debe contener las columnas 'doc_id', 'year', y 'date'.

    Retorna:
        pd.DataFrame: El DataFrame con 'doc_id' actualizado donde era nulo.
    """
    # Verificar que las columnas necesarias existan
    required_cols = {'doc_id', 'year', 'date'}
    if not required_cols.issubset(input_text_dataset.columns):
        raise ValueError("El DataFrame debe contener las columnas 'doc_id', 'year' y 'date'.")

    # Ordenar por year y date
    df = input_text_dataset.sort_values(by=["year", "date"]).copy()

    # Crear m√°scara de valores vac√≠os en doc_id
    mask = df["doc_id"].isna()

    # Para cada a√±o, generar un contador incremental para fechas √∫nicas
    def assign_ids(group):
        # Solo para filas con doc_id nulo
        missing = group["doc_id"].isna()
        # Contador secuencial por fecha
        group.loc[missing, "doc_id"] = (
            group.loc[missing]
                 .groupby("date", sort=False)
                 .ngroup() + 1
        ).astype(float)
        return group

    df = df.groupby("year", group_keys=False).apply(assign_ids)

    return df


In [None]:
input_text_dataset = fill_missing_doc_id(input_text_dataset)
input_text_dataset

# TO csv

In [None]:
# Exportar a CSV con el mismo nombre
input_text_dataset.to_csv("llm_input_text_dataset_true.csv", index=False, encoding="utf-8")

## Estad√≠sticos de p√°rrafos

In [None]:
import pandas as pd

# === Carga del DataFrame (si no est√° cargado) ===
# input_text_dataset = pd.read_csv("input_text_dataset.csv")

# === Total de documentos √∫nicos, por tipo ===
total_docs = input_text_dataset[['title', 'doc_id', 'date']].apply(tuple, axis=1).nunique()
docs_por_tipo = input_text_dataset.drop_duplicates('title')['doc_type'].value_counts()

In [None]:
total_docs

In [None]:
docs_por_tipo

In [None]:
# === A√±os con m√°s y menos opiniones ===
opiniones_por_a√±o = input_text_dataset.groupby('year')['doc_id'].nunique()
a√±o_mas_opiniones = opiniones_por_a√±o.idxmax()
a√±o_menos_opiniones = opiniones_por_a√±o.idxmin()

In [None]:
opiniones_por_a√±o

In [None]:
a√±o_mas_opiniones

In [None]:
a√±o_menos_opiniones

In [None]:
# === Documento con m√°s/menos p√°rrafos ===
parrafos_por_doc = input_text_dataset.groupby('title')['paragraph_id'].nunique()
doc_mas_parrafos = parrafos_por_doc.idxmax()
doc_menos_parrafos = parrafos_por_doc.idxmin()

In [None]:
doc_mas_parrafos

In [None]:
doc_menos_parrafos

In [None]:
parrafo_mas_pequeno = input_text_dataset.loc[input_text_dataset["text_length"].idxmin()]
parrafo_mas_grande = input_text_dataset.loc[input_text_dataset["text_length"].idxmax()]
tamano_promedio_parrafo = input_text_dataset["text_length"].mean()

In [None]:
parrafo_mas_pequeno

In [None]:
parrafo_mas_grande

In [None]:
tamano_promedio_parrafo

# palabras


In [None]:
# === Conteo de palabras por p√°rrafo ===
input_text_dataset["word_count"] = input_text_dataset["text"].str.split().str.len()

# P√°rrafo con menor cantidad de palabras
parrafo_mas_pequeno = input_text_dataset.loc[input_text_dataset["word_count"].idxmin()]

# P√°rrafo con mayor cantidad de palabras
parrafo_mas_grande = input_text_dataset.loc[input_text_dataset["word_count"].idxmax()]

# Tama√±o promedio de p√°rrafo en n√∫mero de palabras
tamano_promedio_parrafo = input_text_dataset["word_count"].mean()


In [None]:
parrafo_mas_pequeno

In [None]:
parrafo_mas_grande

In [None]:
tamano_promedio_parrafo

<div style="font-family: PT Serif Pro Book; text-align: left; color: dark; font-size: 16px;">
    <span style="font-size: 30px; color: #033280; font-weight: bold;">
        <a href="#outilne" style="color: #033280; text-decoration: none;">&#11180;</a>
    </span> 
    <a href="#outilne" style="color: #cd301b; text-decoration: none;">Back to the outline.</a>
</div>