<table border="1" width="99%">
  <tr>
    <td bgcolor="#48a259">
      <h1 style="color: #FFFFFF; text-align: center;">Modelamiento de Accidentes Viales</h1>
    </td>
  </tr>
</table>

In [1]:
import pandas as pd
import geopandas as gpd

In [2]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.window import Window

In [3]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType
)

In [4]:
from pyspark.sql import functions as F

In [5]:
from pyspark.sql.functions import (
    col, lit, when, row_number, lpad, concat, upper, trim, regexp_replace,
    translate, ltrim, isnull, to_date, year, month, dayofmonth, date_format,
    coalesce, to_timestamp, hour, minute, unix_timestamp, regexp_extract, split, monotonically_increasing_id, create_map
)
import csv

In [6]:
spark = SparkSession.builder \
    .appName("Accidentes Peru") \
    .master("local[2]") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()


<table width="99%">
  <tr>
    <td bgcolor="#FFBA39">
      <h2 style="color: #000000; text-align: left;">Modelamiento Dimensional</h2>
    </td>
  </tr>
</table>

### Siniestro Viales

In [7]:
df_siniestros = spark.read.csv(
    "Accidentes/BBDD ONSV - SINIESTROS 2021-2023.csv",
    header=True,
    inferSchema=True,
    sep=","
)

In [8]:
df_personas = spark.read.csv(
    "Accidentes/BBDD ONSV - PERSONAS 2021-2023.csv",
    header=True,
    inferSchema=True,
    sep=","
)

In [9]:
df_vehiculos = spark.read.csv(
    "Accidentes/BBDD ONSV - VEHICULOS 2021-2023.csv",
    header=True,
    inferSchema=True,
    sep=","
)

### Dimension Ubigeo

In [10]:
# --- 1. Crear dimensi√≥n de clasificaci√≥n de municipalidades (solo CSV de referencia) ---
clasificacion_map = {
    "A": "Municipalidades provinciales pertenecientes a ciudades principales",
    "B": "Municipalidades provinciales NO pertenecientes a ciudades principales",
    "C": "Municipalidades distritales de Lima Metropolitana",
    "D": "Municipalidades distritales pertenecientes a otras ciudades principales",
    "E": "Municipalidades distritales NO pertenecientes a ciudades principales, con m√°s de 70% de poblaci√≥n urbana",
    "F": "Municipalidades distritales NO pertenecientes a ciudades principales, con poblaci√≥n urbana entre 35% y 70%",
    "G": "Municipalidades distritales NO pertenecientes a ciudades principales, con menos de 35% de poblaci√≥n urbana"
}

# Guardar CSV de dimensi√≥n de clasificaci√≥n
dim_clas_muni = pd.DataFrame([
    {"clasificacion_municipalidad": k, "clasificacion_muni_desc": v}
    for k, v in clasificacion_map.items()
])
dim_clas_muni.to_csv(
    "Dimensiones/dim_clasificacion_muni.csv",
    index=False,
    quoting=csv.QUOTE_ALL,
    encoding="utf-8"
)

print("CSV de dimensi√≥n de clasificaci√≥n generado exitosamente")

CSV de dimensi√≥n de clasificaci√≥n generado exitosamente


In [11]:
# --- 2. Leer RENAMU ---
renamu_df = spark.read.csv(
    "Renamu/Base_RENAMU_2022_f.csv",
    header=True,
    inferSchema=True,
    sep=";"
)

# Seleccionar columnas principales
dim_ubigeo = renamu_df.select(
    lpad(col("Ubigeo").cast("string"), 6, "0").alias("ubigeo"),
    col("Departamento").alias("departamento"),
    col("ccdd").alias("codigo_departamento"),    
    col("Provincia").alias("provincia"),
    col("ccpp").alias("codigo_provincia"),
    col("Distrito").alias("distrito"),
    col("ccdi").alias("codigo_distrito"),
    col("TIPOMUNI").cast("int").alias("tipomuni")
)

# --- 3. Leer regi√≥n natural desde Excel ---
excel_df = pd.read_excel("Ubigeo/UBIGEO 2022_1891 distritos.xlsx", dtype=str)
excel_df = excel_df.rename(columns={"IDDIST": "ubigeo", "REGION NATURAL": "region_natural"})
excel_df["ubigeo"] = excel_df["ubigeo"].str.zfill(6)
region_df = spark.createDataFrame(excel_df[["ubigeo", "region_natural"]])

# --- 4. Leer clasificaci√≥n municipalidad A-G ---
clasif_df = spark.read.csv(
    "Ubigeo/clasificacion_municipalidades_abcg.csv",
    header=True,
    inferSchema=True,
    sep=","
).withColumn("ubigeo", lpad(col("ubigeo").cast("string"), 6, "0")) \
 .withColumnRenamed("clasificacion_muni", "clasificacion_municipalidad")

# --- 5. Unir UBIGEO con regi√≥n y clasificaci√≥n ---
dim_ubigeo = dim_ubigeo \
    .join(region_df, on="ubigeo", how="left") \
    .join(clasif_df, on="ubigeo", how="left")

# --- 6. Descripci√≥n de TIPOMUNI ---
dim_ubigeo = dim_ubigeo.withColumn(
    "tipo_muni_desc",
    when(col("tipomuni") == 1, "Municipalidad Provincial")
    .when(col("tipomuni") == 2, "Municipalidad Distrital")
    .when(col("tipomuni") == 3, "Municipalidad de Centro Poblado")
    .otherwise("No especificado")
)

# --- 7. ID autoincremental ---
window = Window.orderBy("ubigeo")
dim_ubigeo = dim_ubigeo.withColumn("id_ubigeo", row_number().over(window))

In [12]:
# --- 8. Cargar poblaci√≥n 2022 ---
df_pob = pd.read_excel(
    "./Ubigeo/Proyecciones_poblacion_2018_2022.xlsx",
    skiprows=1,
    dtype={'UBIGEO': str}
)
df_pob.columns = df_pob.columns.str.strip()
df_pob["UBIGEO"] = df_pob["UBIGEO"].fillna("").str.strip().str.zfill(6)
df_pob = df_pob[df_pob["UBIGEO"].str.match(r"^\d{6}$", na=False)]
df_pob_2022 = df_pob[["UBIGEO", "2022"]].rename(columns={"2022": "habitantes"})
df_pob_2022["habitantes"] = pd.to_numeric(df_pob_2022["habitantes"], errors="coerce").fillna(0).astype(int)
poblacion_spark = spark.createDataFrame(df_pob_2022).withColumnRenamed("UBIGEO", "ubigeo")

# --- 9. Unir poblaci√≥n ---
dim_ubigeo = dim_ubigeo.join(poblacion_spark, on="ubigeo", how="left")

# --- 10. Reordenar columnas final ---
cols = dim_ubigeo.columns
ordered_cols = ["id_ubigeo", "ubigeo"] + [c for c in cols if c not in ("id_ubigeo", "ubigeo")]
dim_ubigeo = dim_ubigeo.select(ordered_cols)


In [13]:
# --- 11. Convertir a Pandas y guardar CSV ---
df_pd = dim_ubigeo.toPandas()
df_pd["ubigeo"] = df_pd["ubigeo"].astype(str).str.zfill(6)
df_pd.to_csv("Dimensiones/dim_ubigeo.csv", index=False)

print("CSV de dimensi√≥n UBIGEO generado exitosamente")

CSV de dimensi√≥n UBIGEO generado exitosamente


### Dimension Tiempo

In [14]:
df_tiempo = df_siniestros \
    .withColumn("fecha", to_date(col("FECHA SINIESTRO"), "dd/MM/yyyy")) \
    .withColumn("hora_raw", trim(col("HORA SINIESTRO"))) \
    .withColumn("hora_only", regexp_extract(col("hora_raw"), "([0-9]{1,2}:[0-9]{2}(?::[0-9]{2})?)", 1)) \
    .withColumn("hora_norm",
        when(col("hora_only") != "", col("hora_only")).otherwise(col("hora_raw"))
    ) \
    .withColumn("hora_norm",
        when(col("hora_norm").rlike("^[0-9]{1,2}:[0-9]{2}$"),
             concat(
                 lpad(split(col("hora_norm"), ":").getItem(0), 2, "0"),
                 lit(":"),
                 split(col("hora_norm"), ":").getItem(1),
                 lit(":00")
             )
        ).otherwise(col("hora_norm"))
    ) \
    .withColumn("timestamp_completo",
        to_timestamp(concat(date_format(col("fecha"), "yyyy-MM-dd"), lit(" "), col("hora_norm")),
                     "yyyy-MM-dd HH:mm:ss")
    ) \
    .withColumn("hora_num", hour(col("timestamp_completo"))) \
    .withColumn("minuto_num", minute(col("timestamp_completo"))) \
    .withColumn("fecha_hora_unix", unix_timestamp(col("timestamp_completo"))) \
    .withColumn("anio", year(col("fecha"))) \
    .withColumn("mes", month(col("fecha"))) \
    .withColumn("dia", dayofmonth(col("fecha"))) \
    .withColumn("dia_semana", date_format(col("fecha"), "EEEE")) \
    .withColumn("mes_str", lpad(col("mes").cast("string"), 2, "0")) \
    .withColumn("pk_tiempo",
        when(col("timestamp_completo").isNotNull(),
             date_format(col("timestamp_completo"), "yyyyMMddHHmmss")
        ).otherwise(
             concat(
                 col("anio").cast("string"),
                 col("mes_str"),
                 lpad(col("dia").cast("string"), 2, "0"),
                 lit("000000")
             )
        )
    ) \
    .withColumn("trimestre",
        when(col("mes").between(1,3), 1)
        .when(col("mes").between(4,6), 2)
        .when(col("mes").between(7,9), 3)
        .otherwise(4)
    ) \
    .withColumn("semestre", when(col("mes") <= 6, 1).otherwise(2)) \
    .withColumn("nombre_mes", date_format(col("fecha"), "MMMM"))

