###  Top summary / Resumen 
##### collection_wrangling_gei — Purpose & Outputs / Propósito y Resultados
GEI Emisiones / GH Emissions

| Section / Sección              | Content                                                               |  Contenido
|-------------------------------|----------------------------------------------------------------------------------|----------------------------------------------------------------------------------|
| **General Purpose**           | Produce canonical cleaned datasets for analysis and dashboard artifacts         | Generar conjuntos de datos limpios y canónicos para análisis y visualización    |
| **Raw Inputs**                | `data/raw/*.csv` or CKAN package IDs:<br>• `atm_inventario_gei.csv` | `data/raw/*.csv` o IDs de CKAN:<br>• `registro_certificados_eficiencia_energetica_2025.csv` |
| **Processed Outputs**         | • `data/processed/gei.csv`<br>• `data/ingest_audit/*.json`                     | • `data/processed/gei.csv`<br>• `data/ingest_audit/*.json`                     |
| **Expected Schema**           | ~5885 rows<br>Key columns: year, crf, pollutant, co2-eq             | ~5885 filas<br>Columnas clave: año, crf, gases, Giga gramos de CO2 eq |
| **Reproducibility Checklist** | • `config.yml` present at repo root<br>• Run cells top → bottom<br>• Outputs saved to correct folders | • `config.yml` presente en la raíz del repositorio<br>• Ejecutar celdas de arriba hacia abajo<br>• Guardar salidas en carpetas correspondientes |


In [1]:
# Cell 1: Parameters

import pandas as pd, numpy as np, geopandas as gpd
import os, sys, re, glob, yaml
from pathlib import Path

ROOT = Path.cwd()
while ROOT != ROOT.parent and not (ROOT/"config.yml").exists() and not (ROOT/".git").exists():
      ROOT = ROOT.parent
if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT))
cfg = yaml.safe_load((ROOT/"config.yml").read_text()) if (ROOT/"config.yml").exists() else {}
RAW_DIR   = ROOT / cfg.get("data", {}).get("raw_dir", "data/raw")
PROC_DIR  = ROOT / cfg.get("data", {}).get("processed_dir", "data/processed")
AUDIT_DIR = ROOT / cfg.get("data", {}).get("audit_dir", "data/ingest_audit")
ADOPTION_DEFAULT = cfg.get("defaults", {}).get("adoption_rate_default", 0.30)
PRIORITY         = cfg.get("defaults", {}).get("priority_districts", [10,11,12,13,15])
madrid_codes_official = set(cfg.get("defaults", {}).get("madrid_postal_codes_official", []))
print(f"RAW={RAW_DIR}\nPROC={PROC_DIR}\nAUDIT={AUDIT_DIR}")

RAW=c:\_Workspace\2_Work\1_Projects_Active\Datos_Abiertos_Madrid\Low-Carbon-Heating-Roadmap-for-Madrid\data\raw
PROC=c:\_Workspace\2_Work\1_Projects_Active\Datos_Abiertos_Madrid\Low-Carbon-Heating-Roadmap-for-Madrid\data\processed
AUDIT=c:\_Workspace\2_Work\1_Projects_Active\Datos_Abiertos_Madrid\Low-Carbon-Heating-Roadmap-for-Madrid\data\ingest_audit


In [2]:
# Cell 2: Ingestion / Ingesta
from src.loader import load_gei; from src.cleaning import inspect_dataframe
gei = load_gei(save=False)
inspect_dataframe(gei, name="gei")


 gei.shape → 5885 rows × 6 columns

 head ()


