In [None]:
# ─── Standard library modules for filesystem paths, regex parsing, and numerical utilities ───
import re
from pathlib import Path
import numpy as np

# ─── Third-party libraries for PDF parsing and tabular data handling ───
import pdfplumber
import pandas as pd

In [None]:
def extract_all_tables_long(pdf_path: Path) -> pd.DataFrame:
    rows = []
    tbl_counter = 0

    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            for raw in page.extract_tables():
                tbl_counter += 1
                if not raw or len(raw) < 2:
                    continue

                # —————————————————————————————————————————————
                # 1) Encuentra el índice donde arrancan los datos:
                start_row = None
                for i in range(2, len(raw)):
                    first_cell = raw[i][0] or ""
                    if first_cell.strip():
                        start_row = i
                        break
                if start_row is None:
                    continue

                # 2) Header_rows = todas las filas antes de start_row
                header_rows = raw[:start_row]

                # 3) Combina esos N niveles de header
                num_cols = len(header_rows[0])
                headers = []
                for col in range(num_cols):
                    parts = []
                    for hr in header_rows:
                        txt = (hr[col] or "").replace("\n", " ").strip()
                        if txt:
                            parts.append(txt)
                    headers.append(" ".join(parts))

                # 4) DataFrame a partir de start_row
                data_rows = raw[start_row:]
                df = pd.DataFrame(data_rows, columns=headers)

                # —————————————————————————————————————————————
                # 5) Detecta columna de Currency (si existe)
                currency_cols = [c for c in headers if re.search(r'Currency', c, re.IGNORECASE)]
                if currency_cols:
                    cur_col  = currency_cols[0]
                    idx_cur  = headers.index(cur_col)
                    id_vars  = headers[:idx_cur+1]  # incluye row_label + currency
                    default_currency = None
                else:
                    cur_col         = None
                    id_vars         = [headers[0]]   # solo row_label
                    default_currency = "USD"

                # 6) Resto de columnas son valores
                value_vars = [c for c in headers if c not in id_vars]

                # 7) Melt
                long = df.melt(
                    id_vars=id_vars,
                    value_vars=value_vars,
                    var_name="column_header",
                    value_name="v_raw"
                )

                # —————————————————————————————————————————————
                # 8) Limpieza de v_raw → value
                raw_vals = long["v_raw"].astype(str).str.strip()

                # 8.1) Negativos en paréntesis
                is_neg = raw_vals.str.startswith("(") & raw_vals.str.endswith(")")

                # 8.2) Quita $, espacios y %
                base = raw_vals.str.replace(r"[\$\s\(\)%]", "", regex=True)

                # 8.3) Detecta si es % (columna tiene “%”)
                is_pct = long["column_header"].str.contains("%", case=False, regex=False)

                # 8.4) Comas:
                #     - en % → coma a punto
                #     - en moneda → quitar coma (miles)
                clean = pd.Series(index=base.index, dtype="string")
                clean.loc[is_pct]   = base[is_pct].str.replace(",", ".", regex=False)
                clean.loc[~is_pct]  = base[~is_pct].str.replace(",",  "", regex=False)

                # 8.5) A numérico (errores → NaN)
                nums = pd.to_numeric(clean, errors="coerce")

                # 8.6) Aplica signo negativo
                long["value"] = np.where(is_neg, -nums, nums)
                long.drop(columns="v_raw", inplace=True)

                # —————————————————————————————————————————————
                # 9) Renombra row_label
                long = long.rename(columns={ headers[0]: "row_label" })

                # 10) Asigna currency
                if cur_col:
                    long = long.rename(columns={ cur_col: "currency" })
                else:
                    long["currency"] = default_currency

                # 11) Metadatos
                long["table_name"]  = f"tbl{tbl_counter}_page{page.page_number}"
                long["page_number"] = page.page_number

                # 12) Selección y orden final
                rows.append(long[[
                    "table_name",
                    "row_label",
                    "column_header",
                    "value",
                    "currency",
                    "page_number"
                ]])

    return pd.concat(rows, ignore_index=True)

In [None]:
bronze_q1   = Path(r"D:\DataEngineer_TechnicalTest\Julian_Vallejo_DataEngTest\task1\bronze\2025_Q1")
PDF_PATH    = list(bronze_q1.glob("*.pdf"))[0]
OUTPUT_PARQ = Path("silver/q1_2025_tables.parquet")

df_all = extract_all_tables_long(PDF_PATH)
df_all.to_parquet(OUTPUT_PARQ, index=False)