dim_tiempo = df_tiempo.select(
    "pk_tiempo",
    "fecha",
    "anio",
    "mes",
    "dia",
    "dia_semana",
    "fecha_hora_unix",
    "mes_str",
    "trimestre",
    "semestre",
    "nombre_mes"
).distinct().orderBy("fecha", "fecha_hora_unix")

print(f"\nTotal registros en dim_tiempo: {dim_tiempo.count()}")

dim_tiempo_pd = dim_tiempo.toPandas()
dim_tiempo_pd.to_csv("Dimensiones/dim_tiempo.csv", index=False)


Total registros en dim_tiempo: 6548


### Dim Red Vial

In [15]:
# --- 0 Leer CSV DIRECTAMENTE 

df_red_vial = spark.read.csv(
    "Red_Vial/RedVial_2024_clean.csv",
    header=True,
    inferSchema=True,
    sep=",",
    encoding="utf-8"
)


# ================================
# 1. Selecci√≥n de columnas √∫tiles
# ================================
dim_red_vial = df_red_vial.select(
    F.col("Id").alias("cod_red_vial"),
    F.col("cCodRuta").alias("codigo_ruta"),
    F.col("cNomRuta").alias("nombre_ruta"),
    F.col("cIdTramo").alias("id_tramo"),
    F.col("dkmInicio").alias("km_inicio"),
    F.col("dkmFinal").alias("km_final"),
    F.col("dLongitud").alias("longitud_km"),
    F.col("dNroCarril").alias("nro_carriles"),
    F.col("dAncCalzad").alias("ancho_calzada"),
    F.col("cTipRed").alias("tipo_red_raw"),
    F.col("cClasifica").alias("clasificacion_raw"),
    F.col("cAutopista").alias("es_autopista_raw"),
    F.col("cSentido").alias("sentido_raw"),
    F.col("cCodDepart").alias("cod_departamento"),
    F.col("cDepartame").alias("departamento"),
    F.col("cCodProvin").alias("cod_provincia"),
    F.col("cRegion").alias("region"),
    F.col("cTopografi").alias("topografia_raw"),
    F.col("cEstado").alias("estado_via_raw"),
    F.col("dSuperfici").alias("superficie_km2"),
    F.col("dLogistico").alias("logistico"),
    F.col("cOperacion").alias("operacion"),
    F.col("dIMD").alias("imd"),
    F.col("cPeajes").alias("peajes")
)

# ================================
# 2. Normalizaciones
# ================================
# Tipo de red
dim_red_vial = dim_red_vial.withColumn(
    "tipo_red",
    F.when(F.lower("tipo_red_raw") == "rn", "Red Nacional")
     .when(F.lower("tipo_red_raw") == "rv", "Red Vial")
     .otherwise("Otro")
)

# Clasificaci√≥n
dim_red_vial = dim_red_vial.withColumn(
    "clasificacion",
    F.when(F.lower("clasificacion_raw").like("%nacional%"), "Nacional")
     .when(F.lower("clasificacion_raw").like("%ramal%"), "Ramal")
     .when(F.lower("clasificacion_raw").like("%transversal%"), "Transversal")
     .otherwise("Otro")
)

# Autopista
dim_red_vial = dim_red_vial.withColumn(
    "es_autopista",
    F.when(F.col("es_autopista_raw").isin(["1", 1, "Si", "SI"]), "Si")
     .otherwise("No")
)

# Sentido
dim_red_vial = dim_red_vial.withColumn(
    "sentido",
    F.when(F.lower("sentido_raw") == "uc", "Unidireccional")
     .when(F.lower("sentido_raw") == "db", "Bidireccional")
     .otherwise("Desconocido")
)

# Topograf√≠a
dim_red_vial = dim_red_vial.withColumn(
    "topografia",
    F.when(F.lower("topografia_raw") == "sierra", "Sierra")
     .when(F.lower("topografia_raw") == "costa", "Costa")
     .when(F.lower("topografia_raw") == "selva", "Selva")
     .otherwise("Otro")
)

# Estado de la v√≠a
dim_red_vial = dim_red_vial.withColumn(
    "estado_via",
    F.when(F.lower("estado_via_raw") == "monta√±oso", "Monta√±oso")
     .when(F.lower("estado_via_raw") == "plano", "Plano")
     .otherwise("Otro")
)

# ================================
# 3. ID Autogenerado
# ================================
window = Window.orderBy(F.monotonically_increasing_id())
dim_red_vial = dim_red_vial.withColumn(
    "id_red_vial",
    F.row_number().over(window)
)

# ================================
# 4. Selecci√≥n final limpia
# ================================
dim_red_vial = dim_red_vial.select(
    "id_red_vial",
    "cod_red_vial",
    "codigo_ruta",
    "nombre_ruta",
    "id_tramo",
    "km_inicio",
    "km_final",
    "longitud_km",
    "nro_carriles",
    "ancho_calzada",
    "tipo_red",
    "clasificacion",
    "es_autopista",
    "sentido",
    "cod_departamento",
    "departamento",
    "cod_provincia",
    "region",
    "topografia",
    "estado_via",
    "superficie_km2",
    "logistico",
    "operacion",
    "imd",
    "peajes"
)

# ================================
# 5. Guardado
# ================================
dim_red_vial.toPandas().to_csv(
    "Dimensiones/dim_red_vial.csv",
    index=False
)

### Dimension Personas

In [16]:

# ================================
# 2. Selecci√≥n de columnas de dimensi√≥n
# ================================

dim_personas = df_personas.select(
    F.col("C√ìDIGO SINIESTRO").alias("cod_siniestro"),
    F.col("C√ìDIGO PERSONA").alias("cod_persona"),
    F.col("SEXO").alias("sexo_raw"),
    F.col("EDAD").cast("int").alias("edad"),
    F.col("TIPO PERSONA").alias("tipo_persona_raw"),
    F.col("GRAVEDAD").alias("gravedad_raw"),
    F.col("SITUACI√ìN DE PERSONA").alias("situacion_persona_raw"),
    F.col("PA√çS DE NACIONALIDAD").alias("nacionalidad"),
    F.col("OTRO PA√çS DE NACIONALIDAD").alias("nacionalidad_otro"),
    F.col("POSEE LICENCIA").alias("posee_licencia_raw"),
    F.col("ESTADO LICENCIA").alias("estado_licencia"),
    F.col("CLASE_LICENCIA").alias("clase_licencia"),
    F.col("¬øSE SOMETI√ì A DOSAJE ET√çLICO CUALITATIVO?").alias("dosaje_cualit_raw"),
    F.col("RESULTADO DEL DOSAJE ET√çLICO CUALITATIVO").alias("dosaje_cualit_res"),
    F.col("¬øSE SOMETI√ì A DOSAJE ET√çLICO CUANTITATIVO?").alias("dosaje_cuantit_raw")
)

# ================================
# 3. Normalizaci√≥n
# ================================

# Sexo
dim_personas = dim_personas.withColumn(
    "sexo",
    F.when(F.lower("sexo_raw").like("%masc%"), "M")
     .when(F.lower("sexo_raw").like("%fem%"), "F")
     .otherwise("O")
)

# Tipo persona
dim_personas = dim_personas.withColumn(
    "tipo_persona",
    F.when(F.lower("tipo_persona_raw").like("%conductor%"), "Conductor")
     .when(F.lower("tipo_persona_raw").like("%pasaj%"), "Pasajero")
     .when(F.lower("tipo_persona_raw").like("%peat%"), "Peat√≥n")
     .otherwise("Otro")
)

