# TPST – Notebook local (PySpark) para ML en siniestros (Perú)

Este cuaderno hace **todo** localmente en tu Mac (on‑prem) usando **PySpark** en `local[*]`:

1) Arranque de Spark (local).  
2) Carga del **Parquet largo** (`year, region, metric, dim_name, dim_value, value`).  
3) **Limpieza**: elimina meses y filas con `region = TOTAL`.  
4) Persistencia **Silver** (limpio).  
5) Construcción **Gold (wide)**: tablón pivoteado `year, region` + columnas por métrica/categoría.  
6) **Features**: proporciones, índices (noche/fin de semana) y **lags** por región.  
7) **Entrenamiento** (baseline): GLM **Poisson** con Spark ML + métricas (RMSE, MAE).  
8) Escritura de salidas `Delta/Parquet` locales.

> Requisitos previos (una sola vez):
> ```bash
> python -m venv .venv && source .venv/bin/activate
> python -m pip install -U pip
> python -m pip install pyspark==3.5.1 pyarrow==15.0.2 pandas==2.2.2
> ```

Configura la ruta al Parquet **de entrada** en la celda siguiente.


In [12]:
import os, subprocess, sys

# Forzar JAVA_HOME=17 en este proceso
java17 = subprocess.check_output(["/usr/libexec/java_home", "-v", "17"]).decode().strip()
os.environ["JAVA_HOME"] = java17
os.environ["PATH"] = f'{java17}/bin:' + os.environ["PATH"]

import pyspark
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("check-17")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions","8")
         .getOrCreate())

print("Spark:", spark.version)
print("Java :", spark.sparkContext._jvm.java.lang.System.getProperty("java.version"))
spark.stop()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/05 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark: 4.0.1
Java : 17.0.16


In [13]:
import sys, pyspark, pandas as pd, pyarrow as pa
print(sys.executable)
print("Spark:", pyspark.__version__)
print("pandas:", pd.__version__, "pyarrow:", pa.__version__)

/Users/enmanuelcuadros/Downloads/tesis-prevencion-siniestros-transito/.venv/bin/python
Spark: 4.0.1
pandas: 2.3.3 pyarrow: 21.0.0


In [14]:
import os; print(os.environ.get("JAVA_HOME"))

/opt/homebrew/Cellar/openjdk@17/17.0.16/libexec/openjdk.jdk/Contents/Home


In [21]:
# === Configuración de rutas ===
from pathlib import Path

# Ruta al Parquet consolidado (tabla larga) generado por tu pipeline
PARQUET_IN = Path("/Users/enmanuelcuadros/Downloads/tesis-prevencion-siniestros-transito/data/processed/siniestros_normalizado.parquet")

# Carpetas de salida (se crean si no existen)
DIR_BRONZE = Path("bronze_local")
DIR_SILVER = Path("silver_local")
DIR_GOLD   = Path("gold_local")

for d in (DIR_BRONZE, DIR_SILVER, DIR_GOLD):
    d.mkdir(parents=True, exist_ok=True)

PARQUET_IN.resolve()

PosixPath('/Users/enmanuelcuadros/Downloads/tesis-prevencion-siniestros-transito/data/processed/siniestros_normalizado.parquet')

## 1) Inicializar Spark (local)

In [22]:
from pyspark.sql import SparkSession, functions as F, Window

spark = (SparkSession.builder
         .appName("TPST-local")
         .master("local[*]")
         .config("spark.sql.shuffle.partitions", "8")
         .config("spark.driver.memory", "4g")
         .getOrCreate())

spark.version

'4.0.1'

## 2) Cargar Parquet largo → Bronze

In [23]:
df_bronze = (spark.read.parquet(str(PARQUET_IN))
              .select("year","region","metric","dim_name","dim_value","value"))
df_bronze.printSchema()
df_bronze.show(5, truncate=False)

# Persistir una copia Parquet en bronze_local (opcional)
(df_bronze
 .repartition(1)
 .write.mode("overwrite")
 .parquet(str(DIR_BRONZE / "siniestros_long_raw.parquet")))

df_bronze.count()

                                                                                

root
 |-- year: long (nullable = true)
 |-- region: string (nullable = true)
 |-- metric: string (nullable = true)
 |-- dim_name: string (nullable = true)
 |-- dim_value: string (nullable = true)
 |-- value: double (nullable = true)

+----+--------+----------------+--------+---------+------+
|year|region  |metric          |dim_name|dim_value|value |
+----+--------+----------------+--------+---------+------+
|2008|AMAZONAS|siniestros_total|NULL    |NULL     |271.0 |
|2008|ANCASH  |siniestros_total|NULL    |NULL     |1616.0|
|2008|APURIMAC|siniestros_total|NULL    |NULL     |428.0 |
|2008|AREQUIPA|siniestros_total|NULL    |NULL     |5594.0|
|2008|AYACUCHO|siniestros_total|NULL    |NULL     |752.0 |
+----+--------+----------------+--------+---------+------+
only showing top 5 rows


18363

## 3) Limpieza → Silver (quitar meses/TOTAL y tipificar)

In [24]:
MESES = ["ENERO","FEBRERO","MARZO","ABRIL","MAYO","JUNIO","JULIO","AGOSTO",
         "SEPTIEMBRE","OCTUBRE","NOVIEMBRE","DICIEMBRE"]

df_silver = (df_bronze
    .withColumn("region_norm", F.upper(F.trim(F.col("region"))))
    .filter(~F.col("region_norm").isin(MESES))
    .filter(~F.col("region_norm").rlike(r'^TOTAL(\s+NACIONAL|\s*GENERAL)?$'))
    .withColumn("year", F.col("year").cast("int"))
    .withColumn("value", F.col("value").cast("double"))
    .drop("region")
    .withColumnRenamed("region_norm","region")
)

df_silver.show(5, truncate=False)
df_silver.count()

(df_silver
 .repartition(1)
 .write.mode("overwrite")
 .parquet(str(DIR_SILVER / "siniestros_long_clean.parquet")))

+----+----------------+--------+---------+------+--------+
|year|metric          |dim_name|dim_value|value |region  |
+----+----------------+--------+---------+------+--------+
|2008|siniestros_total|NULL    |NULL     |271.0 |AMAZONAS|
|2008|siniestros_total|NULL    |NULL     |1616.0|ANCASH  |
|2008|siniestros_total|NULL    |NULL     |428.0 |APURIMAC|
|2008|siniestros_total|NULL    |NULL     |5594.0|AREQUIPA|
|2008|siniestros_total|NULL    |NULL     |752.0 |AYACUCHO|
+----+----------------+--------+---------+------+--------+
only showing top 5 rows


## 4) Gold (Wide): pivot `year, region` con columnas `metric__categoria`

In [25]:
import unicodedata, re
from pyspark.sql.types import StringType

@F.udf(StringType())
def slug(s):
    if s is None: 
        return "total"
    s = unicodedata.normalize("NFKD", s).encode("ascii","ignore").decode("ascii")
    s = re.sub(r"[^A-Za-z0-9]+","_", s.strip().lower()).strip("_")
    return s or "total"

df_aug = (df_silver
  .withColumn("dim_value_slug", slug(F.col("dim_value")))
  .withColumn("metric_slug", slug(F.col("metric")))
  .withColumn("colname", F.concat_ws("__", F.col("metric_slug"), F.col("dim_value_slug")))
)

df_wide = (df_aug
  .groupBy("year","region")
  .pivot("colname")
  .agg(F.sum("value"))
  .fillna(0.0)
)

df_wide.printSchema()
df_wide.show(5, truncate=False)
df_wide.count()

(df_wide
 .repartition(1)
 .write.mode("overwrite")
 .parquet(str(DIR_GOLD / "siniestros_wide.parquet")))

                                                                                

root
 |-- year: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- siniestros_por_causa__desacato_de_senal_de_transito: double (nullable = false)
 |-- siniestros_por_causa__desacato_de_senal_de_transito_de_parte_del_conductor: double (nullable = false)
 |-- siniestros_por_causa__desacato_de_senal_de_transito_de_parte_del_peaton: double (nullable = false)
 |-- siniestros_por_causa__ebriedad_del_condutor: double (nullable = false)
 |-- siniestros_por_causa__ebriedad_del_peaton: double (nullable = false)
 |-- siniestros_por_causa__exceso_de_carga: double (nullable = false)
 |-- siniestros_por_causa__exceso_de_velocidad: double (nullable = false)
 |-- siniestros_por_causa__factor_ambiental: double (nullable = false)
 |-- siniestros_por_causa__falla_de_luces: double (nullable = false)
 |-- siniestros_por_causa__falla_mecanica: double (nullable = false)
 |-- siniestros_por_causa__falta_de_luces: double (nullable = false)
 |-- siniestros_por_causa__imprudencia_del_conductor:

