# 02 - RAW to CLEAN (DuckDB) Â· OpenBDAP Saldi storici

Questo notebook:
- legge il CSV RAW salvato in `data/raw/<PROJECT>/<RUN_ID>/..._raw.csv`
- crea una tabella `clean` con rename a `snake_case` e cast coerenti
- applica una policy esplicita per null e parsing numerico
- esegue validazioni minime sul dataset CLEAN
- esporta parquet e metadata in `data/clean/<PROJECT>/<RUN_ID>/`

Policy di parsing:
- `""`, `" "`, `"n.d."`, `"nd"`, `"N.D."`, `"null"` -> `NULL`
- `-` e mantenuto come segno meno valido, non come null
- `.` e trattato come separatore decimale canonico
- `,` viene convertito a `.` solo se presente
- valori con `%` vengono convertiti dividendo per 100
- nessun ricalcolo o modifica semantica dei valori economici


In [None]:
from __future__ import annotations

from pathlib import Path
from datetime import datetime, timezone
import json
import hashlib
import os
import re
import unicodedata

import duckdb

PROJECT = "openbdap_rendiconto_saldi_storico"
DATASET_SLUG = "rendiconto_pubblicato_serie_storica_saldi"
RAW_RUN_ID = None
CLEAN_RUN_ID = None
DELIM = ";"
ENCODING = "utf-8"
MIN_WARN_ROWS = 10

SPECIAL_NULLS = {"", " ", "n.d.", "nd", "N.D.", "null", "NULL"}

SEMANTIC_MAP = {
    "ANNO": "esercizio_finanziario",
    "RISPARMIO_PUBBLICO": "risparmio_pubblico",
    "SALDO_NETTO": "saldo_netto_da_finanziare",
    "INDEBITAMENTO_NETTO": "indebitamento_netto",
    "RICORSO_MERCATO": "ricorso_al_mercato",
    "AVANZO_PRIMARIO": "avanzo_primario",
    "SPESE_CORRENTI": "spese_correnti",
    "SPESE_INTERESSI": "spese_per_interessi",
    "SPESE_CONTO_CAPITALE": "spese_in_conto_capitale",
    "SPESE_ACQ_ATT_FINE": "spese_acquisizione_attivita_finanziarie",
    "SPESE_RIMBORSO_PRESTITI": "spese_per_rimborso_prestiti",
    "SPESE_COMPLESSIVE": "spese_complessive",
    "SPESE_FINALI": "spese_finali",
    "SPESE_FIN_NETTO_ATT_FIN": "spese_finali_netto_att_fin",
    "ENTRATE_TRIBUTARIE": "entrate_tributarie",
    "ENTRATE_EXTRA_TRIBUTARIE": "entrate_extra_tributarie",
    "ENTR_ALIEN_PATR_RISCOS": "entrate_alienazioni_patrimoniali_e_riscossioni",
    "RISCOSSIONE_CREDITI": "riscossione_crediti",
    "ENTR_ACCENSIONE_PRESTITI": "entrate_accensione_prestiti",
    "ENTRATE_FINALI": "entrate_finali",
    "ENTR_FIN_NETTO_RISCO_CRED": "entrate_fin_netto_riscossione_crediti",
    "ENTRATE_CORRENTI": "entrate_correnti",
}

DESCRIPTIONS = {
    "esercizio_finanziario": "Anno di esercizio finanziario.",
    "risparmio_pubblico": "Saldo di risparmio pubblico.",
    "saldo_netto_da_finanziare": "Saldo netto da finanziare.",
    "indebitamento_netto": "Indebitamento netto.",
    "ricorso_al_mercato": "Ricorso al mercato.",
    "avanzo_primario": "Avanzo primario.",
}

def maybe_mount_drive() -> None:
    if "google.colab" not in str(get_ipython().__class__):
        return
    from google.colab import drive
    drive.mount('/content/drive')