# Gravedad
dim_personas = dim_personas.withColumn(
    "gravedad",
    F.when(F.lower("gravedad_raw").like("%falle%"), "Fallecido")
     .when(F.lower("gravedad_raw").like("%lesion%"), "Herido")
     .otherwise("Ileso")
)

# Situaci√≥n
dim_personas = dim_personas.withColumn(
    "situacion_persona",
    F.when(F.lower("situacion_persona_raw").like("%ident%"), "Identificado")
     .otherwise("No identificado")
)

# Posee licencia
dim_personas = dim_personas.withColumn(
    "tiene_licencia",
    F.when(F.lower("posee_licencia_raw") == "si", "Si")
     .when(F.lower("posee_licencia_raw") == "no", "No")
     .otherwise("Desconocido")
)

# Dosaje cualitativo
dim_personas = dim_personas.withColumn(
    "dosaje_cualit",
    F.when(F.lower("dosaje_cualit_raw") == "si", "Si")
     .when(F.lower("dosaje_cualit_raw") == "no", "No")
     .otherwise("Desconocido")
)

# Dosaje cuantitativo
dim_personas = dim_personas.withColumn(
    "dosaje_cuantit",
    F.when(F.lower("dosaje_cuantit_raw") == "si", "Si")
     .when(F.lower("dosaje_cuantit_raw") == "no", "No")
     .otherwise("Desconocido")
)

# Grupo de edad
dim_personas = dim_personas.withColumn(
    "grupo_edad",
    F.when(F.col("edad") < 12, "Ni√±o")
     .when((F.col("edad") >= 12) & (F.col("edad") <= 17), "Adolescente")
     .when((F.col("edad") >= 18) & (F.col("edad") <= 59), "Adulto")
     .when(F.col("edad") >= 60, "Adulto mayor")
     .otherwise("No especificado")
)

dim_personas = dim_personas.withColumn("edad", F.col("edad").cast("int"))

# ================================
# 4. ID Autogenerado
# ================================

window = Window.orderBy(F.monotonically_increasing_id())

dim_personas = dim_personas.withColumn(
    "id_persona",
    F.row_number().over(window)
)

# ================================
# 5. Selecci√≥n final
# ================================

dim_personas = dim_personas.select(
    "id_persona",
    "cod_siniestro",  # ‚Üê CLAVE: Ahora est√° incluido
    "cod_persona",
    "sexo",
    "edad",
    "grupo_edad",
    "tipo_persona",
    "gravedad",
    "situacion_persona",
    "nacionalidad",
    "nacionalidad_otro",
    "tiene_licencia",
    "estado_licencia",
    "clase_licencia",
    "dosaje_cualit",
    "dosaje_cualit_res",
    "dosaje_cuantit"
)

# ================================
# 6. Guardar resultado
# ================================
pdf = dim_personas.toPandas()
pdf["edad"] = pdf["edad"].astype("Int64")  # entero con null permitido
pdf.to_csv("Dimensiones/dim_personas.csv", index=False)


### Dimension Vehiculo

In [17]:
# ================================
# 2. Selecci√≥n de columnas √∫tiles
# ================================
dim_vehiculo = df_vehiculos.select(
    F.col("C√ìDIGO SINIESTRO").alias("cod_siniestro"),
    F.col("C√ìDIGO VEHICULO").alias("cod_vehiculo"),
    F.col("SITUACI√ìN VEH√çCULO").alias("situacion_raw"),
    F.col("ESTADO MODALIDAD").alias("estado_modalidad"),
    F.col("MODALIDAD DE TRANSPORTE").alias("modalidad_raw"),
    F.col("ELEMENTO TRANSPORTADO").alias("elemento_transportado"),
    F.col("AMBITO SERVICIO").alias("ambito_servicio"),
    F.col("POSEE SEGURO").alias("posee_seguro_raw"),
    F.col("ESTADO SOAT").alias("estado_soat"),
    F.col("TIPO SEGURO").alias("tipo_seguro"),
    F.col("COMPA√ëIA SEGURO").alias("compania_seguro"),
    F.col("POSEE CITV").alias("posee_citv_raw"),
    F.col("ESTADO CITV").alias("estado_citv"),
    F.col("LUGAR IMPACTO VEH√çCULO").alias("lugar_impacto"),
    F.col("VEH√çCULO").alias("tipo_vehiculo_raw"),
    F.col("TIPO SINIESTRO").alias("tipo_siniestro_raw"),
    F.col("TIPO DE V√çA").alias("tipo_via_raw")
)

# ================================
# 3. Normalizaciones
# ================================

# Situaci√≥n del veh√≠culo
dim_vehiculo = dim_vehiculo.withColumn(
    "situacion",
    F.when(F.lower("situacion_raw").like("%ident%"), "Identificado")
     .otherwise("No identificado")
)

# Modalidad
dim_vehiculo = dim_vehiculo.withColumn(
    "modalidad",
    F.when(F.lower("modalidad_raw").like("%particular%"), "Particular")
     .when(F.lower("modalidad_raw").like("%carga%"), "Carga")
     .when(F.lower("modalidad_raw").like("%pasaj%"), "Pasajeros")
     .otherwise("Otro")
)

# Posee seguro
dim_vehiculo = dim_vehiculo.withColumn(
    "posee_seguro",
    F.when(F.lower("posee_seguro_raw") == "si", "Si")
     .when(F.lower("posee_seguro_raw") == "no", "No")
     .otherwise("Desconocido")
)

# CITV
dim_vehiculo = dim_vehiculo.withColumn(
    "posee_citv",
    F.when(F.lower("posee_citv_raw") == "si", "Si")
     .when(F.lower("posee_citv_raw") == "no", "No")
     .otherwise("Desconocido")
)

# Tipo de veh√≠culo
dim_vehiculo = dim_vehiculo.withColumn(
    "tipo_vehiculo",
    F.when(F.lower("tipo_vehiculo_raw").like("%moto%"), "Motocicleta")
     .when(F.lower("tipo_vehiculo_raw").like("%camion%"), "Cami√≥n")
     .when(F.lower("tipo_vehiculo_raw").like("%auto%"), "Autom√≥vil")
     .when(F.lower("tipo_vehiculo_raw").like("%pickup%"), "Pickup")
     .otherwise("Otro")
)

# Normalizaci√≥n tipo de v√≠a
dim_vehiculo = dim_vehiculo.withColumn(
    "tipo_via",
    F.when(F.lower("tipo_via_raw").like("%carretera%"), "Carretera")
     .when(F.lower("tipo_via_raw").like("%avenida%"), "Avenida")
     .when(F.lower("tipo_via_raw").like("%calle%"), "Calle")
     .otherwise("Otro")
)

# ================================
# 4. ID Autogenerado
# ================================
window = Window.orderBy(F.monotonically_increasing_id())

dim_vehiculo = dim_vehiculo.withColumn(
    "id_vehiculo",
    F.row_number().over(window)
)

# ================================
# 5. Selecci√≥n final limpia
# ================================
dim_vehiculo = dim_vehiculo.select(
    "id_vehiculo",
    "cod_vehiculo",
    "cod_siniestro",
    "situacion",
    "estado_modalidad",
    "modalidad",
    "elemento_transportado",
    "ambito_servicio",
    "posee_seguro",
    "estado_soat",
    "tipo_seguro",
    "compania_seguro",
    "posee_citv",
    "estado_citv",
    "lugar_impacto",
    "tipo_vehiculo",
    "tipo_via"
)

# ================================
# 6. Guardado final
# ================================
dim_vehiculo.toPandas().to_csv(
    "Dimensiones/dim_vehiculo.csv",
    index=False
)


### Dimension tipo Siniestro

In [18]:
# ================================
# 1. Selecci√≥n de columnas fuente
# ================================
dim_tipo_siniestro = df_siniestros.select(
    F.col("CLASE SINIESTRO").alias("clase_siniestro_raw")
).dropDuplicates()


# ================================
# 2. Normalizaci√≥n de CLASE DE SINIESTRO
# ================================
dim_tipo_siniestro = dim_tipo_siniestro.withColumn(
    "clase_siniestro",
    F.when(F.lower("clase_siniestro_raw").like("%despis%"), "Despiste")
     .when(F.lower("clase_siniestro_raw").like("%atrop%") & F.lower("clase_siniestro_raw").like("%fuga%"), "Atropello (Fuga)")
     .when(F.lower("clase_siniestro_raw").like("%atrop%"), "Atropello")
     .when(F.lower("clase_siniestro_raw").like("%choque%") & F.lower("clase_siniestro_raw").like("%fuga%"), "Choque (Fuga)")
     .when(F.lower("clase_siniestro_raw").like("%choque%") & F.lower("clase_siniestro_raw").like("%objeto%"), "Choque con objeto fijo")
     .when(F.lower("clase_siniestro_raw").like("%choque%"), "Choque")
     .when(F.lower("clase_siniestro_raw").like("%caida%"), "Ca√≠da de pasajero")
     .when(F.lower("clase_siniestro_raw").like("%volc%"), "Volcadura")
     .when(F.lower("clase_siniestro_raw").like("%ferro%"), "Ferroviario")
     .when(F.lower("clase_siniestro_raw").like("%incend%"), "Incendio")
     .when(F.lower("clase_siniestro_raw").like("%especial%"), "Especial")
     .otherwise(F.initcap("clase_siniestro_raw"))
)

