**Proyecto:** INE Incidentes de Transito.

**Objetivo:** Ingesta de Hechos de Vehiculos Involucrados.

**Dependencias / Notebooks:** ingest_fallecidos_lesionados

**Ambiente de Ejecución:** Databricks, Azure, Desarrollo

---
| Fecha | Desarrollador | Descripcion |
| :--- | :--- | :--- |
| 06/NOV/2025 | Ruben Coloma | Emision Inicial |

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

**LEER LOS DATOS DEL CONTENEDOR**

In [0]:
# Constantes
str_container = "bronze"
str_storage_name = "stinetraficodeveastus2"
str_storage_path = f"abfss://{str_container}@{str_storage_name}.dfs.core.windows.net/"
str_file_vehiculos = "09BaseDatosVehiculosInvolucradosPNC2023.csv"
str_ruta_vehiculos = str_storage_path + str_file_vehiculos

In [0]:
# Cargar cargar datos
df_vehiculos_raw = spark.read.option('header', True)\
                        .option('inferSchema', True)\
                        .csv(str_ruta_vehiculos)

In [0]:
# Quitar caracteres especiales nombres de columnas
df_vehiculos_raw = df_vehiculos_raw.withColumnRenamed("núm_corre", "num_corre")
df_vehiculos_raw = df_vehiculos_raw.withColumnRenamed("año_ocu", "anio_ocu")
df_vehiculos_raw = df_vehiculos_raw.withColumnRenamed("día_ocu", "dia_ocu")
df_vehiculos_raw = df_vehiculos_raw.withColumnRenamed("día_sem_ocu", "dia_sem_ocu")
df_vehiculos_raw = df_vehiculos_raw.withColumnRenamed("g_edad_80ymás", "g_edad_80ymas")
df_vehiculos_raw = df_vehiculos_raw.withColumnRenamed("g_edad_60ymás", "g_edad_60ymas")

**CATALOGOS CON ORDEN ALFANUMERICO**

In [0]:
# Estado del Conductor
df_estado_con = df_vehiculos_raw.select('estado_con').distinct().orderBy('estado_con')
df_estado_con = df_estado_con.withColumn("id_estado_con", monotonically_increasing_id() + 1)
df_estado_con = df_estado_con.select("id_estado_con", "estado_con")

**GUARDAR EN TABLAS LOS CATALOGOS**

In [0]:
df_estado_con.write.mode("overwrite").saveAsTable("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_estado_con")

**JOINS PARA CONVERTIR LOS CATALOGOS EN VALORES NUMERICOS**

In [0]:
# Cargar las tablas dimensionales
df_dim_hora = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_hora")
df_dim_g_hora_5 = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_g_hora_5")
df_dim_mes = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_mes_ocu")
df_dim_dia_sem_ocu = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_dia_sem_ocu")
df_dim_depto = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_depto_ocu")
df_dim_mupio = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_mupio_ocu")
#SEXO
df_dim_ine_sexo_per = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_sexo_per")
#EDAD80
df_dim_ine_g_edad_80ymas = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_g_edad_80ymas")
#EDAD60
df_dim_ine_g_edad_60ymas = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_g_edad_60ymas")
#EDAD_QUINQUENAL
df_dim_ine_edad_quinquenales = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_edad_quinquenales")
#MAYOR MENOR
df_dim_ine_mayor_menor = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_mayor_menor")

df_dim_tipo_veh = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_tipo_veh")
df_dim_marca_veh = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_marca_veh")
df_dim_color_veh = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_color_veh")
df_dim_modelo_veh = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_modelo_veh")
df_dim_g_modelo_veh = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_g_modelo_veh")
df_dim_tipo_eve = spark.table("cat_ine_trafico_dev.silver_ine_trafico_dev.dim_ine_tipo_eve")