25/10/05 11:50:10 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+----------+---------------------------------------------------+--------------------------------------------------------------------------+-----------------------------------------------------------------------+-------------------------------------------+-----------------------------------------+-------------------------------------+-----------------------------------------+--------------------------------------+------------------------------------+------------------------------------+------------------------------------+-----------------------------------------------+----------------------------------------------+--------------------------------------------+----------------------------------------+------------------------------------------------+--------------------------------------------+---------------------------+-----------------------------------------+---------------------------------------------+----------------------------------------------+---------------------------+--

## 5) Features: proporciones, índices e **lags**

In [33]:
from functools import reduce
from pyspark.sql import functions as F
from pyspark.sql import Window

w = df_wide

TOTAL_COL = "siniestros_total__total"
has_total = TOTAL_COL in w.columns

# ---------- helper para sumar columnas ----------
def sum_cols(df, cols):
    """Suma segura: coalesce(col,0) + ...  -> Column expr"""
    if not cols:
        return F.lit(None).cast("double")
    expr = F.coalesce(F.col(cols[0]), F.lit(0.0))
    for c in cols[1:]:
        expr = expr + F.coalesce(F.col(c), F.lit(0.0))
    return expr

# Si no tenemos TOTAL, lo aproximamos sumando franjas (si existen)
if not has_total:
    franjas_raw = [c for c in w.columns if c.startswith("siniestros_por_franja_horaria__")]
    if franjas_raw:
        w = w.withColumn(TOTAL_COL, sum_cols(w, franjas_raw))
        has_total = True

# ----------------- PROPORCIONES (safe) -----------------
if has_total:
    # usa try_divide (Spark 4): devuelve NULL si divisor=0
    franjas = [c for c in w.columns if c.startswith("siniestros_por_franja_horaria__")]
    dias    = [c for c in w.columns if c.startswith("siniestros_por_dia__")]

    for c in franjas:
        w = w.withColumn(f"prop__{c}", F.try_divide(F.col(c), F.col(TOTAL_COL)))
    for c in dias:
        w = w.withColumn(f"prop__{c}", F.try_divide(F.col(c), F.col(TOTAL_COL)))

    # idx_finde = sábado + domingo (si existen)
    finde_cols = [f"prop__{c}" for c in dias if c.endswith("__sabado") or c.endswith("__domingo")]
    if finde_cols:
        w = w.withColumn("idx_finde", sum_cols(w, finde_cols))

    # idx_noche: detectar columnas reales de franja y sumarlas
    import re
    franja_prop_cols = [c for c in w.columns if c.startswith("prop__siniestros_por_franja_horaria__")]
    noche_exact = [c for c in franja_prop_cols if c in {
        "prop__siniestros_por_franja_horaria__18_00_23_59",
        "prop__siniestros_por_franja_horaria__18_01_a_23_59",
        "prop__siniestros_por_franja_horaria__20_00_a_24_00",
        "prop__siniestros_por_franja_horaria__20_01_a_24_00",
    }]

    if noche_exact:
        w = w.withColumn("idx_noche", F.col(noche_exact[0]))
    else:
        patrones = [r"__1[89]_..", r"__2[0-3]_..", r"__20_01_a_02_00", r"__22_00_a_24_00"]
        noche_cols = []
        for c in franja_prop_cols:
            if any(re.search(p, c) for p in patrones):
                noche_cols.append(c)
        # incluir franja amplia si existe (contiene 18-20):
        if "prop__siniestros_por_franja_horaria__14_00_a_20_00" in franja_prop_cols:
            noche_cols.append("prop__siniestros_por_franja_horaria__14_00_a_20_00")

        if noche_cols:
            w = w.withColumn("idx_noche", sum_cols(w, sorted(set(noche_cols))))
        else:
            w = w.withColumn("idx_noche", F.lit(None).cast("double"))

# ----------------- LAGS y growth (safe) -----------------
win = Window.partitionBy("region").orderBy("year")
if has_total:
    w = (w
         .withColumn("y_lag1", F.lag(F.col(TOTAL_COL), 1).over(win))
         .withColumn("y_lag2", F.lag(F.col(TOTAL_COL), 2).over(win))
         # try_divide evita división por cero o NULL en el denominador
         .withColumn("growth_y", F.try_divide(F.col(TOTAL_COL) - F.col("y_lag1"), F.col("y_lag1")))
    )

w.printSchema()
w.show(5, truncate=False)

(w
 .repartition(1)
 .write.mode("overwrite")
 .parquet(str(DIR_GOLD / "siniestros_wide_features.parquet")))


root
 |-- year: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- siniestros_por_causa__desacato_de_senal_de_transito: double (nullable = false)
 |-- siniestros_por_causa__desacato_de_senal_de_transito_de_parte_del_conductor: double (nullable = false)
 |-- siniestros_por_causa__desacato_de_senal_de_transito_de_parte_del_peaton: double (nullable = false)
 |-- siniestros_por_causa__ebriedad_del_condutor: double (nullable = false)
 |-- siniestros_por_causa__ebriedad_del_peaton: double (nullable = false)
 |-- siniestros_por_causa__exceso_de_carga: double (nullable = false)
 |-- siniestros_por_causa__exceso_de_velocidad: double (nullable = false)
 |-- siniestros_por_causa__factor_ambiental: double (nullable = false)
 |-- siniestros_por_causa__falla_de_luces: double (nullable = false)
 |-- siniestros_por_causa__falla_mecanica: double (nullable = false)
 |-- siniestros_por_causa__falta_de_luces: double (nullable = false)
 |-- siniestros_por_causa__imprudencia_del_conductor:

In [34]:
# ============================================
# Validador de calidad de datos (PySpark)
# ============================================
import json, re
from pathlib import Path
from pyspark.sql import functions as F, types as T, Window

# ---------- helpers ----------
def _sample_df(df, n=10):
    cols = df.columns[:]
    out = [dict(zip(cols, row)) for row in df.limit(n).toLocalIterator()]
    return out

def _add(res, name, ok, extra=None, critical=False):
    res["checks"].append({
        "name": name,
        "status": "PASS" if ok else "FAIL",
        "critical": bool(critical),
        "details": extra or {}
    })
    if not ok and critical:
        res["has_critical_fail"] = True

def _expect_no_nulls(df, cols):
    bad = df
    for c in cols:
        bad = bad.filter(F.col(c).isNull())
    return bad

def _expect_unique_key(df, key_cols):
    dup = (df.groupBy(*key_cols).count().filter("count > 1"))
    return dup

def _sum_cols(df, cols):
    if not cols: return F.lit(None).cast("double")
    expr = F.coalesce(F.col(cols[0]), F.lit(0.0))
    for c in cols[1:]:
        expr = expr + F.coalesce(F.col(c), F.lit(0.0))
    return expr

# ---------- configuración / inputs ----------
REPORT_DIR = Path("reports")
REPORT_DIR.mkdir(parents=True, exist_ok=True)
REPORT_PATH = REPORT_DIR / "data_quality_report.json"

MESES = ["ENERO","FEBRERO","MARZO","ABRIL","MAYO","JUNIO","JULIO","AGOSTO",
         "SEPTIEMBRE","OCTUBRE","NOVIEMBRE","DICIEMBRE"]

# Tablas / DataFrames a validar
df_long   = spark.read.parquet("silver_local/siniestros_long_clean.parquet")          # Silver (long)
df_wide   = spark.read.parquet("gold_local/siniestros_wide.parquet")                  # Gold wide
df_feat   = spark.read.parquet("gold_local/siniestros_wide_features.parquet")         # Features (si existe)

res = {"checks": [], "has_critical_fail": False}

# ============================================
# 1) VALIDACIONES – SILVER (long)
# ============================================
# Esquema esperado mínimo
expected_cols_long = {"year":"int", "region":"string", "metric":"string",
                      "dim_name":"string", "dim_value":"string", "value":"double"}

# 1.1 columnas presentes
missing = [c for c in expected_cols_long if c not in df_long.columns]
_add(res, "long.columns_present", len(missing)==0,
     {"missing": missing}, critical=True)

# 1.2 tipos básicos (permitimos cast si vienen como variantes)
type_ok = True
type_issues = {}
for c, t in expected_cols_long.items():
    if c not in df_long.columns: 
        type_ok = False; type_issues[c] = "missing"
        continue
    # map simple de dtypes spark
    actual = dict(df_long.dtypes).get(c)
    if t == "int" and actual not in ("int","bigint"):
        type_ok = False; type_issues[c] = actual
    if t == "double" and actual not in ("double","float","decimal(38,18)","decimal(10,0)","decimal(20,0)"):
        # lo dejamos pasar si es decimal/numeric; Spark castea al volar
        pass