# ================================
# 3. ID Autogenerado
# ================================
window = Window.orderBy(F.monotonically_increasing_id())

dim_tipo_siniestro = dim_tipo_siniestro.withColumn(
    "id_tipo_siniestro",
    F.row_number().over(window)
)

# ================================
# 4. Selecci√≥n final
# ================================
dim_tipo_siniestro = dim_tipo_siniestro.select(
    "id_tipo_siniestro",
    "clase_siniestro"
)

In [19]:
# ================================
# 5. Guardado final
# ================================
dim_tipo_siniestro.toPandas().to_csv(
    "Dimensiones/dim_tipo_siniestro.csv",
    index=False
)

### Dim Causa Siniestro

In [20]:
# ===============================
# 1. Selecci√≥n
# ===============================
dim_causas = df_siniestros.select(
    F.col("CAUSA FACTOR PRINCIPAL").alias("causa_factor_raw"),
    F.col("CAUSA ESPEC√çFICA").alias("causa_especifica_raw")
).dropDuplicates()

# ===============================
# Helper: funci√≥n para detectar vac√≠os
# ===============================
def is_null_or_empty(col):
    return (F.col(col).isNull()) | (F.trim(F.col(col)) == "") | (F.lower(F.col(col)).isin(
        "", "no identifica la causa", "no cuenta con causa especifica",
        "en proceso de investigaci√≥n", "no aplica", "-", "null"
    ))

# ===============================
# 2. Normalizaci√≥n: CAUSA FACTOR PRINCIPAL
# ===============================
dim_causas = dim_causas.withColumn(
    "causa_factor",
    F.when(is_null_or_empty("causa_factor_raw"), "No identificado")
     .when(F.lower("causa_factor_raw").like("%imprudencia del conductor%"), "Conductor ‚Äì Imprudencia")
     .when(F.lower("causa_factor_raw").like("%negligencia del conductor%"), "Conductor ‚Äì Negligencia")
     .when(F.lower("causa_factor_raw").like("%impericia del conductor%"), "Conductor ‚Äì Impericia")
     .when(F.lower("causa_factor_raw").like("%imprudencia del peat%"), "Peat√≥n ‚Äì Imprudencia")
     .when(F.lower("causa_factor_raw").like("%pasajero%"), "Pasajero ‚Äì Imprudencia")
     .when(F.lower("causa_factor_raw").like("%infraestructura%"), "Entorno / Infraestructura")
     .when(F.lower("causa_factor_raw").like("%investigaci%"), "No identificado")
     .otherwise(F.initcap("causa_factor_raw"))
)

# ===============================
# 3. Normalizaci√≥n: CATEGOR√çA CAUSA (AMPLIADA)
# ===============================
dim_causas = dim_causas.withColumn(
    "categoria_causa",
    F.when(is_null_or_empty("causa_especifica_raw"), "No identificado")
     .when(F.lower("causa_especifica_raw").like("%ebriedad%"), "Alcohol / Drogas")
     .when(F.lower("causa_especifica_raw").like("%drogadicci√≥n%"), "Alcohol / Drogas")
     .when(F.lower("causa_especifica_raw").like("%velocidad%"), "Velocidad / Conducci√≥n temeraria")
     .when(F.lower("causa_especifica_raw").like("%sentido contrario%"), "Maniobras indebidas")
     .when(F.lower("causa_especifica_raw").like("%girar%"), "Maniobras indebidas")
     .when(F.lower("causa_especifica_raw").like("%invas%"), "Maniobras indebidas")
     .when(F.lower("causa_especifica_raw").like("%adelant%"), "Maniobras indebidas")
     .when(F.lower("causa_especifica_raw").like("%derecho%"), "Maniobras indebidas")
     .when(F.lower("causa_especifica_raw").like("%preferente%"), "Maniobras indebidas")
     .when(F.lower("causa_especifica_raw").like("%frenar%"), "Maniobras indebidas")
     .when(F.lower("causa_especifica_raw").like("%desacato%"), "Desacato a se√±alizaci√≥n")
     .when(F.lower("causa_especifica_raw").like("%no encender%"), "Desacato a se√±alizaci√≥n")
     .when(F.lower("causa_especifica_raw").like("%dispositiv%"), "Distracci√≥n")
     .when(F.lower("causa_especifica_raw").like("%fatiga%"), "Distracci√≥n / Estado del conductor")
     .when(F.lower("causa_especifica_raw").like("%cansancio%"), "Distracci√≥n / Estado del conductor")
     .when(F.lower("causa_especifica_raw").like("%falla%"), "Fallas mec√°nicas")
     .when(F.lower("causa_especifica_raw").like("%neum√°t%"), "Fallas mec√°nicas")
     .when(F.lower("causa_especifica_raw").like("%carrocer%"), "Fallas mec√°nicas")
     .when(F.lower("causa_especifica_raw").like("%suspensi%"), "Fallas mec√°nicas")
     .when(F.lower("causa_especifica_raw").like("%freno%"), "Fallas mec√°nicas")
     .when(F.lower("causa_especifica_raw").like("%el√©ctric%"), "Fallas mec√°nicas")
     .when(F.lower("causa_especifica_raw").like("%direcci%"), "Fallas mec√°nicas")
     .when(F.lower("causa_especifica_raw").like("%pasajero%"), "Conducta del pasajero")
     .when(F.lower("causa_especifica_raw").like("%subir%"), "Conducta del pasajero")
     .when(F.lower("causa_especifica_raw").like("%bajar%"), "Conducta del pasajero")
     .when(F.lower("causa_especifica_raw").like("%cruce%"), "Conducta del peat√≥n")
     .when(F.lower("causa_especifica_raw").like("%otro.*peat%"), "Conducta del peat√≥n")
     .when(F.lower("causa_especifica_raw").like("%carga%"), "Carga / objetos")
     .when(F.lower("causa_especifica_raw").like("%mercanc%"), "Carga / objetos")
     .when(F.lower("causa_especifica_raw").like("%objeto%"), "Carga / objetos")
     .when(F.lower("causa_especifica_raw").like("%exceso%"), "Carga / objetos")
     .when(F.lower("causa_especifica_raw").like("%superficie%"), "Entorno / V√≠a")
     .when(F.lower("causa_especifica_raw").like("%se√±alizaci√≥n%"), "Entorno / V√≠a")
     .when(F.lower("causa_especifica_raw").like("%estacionado%"), "Entorno / V√≠a")
     .when(F.lower("causa_especifica_raw").like("%obras%"), "Entorno / V√≠a")
     .when(F.lower("causa_especifica_raw").like("%ambiental%"), "Entorno / V√≠a")
     .when(F.lower("causa_especifica_raw").like("%abandono%"), "Entorno / V√≠a")
     .otherwise("Otros")
)

# Guardamos causa espec√≠fica limpia
dim_causas = dim_causas.withColumn(
    "causa_especifica",
    F.when(is_null_or_empty("causa_especifica_raw"), "No identificado")
     .otherwise(F.initcap("causa_especifica_raw"))
)

# ===============================
# 4. ID
# ===============================
window = Window.orderBy(F.monotonically_increasing_id())
dim_causas = dim_causas.withColumn("id_causa", F.row_number().over(window))

# ===============================
# 5. Selecci√≥n final
# ===============================
dim_causas = dim_causas.select(
    "id_causa",
    "causa_factor",
    "categoria_causa",
    "causa_especifica"
)

# ===============================
# 6. Deduplicar filas "No identificado" completas
# ===============================
dim_causas = dim_causas.dropDuplicates(["causa_factor", "categoria_causa", "causa_especifica"])

In [21]:
# ================================
# 7. Guardado final
# ================================
dim_causas.toPandas().to_csv(
    "Dimensiones/dim_causa_siniestro.csv",
    index=False
)

### Dimension Infraestructura

In [22]:
from pyspark.sql.functions import col, upper, trim, regexp_replace, monotonically_increasing_id

def normalizar(col_name):
    return upper(trim(regexp_replace(col_name, r'\s+', ' ')))