def discover_root() -> Path:
    env_root = os.getenv("DCL_ROOT")
    if env_root:
        return Path(env_root)
    cwd = Path.cwd().resolve()
    for candidate in [cwd, *cwd.parents]:
        if (candidate / 'data').exists() and (candidate / 'notebooks').exists():
            return candidate
    if Path('/content/drive/MyDrive/DataCivicLab').exists():
        return Path('/content/drive/MyDrive/DataCivicLab')
    return cwd

def latest_run_dir(root: Path) -> Path:
    run_dirs = sorted([p for p in root.iterdir() if p.is_dir()], key=lambda p: p.name)
    if not run_dirs:
        raise FileNotFoundError(f'No run dirs in: {root}')
    return run_dirs[-1]

def sha256_file(path: Path) -> str:
    h = hashlib.sha256()
    with open(path, 'rb') as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b''):
            h.update(chunk)
    return h.hexdigest()

def to_snake(value: str) -> str:
    value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
    value = value.strip().lower()
    value = re.sub(r'[^\w]+', '_', value)
    value = re.sub(r'_+', '_', value).strip('_')
    if not value:
        value = 'col'
    if value[0].isdigit():
        value = 'c_' + value
    return value

maybe_mount_drive()

ROOT = discover_root()
RAW_ROOT = ROOT / 'data' / 'raw' / PROJECT
CLEAN_ROOT = ROOT / 'data' / 'clean' / PROJECT

raw_run_dir = latest_run_dir(RAW_ROOT) if RAW_RUN_ID is None else (RAW_ROOT / RAW_RUN_ID)
RAW_RUN_ID = raw_run_dir.name
raw_csv = raw_run_dir / f'{DATASET_SLUG}_raw.csv'
if not raw_csv.exists():
    cands = sorted(raw_run_dir.glob('*_raw.csv'))
    if not cands:
        raise FileNotFoundError(f'No *_raw.csv in {raw_run_dir}')
    raw_csv = cands[0]

CLEAN_RUN_ID = RAW_RUN_ID if CLEAN_RUN_ID is None else CLEAN_RUN_ID
CLEAN_DIR = CLEAN_ROOT / CLEAN_RUN_ID
CLEAN_DIR.mkdir(parents=True, exist_ok=True)

OUT_PARQUET = CLEAN_DIR / 'saldi_storico.parquet'
OUT_MAPPING = CLEAN_DIR / 'columns_mapping_raw_to_clean.json'
OUT_PROFILE = CLEAN_DIR / 'profile_clean.json'
OUT_VALIDATE = CLEAN_DIR / 'validate_clean.json'
OUT_DICT = CLEAN_DIR / 'data_dictionary.json'
OUT_MANIFEST = CLEAN_DIR / 'clean_manifest.json'

print('ROOT:', ROOT)
print('RAW_RUN_ID:', RAW_RUN_ID)
print('raw_csv:', raw_csv)
print('CLEAN_DIR:', CLEAN_DIR)


In [None]:
# --- DUCKDB: load raw + parse policy ---
con = duckdb.connect()

con.execute(f"""
CREATE OR REPLACE TABLE raw AS
SELECT * FROM read_csv(
  '{raw_csv.as_posix()}',
  delim='{DELIM}',
  header=true,
  all_varchar=true,
  encoding='{ENCODING}'
);
""")

null_tokens = ', '.join(["'" + token.replace("'", "''") + "'" for token in sorted(SPECIAL_NULLS)])

con.execute(f"""
CREATE OR REPLACE MACRO clean_token(x) AS (
  CASE
    WHEN x IS NULL THEN NULL
    WHEN TRIM(CAST(x AS VARCHAR)) IN ({null_tokens}) THEN NULL
    ELSE TRIM(CAST(x AS VARCHAR))
  END
);
""")

con.execute(r"""
CREATE OR REPLACE MACRO parse_num(x) AS (
  TRY_CAST(
    CASE
      WHEN clean_token(x) IS NULL THEN NULL
      WHEN RIGHT(clean_token(x), 1) = '%' THEN REPLACE(LEFT(clean_token(x), LENGTH(clean_token(x)) - 1), ',', '.')
      ELSE REPLACE(clean_token(x), ',', '.')
    END
  AS DOUBLE
  ) / CASE WHEN clean_token(x) IS NOT NULL AND RIGHT(clean_token(x), 1) = '%' THEN 100 ELSE 1 END
);
""")

