# Accidentes de tráfico en Reino Unido entre 2010 y 2014

### Disponible en Kaggle en:
https://www.kaggle.com/stefanoleone992/adm-project-road-accidents-in-uk

### Variables y significado

* Accident_Index: Accident index
* Latitude: Accident latitude
* Longitude: Accident longitude
* Region: Accident region
* Urban_or_Rural_Area: Accident area (rural or urban)
* X1st_Road_Class: Accident road class
* Driver_IMD_Decile: Road IMD Decile
* Speed_limit: Road speed limit
* Road_Type: Road type
* Road_Surface_Conditions: Road surface condition
* Weather: Weather
* High_Wind: High wind
* Lights: Road lights
* Datetime: Accident datetime
* Year: Accident year
* Season: Accident season
* Month_of_Year: Accident month
* Day_of_Month: Accident day of month
* Day_of_Week: Accident day of week
* Hour_of_Day: Accident hour of day
* Number_of_Vehicles: Accident number of vehicles
* Age_of_Driver: Driver age
* Age_of_Vehicle: Vehicle age
* Junction_Detail: Accident junction detail
* Junction_Location: Accident junction location
* X1st_Point_of_Impact: Vehicle first point of impact
* Driver_Journey_Purpose: Driver journey purpose
* Engine_CC: Vehicle engine power (in CC)
* Propulsion_Code: Vehicle propulsion code
* Vehicle_Make: Vehicle brand
* Vehicle_Category: Vehicle brand category
* Vehicle_Manoeuvre: Vehicle manoeuvre when accident happened
* Accident_Severity: Accident severity

In [None]:
# Importo el módulo de funciones de PySpark SQL, que contiene transformaciones como when, col, hour, count, etc.
# Uso el alias 'F' para referirme a esas funciones de forma abreviada (por ejemplo: F.col("columna")).
from pyspark.sql import functions as F

# Configuración para acceder a Azure Data Lake Storage (ADLS Gen2)
spark.conf.set(
    "fs.azure.account.key.masterjmr.dfs.core.windows.net",
    "<REDACTED_ACCESS_KEY>"
)

# Ruta del archivo CSV en ADLS
ruta_csv = "abfss://datos@masterjmr.dfs.core.windows.net/accidents_uk.csv"

# Lectura del CSV desde ADLS, con inferencia de tipos
accidentesDF = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(ruta_csv)

    # Creo una nueva columna llamada 'Age_Category' clasificando la edad del conductor en 4 grupos.
    .withColumn(
        "Age_Category",
        F.when(F.col("Age_of_Driver").isin(1, 2), "Adolescente")
         .when(F.col("Age_of_Driver").isin(3, 4), "Joven")
         .when(F.col("Age_of_Driver").isin(5, 6), "Adulto")
         .when(F.col("Age_of_Driver").isin(7, 8), "Anciano")
    )

    # Extraigo la hora del accidente, desde la columna 'Datetime'.
    .withColumn("hora", F.hour("Datetime"))

    # Cacheo el DataFrame, dado que se reutilizará en ejercicios posteriores.
    .cache()
)

In [None]:
from pyspark.sql.types import DoubleType
assert(accidentesDF.schema[1].dataType == DoubleType())
assert(accidentesDF.count() == 251832)

assert(dict(accidentesDF.dtypes)["Age_Category"] == "string")
collectedDF = accidentesDF.groupBy("Age_Category").count().orderBy("count").collect()
assert((collectedDF[0]["count"] == 22533) & (collectedDF[0]["Age_Category"] == "Anciano"))
assert((collectedDF[1]["count"] == 57174) & (collectedDF[1]["Age_Category"] == "Adolescente"))
assert((collectedDF[2]["count"] == 67138) & (collectedDF[2]["Age_Category"] == "Adulto"))
assert((collectedDF[3]["count"] == 104987) & (collectedDF[3]["Age_Category"] == "Joven"))