# Selecci√≥n y normalizaci√≥n de columnas
df_siniestros = df_siniestros.withColumnRenamed("EXISTE CICLOV√çA", "EXISTE_CICLOVIA") \
                             .withColumnRenamed("¬øEXISTE SE√ëAL VERTICAL?", "EXISTE_SE√ëAL_VERTICAL") \
                             .withColumnRenamed("¬øEXISTE SE√ëAL HORIZONTAL?", "EXISTE_SE√ëAL_HORIZONTAL") \
                             .withColumnRenamed("SUPERFICIE DE CALZADA", "SUPERFICIE_DE_CALZADA") \
                             .withColumnRenamed("PERFIL LONGITUDINAL V√çA", "PERFIL_LONGITUDINAL_VIA") \
                             .withColumnRenamed("CONDICI√ìN CLIM√ÅTICA", "CONDICION_CLIMATICA") \
                             .withColumnRenamed("CARACTER√çSTICAS DE V√çA", "CARACTERISTICAS_DE_VIA")

# Reemplazar nulos por valores por defecto
df_infra = df_siniestros.select(
    normalizar(col("EXISTE_CICLOVIA")).alias("existe_ciclovia"),
    normalizar(col("EXISTE_SE√ëAL_VERTICAL")).alias("senal_vertical"),
    normalizar(col("EXISTE_SE√ëAL_HORIZONTAL")).alias("senal_horizontal"),
    normalizar(col("SUPERFICIE_DE_CALZADA")).alias("tipo_superficie"),    
    normalizar(col("PERFIL_LONGITUDINAL_VIA")).alias("perfil_via"),
    normalizar(col("CONDICION_CLIMATICA")).alias("condicion_climatica"),
    normalizar(col("CARACTERISTICAS_DE_VIA")).alias("caracteristicas_via")
)

# Crear dimensi√≥n con ID √∫nico
df_dim_infraestructura = df_infra.dropDuplicates().withColumn(
    "id_infraestructura", monotonically_increasing_id()
)

# Reordenar para que id_infraestructura quede primero
columnas_ordenadas = ["id_infraestructura"] + [c for c in df_dim_infraestructura.columns if c != "id_infraestructura"]
df_dim_infraestructura = df_dim_infraestructura.select(columnas_ordenadas)

# Guardar CSV
df_dim_infraestructura.toPandas().to_csv("Dimensiones/dim_infraestructura_vial.csv", header=True, index=False)


### Hecho Siniestros

In [23]:
def normalizar_nombre(col_name):
    """Normaliza nombres geogr√°ficos: may√∫sculas, sin espacios extra"""
    return upper(trim(regexp_replace(col_name, r'\s+', ' ')))

# Normalizar en tabla de siniestros
df_siniestros_norm = df_siniestros.withColumn(
    "depto_norm", normalizar_nombre(col("DEPARTAMENTO"))
).withColumn(
    "prov_norm", normalizar_nombre(col("PROVINCIA"))
).withColumn(
    "dist_norm", normalizar_nombre(col("DISTRITO"))
)

# Normalizar en dimensi√≥n ubigeo
dim_ubigeo_norm = dim_ubigeo.withColumn(
    "depto_norm", normalizar_nombre(col("departamento"))
).withColumn(
    "prov_norm", normalizar_nombre(col("provincia"))
).withColumn(
    "dist_norm", normalizar_nombre(col("distrito"))
)

In [28]:
# ================================
# 0. RENOMBRAR COLUMNAS CON ESPACIOS/TILDES
# ================================
df_siniestros = df_siniestros \
    .withColumnRenamed("RED VIAL", "RED_VIAL") \
    .withColumnRenamed("C√ìDIGO SINIESTRO", "CODIGO_SINIESTRO") \
    .withColumnRenamed("FECHA SINIESTRO", "FECHA_SINIESTRO") \
    .withColumnRenamed("HORA SINIESTRO", "HORA_SINIESTRO") \
    .withColumnRenamed("CLASE SINIESTRO", "CLASE_SINIESTRO") \
    .withColumnRenamed("CANTIDAD DE FALLECIDOS", "CANTIDAD_DE_FALLECIDOS") \
    .withColumnRenamed("CANTIDAD DE LESIONADOS", "CANTIDAD_DE_LESIONADOS") \
    .withColumnRenamed("CANTIDAD DE VEHICULOS DA√ëADOS", "CANTIDAD_DE_VEHICULOS_DANADOS") \
    .withColumnRenamed("TIPO DE V√çA", "TIPO_DE_VIA") \
    .withColumnRenamed("CONDICI√ìN CLIM√ÅTICA", "CONDICION_CLIMATICA") \
    .withColumnRenamed("CAUSA FACTOR PRINCIPAL", "CAUSA_FACTOR_PRINCIPAL") \
    .withColumnRenamed("CAUSA ESPECIFICA", "CAUSA_ESPECIFICA") \
    .withColumnRenamed("ZONIFICACI√ìN", "ZONIFICACION") \
    .withColumnRenamed("EXISTE CICLOV√çA", "EXISTE_CICLOVIA") \
    .withColumnRenamed("¬øEXISTE SE√ëAL VERTICAL?", "EXISTE_SE√ëAL_VERTICAL") \
    .withColumnRenamed("¬øEXISTE SE√ëAL HORIZONTAL?", "EXISTE_SE√ëAL_HORIZONTAL") \
    .withColumnRenamed("SUPERFICIE DE CALZADA", "SUPERFICIE_DE_CALZADA") \
    .withColumnRenamed("PERFIL LONGITUDINAL V√çA", "PERFIL_LONGITUDINAL_VIA") \
    .withColumnRenamed("CARACTER√çSTICAS DE V√çA", "CARACTERISTICAS_DE_VIA")

# ================================
# 0.1 CREAR DIMENSI√ìN INFRAESTRUCTURA
# ================================
def normalizar(col_name):
    """Normaliza texto: may√∫sculas, sin espacios extra"""
    return upper(trim(regexp_replace(col_name, r'\s+', ' ')))

df_dim_infraestructura = df_siniestros.select(
    normalizar(coalesce(col("EXISTE_CICLOVIA"), lit("NO"))).alias("existe_ciclovia"),
    normalizar(coalesce(col("EXISTE_SE√ëAL_VERTICAL"), lit("NO"))).alias("senal_vertical"),
    normalizar(coalesce(col("EXISTE_SE√ëAL_HORIZONTAL"), lit("NO"))).alias("senal_horizontal"),
    normalizar(coalesce(col("SUPERFICIE_DE_CALZADA"), lit("DESCONOCIDO"))).alias("tipo_superficie"),
    normalizar(coalesce(col("PERFIL_LONGITUDINAL_VIA"), lit("DESCONOCIDO"))).alias("perfil_via"),
    normalizar(coalesce(col("CONDICION_CLIMATICA"), lit("DESCONOCIDO"))).alias("condicion_climatica"),
    normalizar(coalesce(col("CARACTERISTICAS_DE_VIA"), lit("DESCONOCIDO"))).alias("caracteristicas_via")
).dropDuplicates()

window_infra = Window.orderBy(F.monotonically_increasing_id())
df_dim_infraestructura = df_dim_infraestructura.withColumn(
    "id_infraestructura", 
    F.row_number().over(window_infra)
)

columnas_ordenadas = ["id_infraestructura"] + [c for c in df_dim_infraestructura.columns if c != "id_infraestructura"]
df_dim_infraestructura = df_dim_infraestructura.select(columnas_ordenadas)
df_dim_infraestructura.toPandas().to_csv("Dimensiones/dim_infraestructura_vial.csv", header=True, index=False)
print(f"\n‚úì Dimensi√≥n infraestructura creada con {df_dim_infraestructura.count()} combinaciones √∫nicas")

# ================================
# 1. CORRECCI√ìN MANUAL PRE-NORMALIZACI√ìN
# ================================
correcciones_distritos = {
    "VEINTISEIS DE OCTUBRE": "VEINTIS√âIS DE OCTUBRE",
    "ANCO_HUALLO": "ANCO-HUALLO",
    "ANDRES AVELINO CACERES DORREGARAY": "ANDR√âS AVELINO C√ÅCERES DORREGARAY",
    "JOSE MARIA ARGUEDAS": "JOS√â MAR√çA ARGUEDAS",
    "MI PERU": "MI PER√ö",
    "QUISQUI (KICHKI)": "QUISQUI"
}

mapeo_clase_siniestro = {
    "ATROPELLO FUGA": "ATROPELLO (FUGA)",
    "CHOQUE FUGA": "CHOQUE (FUGA)"
}

df_siniestros_corr = df_siniestros

distrito_col = col("DISTRITO")
for sucio, limpio in correcciones_distritos.items():
    distrito_col = when(col("DISTRITO") == sucio, lit(limpio)).otherwise(distrito_col)
df_siniestros_corr = df_siniestros_corr.withColumn("DISTRITO", distrito_col)