_add(res, "long.types_basic", type_ok, {"issues": type_issues}, critical=True)

# 1.3 nulos críticos
null_crit = _expect_no_nulls(df_long, ["year","region","metric","value"])
_add(res, "long.no_nulls_crit", null_crit.count()==0,
     {"sample": _sample_df(null_crit)}, critical=True)

# 1.4 rangos de año
agg_year = df_long.agg(F.min("year").alias("min_y"), F.max("year").alias("max_y")).collect()[0]
min_y, max_y = agg_year["min_y"], agg_year["max_y"]
_add(res, "long.year_range_reasonable", min_y>=2000 and max_y<=2030,
     {"observed": {"min": min_y, "max": max_y}}, critical=False)

# 1.5 regiones no deben ser meses ni 'TOTAL'
bad_regions = (df_long
               .withColumn("r", F.upper(F.trim("region")))
               .filter(F.col("r").isin(MESES) | F.col("r").rlike(r'^TOTAL(\s+NACIONAL|\s*GENERAL)?$')))
_add(res, "long.region_not_month_or_total", bad_regions.count()==0,
     {"sample": _sample_df(bad_regions)}, critical=True)

# 1.6 no-negativos
neg = df_long.filter(F.col("value") < 0)
_add(res, "long.value_non_negative", neg.count()==0,
     {"sample": _sample_df(neg)}, critical=True)

# 1.7 clave (year, region, metric, dim_name, dim_value) no necesariamente única en long (puede haber particiones),
#     pero chequeamos duplicados exactos por seguridad:
dup_long = (_expect_unique_key(df_long, ["year","region","metric","dim_name","dim_value"]))
_add(res, "long.duplicate_exact_rows", dup_long.count()==0,
     {"sample": _sample_df(dup_long)}, critical=False)

# ============================================
# 2) VALIDACIONES – GOLD WIDE
# ============================================
# 2.1 columnas clave presentes
missing_wide = [c for c in ["year","region"] if c not in df_wide.columns]
_add(res, "wide.columns_key_present", len(missing_wide)==0,
     {"missing": missing_wide}, critical=True)

# 2.2 clave única (year, region)
dup_key_wide = _expect_unique_key(df_wide, ["year","region"])
_add(res, "wide.key_unique", dup_key_wide.count()==0,
     {"sample": _sample_df(dup_key_wide)}, critical=True)

# 2.3 no-negativos en métricas
metric_cols_wide = [c for c in df_wide.columns if c not in ("year","region")]
neg_wide = df_wide.select([F.sum((F.col(c)<0).cast("int")).alias(c) for c in metric_cols_wide])
neg_any = any(v>0 for v in neg_wide.collect()[0].asDict().values())
_add(res, "wide.metrics_non_negative", not neg_any, {}, critical=True)

# 2.4 si existe total por franja/día, comprobar suma ≈ total (tolerancia 1%)
TOTAL_COL = "siniestros_total__total"
has_total = TOTAL_COL in df_wide.columns
if has_total:
    franjas_raw = [c for c in df_wide.columns if c.startswith("siniestros_por_franja_horaria__")]
    dias_raw    = [c for c in df_wide.columns if c.startswith("siniestros_por_dia__")]

    tol = 0.01
    checks = {}
    if franjas_raw:
        df_chk = (df_wide
                  .withColumn("sum_franjas", _sum_cols(df_wide, franjas_raw))
                  .withColumn("rel_err", F.when(F.col(TOTAL_COL)>0,
                                                F.abs(F.col("sum_franjas")-F.col(TOTAL_COL))/F.col(TOTAL_COL))
                                           .otherwise(F.lit(None)))
                  .select("year","region","sum_franjas", TOTAL_COL, "rel_err"))
        bad = df_chk.filter((F.col("rel_err") > tol) | F.col("rel_err").isNull())
        checks["franjas_bad_rows"] = bad.count()
        checks["franjas_bad_sample"] = _sample_df(bad)
    if dias_raw:
        df_chk2 = (df_wide
                  .withColumn("sum_dias", _sum_cols(df_wide, dias_raw))
                  .withColumn("rel_err", F.when(F.col(TOTAL_COL)>0,
                                                F.abs(F.col("sum_dias")-F.col(TOTAL_COL))/F.col(TOTAL_COL))
                                           .otherwise(F.lit(None)))
                  .select("year","region","sum_dias", TOTAL_COL, "rel_err"))
        bad2 = df_chk2.filter((F.col("rel_err") > tol) | F.col("rel_err").isNull())
        checks["dias_bad_rows"] = bad2.count()
        checks["dias_bad_sample"] = _sample_df(bad2)
    ok = (checks.get("franjas_bad_rows",0)==0) and (checks.get("dias_bad_rows",0)==0)
    _add(res, "wide.total_consistency_franja_dia", ok, checks, critical=False)
else:
    _add(res, "wide.total_consistency_franja_dia", True,
         {"note": "TOTAL_COL no existe; se omite esta verificación"}, critical=False)

# ============================================
# 3) VALIDACIONES – FEATURES
# ============================================
# 3.1 presencia (si no hay features aún, no fallar)
if "year" in df_feat.columns and "region" in df_feat.columns:
    # clave única
    dup_feat = _expect_unique_key(df_feat, ["year","region"])
    _add(res, "feat.key_unique", dup_feat.count()==0,
         {"sample": _sample_df(dup_feat)}, critical=True)

    # proporciones dentro de [0,1]
    prop_cols = [c for c in df_feat.columns if c.startswith("prop__")]
    if prop_cols:
        any_out = False
        sample = {}
        cond = None
        for c in prop_cols:
            c_bad = ((F.col(c) < 0) | (F.col(c) > 1)) & F.col(c).isNotNull()
            cond = c_bad if cond is None else (cond | c_bad)
        badp = df_feat.select("year","region", *prop_cols).filter(cond) if cond is not None else df_feat.limit(0)
        any_out = badp.count() > 0
        sample = {"sample": _sample_df(badp)}
        _add(res, "feat.proportions_in_0_1", not any_out, sample, critical=False)

    # lags: growth_y NO NaN/Inf (permitir NULL si no hay lag)
    if "growth_y" in df_feat.columns:
        badg = df_feat.filter((F.isnan("growth_y")) | (F.col("growth_y") == float("inf")) | (F.col("growth_y") == float("-inf")))
        _add(res, "feat.growth_finite", badg.count()==0,
             {"sample": _sample_df(badg)}, critical=False)

else:
    _add(res, "feat.present", False, {"reason":"features no tienen year/region"}, critical=False)

# ============================================
# 4) Resultado y fail-fast crítico
# ============================================
# Guardar JSON
with open(REPORT_PATH, "w", encoding="utf-8") as f:
    json.dump(res, f, ensure_ascii=False, indent=2)

print(f"[DATA-QUALITY] Reporte escrito en: {REPORT_PATH}")
display(spark.createDataFrame([(c["name"], c["status"], c["critical"]) for c in res["checks"]],
                              ["check","status","critical"]))

if res["has_critical_fail"]:
    raise Exception("❌ Validaciones CRÍTICAS fallaron. Revisa reports/data_quality_report.json")
else:
    print("✅ Validaciones críticas: OK")


[DATA-QUALITY] Reporte escrito en: reports/data_quality_report.json


DataFrame[check: string, status: string, critical: boolean]

✅ Validaciones críticas: OK


## 6) Utilidades

In [35]:
from pyspark.sql import functions as F, Window
from functools import reduce

MESES = ["ENERO","FEBRERO","MARZO","ABRIL","MAYO","JUNIO","JULIO","AGOSTO",
         "SETIEMBRE","SEPTIEMBRE","OCTUBRE","NOVIEMBRE","DICIEMBRE"]

TOTAL_COL = "siniestros_total__total"

def sum_cols(df, cols):
    if not cols: return F.lit(None).cast("double")
    expr = F.coalesce(F.col(cols[0]), F.lit(0.0))
    for c in cols[1:]:
        expr = expr + F.coalesce(F.col(c), F.lit(0.0))
    return expr


## 7) Vista de modelado (filtrado “safe”)

In [36]:
feat = spark.read.parquet("gold_local/siniestros_wide_features.parquet")

# a) excluir regiones inválidas (meses, totales, etc.)
feat = (feat
    .withColumn("region_up", F.upper(F.trim("region")))
    .filter(~F.col("region_up").isin(MESES))
    .filter(~F.col("region_up").rlike(r'^TOTAL(\s+ANUAL|\s+NACIONAL|\s*GENERAL|\s+SINIESTROS.*)?$'))
    .drop("region_up")
)

# b) requerir target válido (>0)
feat = feat.filter(F.col(TOTAL_COL) > 0)