In [None]:
# Importo la clase Window de PySpark, que permite definir particiones (grupos)
# sobre los cuales, aplicar funciones de ventana como count, avg o stddev sin agrupar el DataFrame.
from pyspark.sql.window import Window

# Creo dos ventanas, para aplicar agregaciones sin perder el detalle por fila:
# 1. Agrupación por año y categoría del vehículo.
ventana_vehiculo_anio = Window.partitionBy("Year", "Vehicle_Category")

# 2. Agrupación por año, categoría del vehículo y ubicación del accidente (Junction_Location).
ventana_vehiculo_causa_anio = Window.partitionBy("Year", "Vehicle_Category", "Junction_Location")

# Enriquezco el DataFrame original, con estadísticas agregadas usando las ventanas definidas.
accidentes_info_agregadaDF = (
    accidentesDF
    # Total de accidentes por año y categoría de vehículo
    .withColumn("total_vehiculo_anio", F.count("*").over(ventana_vehiculo_anio))

    # Total de accidentes por año, categoría de vehículo y ubicación del accidente.
    .withColumn("total_vehiculo_causa_anio", F.count("*").over(ventana_vehiculo_causa_anio))

    # Proporción (en tanto por uno) entre los accidentes con esa ubicación y el total del grupo.
    .withColumn("porc_vehiculo_causa_anio", F.col("total_vehiculo_causa_anio") / F.col("total_vehiculo_anio"))

    # Edad promedio del conductor por año y tipo de vehículo.
    .withColumn("media_edad_vehiculo_anio", F.avg("Age_of_Driver").over(ventana_vehiculo_anio))

    # Edad promedio del conductor por año, tipo de vehículo y ubicación del accidente.
    .withColumn("media_edad_vehiculo_causa_anio", F.avg("Age_of_Driver").over(ventana_vehiculo_causa_anio))

    # Desviación estándar de la edad del conductor, en ese mismo grupo detallado.
    .withColumn("stddev_edad_vehiculo_causa_anio", F.stddev("Age_of_Driver").over(ventana_vehiculo_causa_anio))
)

# COMENTARIO GENERAL DEL CÓDIGO:
# Este bloque enriquece el DataFrame original ('accidentesDF') con nuevas columnas estadísticas,
# calculadas mediante funciones de ventana. Estas funciones permiten obtener, para cada accidente:
# - El número total de accidentes por año y tipo de vehículo,
# - El número total de accidentes por año, tipo de vehículo y ubicación del accidente (Junction_Location),
# - La proporción entre ambos (porcentaje en tanto por uno),
# - La edad media y desviación estándar, del conductor dentro de esos grupos.
# De este modo, se añade contexto agregado a cada fila, sin necesidad de agrupar ni perder detalle.


In [None]:
from pyspark.sql import functions as F
r = accidentes_info_agregadaDF.select(F.mean("total_vehiculo_anio").alias("total_vehiculo_anio"),
                                  F.mean("total_vehiculo_causa_anio").alias("total_vehiculo_causa_anio"),
                                  F.mean("porc_vehiculo_causa_anio").alias("porc_vehiculo_causa_anio"),
                                  F.mean("media_edad_vehiculo_causa_anio").alias("media_edad_vehiculo_causa_anio"),
                                 ).first()
assert(round(r.total_vehiculo_anio, 2) == 33843.98)
assert(round(r.total_vehiculo_causa_anio, 2) == 8185.52)
assert(round(r.porc_vehiculo_causa_anio, 2) == 0.25)
assert(round(r.media_edad_vehiculo_causa_anio, 2) == 3.90)

In [None]:
# Construyo un nuevo DataFrame, que resume el número de accidentes por hora del día y tipo de vehículo.
# La columna 'hora' actuará como índice, y cada tipo de vehículo será una columna separada.