clase_col = upper(col("CLASE_SINIESTRO"))
for sucio, limpio in mapeo_clase_siniestro.items():
    clase_col = when(upper(col("CLASE_SINIESTRO")) == sucio, lit(limpio)).otherwise(clase_col)
df_siniestros_corr = df_siniestros_corr.withColumn("CLASE_SINIESTRO_NORM", clase_col)

# ================================
# 2. NORMALIZACI√ìN GEOGR√ÅFICA
# ================================
def normalizar_nombre(col_name):
    return upper(trim(regexp_replace(col_name, r'\s+', ' ')))

df_siniestros_norm = df_siniestros_corr.withColumn(
    "depto_norm", normalizar_nombre(col("DEPARTAMENTO"))
).withColumn(
    "prov_norm", normalizar_nombre(col("PROVINCIA"))
).withColumn(
    "dist_norm", normalizar_nombre(col("DISTRITO"))
)

# ================================
# 3. NORMALIZAR CAUSAS
# ================================
def is_null_or_empty(col_name):
    return (F.col(col_name).isNull()) | (F.trim(F.col(col_name)) == "") | (F.lower(F.col(col_name)).isin(
        "", "no identifica la causa", "no cuenta con causa especifica",
        "en proceso de investigaci√≥n", "no aplica", "-", "null"
    ))

def normalizar_causa_factor(col_name):
    return (
        F.when(is_null_or_empty(col_name), "No identificado")
         .when(F.lower(F.col(col_name)) == "imprudencia del conductor", "Conductor ‚Äì Imprudencia")
         .when(F.lower(F.col(col_name)) == "negligencia del conductor", "Conductor ‚Äì Negligencia")
         .when(F.lower(F.col(col_name)) == "impericia del conductor", "Conductor ‚Äì Impericia")
         .when(F.lower(F.col(col_name)).like("%imprudencia del peat%"), "Peat√≥n ‚Äì Imprudencia")
         .when(F.lower(F.col(col_name)).like("%pasajero%"), "Pasajero ‚Äì Imprudencia")
         .when(F.lower(F.col(col_name)).like("%infraestructura%"), "Entorno / Infraestructura")
         .when(F.lower(F.col(col_name)).like("%investigaci%"), "No identificado")
         .otherwise(F.initcap(F.col(col_name)))
    )

df_siniestros_norm = df_siniestros_norm.withColumn(
    "causa_factor_norm",
    normalizar_causa_factor("CAUSA_FACTOR_PRINCIPAL")
).withColumn(
    "cod_carretera_norm", upper(trim(col("COD CARRETERA")))
)

# ================================
# 4. NORMALIZAR FECHA Y HORA
# ================================
df_siniestros_norm = df_siniestros_norm \
    .withColumn("fecha_sin", to_date(col("FECHA_SINIESTRO"), "dd/MM/yyyy")) \
    .withColumn("hora_raw", trim(col("HORA_SINIESTRO"))) \
    .withColumn("hora_only",
        regexp_extract(col("hora_raw"), "([0-9]{1,2}:[0-9]{2}(?::[0-9]{2})?)", 1)
    ) \
    .withColumn("hora_norm",
        when(col("hora_only") != "", col("hora_only")).otherwise(col("hora_raw"))
    ) \
    .withColumn("hora_norm",
        when(col("hora_norm").rlike("^[0-9]{1,2}:[0-9]{2}$"),
             concat(
                 lpad(split(col("hora_norm"), ":").getItem(0), 2, "0"),
                 lit(":"),
                 split(col("hora_norm"), ":").getItem(1),
                 lit(":00")
             )
        ).otherwise(col("hora_norm"))
    ) \
    .withColumn("timestamp_sin",
        to_timestamp(
            concat(
                date_format(col("fecha_sin"), "yyyy-MM-dd"), 
                lit(" "), 
                col("hora_norm")
            ),
            "yyyy-MM-dd HH:mm:ss"
        )
    ) \
    .withColumn("pk_tiempo_sin",
        when(col("timestamp_sin").isNotNull(),
            date_format(col("timestamp_sin"), "yyyyMMddHHmmss")
        ).otherwise(
            concat(
                date_format(col("fecha_sin"), "yyyy"),
                date_format(col("fecha_sin"), "MM"),
                date_format(col("fecha_sin"), "dd"),
                lit("000000")
            )
        )
    )

# ================================
# 5. VALIDACI√ìN INICIAL
# ================================
print("\n" + "="*60)
print("VALIDACI√ìN INICIAL DE DATOS")
print("="*60 + "\n")

total_siniestros_origen = df_siniestros_norm.count()
siniestros_unicos = df_siniestros_norm.select("CODIGO_SINIESTRO").distinct().count()

print(f"Total registros en origen: {total_siniestros_origen:,}")
print(f"C√≥digos de siniestro √∫nicos: {siniestros_unicos:,}")

if total_siniestros_origen != siniestros_unicos:
    print(f"‚ö†Ô∏è ADVERTENCIA: Hay {total_siniestros_origen - siniestros_unicos:,} duplicados en origen")
else:
    print("‚úÖ No hay duplicados en origen")

# Validar fallecidos esperados
fallecidos_origen = df_siniestros_norm.agg(F.sum("CANTIDAD_DE_FALLECIDOS")).collect()[0][0]
print(f"\nTotal fallecidos en origen: {fallecidos_origen:,}")
print("  (Esperado: ~10,000-20,000 para 2021-2023)\n")

# ================================
# 6. PREPARAR DIM RED VIAL SIN DUPLICADOS
# ================================
print("="*60)
print("PREPARANDO DIMENSI√ìN RED VIAL")
print("="*60 + "\n")

# An√°lisis de duplicados
dim_red_vial_temp = dim_red_vial.withColumn(
    "cod_ruta_norm", upper(trim(col("codigo_ruta")))
)

total_red_vial = dim_red_vial_temp.count()
codigos_unicos_red = dim_red_vial_temp.select("cod_ruta_norm").distinct().count()

print(f"Total registros en red vial: {total_red_vial:,}")
print(f"C√≥digos de ruta √∫nicos: {codigos_unicos_red:,}")
print(f"Tramos por c√≥digo (promedio): {total_red_vial / codigos_unicos_red:.2f}")

# Mostrar c√≥digos con m√°s tramos
print("\nüìä C√≥digos con m√°s tramos (TOP 10):")
dim_red_vial_temp.groupBy("cod_ruta_norm") \
    .count() \
    .orderBy(F.desc("count")) \
    .show(10, truncate=False)

# Crear versi√≥n √∫nica (tomar el primer registro por c√≥digo)
dim_red_vial_unique = dim_red_vial_temp \
    .withColumn("row_num", F.row_number().over(
        Window.partitionBy("cod_ruta_norm").orderBy("id_red_vial")
    )) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

print(f"\n‚úì Dimensi√≥n red vial √∫nica creada: {dim_red_vial_unique.count():,} registros")

# ================================
# 7. FACT TABLE - JOINS
# ================================
print("\n" + "="*60)
print("CONSTRUYENDO FACT TABLE")
print("="*60 + "\n")

# Join tiempo
print("1Ô∏è‚É£ Join con dim_tiempo...")
df_siniestros_fact = df_siniestros_norm.alias("s").join(
    dim_tiempo.alias("t"),
    col("s.pk_tiempo_sin") == col("t.pk_tiempo"),
    how="left"
)
print(f"   Registros: {df_siniestros_fact.count():,}")

# Join ubigeo
print("2Ô∏è‚É£ Join con dim_ubigeo...")
df_siniestros_fact = df_siniestros_fact.join(
    dim_ubigeo_norm.alias("u"),
    (col("s.depto_norm") == col("u.depto_norm")) &
    (col("s.prov_norm") == col("u.prov_norm")) &
    (col("s.dist_norm") == col("u.dist_norm")),
    how="left"
)
registros_post_ubigeo = df_siniestros_fact.count()
print(f"   Registros: {registros_post_ubigeo:,}")

if registros_post_ubigeo != total_siniestros_origen:
    print(f"   ‚ö†Ô∏è Diferencia: {registros_post_ubigeo - total_siniestros_origen:,}")

# Join tipo siniestro
print("3Ô∏è‚É£ Join con dim_tipo_siniestro...")
df_siniestros_fact = df_siniestros_fact.join(
    dim_tipo_siniestro.alias("ts"),
    upper(col("s.CLASE_SINIESTRO_NORM")) == upper(col("ts.clase_siniestro")),
    how="left"
)
print(f"   Registros: {df_siniestros_fact.count():,}")

# Join causa
print("4Ô∏è‚É£ Join con dim_causas...")
# Join con ambas columnas para match exacto
df_siniestros_fact = df_siniestros_fact.join(
    dim_causas.alias("c"),
    (col("s.causa_factor_norm") == col("c.causa_factor")) &
    (normalizar(col("s.CAUSA ESPEC√çFICA")) == normalizar(col("c.causa_especifica"))),
    how="left"
)
print(f"   Registros: {df_siniestros_fact.count():,}")