In [0]:
# Realizar los joins
df_vehiculos_ingest = df_vehiculos_raw \
    .join(df_dim_hora, df_vehiculos_raw.g_hora == df_dim_hora.g_hora, "left") \
    .join(df_dim_g_hora_5, df_vehiculos_raw.g_hora_5 == df_dim_g_hora_5.g_hora_5, "left") \
    .join(df_dim_mes, df_vehiculos_raw.mes_ocu == df_dim_mes.mes_ocur, "left") \
    .join(df_dim_dia_sem_ocu, df_vehiculos_raw.dia_sem_ocu == df_dim_dia_sem_ocu.mes_ocur, "left") \
    .join(df_dim_depto, df_vehiculos_raw.depto_ocu == df_dim_depto.depto_ocu, "left") \
    .join(df_dim_mupio, df_vehiculos_raw.mupio_ocu == df_dim_mupio.mupio_ocu, "left") \
    .join(df_dim_ine_sexo_per, df_vehiculos_raw.sexo_per == df_dim_ine_sexo_per.sexo_per, "left") \
    .join(df_dim_ine_g_edad_80ymas, df_vehiculos_raw.g_edad_80ymas == df_dim_ine_g_edad_80ymas.g_edad_80ymas, "left") \
    .join(df_dim_ine_g_edad_60ymas, df_vehiculos_raw.g_edad_60ymas == df_dim_ine_g_edad_60ymas.g_edad_60ymas, "left") \
    .join(df_dim_ine_edad_quinquenales, df_vehiculos_raw.edad_quinquenales == df_dim_ine_edad_quinquenales.edad_quinquenales, "left") \
    .join(df_dim_ine_mayor_menor, df_vehiculos_raw.mayor_menor == df_dim_ine_mayor_menor.mayor_menor, "left") \
    .join(df_dim_tipo_veh, df_vehiculos_raw.tipo_veh == df_dim_tipo_veh.tipo_veh, "left") \
    .join(df_dim_marca_veh, df_vehiculos_raw.marca_veh == df_dim_marca_veh.marca_veh, "left") \
    .join(df_dim_color_veh, df_vehiculos_raw.color_veh == df_dim_color_veh.color_veh, "left") \
    .join(df_dim_modelo_veh, df_vehiculos_raw.modelo_veh == df_dim_modelo_veh.modelo_veh, "left") \
    .join(df_dim_g_modelo_veh, df_vehiculos_raw.g_modelo_veh == df_dim_g_modelo_veh.g_modelo_veh, "left") \
    .join(df_dim_tipo_eve, df_vehiculos_raw.tipo_eve == df_dim_tipo_eve.tipo_eve, "left") \
    .join(df_estado_con, df_vehiculos_raw.estado_con == df_estado_con.estado_con, "left") \

# Seleccionar las columnas finales (IDs numéricos en lugar de cadenas)
df_vehiculos_ingest = df_vehiculos_ingest.select(
    "num_corre", 
    "anio_ocu", 
    "dia_ocu", 
    "hora_ocu",
    df_dim_hora.id_g_hora.alias("id_g_hora"),
    df_dim_g_hora_5.id_g_hora_5.alias("id_g_hora_5"),
    df_dim_mes.id_mes_ocurrio.alias("id_mes_ocurrio"),
    df_dim_dia_sem_ocu.id_dia_sem_ocu.alias("id_dia_sem_ocu"),
    df_dim_depto.id_depto_ocu.alias("id_depto_ocu"),
    df_dim_mupio.id_mupio_ocu.alias("id_mupio_ocu"),

    df_dim_ine_sexo_per.id_sexo_per.alias("id_sexo_per"),
    df_dim_ine_g_edad_80ymas.id_g_edad_80ymas.alias("id_g_edad_80ymas"),
    df_dim_ine_g_edad_60ymas.g_edad_60ymas.alias("g_edad_60ymas"),
    df_dim_ine_edad_quinquenales.id_edad_quinquenales.alias("id_edad_quinquenales"),
    df_dim_ine_mayor_menor.id_mayor_menor.alias("id_mayor_menor"),

    df_dim_tipo_veh.id_tipo_veh.alias("id_tipo_veh"),
    df_dim_marca_veh.id_marca_veh.alias("id_marca_veh"),
    df_dim_color_veh.id_color_veh.alias("id_color_veh"),
    df_dim_modelo_veh.id_modelo_veh.alias("id_modelo_veh"),
    df_dim_g_modelo_veh.id_g_modelo_veh.alias("id_g_modelo_veh"),
    df_dim_tipo_eve.id_tipo_eve.alias("id_tipo_eve"),
    df_estado_con.id_estado_con.alias("id_estado_con")
)

**GUARDAR LOS DATOS PARA TRASFORMACIONES**

In [0]:
df_vehiculos_ingest.write.mode("overwrite").saveAsTable("cat_ine_trafico_dev.bronze_ine_trafico_dev.ine_aux_vehiculos")