accidentes_hora_vehiculo = (
    accidentesDF

    # Agrupo los datos por la columna 'hora' (hora del día en que ocurrió el accidente).
    .groupBy("hora")

    # Aplico un pivot sobre la columna 'Vehicle_Category' para que cada tipo de vehículo se convierta en una columna.
    # Esto permite ver, en cada hora, cuántos accidentes hubo, de cada tipo de vehículo.
    .pivot("Vehicle_Category")
    # Cuento cuántos accidentes hay en cada combinación, hora-tipo de vehículo.
    .count()
    # Ordeno las filas por la hora de forma ascendente (de 0 a 23).
    .orderBy("hora")
)
# COMENTARIO GENERAL DEL CÓDIGO:
# Este bloque, analiza la distribución de accidentes según la hora del día y el tipo de vehículo.
# Crea una tabla con una fila por hora (0 a 23) y una columna por categoría de vehículo,
# donde cada celda contiene el número de accidentes ocurridos a esa hora, con ese tipo de vehículo.
# El resultado permite comparar visualmente en qué franjas horarias cada tipo de vehículo sufre más accidentes.


In [None]:
acc = accidentes_hora_vehiculo.collect()
assert(acc[0].hora == 0 and acc[0].Taxi == 324)
assert(acc[10].hora == 10 and acc[10].Other == 41)
assert(acc[15].hora == 15 and acc[15].Motorcycle == 1860)
assert(acc[19].hora == 19 and acc[19].Car == 10886)

In [None]:
# Creo un nuevo DataFrame, que tendrá una sola fila y 6 columnas (una por tipo de vehículo).
# En cada columna, guardo una estructura (struct) con dos elementos:
#  - El número máximo de accidentes registrados, para ese tipo de vehículo.
#  - La hora del día (0-23), en que ocurrió ese máximo.

hora_max_accidentes_vehiculo_df = accidentes_hora_vehiculo.select(

    # Para "Bus/minibus", obtenego la fila con más accidentes y guardo el par (accidentes, hora).
    F.max(F.struct("Bus/minibus", "hora")).alias("Bus/minibus"),

     # Para "Car" hago lo mismo: elijo el struct, con más accidentes y la hora correspondiente.
    F.max(F.struct("Car", "hora")).alias("Car"),

     # Para "Motorcycle" hago lo mismo.
    F.max(F.struct("Motorcycle", "hora")).alias("Motorcycle"),

    # Para "Other" hago lo mismo.
    F.max(F.struct("Other", "hora")).alias("Other"),

    # Para "Taxi" hago lo mismo.
    F.max(F.struct("Taxi", "hora")).alias("Taxi"),

    # Para "Van" hago lo mismo.
    F.max(F.struct("Van", "hora")).alias("Van")
)
# COMENTARIO GENERAL DEL CÓDIGO:
# Este bloque calcula, para cada categoría de vehículo, la hora del día en la que se produce
# el mayor número de accidentes. El resultado es un DataFrame de una sola fila, donde cada columna
# contiene un par (número de accidentes, hora) correspondiente al pico máximo de siniestros diarios.


In [None]:
assert(len(hora_max_accidentes_vehiculo_df.columns) == 6)
assert(sum([1 for c in ["Bus/minibus", "Car", "Motorcycle", "Other", "Taxi", "Van"]
          if c in hora_max_accidentes_vehiculo_df.columns]) == 6)
r2 = hora_max_accidentes_vehiculo_df.first()
assert(r2["Bus/minibus"][0] == 56 and r2["Bus/minibus"][1] == 15)
assert(r2["Car"][0] == 19961 and r2["Car"][1] == 17)
assert(r2["Motorcycle"][0] == 2751 and r2["Motorcycle"][1] == 17)
assert(r2["Other"][0] == 64 and r2["Other"][1] == 13)
assert(r2["Taxi"][0] == 389 and r2["Taxi"][1] == 16)
assert(r2["Van"][0] == 1233 and r2["Van"][1] == 16)