# Join infraestructura
print("5Ô∏è‚É£ Join con dim_infraestructura...")
df_siniestros_fact = df_siniestros_fact.join(
    df_dim_infraestructura.alias("i"),
    on=[
        normalizar(coalesce(col("s.EXISTE_CICLOVIA"), lit("NO"))) == col("i.existe_ciclovia"),
        normalizar(coalesce(col("s.EXISTE_SE√ëAL_VERTICAL"), lit("NO"))) == col("i.senal_vertical"),
        normalizar(coalesce(col("s.EXISTE_SE√ëAL_HORIZONTAL"), lit("NO"))) == col("i.senal_horizontal"),
        normalizar(coalesce(col("s.SUPERFICIE_DE_CALZADA"), lit("DESCONOCIDO"))) == col("i.tipo_superficie"),
        normalizar(coalesce(col("s.PERFIL_LONGITUDINAL_VIA"), lit("DESCONOCIDO"))) == col("i.perfil_via"),
        normalizar(coalesce(col("s.CONDICION_CLIMATICA"), lit("DESCONOCIDO"))) == col("i.condicion_climatica"),
        normalizar(coalesce(col("s.CARACTERISTICAS_DE_VIA"), lit("DESCONOCIDO"))) == col("i.caracteristicas_via")
    ],
    how="left"
)
print(f"   Registros: {df_siniestros_fact.count():,}")

# Join red vial (SIN DUPLICACI√ìN)
print("6Ô∏è‚É£ Join con dim_red_vial (versi√≥n √∫nica)...")
registros_antes_red_vial = df_siniestros_fact.count()

df_siniestros_fact = df_siniestros_fact.join(
    dim_red_vial_unique.select("id_red_vial", "cod_ruta_norm"),
    col("s.cod_carretera_norm") == col("cod_ruta_norm"),
    how="left"
)

registros_despues_red_vial = df_siniestros_fact.count()
print(f"   Registros ANTES: {registros_antes_red_vial:,}")
print(f"   Registros DESPU√âS: {registros_despues_red_vial:,}")
print(f"   Diferencia: {registros_despues_red_vial - registros_antes_red_vial:,}")

if registros_despues_red_vial == registros_antes_red_vial:
    print("   ‚úÖ No hay duplicaci√≥n")
else:
    print(f"   ‚ùå ERROR: Se duplicaron {registros_despues_red_vial - registros_antes_red_vial:,} registros")
    print("   ‚ö†Ô∏è DETENER EJECUCI√ìN - Revisar join de red vial")

# ================================
# 8. VALIDACI√ìN POST-JOINS
# ================================
print("\n" + "="*60)
print("VALIDACI√ìN POST-JOINS")
print("="*60 + "\n")

# Validar que no hay duplicaci√≥n
codigos_finales = df_siniestros_fact.select("s.CODIGO_SINIESTRO").distinct().count()
registros_finales = df_siniestros_fact.count()

print(f"C√≥digos √∫nicos: {codigos_finales:,}")
print(f"Registros totales: {registros_finales:,}")
print(f"Factor de duplicaci√≥n: {registros_finales / codigos_finales:.2f}x")

if registros_finales == codigos_finales:
    print("‚úÖ No hay duplicaci√≥n - 1 registro por siniestro")
elif registros_finales < codigos_finales * 1.01:
    print("‚ö†Ô∏è Duplicaci√≥n m√≠nima (<1%) - Aceptable")
else:
    print("‚ùå DUPLICACI√ìN DETECTADA - NO CONTINUAR")
    print(f"   Se perdieron o duplicaron {abs(registros_finales - codigos_finales):,} registros")

# Validar fallecidos
fallecidos_post_join = df_siniestros_fact.agg(F.sum("s.CANTIDAD_DE_FALLECIDOS")).collect()[0][0]
print(f"\nTotal fallecidos despu√©s de joins: {fallecidos_post_join:,}")
print(f"Total fallecidos en origen: {fallecidos_origen:,}")
print(f"Diferencia: {fallecidos_post_join - fallecidos_origen:,}")

if fallecidos_post_join == fallecidos_origen:
    print("‚úÖ Fallecidos coinciden - No hay duplicaci√≥n")
elif abs(fallecidos_post_join - fallecidos_origen) / fallecidos_origen < 0.01:
    print("‚úÖ Diferencia m√≠nima (<1%)")
else:
    print("‚ùå ERROR: Los fallecidos no coinciden - HAY DUPLICACI√ìN")

# ================================
# 9. AN√ÅLISIS DE RED VIAL
# ================================
print("\n" + "="*60)
print("AN√ÅLISIS DE SINIESTROS SIN RED VIAL")
print("="*60 + "\n")

sin_red_vial = df_siniestros_fact.filter(col("id_red_vial").isNull()).count()
con_red_vial = df_siniestros_fact.filter(col("id_red_vial").isNotNull()).count()

print(f"Con red vial: {con_red_vial:,} ({(con_red_vial/registros_finales)*100:.2f}%)")
print(f"Sin red vial: {sin_red_vial:,} ({(sin_red_vial/registros_finales)*100:.2f}%)")

if sin_red_vial > 0:
    print("\nüìä Distribuci√≥n sin red vial por TIPO_DE_VIA:")
    df_siniestros_fact.filter(col("id_red_vial").isNull()) \
        .groupBy("s.TIPO_DE_VIA") \
        .count() \
        .orderBy(F.desc("count")) \
        .show(10, truncate=False)

# ================================
# 10. CREAR FACT TABLE FINAL
# ================================
print("\n" + "="*60)
print("CREANDO FACT TABLE FINAL")
print("="*60 + "\n")

fact_siniestros = df_siniestros_fact.select(
    col("s.CODIGO_SINIESTRO").alias("cod_siniestro"),
    col("t.pk_tiempo").alias("fk_tiempo"),
    col("u.id_ubigeo").alias("fk_ubigeo"),
    col("ts.id_tipo_siniestro").alias("fk_tipo_siniestro"),
    col("c.id_causa").alias("fk_causa_siniestro"),
    col("i.id_infraestructura").alias("fk_infraestructura"),
    col("id_red_vial").cast("int").alias("fk_red_vial"),
    col("s.CANTIDAD_DE_FALLECIDOS").cast("int").alias("num_fallecidos"),
    col("s.CANTIDAD_DE_LESIONADOS").cast("int").alias("num_lesionados"),
    col("s.CANTIDAD_DE_VEHICULOS_DANADOS").cast("int").alias("num_vehiculos"),
    col("s.ZONA").alias("zona"),
    col("s.RED_VIAL").alias("red_vial"),
    col("s.CONDICION_CLIMATICA").alias("condicion_climatica"),
    col("s.ZONIFICACION").alias("zonificacion")
)

# ID autoincremental
window = Window.orderBy(F.monotonically_increasing_id())
fact_siniestros = fact_siniestros.withColumn(
    "id_fact_siniestro",
    F.row_number().over(window)
)

print(f"‚úì Fact table creada con {fact_siniestros.count():,} registros")

# ================================
# 11. VERIFICACI√ìN FINAL DE FKs
# ================================
print("\n" + "="*60)
print("VERIFICACI√ìN FINAL DE FOREIGN KEYS")
print("="*60 + "\n")

fk_tiempo_null = fact_siniestros.filter(col('fk_tiempo').isNull()).count()
fk_ubigeo_null = fact_siniestros.filter(col('fk_ubigeo').isNull()).count()
fk_tipo_null = fact_siniestros.filter(col('fk_tipo_siniestro').isNull()).count()
fk_causa_null = fact_siniestros.filter(col('fk_causa_siniestro').isNull()).count()
fk_infra_null = fact_siniestros.filter(col('fk_infraestructura').isNull()).count()
fk_red_null = fact_siniestros.filter(col('fk_red_vial').isNull()).count()

print(f"FK Tiempo nulas: {fk_tiempo_null:,}")
print(f"FK Ubigeo nulas: {fk_ubigeo_null:,}")
print(f"FK Tipo Siniestro nulas: {fk_tipo_null:,}")
print(f"FK Causa Siniestro nulas: {fk_causa_null:,}")
print(f"FK Infraestructura nulas: {fk_infra_null:,}")
print(f"FK Red Vial nulas: {fk_red_null:,}")

total_fks_null = fk_tiempo_null + fk_ubigeo_null + fk_tipo_null + fk_causa_null + fk_infra_null

if total_fks_null == 0:
    print("\n‚úÖ Todas las FKs obligatorias est√°n completas")