raw_cols = [r[1] for r in con.execute("PRAGMA table_info('raw')").fetchall()]
YEAR_COL = next((c for c in raw_cols if c.strip().upper() in {'ANNO', 'ESERCIZIO', 'ESERCIZIO_FINANZIARIO'}), None)
NUMERIC_COLS = set(SEMANTIC_MAP.keys()) - {'ANNO'}

used = set()
final_map = {}
select_exprs = []

for c in raw_cols:
    new = SEMANTIC_MAP.get(c, to_snake(c))
    if new in used:
        i = 2
        while f'{new}_{i}' in used:
            i += 1
        new = f'{new}_{i}'
    used.add(new)
    final_map[c] = new

    if YEAR_COL and c == YEAR_COL:
        expr = f'TRY_CAST(clean_token("{c}") AS INTEGER) AS "{new}"'
    elif c in NUMERIC_COLS:
        expr = f'parse_num("{c}") AS "{new}"'
    else:
        expr = f'clean_token("{c}") AS "{new}"'
    select_exprs.append(expr)

clean_sql = 'CREATE OR REPLACE TABLE clean AS\nSELECT\n  ' + ',\n  '.join(select_exprs) + '\nFROM raw;'
con.execute(clean_sql)
con.execute(f"COPY clean TO '{OUT_PARQUET.as_posix()}' (FORMAT PARQUET);")
con.execute("SELECT * FROM clean ORDER BY 1 LIMIT 5").df()


In [None]:
# --- validations + profile + metadata ---
n_rows = con.execute("SELECT COUNT(*) FROM clean").fetchone()[0]
schema_rows = con.execute("PRAGMA table_info('clean')").fetchall()
cols = [r[1] for r in schema_rows]
types = {r[1]: r[2] for r in schema_rows}

null_exprs = ', '.join([f'SUM(CASE WHEN "{c}" IS NULL THEN 1 ELSE 0 END) AS "{c}"' for c in cols])
nulls_row = con.execute(f'SELECT {null_exprs} FROM clean').fetchone()
nulls = dict(zip(cols, map(int, nulls_row)))

required_cols = ['esercizio_finanziario']
for candidate in ['saldo_netto_da_finanziare', 'indebitamento_netto', 'avanzo_primario']:
    if candidate in cols:
        required_cols.append(candidate)

missing_required = [c for c in required_cols if c not in cols]
duplicate_key_rows = None
year_min = None
year_max = None
if 'esercizio_finanziario' in cols:
    duplicate_key_rows = con.execute("""
        SELECT COALESCE(SUM(cnt - 1), 0)
        FROM (
          SELECT esercizio_finanziario, COUNT(*) AS cnt
          FROM clean
          GROUP BY 1
          HAVING COUNT(*) > 1
        ) t
    """).fetchone()[0]
    year_min, year_max = con.execute(
        'SELECT MIN(esercizio_finanziario), MAX(esercizio_finanziario) FROM clean WHERE esercizio_finanziario IS NOT NULL'
    ).fetchone()

errors = []
warnings = []
checks = []

checks.append({'name': 'row_count_ge_1', 'ok': n_rows >= 1, 'value': int(n_rows)})
if n_rows < 1:
    errors.append('Dataset clean vuoto.')
if n_rows < MIN_WARN_ROWS:
    warnings.append(f'Row count basso: {n_rows} righe, attese preferibilmente >= {MIN_WARN_ROWS}.')

checks.append({'name': 'required_columns_present', 'ok': len(missing_required) == 0, 'missing': missing_required})
if missing_required:
    errors.append('Colonne obbligatorie mancanti: ' + ', '.join(missing_required))