# c) descartar filas con proporciones fuera de [0,1] en alguna franja/día
prop_cols = [c for c in feat.columns if c.startswith("prop__")]
if prop_cols:
    cond_bad = None
    for c in prop_cols:
        c_bad = ((F.col(c) < 0) | (F.col(c) > 1)) & F.col(c).isNotNull()
        cond_bad = c_bad if cond_bad is None else (cond_bad | c_bad)
    bad_count = feat.filter(cond_bad).count() if cond_bad is not None else 0
    print(f"[INFO] Filas con proporciones fuera de [0,1]: {bad_count}")
    if bad_count > 0:
        feat = feat.filter(~cond_bad)

# d) (opcional) eliminar outliers burdos por año/región si el total es extremadamente raro
#    Aquí un filtro suave por percentil 99 del total por año:
from pyspark.sql.window import Window
win_year = Window.partitionBy("year")
q99 = feat.approxQuantile(TOTAL_COL, [0.99], 0.01)[0]
print(f"[INFO] Q99 de {TOTAL_COL} global: {q99}")
feat_model = feat.filter(F.col(TOTAL_COL) <= q99)


[INFO] Filas con proporciones fuera de [0,1]: 4
[INFO] Q99 de siniestros_total__total global: 54362.0


## 8) Baseline ML Poisson (train<=2021 / test>=2022)

In [38]:
from pyspark.sql import functions as F

TOTAL_COL = "siniestros_total__total"

# Partimos de feat_model (filtrado “safe” que armamos antes)
df = feat_model

# 1) Dejar fuera filas sin historia para lags (o podrías fillna=0 si prefieres)
if "y_lag1" in df.columns:
    df = df.filter(F.col("y_lag1").isNotNull())

# 2) Imputaciones razonables
prop_cols = [c for c in df.columns if c.startswith("prop__")]
idx_cols  = [c for c in df.columns if c.startswith("idx_")]
count_cols = [c for c in df.columns if c.startswith("siniestros_") and not c.startswith("prop__")]

# Proporciones/índices faltantes -> 0 (no aporta)
if prop_cols:
    df = df.fillna(0.0, subset=prop_cols)
if idx_cols:
    df = df.fillna(0.0, subset=idx_cols)

# growth_y nulo (primer año con lag) -> 0 neutral
if "growth_y" in df.columns:
    df = df.fillna({"growth_y": 0.0})

# Conteos nulos -> 0 (por robustez, aunque en tu silver/wide ya los controlaste)
if count_cols:
    df = df.fillna(0.0, subset=count_cols)

# Target válido
df = df.filter(F.col(TOTAL_COL) >= 0)

# Confirmación rápida de nulos en features
exclude = {"year","region", TOTAL_COL}
feature_cols = [c for c in df.columns if c not in exclude]
nnull = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in feature_cols]).collect()[0].asDict()
print("[NULLS en features]", {k:v for k,v in nnull.items() if v>0})


[NULLS en features] {'y_lag2': 25}


In [41]:
from pyspark.sql import functions as F

# Bandera de missing y_lag2
df = df.withColumn("missing_y_lag2", F.col("y_lag2").isNull().cast("int"))

# Opción A: imputar con 0 (neutral)
df = df.fillna({"y_lag2": 0.0})

# --- Alternativa B: imputar con y_lag1 (descomenta si prefieres esta lógica) ---
# df = df.withColumn("y_lag2",
#                    F.when(F.col("y_lag2").isNull(), F.coalesce(F.col("y_lag1"), F.lit(0.0)))
#                     .otherwise(F.col("y_lag2")))

# Verificación post-imputación
nnull = df.select(F.sum(F.col("y_lag2").isNull().cast("int")).alias("y_lag2_nulls")).collect()[0]["y_lag2_nulls"]
print("[POST] y_lag2_nulls:", nnull)


[POST] y_lag2_nulls: 0


In [42]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")
ds = assembler.transform(df).select("year","region","features", F.col(TOTAL_COL).alias("label"))

# Split temporal (ajústalo si quieres otra ventana)
train = ds.filter("year <= 2021")
test  = ds.filter("year >= 2022")

print(f"[INFO] train={train.count()}, test={test.count()}, feats={len(feature_cols)}")

# GLM Poisson (pon algo de regularización para estabilidad)
glm = GeneralizedLinearRegression(family="poisson", link="log", maxIter=200, regParam=0.1)
model = glm.fit(train)

pred = model.transform(test)
rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse").evaluate(pred)
mae  = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae").evaluate(pred)
print(f"[BASELINE] RMSE={rmse:.2f} | MAE={mae:.2f}")

# Top coeficientes por magnitud (interpretabilidad)
coefs = list(model.coefficients.toArray())
pairs = sorted(zip(feature_cols, coefs), key=lambda x: abs(x[1]), reverse=True)[:25]
print("[TOP COEFS] (feature, coef):")
for name, val in pairs:
    print(f"{name:60s} {val:+.4f}")

# Vista de predicción
pred.select("year","region","label","prediction")\
    .orderBy("year","region").show(30, truncate=False)


[INFO] train=322, test=50, feats=87


25/10/05 12:17:46 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/10/05 12:17:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
25/10/05 12:17:47 WARN Instrumentation: [a8b2952b] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/10/05 12:17:47 WARN Instrumentation: [a8b2952b] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/10/05 12:17:47 WARN Instrumentation: [a8b2952b] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/10/05 12:17:47 WARN Instrumentation: [a8b2952b] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/10/05 12:17:47 WARN Instrumentation: [a8b2952b] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/10/05 12:17:47 WARN Instrumentation: [a8b2952b] Cholesky so

[BASELINE] RMSE=194244.17 | MAE=36219.61
[TOP COEFS] (feature, coef):
growth_y                                                     +0.1628
idx_finde                                                    -0.0703
idx_noche                                                    +0.0689
prop__siniestros_por_franja_horaria__18_01_a_20_00           -0.0667
prop__siniestros_por_franja_horaria__14_00_a_20_00           +0.0575
prop__siniestros_por_franja_horaria__20_00_a_02_00           -0.0558
prop__siniestros_por_franja_horaria__02_00_a_08_00           +0.0470
prop__siniestros_por_franja_horaria__16_01_a_18_00           -0.0429
prop__siniestros_por_franja_horaria__08_01_a_10_00           -0.0425
prop__siniestros_por_dia__jueves                             -0.0376
prop__siniestros_por_dia__viernes                            -0.0355
prop__siniestros_por_franja_horaria__10_01_a_12_00           -0.0332
prop__siniestros_por_franja_horaria__12_01_a_14_00           -0.0331
prop__siniestros_por_franja_horar

## 9) Exportaciones de datasets limpios

In [43]:
# a) Silver long limpio (ya existe): opcional a CSV para auditoría
spark.read.parquet("silver_local/siniestros_long_clean.parquet")\
    .coalesce(1).write.mode("overwrite").option("header",True)\
    .csv("silver_local/siniestros_long_clean_csv")

# b) Gold wide + features (ya existen): exportar “model view”
feat_model.repartition(1).write.mode("overwrite")\
    .parquet("gold_local/siniestros_wide_features_model.parquet")

feat_model.coalesce(1).write.mode("overwrite").option("header",True)\
    .csv("gold_local/siniestros_wide_features_model_csv")


## 10) Estadísticas de verificación 

In [44]:
from pyspark.sql import functions as F

# 4.1 conteos por año y top regiones
feat_model.groupBy("year").agg(
    F.count("*").alias("rows"),
    F.sum(TOTAL_COL).alias("total_siniestros")
).orderBy("year").show(50)

feat_model.groupBy("region").agg(
    F.sum(TOTAL_COL).alias("total_siniestros"),
    F.avg(TOTAL_COL).alias("avg_anual")
).orderBy(F.desc("total_siniestros")).show(20)

# 4.2 distribución del target (global)
feat_model.agg(
    F.count("*").alias("n"),
    F.min(TOTAL_COL).alias("min"),
    F.expr(f"percentile({TOTAL_COL}, 0.5)").alias("p50"),
    F.expr(f"percentile({TOTAL_COL}, 0.9)").alias("p90"),
    F.max(TOTAL_COL).alias("max"),
    F.avg(TOTAL_COL).alias("mean"),
    F.stddev(TOTAL_COL).alias("std")
).show()

# 4.3 chequeo de sumas de proporciones (franjas/días) ~ 1 por fila
prop_franja = [c for c in feat_model.columns if c.startswith("prop__siniestros_por_franja_horaria__")]
prop_dia    = [c for c in feat_model.columns if c.startswith("prop__siniestros_por_dia__")]

def sum_expr(cols):
    if not cols: return F.lit(None).cast("double")
    e = F.coalesce(F.col(cols[0]), F.lit(0.0))
    for c in cols[1:]:
        e = e + F.coalesce(F.col(c), F.lit(0.0))
    return e