Unnamed: 0,inventario_gei_año,inventario_gei_sector_crf,inventario_gei_categoria_crf,inventario_gei_actividad_crf,inventario_gei_contaminante,inventario_gei_gg_co2_eq
0,1990,CRF 1 Energía,1.A. Actividades de combustión,1A1a Producción de energía y calor en plantas ...,N2O,15104735
1,1990,CRF 1 Energía,1.A. Actividades de combustión,1A1a Producción de energía y calor en plantas ...,CH4,1595958
2,1990,CRF 1 Energía,1.A. Actividades de combustión,1A1c Fabricación de combustibles sólidos y ot...,CH4,1038352
3,1990,CRF 1 Energía,1.A. Actividades de combustión,1A1c Fabricación de combustibles sólidos y ot...,CO2,114493
4,1990,CRF 1 Energía,1.A. Actividades de combustión,1A1c Fabricación de combustibles sólidos y ot...,N2O,5255745



 tail ()


Unnamed: 0,inventario_gei_año,inventario_gei_sector_crf,inventario_gei_categoria_crf,inventario_gei_actividad_crf,inventario_gei_contaminante,inventario_gei_gg_co2_eq
5880,2022,CRF 5 Residuos,5.C. Incineración y quema en espacio abierto d...,5C21 Quema abierta de residuos Biogénicos,N2O,87479309
5881,2022,CRF 5 Residuos,5.D. Tratamiento y eliminación de aguas residu...,5D1 Aguas residuales domésticas,CH4,52131028852
5882,2022,CRF 5 Residuos,5.D. Tratamiento y eliminación de aguas residu...,5D1 Aguas residuales domésticas,N2O,1154508699
5883,2022,CRF 5 Residuos,5.D. Tratamiento y eliminación de aguas residu...,5D2 Aguas residuales industriales,CH4,176293177564
5884,2022,CRF 5 Residuos,5.E. Otros,5E1 Esparcimiento de lodos,CH4,75236532



 info ():
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5885 entries, 0 to 5884
Data columns (total 6 columns):
 #   Column                        Non-Null Count  Dtype 
---  ------                        --------------  ----- 
 0   inventario_gei_año            5885 non-null   int64 
 1   inventario_gei_sector_crf     5885 non-null   object
 2   inventario_gei_categoria_crf  5885 non-null   object
 3   inventario_gei_actividad_crf  5885 non-null   object
 4   inventario_gei_contaminante   5885 non-null   object
 5   inventario_gei_gg_co2_eq      5885 non-null   object
dtypes: int64(1), object(5)
memory usage: 276.0+ KB

 describe all


Unnamed: 0,inventario_gei_año,inventario_gei_sector_crf,inventario_gei_categoria_crf,inventario_gei_actividad_crf,inventario_gei_contaminante,inventario_gei_gg_co2_eq
count,5885.0,5885,5885,5885,5885,5885.0
unique,,5,27,129,6,5500.0
top,,CRF 1 Energía,1.A. Actividades de combustión,2F1c Refrigeración industrial,CO2,0.0
freq,,1774,1660,203,2063,309.0
mean,2006.516907,,,,,
std,9.424857,,,,,
min,1990.0,,,,,
25%,1999.0,,,,,
50%,2007.0,,,,,
75%,2015.0,,,,,


In [3]:
# Cell 3: Schema validation / Validación de esquema
# Validate required columns and types using src.cleaning.validate_schema / usa helper
from src.cleaning import validate_schema  
req_building_cols = ["inventario_gei_año","inventario_gei_actividad_crf","inventario_gei_contaminante", "inventario_gei_gg_co2_eq"]
try:
    validate_schema(gei, req_building_cols)
    print("gei schema OK")
except AssertionError as e:
    raise AssertionError(f"Schema error gei: {e}")
# Drop not required columns  / Soltar columnas innecesarias
gei = gei[["inventario_gei_año","inventario_gei_actividad_crf","inventario_gei_contaminante", "inventario_gei_gg_co2_eq"]]

gei schema OK


In [4]:
# Cell 4: Data quality report / Informe de calidad de datos
# Use dq_report to get structured report and store it / usa dq_report y guárdalo
from src.cleaning import dq_report
dq_gei = dq_report(gei)
# print compact summary / imprimir resumen compacto
print("rows_in:", dq_gei["rows_in"], "duplicate_rows:", dq_gei["duplicate_rows"])
for col, meta in list(dq_gei["columns"].items())[:8]:
    print(f"{col}: nulls={meta['null_count']} null_pct={meta['null_pct']:.2f}% uniques={meta['unique_nonnull']}")