if duplicate_key_rows is not None:
    checks.append({'name': 'unique_esercizio_finanziario', 'ok': duplicate_key_rows == 0, 'duplicate_key_rows': int(duplicate_key_rows)})
    if duplicate_key_rows != 0:
        errors.append(f'Duplicati sulla chiave esercizio_finanziario: {duplicate_key_rows}.')
    checks.append({'name': 'year_bounds_present', 'ok': year_min is not None and year_max is not None, 'year_min': year_min, 'year_max': year_max})
else:
    checks.append({'name': 'unique_esercizio_finanziario', 'ok': False, 'duplicate_key_rows': None})
    warnings.append('Colonna esercizio_finanziario assente: impossibile validare unicita e range anni.')

validate_clean = {
    'ok': len(errors) == 0,
    'checks': checks,
    'errors': errors,
    'warnings': warnings,
}

sample_rows = con.execute('SELECT * FROM clean LIMIT 10').fetchall()
sample_rows = [dict(zip(cols, row)) for row in sample_rows]

profile_clean = {
    'project': PROJECT,
    'raw_run_id': RAW_RUN_ID,
    'clean_run_id': CLEAN_RUN_ID,
    'raw_csv': str(raw_csv),
    'clean_parquet': str(OUT_PARQUET),
    'n_rows': int(n_rows),
    'n_cols': int(len(cols)),
    'columns': cols,
    'types': types,
    'nulls': nulls,
    'sample_rows': sample_rows,
}

data_dictionary = []
for col in cols:
    sample_value = next((row.get(col) for row in sample_rows if row.get(col) is not None), None)
    data_dictionary.append({
        'column': col,
        'type': types.get(col),
        'description': DESCRIPTIONS.get(col, 'Campo derivato dal RAW senza reinterpretazione.'),
        'example_value': sample_value,
        'parsing_policy': 'trim + null policy + numeric cast dove applicabile',
    })

OUT_MAPPING.write_text(json.dumps(final_map, ensure_ascii=False, indent=2), encoding='utf-8')
OUT_PROFILE.write_text(json.dumps(profile_clean, ensure_ascii=False, indent=2), encoding='utf-8')
OUT_VALIDATE.write_text(json.dumps(validate_clean, ensure_ascii=False, indent=2), encoding='utf-8')
OUT_DICT.write_text(json.dumps(data_dictionary, ensure_ascii=False, indent=2), encoding='utf-8')

manifest = {
    'project': PROJECT,
    'raw_run_id': RAW_RUN_ID,
    'clean_run_id': CLEAN_RUN_ID,
    'created_utc': datetime.now(timezone.utc).isoformat(),
    'inputs': {
        'raw_csv': {'path': str(raw_csv), 'sha256': sha256_file(raw_csv)}
    },
    'outputs': {
        'clean_parquet': {'path': str(OUT_PARQUET), 'sha256': sha256_file(OUT_PARQUET)},
        'columns_mapping_raw_to_clean': {'path': str(OUT_MAPPING), 'sha256': sha256_file(OUT_MAPPING)},
        'profile_clean': {'path': str(OUT_PROFILE), 'sha256': sha256_file(OUT_PROFILE)},
        'validate_clean': {'path': str(OUT_VALIDATE), 'sha256': sha256_file(OUT_VALIDATE)},
        'data_dictionary': {'path': str(OUT_DICT), 'sha256': sha256_file(OUT_DICT)}
    },
    'config': {
        'delimiter': DELIM,
        'encoding': ENCODING,
        'null_policy': sorted(SPECIAL_NULLS),
        'percent_policy': 'divide_by_100',
        'decimal_policy': 'keep_dot_decimal_convert_comma_to_dot'
    }
}
OUT_MANIFEST.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding='utf-8')

validate_clean


Output generati nel run CLEAN:
- `data/clean/<PROJECT>/<RUN_ID>/saldi_storico.parquet`
- `columns_mapping_raw_to_clean.json`
- `profile_clean.json`
- `validate_clean.json`
- `data_dictionary.json`
- `clean_manifest.json`