if prop_franja:
    feat_model.select(
        F.avg(F.abs(sum_expr(prop_franja) - F.lit(1.0))).alias("avg_abs_err_franjas"),
        F.expr("percentile(abs(({})) , 0.95)".format(" + ".join([f"coalesce({c},0)" for c in prop_franja]) + " - 1")).alias("p95_abs_err_franjas")
    ).show()

if prop_dia:
    feat_model.select(
        F.avg(F.abs(sum_expr(prop_dia) - F.lit(1.0))).alias("avg_abs_err_dias"),
        F.expr("percentile(abs(({})) , 0.95)".format(" + ".join([f"coalesce({c},0)" for c in prop_dia]) + " - 1")).alias("p95_abs_err_dias")
    ).show()

# 4.4 regresión base: pred vs real (sanity)
pred.orderBy(F.desc("prediction")).select("year","region","label","prediction").show(15, truncate=False)


+----+----+----------------+
|year|rows|total_siniestros|
+----+----+----------------+
|2008|  24|         85094.0|
|2009|  25|         86026.0|
|2010|  25|         83653.0|
|2011|  25|         84871.0|
|2012|  25|         94923.0|
|2013|  25|        101762.0|
|2014|  25|        101104.0|
|2015|  25|         95532.0|
|2016|  25|         89304.0|
|2017|  25|         88168.0|
|2018|  24|         86880.0|
|2019|  25|         95800.0|
|2020|  25|         57396.0|
|2021|  23|         71609.0|
|2022|  25|         83897.0|
|2023|  25|         87083.0|
+----+----+----------------+

+-----------+----------------+------------------+
|     region|total_siniestros|         avg_anual|
+-----------+----------------+------------------+
|       LIMA|        719652.0|           47976.8|
|     CALLAO|         93343.0| 6222.866666666667|
|   AREQUIPA|         82919.0|         5182.4375|
|LA LIBERTAD|         68427.0|         4276.6875|
|      PIURA|         47724.0|           2982.75|
|      CUSCO|      

In [47]:
from pyspark.sql import functions as F

# Esperado: 25 regiones por año
expect = (feat_model.select("year","region").groupBy("year")
          .agg(F.countDistinct("region").alias("n_regions"))
          .orderBy("year"))
expect.show(50)

faltantes = (feat_model.groupBy("year")
  .agg(
      F.count("*").alias("rows"),
      F.countDistinct("region").alias("n_regions"),
      F.sum((F.col("siniestros_total__total")<=0).cast("int")).alias("total_le_0")
  )
  .orderBy("year"))
faltantes.show(50)

# Lista explícita de (year, region) faltantes respecto a catálogo de regiones presentes globalmente:
regiones_all = [r.region for r in feat_model.select("region").distinct().collect()]
years_all = [r.year for r in feat_model.select("year").distinct().orderBy("year").collect()]
from itertools import product
full = spark.createDataFrame([(y, r) for y, r in product(years_all, regiones_all)], ["year","region"])
missing_rows = (full.join(feat_model.select("year","region"), ["year","region"], "left_anti")
                     .orderBy("year","region"))
missing_rows.show(100, truncate=False)


+----+---------+
|year|n_regions|
+----+---------+
|2008|       24|
|2009|       25|
|2010|       25|
|2011|       25|
|2012|       25|
|2013|       25|
|2014|       25|
|2015|       25|
|2016|       25|
|2017|       25|
|2018|       24|
|2019|       25|
|2020|       25|
|2021|       23|
|2022|       25|
|2023|       25|
+----+---------+

+----+----+---------+----------+
|year|rows|n_regions|total_le_0|
+----+----+---------+----------+
|2008|  24|       24|         0|
|2009|  25|       25|         0|
|2010|  25|       25|         0|
|2011|  25|       25|         0|
|2012|  25|       25|         0|
|2013|  25|       25|         0|
|2014|  25|       25|         0|
|2015|  25|       25|         0|
|2016|  25|       25|         0|
|2017|  25|       25|         0|
|2018|  24|       24|         0|
|2019|  25|       25|         0|
|2020|  25|       25|         0|
|2021|  23|       23|         0|
|2022|  25|       25|         0|
|2023|  25|       25|         0|
+----+----+---------+----------+

In [48]:
from pyspark.sql import functions as F

TOTAL_COL = "siniestros_total__total"
MESES = ["ENERO","FEBRERO","MARZO","ABRIL","MAYO","JUNIO","JULIO","AGOSTO",
         "SETIEMBRE","SEPTIEMBRE","OCTUBRE","NOVIEMBRE","DICIEMBRE"]

# 1) Cargar "feat" crudo (antes del filtrado de modelado)
try:
    feat_full = spark.read.parquet("gold_local/siniestros_wide_features.parquet")
except Exception as e:
    # Si no existe, usa feat_model como fallback (no ideal para diagnóstico de causas)
    print("[WARN] No encontré gold_local/siniestros_wide_features.parquet; uso feat_model.")
    feat_full = feat_model

# 2) Reglas de exclusión usadas en tu pipeline "safe"
prop_cols_franja = [c for c in feat_full.columns if c.startswith("prop__siniestros_por_franja_horaria__")]
prop_cols_dia    = [c for c in feat_full.columns if c.startswith("prop__siniestros_por_dia__")]

def sum_expr(cols):
    e = F.lit(0.0)
    for c in cols:
        e = e + F.coalesce(F.col(c), F.lit(0.0))
    return e

feat_diag = (feat_full
    .withColumn("region_up", F.upper(F.trim("region")))
    .withColumn("is_month", F.col("region_up").isin(MESES))
    .withColumn("is_total_like", F.col("region_up").rlike(r'^TOTAL(\s+ANUAL|\s+NACIONAL|\s*GENERAL|\s+SINIESTROS.*)?$'))
    .withColumn("total_le_0", (F.col(TOTAL_COL) <= 0).cast("int"))
    .withColumn("sum_props_franja", sum_expr(prop_cols_franja) if prop_cols_franja else F.lit(None).cast("double"))
    .withColumn("sum_props_dia",    sum_expr(prop_cols_dia)    if prop_cols_dia    else F.lit(None).cast("double"))
    .withColumn("prop_out_of_range",
                F.when(
                   F.greatest(
                     *[F.when((F.col(c) < 0) | (F.col(c) > 1), F.lit(1)).otherwise(F.lit(0)) for c in (prop_cols_franja + prop_cols_dia)]
                   ) == 1, 1
                ).otherwise(0) if (prop_cols_franja or prop_cols_dia) else F.lit(0))
    .select("year","region", TOTAL_COL, "is_month","is_total_like","total_le_0","prop_out_of_range",
            "sum_props_franja","sum_props_dia")
)

# 3) “Razón” compacta
feat_diag = feat_diag.withColumn(
    "reason",
    F.when(F.col("is_month"), F.lit("exclude_month"))
     .when(F.col("is_total_like"), F.lit("exclude_total"))
     .when(F.col("total_le_0") == 1, F.lit("total_le_0"))
     .when(F.col("prop_out_of_range") == 1, F.lit("prop_out_of_range"))
     .otherwise(F.lit("kept"))
)

# 4) Ver qué pasó con los (year,region) que tú listaste como faltantes
missing_list = [(2008,"TACNA"), (2018,"LIMA"), (2021,"CALLAO"), (2021,"HUANCAVELICA")]
missing_df = spark.createDataFrame(missing_list, ["year","region"])

print("=== Diagnóstico de faltantes (razón) ===")
(missing_df.join(feat_diag, ["year","region"], "left")
  .orderBy("year","region")
  .show(truncate=False))


=== Diagnóstico de faltantes (razón) ===
+----+------------+-----------------------+--------+-------------+----------+-----------------+------------------+------------------+-----------------+
|year|region      |siniestros_total__total|is_month|is_total_like|total_le_0|prop_out_of_range|sum_props_franja  |sum_props_dia     |reason           |
+----+------------+-----------------------+--------+-------------+----------+-----------------+------------------+------------------+-----------------+
|2008|TACNA       |243.0                  |false   |false        |0         |1                |4.267489711934156 |1.0               |prop_out_of_range|
|2018|LIMA        |3176.0                 |false   |false        |0         |1                |15.534005037783375|15.534005037783375|prop_out_of_range|
|2021|CALLAO      |2773.0                 |false   |false        |0         |1                |12.927515326361341|1.0               |prop_out_of_range|
|2021|HUANCAVELICA|242.0                  |fals

In [None]:
# Columnas de conteos (no-prop) por franja/día para reconstruir TOTAL si hace falta
cnt_franja = [c for c in feat_full.columns
              if c.startswith("siniestros_por_franja_horaria__") and not c.startswith("prop__")]