# keep report in memory for audit / conservar para auditoría
dq_reports = {"df_gei": dq_gei}

rows_in: 5885 duplicate_rows: 0
inventario_gei_año: nulls=0 null_pct=0.00% uniques=33
inventario_gei_actividad_crf: nulls=0 null_pct=0.00% uniques=129
inventario_gei_contaminante: nulls=0 null_pct=0.00% uniques=6
inventario_gei_gg_co2_eq: nulls=0 null_pct=0.00% uniques=5500


In [5]:
# Cell 5: Cleaning — safe rename, convert, drop NaN/negatives, log transform / Limpiar — renombrar seguro, convertir, eliminar NaN/negativos, registrar transformación

# --- 0. Rename columns to canonical names / Renombrar columnas ---
rename_dict = {
    "inventario_gei_año": "gei_yr",                        # Año / year
    "inventario_gei_actividad_crf": "gei_crf_act",         # CRF Actividad / Activity
    "inventario_gei_contaminante": "gei_pollutant",        # Contaminante
    "inventario_gei_gg_co2_eq" : "gei_co2_eq_t"            # tons /toneladas
}
gei.rename(columns=rename_dict, inplace=True)

critical_cols = [
    "gei_co2_eq_t"
]

sentinels = ['99999999,99','99999999.99','99999999','99999999,00',
             'NaN','nan','NULL','-']

gei["gei_crf"] = gei["gei_crf_act"].apply(lambda x: x.split()[0])
gei["gei_crf"] = gei["gei_crf"].replace({"4(IV)2": "3.D.2"})
# Source: https://naei.energysecurity.gov.uk/glossary/crf-code-list

# --- 1. Convert to numeric with cleaning ---
for c in critical_cols:
    if c in gei.columns:
        s = gei[c].astype(str).replace(sentinels, pd.NA)
        s = s.str.replace(r'\.', '', regex=True).str.replace(',', '.', regex=True)
        s = s.str.replace(r'[^\d\.\-]', '', regex=True)
        gei[c] = pd.to_numeric(s, errors='coerce')
        gei["gei_co2_eq_t"] = (gei["gei_co2_eq_t"].astype(float) * 1000   # Gg → tons
)

# --- 2. Drop rows with NaN in any critical column ---
n_before = len(gei)
gei = gei.dropna(subset=critical_cols)
n_after_nan = len(gei)

# --- 4. Final check ---
for col in critical_cols:
    n_null = gei[col].isna().sum()
    n_neg  = (gei[col] < 0).sum()
    n_zero = (gei[col] == 0).sum()
    print(f"{col}: nulls={n_null}, negatives={n_neg}, zeros={n_zero}")

# --- 5. Register transformation ---
    transforms = []
    transforms.append({
    "step": "rename_and_clean_gei",
    "renamed_columns": list(rename_dict.items()),
    "columns_cleaned": critical_cols,
    "sentinels_mapped": sentinels,
    "rows_before": n_before,
    "rows_dropped_nan": n_before - n_after_nan,
})

gei_co2_eq_t: nulls=0, negatives=357, zeros=309