In [None]:
# Importo las clases específicas, de la librería pyspark.ml.feature que permiten transformar columnas,
# en formatos que los modelos de Machine Learning pueden procesar.
from pyspark.ml.feature import StringIndexer, Binarizer, VectorAssembler
from pyspark.ml import Pipeline

# Creo un StringIndexer, para transformar la columna "Driver_Journey_Purpose" (texto),
# en una columna numérica llamada "purpose_indexed".
# Uso handleInvalid="keep" para evitar errores, si aparece una categoría no vista al entrenar.
journey_purpose_indexer = StringIndexer(
    inputCol="Driver_Journey_Purpose", # Columna de entrada (string).
    outputCol="purpose_indexed",       # Columna de salida (númerica).
    handleInvalid="keep"               # Si hay valores nuevos en datos futuros, no da error.
)

# Creo un Binarizer, para convertir el número de vehículos en una variable binaria.
# Si el número de vehículos es mayor a 2.5, se pone 1.0, si no, se pone 0.0.
cars_involved_binarizer = Binarizer(
    inputCol="Number_of_Vehicles",           # Entrada: número de vehículos.
    outputCol="number_vehicles_binarized",   # Salida: 0.0 o 1.0.
    threshold=2.5                            # Umbral: 3 o más vehículos → 1.0.
)

# Creo un VectorAssembler, el cual combina 3 columnas en una sola columna de vectores numéricos.
# Esta columna se llama "features" y se usará para alimentar algoritmos de ML.
vector_assembler = VectorAssembler(
    inputCols=["purpose_indexed", "number_vehicles_binarized", "Speed_limit"],  # Entradas.
    outputCol="features"                                                        # Salida.
)

# Hago la unión de los tres pasos anteriores, en un único pipeline.
# El pipeline ejecutará esas transformaciones en secuencia.
pipeline = Pipeline(stages=[
    journey_purpose_indexer,     # 1. Converte texto a número.
    cars_involved_binarizer,     # 2. Converte número de vehículos a binario.
    vector_assembler             # 3. Junta las columnas en un solo vector.
])

# "Entreno" el pipeline con el DataFrame original.
# No entrena un modelo, solo ajusta los transformadores, para que estén listos para aplicar.
pipeline_model = pipeline.fit(accidentesDF)

# COMENTARIO GENERAL DEL CÓDIGO:
# Este bloque construye un pipeline de preprocesamiento sobre el DataFrame 'accidentesDF',
# preparando las variables para su uso en modelos predictivos. Convierte una variable categórica
# en índice numérico, transforma una variable continua en binaria, y combina todas en un vector
# de entrada. Estas etapas se encapsulan en un Pipeline, que puede aplicarse fácilmente a nuevos datos.


In [None]:
from pyspark.ml.feature import StringIndexer, Binarizer, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel

assert(isinstance(journey_purpose_indexer, StringIndexer))
assert(journey_purpose_indexer.getInputCol() == "Driver_Journey_Purpose" and
       journey_purpose_indexer.getOutputCol() == "purpose_indexed" and
       journey_purpose_indexer.getHandleInvalid() == "keep")

assert(isinstance(cars_involved_binarizer, Binarizer))
assert(cars_involved_binarizer.getInputCol() == "Number_of_Vehicles" and
       cars_involved_binarizer.getOutputCol() == "number_vehicles_binarized" and
       cars_involved_binarizer.getThreshold() == 2.5)

assert(isinstance(pipeline, Pipeline))
assert(len(pipeline.getStages()) == 3)             # el pipeline debe tener solamente tres etapas
assert(journey_purpose_indexer in pipeline.getStages()
       and cars_involved_binarizer in pipeline.getStages()
       and vector_assembler in pipeline.getStages())

assert(isinstance(pipeline_model, PipelineModel))