cnt_dia    = [c for c in feat_full.columns
              if c.startswith("siniestros_por_dia__") and not c.startswith("prop__")]

def sum_safe(cols):
    e = F.lit(0.0)
    for c in cols:
        e = e + F.coalesce(F.col(c), F.lit(0.0))
    return e

feat_repair = (feat_full
  .withColumn("sum_cnt_franja", sum_safe(cnt_franja) if cnt_franja else F.lit(None).cast("double"))
  .withColumn("sum_cnt_dia",    sum_safe(cnt_dia)    if cnt_dia    else F.lit(None).cast("double"))
  .join(feat_diag.select("year","region","prop_out_of_range","sum_props_franja","sum_props_dia"),
        ["year","region"], "left")
)

# regla: si hay prop_out_of_range y 0.8<= sum_props_* <=1.2 => recalcular TOTAL a partir del conjunto “más completo”
feat_repair = feat_repair.withColumn(
    "recalc_total",
    F.when( (F.col("prop_out_of_range")==1) &
            ( (F.col("sum_props_franja").between(0.8,1.2)) | (F.col("sum_props_dia").between(0.8,1.2)) ),
        F.when( F.col("sum_props_franja").isNotNull(), F.col("sum_cnt_franja") )
         .otherwise( F.col("sum_cnt_dia") )
    )
)

# Aplicar reparación si produce un total > 0
feat_repair = feat_repair.withColumn(
    TOTAL_COL,
    F.when(F.col("recalc_total").isNotNull() & (F.col("recalc_total")>0), F.col("recalc_total"))
     .otherwise(F.col(TOTAL_COL))
).drop("recalc_total","sum_cnt_franja","sum_cnt_dia")

# Recalcular proporciones después del cambio de TOTAL
def recompute_props(df, props, base):
    out = df
    for c in props:
        raw = c.replace("prop__", "")
        out = out.withColumn(c, F.when(F.col(base)>0, F.col(raw)/F.col(base)).otherwise(F.lit(None)))
    return out

if prop_cols_franja or prop_cols_dia:
    feat_repair = recompute_props(feat_repair, prop_cols_franja + prop_cols_dia, TOTAL_COL)

# Ahora re-etiquetar y repetir conteo n_regions por año
re_diag = (feat_repair
  .withColumn("region_up", F.upper(F.trim("region")))
  .withColumn("is_month", F.col("region_up").isin(MESES))
  .withColumn("is_total_like", F.col("region_up").rlike(r'^TOTAL(\s+ANUAL|\s+NACIONAL|\s*GENERAL|\s+SINIESTROS.*)?$'))
  .withColumn("total_le_0", (F.col(TOTAL_COL) <= 0).cast("int"))
  .withColumn("prop_out_of_range",
      F.when(
        F.greatest(*[F.when((F.col(c) < 0) | (F.col(c) > 1), F.lit(1)).otherwise(F.lit(0))
                     for c in (prop_cols_franja + prop_cols_dia)]) == 1, 1
      ).otherwise(0) if (prop_cols_franja or prop_cols_dia) else F.lit(0))
)

print("=== n_regions por año tras intento de reparación ===")
(re_diag
 .filter(~F.col("is_month") & ~F.col("is_total_like") & (F.col("total_le_0")==0) & (F.col("prop_out_of_range")==0))
 .groupBy("year").agg(F.countDistinct("region").alias("n_regions")).orderBy("year")
 .show(50))


In [None]:
CANDIDATE = (re_diag
  .filter(~F.col("is_month") & ~F.col("is_total_like") & (F.col("total_le_0")==0) & (F.col("prop_out_of_range")==0))
  .select("year","region", TOTAL_COL)
)

# Assert: 25 regiones por año
completitud = CANDIDATE.groupBy("year").agg(F.countDistinct("region").alias("n_regions")).orderBy("year")
completitud.show(50)

# Si algún año < 25, listar faltantes remanentes:
reg_all = [r.region for r in CANDIDATE.select("region").distinct().collect()]
yrs_all = [r.year for r in CANDIDATE.select("year").distinct().orderBy("year").collect()]
from itertools import product
full2 = spark.createDataFrame([(y, r) for y, r in product(yrs_all, reg_all)], ["year","region"])
missing_after = (full2.join(CANDIDATE.select("year","region"), ["year","region"], "left_anti")
                       .orderBy("year","region"))
missing_after.show(100, truncate=False)


## 10) CORRECCION DE PERDIDA DE DATOS

In [49]:
from pyspark.sql import functions as F

TOTAL_COL = "siniestros_total__total"

# 1) Identificar columnas de conteo y proporciones
cnt_franja_all = [c for c in feat_full.columns
                  if c.startswith("siniestros_por_franja_horaria__") and not c.startswith("prop__")]
cnt_dia_all    = [c for c in feat_full.columns
                  if c.startswith("siniestros_por_dia__") and not c.startswith("prop__")]

prop_franja_all = [c for c in feat_full.columns if c.startswith("prop__siniestros_por_franja_horaria__")]
prop_dia_all    = [c for c in feat_full.columns if c.startswith("prop__siniestros_por_dia__")]

# 2) Separar franjas en "sub-bandas" (finas) vs "bandas amplias"
# Heurística: sub-bandas suelen tener el patrón "__.._01_a_.._00" (intervalos de 2 horas típicamente)
import re
is_fine = lambda name: bool(re.search(r"__\d{2}_\d{2}_a_\d{2}_\d{2}$", name)) and ("_01_a_" in name)
cnt_franja_fine   = [c for c in cnt_franja_all if is_fine(c)]
cnt_franja_broad  = [c for c in cnt_franja_all if c not in cnt_franja_fine]

# 3) Elegimos **solo sub-bandas** y descartamos bandas amplias para evitar doble conteo
cnt_franja = cnt_franja_fine if cnt_franja_fine else cnt_franja_all

# 4) Sumas seguras (coalesce)
def sum_safe(cols):
    e = F.lit(0.0)
    for c in cols:
        e = e + F.coalesce(F.col(c), F.lit(0.0))
    return e

feat_fix = (feat_full
    .withColumn("sum_cnt_dia",    sum_safe(cnt_dia_all)    if cnt_dia_all    else F.lit(None).cast("double"))
    .withColumn("sum_cnt_franja", sum_safe(cnt_franja)     if cnt_franja     else F.lit(None).cast("double"))
)

# 5) Recalcular TOTAL si detectamos desalineo grave en proporciones (p.ej. suma > 1.2)
# Preferimos reconstruir desde días (más estable). Si no hay dí­as, caemos a franjas-finas.
feat_fix = feat_fix.withColumn(
    "re_total",
    F.when( (F.col("sum_cnt_dia").isNotNull()) & (F.col("sum_cnt_dia") > 0), F.col("sum_cnt_dia"))
     .when( (F.col("sum_cnt_franja").isNotNull()) & (F.col("sum_cnt_franja") > 0), F.col("sum_cnt_franja"))
)
# Regla de aplicación: solo cuando las proporciones estaban "malas" (sumas >>1 o <<1) o TOTAL<=0
# Para eso primero necesitamos estimar sumas de proporciones existentes:
def sum_props(df, props):
    if not props: return df.withColumn("sum_props_tmp", F.lit(None).cast("double"))
    e = F.coalesce(F.col(props[0]), F.lit(0.0))
    for c in props[1:]:
        e = e + F.coalesce(F.col(c), F.lit(0.0))
    return df.withColumn("sum_props_tmp", e)

df_tmp = sum_props(feat_full, prop_dia_all)
df_tmp = sum_props(df_tmp.drop("sum_props_tmp"), prop_franja_all)  # no necesitamos ambas simultáneamente aquí

# Para simplificar, aplicamos reparación si TOTAL<=0 o si hay evidencia de sumas >> 1
feat_fix = (feat_fix
    .withColumn(
        TOTAL_COL,
        F.when( (F.col(TOTAL_COL) <= 0) |
                (F.col("sum_cnt_dia").isNotNull() & (F.col("sum_cnt_dia") > 0) &
                 ( (F.col("sum_cnt_dia")/F.col(TOTAL_COL) > 1.2) | (F.col(TOTAL_COL)/F.col("sum_cnt_dia") > 1.2) )
                ) |
                (F.col("sum_cnt_franja").isNotNull() & (F.col("sum_cnt_franja") > 0) &
                 ( (F.col("sum_cnt_franja")/F.col(TOTAL_COL) > 1.2) | (F.col(TOTAL_COL)/F.col("sum_cnt_franja") > 1.2) )
                ),
            F.col("re_total")
        ).otherwise(F.col(TOTAL_COL))
    )
    .drop("re_total")
)