else:
    print(f"\n‚ö†Ô∏è Hay {total_fks_null:,} FKs obligatorias nulas")

# Validaci√≥n final de fallecidos
fallecidos_final = fact_siniestros.agg(F.sum("num_fallecidos")).collect()[0][0]
print(f"\nüìä Total fallecidos en fact final: {fallecidos_final:,}")
print(f"üìä Total fallecidos en origen: {fallecidos_origen:,}")

if fallecidos_final == fallecidos_origen:
    print("‚úÖ Los fallecidos coinciden perfectamente")
elif abs(fallecidos_final - fallecidos_origen) < 100:
    print("‚úÖ Diferencia m√≠nima aceptable")
else:
    print("‚ùå ERROR: Diferencia significativa en fallecidos")



‚úì Dimensi√≥n infraestructura creada con 352 combinaciones √∫nicas

VALIDACI√ìN INICIAL DE DATOS

Total registros en origen: 6,721
C√≥digos de siniestro √∫nicos: 6,718
‚ö†Ô∏è ADVERTENCIA: Hay 3 duplicados en origen

Total fallecidos en origen: 8,001
  (Esperado: ~10,000-20,000 para 2021-2023)

PREPARANDO DIMENSI√ìN RED VIAL

Total registros en red vial: 7,340
C√≥digos de ruta √∫nicos: 199
Tramos por c√≥digo (promedio): 36.88

üìä C√≥digos con m√°s tramos (TOP 10):
+-------------+-----+
|cod_ruta_norm|count|
+-------------+-----+
|PE-3N        |743  |
|PE-5N        |419  |
|PE-1S        |323  |
|PE-1N        |295  |
|PE-08B       |241  |
|PE-3S        |237  |
|PE-28B       |211  |
|PE-20A       |194  |
|PE-1NR       |158  |
|PE-3SF       |148  |
+-------------+-----+
only showing top 10 rows


‚úì Dimensi√≥n red vial √∫nica creada: 199 registros

CONSTRUYENDO FACT TABLE

1Ô∏è‚É£ Join con dim_tiempo...
   Registros: 6,721
2Ô∏è‚É£ Join con dim_ubigeo...
   Registros: 6,721
3Ô∏è‚É£ Join

<table width="99%">
  <tr>
    <td bgcolor="#FFBA39">
      <h2 style="color: #000000; text-align: left;">Explorando Resultado</h2>
    </td>
  </tr>
</table>

In [29]:

# ================================
# 12. GUARDADO OPTIMIZADO
# ================================
print("\n" + "="*60)
print("GUARDANDO FACT TABLE")
print("="*60 + "\n")

# Verificar schema antes de guardar
print("Schema final:")
fact_siniestros.printSchema()

print("\nPrimeras 3 filas:")
fact_siniestros.show(3, truncate=False, vertical=True)

# Estad√≠sticas finales
print("\nüìä ESTAD√çSTICAS FINALES:")
print(f"Total registros: {fact_siniestros.count():,}")
print(f"Total fallecidos: {fallecidos_final:,}")
print(f"Total lesionados: {fact_siniestros.agg(F.sum('num_lesionados')).collect()[0][0]:,}")
print(f"Total veh√≠culos: {fact_siniestros.agg(F.sum('num_vehiculos')).collect()[0][0]:,}")

# Guardar como CSV con Spark
print("\nüíæ Guardando como CSV...")
fact_siniestros.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .csv("Fact/fact_siniestros_spark")

print("‚úì CSV guardado en: Fact/fact_siniestros_spark/")
print("  Renombra el archivo part-*.csv a 'fact_siniestros.csv'")

# Guardar como Parquet
print("\nüíæ Guardando como Parquet...")
fact_siniestros.write \
    .mode("overwrite") \
    .parquet("Fact/fact_siniestros.parquet")

print("‚úì Parquet guardado en: Fact/fact_siniestros.parquet")

print("\n" + "="*60)
print("‚úÖ PROCESO COMPLETADO EXITOSAMENTE")
print("="*60)


GUARDANDO FACT TABLE

Schema final:
root
 |-- cod_siniestro: string (nullable = true)
 |-- fk_tiempo: string (nullable = true)
 |-- fk_ubigeo: integer (nullable = true)
 |-- fk_tipo_siniestro: integer (nullable = true)
 |-- fk_causa_siniestro: integer (nullable = true)
 |-- fk_infraestructura: integer (nullable = true)
 |-- fk_red_vial: integer (nullable = true)
 |-- num_fallecidos: integer (nullable = true)
 |-- num_lesionados: integer (nullable = true)
 |-- num_vehiculos: integer (nullable = true)
 |-- zona: string (nullable = true)
 |-- red_vial: string (nullable = true)
 |-- condicion_climatica: string (nullable = true)
 |-- zonificacion: string (nullable = true)
 |-- id_fact_siniestro: integer (nullable = false)


Primeras 3 filas:
-RECORD 0-----------------------------
 cod_siniestro       | A-2021-01-22   
 fk_tiempo           | 20210101174500 
 fk_ubigeo           | 1789           
 fk_tipo_siniestro   | 10             
 fk_causa_siniestro  | 1              
 fk_infraestructur

In [30]:
# ================================
# DIAGN√ìSTICO: ¬øPOR QU√â AUMENTARON LAS FILAS?
# ================================
print("\n" + "="*60)
print("DIAGN√ìSTICO DE DUPLICACI√ìN DE REGISTROS")
print("="*60 + "\n")

# 1. Contar registros ANTES del join con red vial
print("üìä ANTES del join con Red Vial:")
registros_antes = df_siniestros_fact.count()
print(f"Total registros: {registros_antes:,}\n")

# 2. Verificar duplicados en dim_red_vial
print("üîç Verificando duplicados en dim_red_vial:")
print("\nC√≥digos de ruta duplicados en dim_red_vial:")
dim_red_vial.withColumn("cod_ruta_norm", upper(trim(col("codigo_ruta")))) \
    .groupBy("cod_ruta_norm") \
    .count() \
    .filter(col("count") > 1) \
    .orderBy(F.desc("count")) \
    .show(20, truncate=False)

# 3. Ver ejemplos de c√≥digos duplicados
print("\nüìã Ejemplo de registros duplicados para un c√≥digo:")
ejemplo_codigo = dim_red_vial \
    .withColumn("cod_ruta_norm", upper(trim(col("codigo_ruta")))) \
    .groupBy("cod_ruta_norm") \
    .count() \
    .filter(col("count") > 1) \
    .first()

if ejemplo_codigo:
    codigo_dup = ejemplo_codigo["cod_ruta_norm"]
    print(f"\nC√≥digo duplicado: {codigo_dup}")
    dim_red_vial.withColumn("cod_ruta_norm", upper(trim(col("codigo_ruta")))) \
        .filter(col("cod_ruta_norm") == codigo_dup) \
        .show(truncate=False)

# 4. Cu√°ntos c√≥digos √∫nicos vs total de registros
total_red_vial = dim_red_vial.count()
codigos_unicos = dim_red_vial.select("codigo_ruta").distinct().count()
print(f"\nüìä Estad√≠sticas de dim_red_vial:")
print(f"Total registros: {total_red_vial:,}")
print(f"C√≥digos √∫nicos: {codigos_unicos:,}")
print(f"Registros duplicados: {total_red_vial - codigos_unicos:,}")

# 5. Ver distribuci√≥n de siniestros por c√≥digo de carretera
print("\nüìç TOP 20 c√≥digos de carretera en siniestros:")
df_siniestros_norm.groupBy("cod_carretera_norm") \
    .count() \
    .orderBy(F.desc("count")) \
    .show(20, truncate=False)


DIAGN√ìSTICO DE DUPLICACI√ìN DE REGISTROS

üìä ANTES del join con Red Vial:
Total registros: 6,721

üîç Verificando duplicados en dim_red_vial:

C√≥digos de ruta duplicados en dim_red_vial:
+-------------+-----+
|cod_ruta_norm|count|
+-------------+-----+
|PE-3N        |743  |
|PE-5N        |419  |
|PE-1S        |323  |
|PE-1N        |295  |
|PE-08B       |241  |
|PE-3S        |237  |
|PE-28B       |211  |
|PE-20A       |194  |
|PE-1NR       |158  |
|PE-3SF       |148  |
|PE-14A       |141  |
|PE-18        |138  |
|PE-30C       |114  |
|PE-3ND       |103  |
|PE-34H       |92   |
|PE-5NA       |90   |
|PE-24        |88   |
|PE-3SG       |87   |
|PE-34B       |82   |
|PE-40        |77   |
+-------------+-----+
only showing top 20 rows


üìã Ejemplo de registros duplicados para un c√≥digo:

C√≥digo duplicado: PE-3SB
+-----------+------------+-----------+-------------------------------------------------------------------------------------------------------------------------------------