In [None]:
# Agrupo el DataFrame original, por Región y Tipo de Vehículo
# y calculo tres métricas para cada combinación:
# 1. Número total de accidentes.
# 2. Edad promedio del conductor (redondeada a 2 decimales).
# 3. Promedio de vehículos implicados (redondeado a 2 decimales).
# Luego empaqueto estas 3 métricas, en una sola estructura (struct).
grupo = accidentesDF.groupBy("Region", "Vehicle_Category").agg(
    F.struct(
        F.count("*"),                               # Total de accidentes.
        F.round(F.avg("Age_of_Driver"), 2),         # Edad media del conductor.
        F.round(F.avg("Number_of_Vehicles"), 2)     # Promedio de vehículos implicados.
    ).alias("resumen")                              # Le doy el nombre "resumen" a esta estructura.
)

# A partir del resultado anterior, hago la agrupación solo por Región
# y aplico pivot, para convertir los tipos de vehículos en columnas individuales.
# Para cada combinación, selecciono el primer valor del resumen (ya que solo hay uno por grupo).
numero_edad_coches_df = (
    grupo
    .groupBy("Region")           # Agrupo solo por región
    .pivot("Vehicle_Category")   # Cada tipo de vehículo, será una columna.
    .agg(F.first("resumen"))     # Tomo la estructura calculada, para cada tipo
    .orderBy("Region")           # Ordeno alfabéticamente, por Región.
)

# Convierto el DataFrame de Spark, a un DataFrame de Pandas
# para poder visualizarlo más fácilmente, como tabla o exportarlo si fuera necesario.
numero_edad_coches_pd = numero_edad_coches_df.toPandas()

# Mostro por pantalla el resultado final en formato Pandas.
# Cada celda contiene una tupla: (nº de accidentes, edad media, nº vehículos)
print(numero_edad_coches_pd)

# COMENTARIO GENERAL DEL CÓDIGO:
# Este bloque analiza los accidentes por región y tipo de vehículo.
# Para cada combinación se calcula el número de accidentes, la edad media del conductor
# y el promedio de vehículos implicados. Estos datos se empaquetan en estructuras (structs)
# y se pivotan para formar un resumen por región, con columnas separadas por tipo de vehículo.
# Finalmente, se convierte a Pandas para facilitar su visualización.


                      Region                               Bus/minibus  \
0               East England  {'col1': 44, 'col2': 4.34, 'col3': 1.77}   
1              East Midlands  {'col1': 39, 'col2': 4.95, 'col3': 1.85}   
2                     London  {'col1': 28, 'col2': 4.71, 'col3': 1.75}   
3         North East England  {'col1': 52, 'col2': 4.46, 'col3': 1.85}   
4         North West England   {'col1': 66, 'col2': 4.7, 'col3': 1.88}   
5                   Scotland     {'col1': 2, 'col2': 4.0, 'col3': 1.5}   
6         South East England  {'col1': 71, 'col2': 4.96, 'col3': 1.87}   
7         South West England  {'col1': 37, 'col2': 4.84, 'col3': 1.81}   
8                      Wales     {'col1': 1, 'col2': 5.0, 'col3': 2.0}   
9              Wast Midlands   {'col1': 56, 'col2': 4.7, 'col3': 1.91}   
10  Yorkshire and the Humber  {'col1': 75, 'col2': 4.67, 'col3': 1.69}   

                                            Car  \
0    {'col1': 22813, 'col2': 3.95, 'col3': 2.0}   
1   {'col

In [None]:
assert(len(numero_edad_coches_df.columns) == 7)
assert(sum([1 for c in ["Region", "Bus/minibus", "Car", "Motorcycle", "Other", "Taxi", "Van"]
          if c in numero_edad_coches_df.columns]) == 7)
r = numero_edad_coches_df.collect()
assert(r[0].Region == "East England" and r[0].Other == (61, 4.02, 1.87))
assert(len(numero_edad_coches_df.columns) == 7 and len(r) == 11)
assert(r[0].Car == (22813, 3.95, 2.0))
assert(r[7].Motorcycle == (2802, 3.17, 1.88))
assert(r[10].Van == (1412, 3.92, 2.02))