# 6) Recalcular proporciones desde conteos / nuevo TOTAL
def recompute_props(df, cnt_cols, prefix="prop__"):
    out = df
    for c in cnt_cols:
        prop_col = prefix + c  # ej: prop__siniestros_por_franja_horaria__...
        out = out.withColumn(prop_col, F.when(F.col(TOTAL_COL) > 0, F.col(c)/F.col(TOTAL_COL)).otherwise(F.lit(None)))
    return out

feat_fix = recompute_props(feat_fix, cnt_dia_all)
feat_fix = recompute_props(feat_fix, cnt_franja)   # solo finas

# 7) (Opcional) dropear proporciones que provienen de bandas amplias para evitar confusión
prop_franja_broad = [ "prop__" + c for c in cnt_franja_broad ]
feat_fix = feat_fix.drop(*prop_franja_broad)

# 8) Re-diagnóstico rápido de sumas de proporciones
prop_franja_new = [c for c in feat_fix.columns if c.startswith("prop__siniestros_por_franja_horaria__")]
prop_dia_new    = [c for c in feat_fix.columns if c.startswith("prop__siniestros_por_dia__")]

def sum_expr(cols):
    e = F.lit(0.0)
    for c in cols:
        e = e + F.coalesce(F.col(c), F.lit(0.0))
    return e

diag = feat_fix.select(
    "year","region",
    sum_expr(prop_franja_new).alias("sum_props_franja"),
    sum_expr(prop_dia_new).alias("sum_props_dia")
)
diag.orderBy("year","region").show(10, truncate=False)

# 9) Chequeo de completitud (25 regiones/año), excluyendo meses/totales si aún existiesen
MESES = ["ENERO","FEBRERO","MARZO","ABRIL","MAYO","JUNIO","JULIO","AGOSTO","SETIEMBRE","SEPTIEMBRE","OCTUBRE","NOVIEMBRE","DICIEMBRE"]
feat_ready = (feat_fix
  .withColumn("region_up", F.upper(F.trim("region")))
  .filter(~F.col("region_up").isin(MESES))
  .drop("region_up")
)
feat_ready.groupBy("year").agg(F.countDistinct("region").alias("n_regions")).orderBy("year").show(50)


+----+------------+----------------+------------------+
|year|region      |sum_props_franja|sum_props_dia     |
+----+------------+----------------+------------------+
|2007|SETIEMBRE   |0.0             |0.0               |
|2008|AMAZONAS    |0.0             |1.0               |
|2008|ANCASH      |0.0             |1.0               |
|2008|APURIMAC    |0.0             |0.9999999999999999|
|2008|AREQUIPA    |0.0             |1.0               |
|2008|AYACUCHO    |0.0             |1.0               |
|2008|CAJAMARCA   |0.0             |0.9999999999999999|
|2008|CALLAO      |0.0             |1.0               |
|2008|CUSCO       |0.0             |0.9999999999999999|
|2008|HUANCAVELICA|0.0             |1.0               |
+----+------------+----------------+------------------+
only showing top 10 rows
+----+---------+
|year|n_regions|
+----+---------+
|2008|       27|
|2009|       27|
|2010|       27|
|2011|       27|
|2012|       27|
|2013|       27|
|2014|       27|
|2015|       27|
|201

In [50]:
from pyspark.sql import functions as F

MESES = ["ENERO","FEBRERO","MARZO","ABRIL","MAYO","JUNIO",
         "JULIO","AGOSTO","SETIEMBRE","SEPTIEMBRE","OCTUBRE",
         "NOVIEMBRE","DICIEMBRE"]

feat_ready = (feat_fix
    .withColumn("region_up", F.upper(F.trim("region")))
    .filter(~F.col("region_up").isin(MESES))                                # quita meses
    .filter(~F.col("region_up").rlike(r'^TOTAL(\s|$)|^TOTAL.*SINIESTROS'))  # quita totales
    .drop("region_up")
)
feat_ready.groupBy("year").agg(F.countDistinct("region").alias("n_regions")).orderBy("year").show(50)


+----+---------+
|year|n_regions|
+----+---------+
|2008|       25|
|2009|       25|
|2010|       25|
|2011|       25|
|2012|       25|
|2013|       25|
|2014|       25|
|2015|       25|
|2016|       25|
|2017|       25|
|2018|       25|
|2019|       25|
|2020|       25|
|2021|       25|
|2022|       25|
|2023|       25|
+----+---------+



In [51]:
mapeo = {
  "LIMA METROPOLITANA": "LIMA",
  "PROV. CONST. DEL CALLAO": "CALLAO",
  # agrega variantes que veas…
}
from pyspark.sql.types import MapType, StringType
m = F.create_map([F.lit(k) for kv in mapeo.items() for k in kv])

feat_ready = (feat_ready
    .withColumn("region_norm_up", F.upper(F.trim("region")))
    .withColumn("region_norm_up", F.coalesce(m[F.col("region_norm_up")], F.col("region_norm_up")))
    .drop("region")
    .withColumnRenamed("region_norm_up", "region")
)
feat_ready.groupBy("year").agg(F.countDistinct("region").alias("n_regions")).orderBy("year").show(50)


+----+---------+
|year|n_regions|
+----+---------+
|2008|       25|
|2009|       25|
|2010|       25|
|2011|       25|
|2012|       25|
|2013|       25|
|2014|       25|
|2015|       25|
|2016|       25|
|2017|       25|
|2018|       25|
|2019|       25|
|2020|       25|
|2021|       25|
|2022|       25|
|2023|       25|
+----+---------+



In [None]:
from pyspark.sql import functions as F

# 1) Mira los tipos actuales
print("dtypes:", feat_model.select("year","region").dtypes)
feat_model.select("year","region").show(5, truncate=False)

# 2) Normaliza feat_model (mismos tipos y limpieza)
feat_norm = (feat_model
    .withColumn("year_i",  F.col("year").cast("int"))
    .withColumn("region_u",
        F.upper(F.trim(F.regexp_replace(F.col("region"), r"\s+", " ")))
    )
)

# 3) Pide las 4 parejas con los mismos tipos/normalización
pairs = [(2008,"TACNA"), (2018,"LIMA"), (2021,"CALLAO"), (2021,"HUANCAVELICA")]
pairs_df = spark.createDataFrame(pairs, ["year_i","region_u"])

probe_exact = (feat_norm
    .join(pairs_df, ["year_i","region_u"], "inner")
    .select(F.col("year_i").alias("year"),
            F.col("region_u").alias("region"),
            F.col("siniestros_total__total"))
)

print("=== Coincidencias exactas tras normalizar ===")
probe_exact.orderBy("year","region").show(50, truncate=False)

# 4) Si alguna pareja sigue sin aparecer, busca "casi coincidencias"
print("=== Diagnóstico de casi-coincidencias por año ===")
for y, r in pairs:
    print(f">>> Buscando en {y} algo parecido a '{r}'")
    (feat_norm
        .filter(F.col("year_i")==y)
        .select("region", "region_u")
        .distinct()
        .filter(F.col("region_u").rlike(r.replace(" ", r"\\s+")))  # patrón flexible en espacios
        .show(20, truncate=False)
    )

# 5) (Opcional) lista qué regiones hay por cada año de interés
print("=== Regiones disponibles por año (muestra) ===")
(feat_norm
 .filter(F.col("year_i").isin([2008, 2018, 2021]))
 .groupBy("year_i")
 .agg(F.collect_set("region_u").alias("regiones"))
 .orderBy("year_i")
 .show(truncate=False))


dtypes: [('year', 'int'), ('region', 'string')]
+----+--------+
|year|region  |
+----+--------+
|2008|AMAZONAS|
|2009|AMAZONAS|
|2010|AMAZONAS|
|2011|AMAZONAS|
|2012|AMAZONAS|
+----+--------+
only showing top 5 rows
=== Coincidencias exactas tras normalizar ===
+----+------+-----------------------+
|year|region|siniestros_total__total|
+----+------+-----------------------+
+----+------+-----------------------+

=== Diagnóstico de casi-coincidencias por año ===
>>> Buscando en 2008 algo parecido a 'TACNA'
+------+--------+
|region|region_u|
+------+--------+
+------+--------+

>>> Buscando en 2018 algo parecido a 'LIMA'
+------+--------+
|region|region_u|
+------+--------+
+------+--------+

>>> Buscando en 2021 algo parecido a 'CALLAO'
+------+--------+
|region|region_u|
+------+--------+
+------+--------+

>>> Buscando en 2021 algo parecido a 'HUANCAVELICA'
+------+--------+
|region|region_u|
+------+--------+
+------+--------+

=== Regiones disponibles por año (muestra) ===
+------+-