In [6]:
# Cell 6: Final Check / Revisión Final
gei.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5885 entries, 0 to 5884
Data columns (total 5 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   gei_yr         5885 non-null   int64  
 1   gei_crf_act    5885 non-null   object 
 2   gei_pollutant  5885 non-null   object 
 3   gei_co2_eq_t   5885 non-null   float64
 4   gei_crf        5885 non-null   object 
dtypes: float64(1), int64(1), object(3)
memory usage: 230.0+ KB


In [7]:
# Cell 7: Export and audit / Exportar y auditar
from src.io import save_df, write_audit_log  # persistence helpers / ayudantes de persistencia
# save processed artifact / guardar artefacto procesado
out_path = save_df(gei, str(PROC_DIR.joinpath("df_gei.csv")))
print("Saved:", out_path)
# build audit entry and write / construir objeto de auditoría y guardar
audit = {
    "source": "atm_inventario_gei.csv",
    "rows_in": int(dq_gei["rows_in"]),   # cast to Python int
    "rows_out": int(len(gei)),           # ensure Python int
    "transforms": []
}

# normalize transforms too
for t in transforms:
    audit["transforms"].append({
        **t,
        "rows_before": int(t.get("rows_before", 0)),
        "rows_final": int(t.get("rows_final", 0)),
        "rows_dropped_nan": int(t.get("rows_dropped_nan", 0)),
        "rows_dropped_negatives_or_zeros": int(t.get("rows_dropped_negatives_or_zeros", 0))
    })

audit_path = write_audit_log(**audit)
print("Audit saved:", audit_path)

Saved: c:\_Workspace\2_Work\1_Projects_Active\Datos_Abiertos_Madrid\Low-Carbon-Heating-Roadmap-for-Madrid\data\processed\df_gei.csv
Audit saved: C:\_Workspace\2_Work\1_Projects_Active\Datos_Abiertos_Madrid\Low-Carbon-Heating-Roadmap-for-Madrid\data\ingest_audit\audit_atm_inventario_gei_20251029_213554.json


In [8]:
# Cell 8: Reproducibility checks / Verificaciones de reproducibilidad

# list artifacts and audits / listar artefactos y auditorías
proc_files = glob.glob(str(PROC_DIR.joinpath("*.csv")))
audit_files = glob.glob(str(AUDIT_DIR.joinpath("*.json")))
print("processed files:", proc_files)
print("audit files:", audit_files)
# basic checks / comprobaciones básicas
assert proc_files, "No processed artifacts found in data/processed/  / No hay artefactos procesados"
assert audit_files, "No audit JSONs found in data/ingest_audit/  / No hay JSONs de auditoría"
# size checks (rows) / comprobación de filas mínima (adjust expected as needed)
min_rows_expected = 10
for p in proc_files:
    df = pd.read_csv(p, nrows=min_rows_expected)
    if df.shape[0] < min_rows_expected:
        raise RuntimeError(f"Artifact {p} has <{min_rows_expected} rows; check processing  / Artifact tiene pocas filas")
print("Reproducibility smoke tests passed / Pruebas de reproducibilidad OK")

processed files: ['c:\\_Workspace\\2_Work\\1_Projects_Active\\Datos_Abiertos_Madrid\\Low-Carbon-Heating-Roadmap-for-Madrid\\data\\processed\\df_ceee.csv', 'c:\\_Workspace\\2_Work\\1_Projects_Active\\Datos_Abiertos_Madrid\\Low-Carbon-Heating-Roadmap-for-Madrid\\data\\processed\\df_gei.csv', 'c:\\_Workspace\\2_Work\\1_Projects_Active\\Datos_Abiertos_Madrid\\Low-Carbon-Heating-Roadmap-for-Madrid\\data\\processed\\df_pst.csv', 'c:\\_Workspace\\2_Work\\1_Projects_Active\\Datos_Abiertos_Madrid\\Low-Carbon-Heating-Roadmap-for-Madrid\\data\\processed\\sql_buildings_train.csv']
audit files: ['c:\\_Workspace\\2_Work\\1_Projects_Active\\Datos_Abiertos_Madrid\\Low-Carbon-Heating-Roadmap-for-Madrid\\data\\ingest_audit\\audit_atm_inventario_gei_20251029_213059.json', 'c:\\_Workspace\\2_Work\\1_Projects_Active\\Datos_Abiertos_Madrid\\Low-Carbon-Heating-Roadmap-for-Madrid\\data\\ingest_audit\\audit_atm_inventario_gei_20251029_213554.json', 'c:\\_Workspace\\2_Work\\1_Projects_Active\\Datos_Abiertos_Mad