25/10/05 15:15:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 933995 ms exceeds timeout 120000 ms
25/10/05 15:15:44 WARN SparkContext: Killing executors is not supported by current scheduler.
25/10/05 15:15:46 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

## 11) VALIDACION FINAL Y EXPORTACION DE DATOS FINALES PARA PROCESO DE ML Y PREDICCION

In [53]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# =========================================
# 0) Config & helpers
# =========================================
try:
    TOTAL_COL  # si ya existe, úsalo
except NameError:
    TOTAL_COL = "siniestros_total__total"  # cambia si tu target tiene otro nombre

MONTHS_ES = {
    "ENERO","FEBRERO","MARZO","ABRIL","MAYO","JUNIO",
    "JULIO","AGOSTO","SETIEMBRE","SEPTIEMBRE","OCTUBRE",
    "NOVIEMBRE","DICIEMBRE"
}
EPS = 1e-12

def array_sum(cols):
    """Suma segura de columnas numéricas con coalesce(.,0.0)."""
    return F.aggregate(
        F.array(*[F.coalesce(F.col(c), F.lit(0.0)) for c in cols]),
        F.lit(0.0),
        lambda acc, x: acc + x
    )

# =========================================
# 1) Filtrado básico de filas “no-región”
#    (meses / totales) en un solo paso simple
# =========================================
clean = (
    feat_model
    .withColumn("region_u", F.upper(F.trim(F.col("region"))))
    .filter(~F.col("region_u").isin(list(MONTHS_ES)))
    .filter(~F.col("region_u").rlike(r"^TOTAL(\s|$)"))
    .drop("region_u")
)

# =========================================
# 2) Listas de columnas de proporciones
# =========================================
prop_dia = [c for c in clean.columns if c.startswith("prop__siniestros_por_dia__")]
prop_fra = [c for c in clean.columns if c.startswith("prop__siniestros_por_franja_horaria__")]

# =========================================
# 3) Construcción de sumas/flags en columnas intermedias
#    (sin anidar CASE WHEN)
# =========================================
clean = (
    clean
    .withColumn("sum_props_dia",    array_sum(prop_dia)    if prop_dia else F.lit(0.0))
    .withColumn("sum_props_franja", array_sum(prop_fra)    if prop_fra else F.lit(0.0))
    .withColumn("has_day_props",    (F.col("sum_props_dia")    > 0).cast("boolean"))
    .withColumn("has_fra_props",    (F.col("sum_props_franja") > 0).cast("boolean"))
    .withColumn("total_pos",        (F.col(TOTAL_COL) > 0).cast("boolean"))
)

# =========================================
# 4) MATERIALIZAR para cortar el plan
# =========================================
clean = clean.persist()
_ = clean.count()  # fuerza evaluación

# =========================================
# 5) Normalización simple y segura de proporciones
#    - Clip a [0,1] primero (para quitar outliers)
#    - Si la suma > 0, renormaliza cada grupo a que sumen 1
# =========================================
# 5.1 Clip a [0, 1]
for c in prop_dia:
    clean = clean.withColumn(c, F.when(F.col(c).isNull(), None)
                                 .otherwise(F.when(F.col(c) < 0, 0.0)
                                                .when(F.col(c) > 1, 1.0)
                                                .otherwise(F.col(c))))
for c in prop_fra:
    clean = clean.withColumn(c, F.when(F.col(c).isNull(), None)
                                 .otherwise(F.when(F.col(c) < 0, 0.0)
                                                .when(F.col(c) > 1, 1.0)
                                                .otherwise(F.col(c))))

# Recalcular sumas tras el clip
clean = (
    clean
    .withColumn("sum_props_dia",    array_sum(prop_dia)    if prop_dia else F.lit(0.0))
    .withColumn("sum_props_franja", array_sum(prop_fra)    if prop_fra else F.lit(0.0))
    .withColumn("has_day_props",    (F.col("sum_props_dia")    > 0).cast("boolean"))
    .withColumn("has_fra_props",    (F.col("sum_props_franja") > 0).cast("boolean"))
)

# 5.2 Renormalizar (dividir por la suma del grupo, si > 0)
if prop_dia:
    for c in prop_dia:
        clean = clean.withColumn(
            c,
            F.when(F.col("sum_props_dia") > EPS, F.col(c) / F.col("sum_props_dia")).otherwise(F.col(c))
        )

if prop_fra:
    for c in prop_fra:
        clean = clean.withColumn(
            c,
            F.when(F.col("sum_props_franja") > EPS, F.col(c) / F.col("sum_props_franja")).otherwise(F.col(c))
        )

# =========================================
# 6) DIAGNÓSTICO: errores de suma y rango tras normalización
# =========================================
if prop_dia:
    diag_dia = (
        clean.select(
            F.avg(F.abs(array_sum(prop_dia) - F.lit(1.0))).alias("avg_abs_err_dias"),
            F.expr("percentile(abs(({})) , 0.95)".format(" + ".join([f"coalesce({c},0)" for c in prop_dia]) + " - 1")).alias("p95_abs_err_dias")
        )
    )
    diag_dia.show(truncate=False)

if prop_fra:
    diag_fra = (
        clean.select(
            F.avg(F.abs(array_sum(prop_fra) - F.lit(1.0))).alias("avg_abs_err_franjas"),
            F.expr("percentile(abs(({})) , 0.95)".format(" + ".join([f"coalesce({c},0)" for c in prop_fra]) + " - 1")).alias("p95_abs_err_franjas")
        )
    )
    diag_fra.show(truncate=False)

# =========================================
# 7) Validaciones clave (sin anidar CASE):
#    - 25 regiones por año
#    - No totales <= 0
#    - Listado de (year, region) faltantes si existieran
# =========================================
expect = (clean.select("year","region").groupBy("year")
          .agg(F.countDistinct("region").alias("n_regions"))
          .orderBy("year"))
expect.show(50)

faltantes = (clean.groupBy("year")
  .agg(
      F.count("*").alias("rows"),
      F.countDistinct("region").alias("n_regions"),
      F.sum((F.col(TOTAL_COL) <= 0).cast("int")).alias("total_le_0")
  )
  .orderBy("year"))
faltantes.show(50)

# Catálogo completo para chequear faltantes explícitos
regiones_all = [r.region for r in clean.select("region").distinct().collect()]
years_all = [r.year for r in clean.select("year").distinct().orderBy("year").collect()]

from itertools import product
full = spark.createDataFrame([(y, r) for y, r in product(years_all, regiones_all)], ["year","region"])
missing_rows = (full.join(clean.select("year","region"), ["year","region"], "left_anti")
                     .orderBy("year","region"))
print("=== Missing (year, region) vs. catálogo global ===")
missing_rows.show(100, truncate=False)

# =========================================
# 8) (Opcional) Exportar data limpia
#    - Ajusta OUTPUT_PATH a tu ruta destino
#    - En Parquet (particionado por year/region si quieres)
# =========================================
OUTPUT_PATH = "dbfs:/mnt/tu_contenedor/clean/feat_model_clean_v2"  # <-- cambia a tu ruta
# OUTPUT_PATH = "/tmp/feat_model_clean_v2"  # local driver (si no usas DBFS)

# Elimina columnas auxiliares si no las quieres en el dataset final
aux_cols = ["sum_props_dia", "sum_props_franja", "has_day_props", "has_fra_props", "total_pos"]
final_cols = [c for c in clean.columns if c not in aux_cols]

clean.select(final_cols).write.mode("overwrite").parquet(OUTPUT_PATH)
print(f"[OK] Data limpia exportada a: {OUTPUT_PATH}")


+----------------------+---------------------+
|avg_abs_err_dias      |p95_abs_err_dias     |
+----------------------+---------------------+
|2.7194856916323278E-17|2.220446049250313E-16|
+----------------------+---------------------+

+---------------------+---------------------+
|avg_abs_err_franjas  |p95_abs_err_franjas  |
+---------------------+---------------------+
|3.616635610521343E-17|2.220446049250313E-16|
+---------------------+---------------------+

+----+---------+
|year|n_regions|
+----+---------+
|2008|       24|
|2009|       25|
|2010|       25|
|2011|       25|
|2012|       25|
|2013|       25|
|2014|       25|
|2015|       25|
|2016|       25|
|2017|       25|
|2018|       24|
|2019|       25|
|2020|       25|
|2021|       23|
|2022|       25|
|2023|       25|
+----+---------+

+----+----+---------+----------+
|year|rows|n_regions|total_le_0|
+----+----+---------+----------+
|2008|  24|       24|         0|
|2009|  25|       25|         0|
|2010|  25|       25|      

Py4JJavaError: An error occurred while calling o9494.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "dbfs"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3581)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3612)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
	at org.apache.spark.sql.execution.datasources.DataSource.makeQualified(DataSource.scala:125)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:468)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:554)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:118)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:369)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
