# Análisis de accidentes con Spark

En este laboratorio lo que se busca es poder encotrar información interesante y relevante con respecto a los accidentes que han ocurrido de los años de 2013 a 2023 en Guatemala. Principalmente tenemos 3 tipos de archivos: Fallecidos y Lesionados, Hechos de tránsito y Vehículos involucrados. Estos 3 archivos contienen información con respecto a accidentes automovílisticos. La idea es poder usar spark para poder analizar los eventos y las características relacionadas a este. Lo primero es juntar todos los archivos. Obtuvimos los datos de la página del Instituto Nacional de Estadística (INE): https://www.ine.gob.gt/bases-de-datos/accidentes-de-transito/. Descargando todos los archivos desde 2013 a 2023. El procesamiento se hizo en distintas etapas:

1. Descargar todos los datos en un sav y si no esta disponible en ese formato, en excel.
2. Juntar todos los dataframes en los 3 tipos de archivos descritos (fallecidos y lesionados, hechos de tránsito y vehículos involucrados)
3. Analizar los datos y ver columnas con muchos valores vacíos, columnas repetidas entre otras situaciones que generen data innecesaria o redundante.


# Merge detodos los datos

In [15]:
import pandas as pd
import numpy as np
from pathlib import Path
import unicodedata
import re

def estandarizar_nombre_columna(col):
    """Estandariza un nombre de columna: minusculas, sin tildes, sin espacios"""
    col = str(col).lower()
    col = unicodedata.normalize('NFKD', col).encode('ASCII', 'ignore').decode('ASCII')
    col = col.replace(' ', '_').replace('-', '_').replace('.', '_')
    return col

def procesar_archivo_individual(archivo, tipo_dataset):
    """Procesa un archivo individual y estandariza sus columnas"""
    print(f"Procesando: {archivo.name}")
    
    # Leer el archivo según su formato
    if archivo.suffix == '.sav':
        df, meta = pyreadstat.read_sav(archivo)
    elif archivo.suffix == '.xlsx':
        df = pd.read_excel(archivo)
    else:
        print(f"Formato no soportado: {archivo.name}")
        return None
    
    # Estandarizar nombres de columnas
    df.columns = [estandarizar_nombre_columna(col) for col in df.columns]
    
    # Extraer año del nombre del archivo
    año = re.search(r'(\d{4})', archivo.stem).group(1) if re.search(r'(\d{4})', archivo.stem) else "Desconocido"
    
    # Agregar metadatos
    df['archivo_origen'] = archivo.name
    df['tipo_dataset'] = tipo_dataset
    df['año'] = año
    
    return df

def combinar_columnas_en_dataframe(df):
    """Combina columnas similares dentro de un DataFrame"""
    # Mapeo de columnas a consolidar
    mapeo_combinacion = {
        'num_hecho': ['num_hecho', 'num_correlativo', 'num_corre', 'núm_corre'],
        'dia_ocu': ['dia_ocu', 'día_ocu'],
        'mes_ocu': ['mes_ocu'],
        'hora_ocu': ['hora_ocu'],
        'dia_sem_ocu': ['dia_sem_ocu', 'día_sem_ocu'],
        'depto_ocu': ['depto_ocu'],
        'mupio_ocu': ['mupio_ocu'],
        'areag_ocu': ['areag_ocu', 'area_geo_ocu', 'área_geo_ocu'],
        'zona_ocu': ['zona_ocu', 'zona_ciudad'],
        'sexo_pil': ['sexo_pil', 'sexo_con', 'sexo_per', 'sexo_víc'],
        'edad_pil': ['edad_pil', 'edad_con', 'edad_per', 'edad_víc'],
        'tipo_veh': ['tipo_veh'],
        'color_veh': ['color_veh'],
        'modelo_veh': ['modelo_veh'],
        'causa_acc': ['causa_acc'],
        'marca_veh': ['marca_veh'],
        'estado_pil': ['estado_pil', 'estado_con'],
        'fallecidos_lesionados': ['fallecidos_lesionados', 'fall_les'],
        'tipo_eve': ['tipo_eve']
    }
    
    for col_principal, columnas_similares in mapeo_combinacion.items():
        # Buscar qué columnas similares existen en este DataFrame
        columnas_existentes = [col for col in columnas_similares if col in df.columns]
        
        if len(columnas_existentes) > 0:
            # Si la columna principal no existe, crear una nueva
            if col_principal not in df.columns:
                # Usar la primera columna existente como base
                df[col_principal] = df[columnas_existentes[0]]
                # Si hay más columnas, combinar
                for col in columnas_existentes[1:]:
                    mask = df[col_principal].isna() & df[col].notna()
                    df.loc[mask, col_principal] = df.loc[mask, col]
            else:
                # Si la columna principal existe, combinar con las demás
                for col in columnas_existentes:
                    if col != col_principal:
                        mask = df[col_principal].isna() & df[col].notna()
                        df.loc[mask, col_principal] = df.loc[mask, col]
    
    return df

def unir_y_combinar_datasets(directorio_data):
    """Une y combina todos los datasets"""
    tipos_datasets = {
        'hechos_transito': [],
        'vehiculos_involucrados': [],
        'fallecidos_lesionados': []
    }
    
    # Identificar archivos por tipo
    for archivo in Path(directorio_data).iterdir():
        if archivo.suffix in ['.sav', '.xlsx']:
            nombre = archivo.stem.lower()
            if 'hechos_transito' in nombre:
                tipos_datasets['hechos_transito'].append(archivo)
            elif 'vehiculos_involucrados' in nombre:
                tipos_datasets['vehiculos_involucrados'].append(archivo)
            elif 'fallecidos_lesionados' in nombre:
                tipos_datasets['fallecidos_lesionados'].append(archivo)
    
    datasets_finales = {}
    
    for tipo, archivos in tipos_datasets.items():
        print(f"\nProcesando {tipo}...")
        dataframes = []
        
        for archivo in sorted(archivos):
            df = procesar_archivo_individual(archivo, tipo)
            if df is not None:
                # Combinar columnas similares en este archivo individual
                df = combinar_columnas_en_dataframe(df)
                dataframes.append(df)
        
        if dataframes:
            # Unir todos los dataframes del mismo tipo
            df_final = pd.concat(dataframes, ignore_index=True, sort=False)
            
            # Seleccionar columnas relevantes según el tipo
            if tipo == 'hechos_transito':
                columnas_finales = [
                    'num_hecho', 'dia_ocu', 'mes_ocu', 'dia_sem_ocu', 'hora_ocu',
                    'depto_ocu', 'mupio_ocu', 'areag_ocu', 'zona_ocu', 
                    'tipo_veh', 'color_veh', 'modelo_veh', 'causa_acc',
                    'sexo_pil', 'edad_pil', 'archivo_origen', 'tipo_dataset', 'año'
                ]
            elif tipo == 'vehiculos_involucrados':
                columnas_finales = [
                    'num_hecho', 'dia_ocu', 'mes_ocu', 'hora_ocu',
                    'depto_ocu', 'mupio_ocu', 'areag_ocu', 'zona_ocu',
                    'tipo_veh', 'color_veh', 'modelo_veh', 'causa_acc',
                    'marca_veh', 'estado_pil', 'sexo_pil', 'edad_pil',
                    'archivo_origen', 'tipo_dataset', 'año'
                ]
            else:  # fallecidos_lesionados
                columnas_finales = [
                    'num_hecho', 'dia_ocu', 'mes_ocu', 'hora_ocu',
                    'depto_ocu', 'mupio_ocu', 'areag_ocu', 'zona_ocu',
                    'tipo_veh', 'causa_acc', 'sexo_pil', 'edad_pil',
                    'fallecidos_lesionados', 'tipo_eve', 'archivo_origen', 'tipo_dataset', 'año'
                ]
            
            # Seleccionar solo las columnas que existen
            columnas_existentes = [col for col in columnas_finales if col in df_final.columns]
            df_final = df_final[columnas_existentes]
            
            # Eliminar duplicados
            df_final = df_final.drop_duplicates().reset_index(drop=True)
            
            datasets_finales[tipo] = df_final
            print(f"{tipo}: {len(df_final)} registros, {len(df_final.columns)} columnas")
    
    return datasets_finales

# Ejecutar la unión y combinación
print("Iniciando proceso de unión y combinación...")
directorio_data = "./data"
datasets_finales = unir_y_combinar_datasets(directorio_data)

# Guardar los datasets resultantes
directorio_salida = "./datasets_corregidos"
Path(directorio_salida).mkdir(exist_ok=True)

for tipo, df in datasets_finales.items():
    archivo_salida = f"{directorio_salida}/{tipo}_corregido.csv"
    df.to_csv(archivo_salida, index=False, encoding='utf-8-sig')
    print(f"Guardado: {archivo_salida}")



Iniciando proceso de unión y combinación...

Procesando hechos_transito...
Procesando: hechos_transito2013.sav
Procesando: hechos_transito2014.sav
Procesando: hechos_transito2015.xlsx
Procesando: hechos_transito2016.sav
Procesando: hechos_transito2017.sav
Procesando: hechos_transito2018.sav
Procesando: hechos_transito2019.sav
Procesando: hechos_transito2020.sav
Procesando: hechos_transito2021.sav
Procesando: hechos_transito2022.sav
Procesando: hechos_transito2023.sav
hechos_transito: 76759 registros, 18 columnas

Procesando vehiculos_involucrados...
Procesando: vehiculos_involucrados2013.sav
Procesando: vehiculos_involucrados2014.sav
Procesando: vehiculos_involucrados2015.xlsx
Procesando: vehiculos_involucrados2016.sav
Procesando: vehiculos_involucrados2017.sav
Procesando: vehiculos_involucrados2018.sav
Procesando: vehiculos_involucrados2019.sav
Procesando: vehiculos_involucrados2020.sav
Procesando: vehiculos_involucrados2021.sav
Procesando: vehiculos_involucrados2022.sav
Procesando: v

se puede ver que se han logrado juntar la información de todos los años, incluso teniendo archivos de distinto tipo. Ahora es encesario evaluar las columnas y ver cuales podrían no ser necesaríass o incluso problemáticas.

In [16]:
# Análisis de calidad completo para todos los datasets
print("\n" + "="*80)
print("ANÁLISIS DE CALIDAD COMPLETO - DATASETS CORREGIDOS")
print("="*80)

def analizar_calidad_completo(df, nombre_dataset):
    """Analiza la calidad de todas las columnas importantes del dataset"""
    print(f"\n{nombre_dataset.upper()}:")
    print(f"   Total registros: {len(df):,}")
    print(f"   Años disponibles: {sorted(df['año'].unique())}")
    print(f"   Total columnas: {len(df.columns)}")
    
    # Columnas a analizar (excluyendo metadatos)
    columnas_analizar = [col for col in df.columns if col not in ['archivo_origen', 'tipo_dataset', 'año']]
    
    print(f"\n   ANÁLISIS POR COLUMNA:")
    for col in columnas_analizar:
        if col in df.columns:
            total_registros = len(df)
            nulos = df[col].isna().sum()
            porcentaje_nulos = (nulos / total_registros) * 100
            
            # Información adicional según el tipo de columna
            if df[col].dtype in ['object', 'string']:
                # Para columnas categóricas
                valores_unicos = df[col].nunique()
                valor_mas_comun = df[col].mode().iloc[0] if not df[col].mode().empty else "N/A"
                print(f"     {col}:")
                print(f"      - Nulos: {nulos:,} ({porcentaje_nulos:.1f}%)")
                print(f"      - Valores únicos: {valores_unicos}")
                print(f"      - Valor más común: {valor_mas_comun}")
                
            else:
                # Para columnas numéricas
                no_nulos = total_registros - nulos
                if no_nulos > 0:
                    stats = df[col].describe()
                    print(f"     {col}:")
                    print(f"      - Nulos: {nulos:,} ({porcentaje_nulos:.1f}%)")
                    print(f"      - No nulos: {no_nulos:,}")
                    print(f"      - Rango: [{stats['min']:.1f} - {stats['max']:.1f}]")
                    print(f"      - Promedio: {stats['mean']:.2f}")
                else:
                    print(f"     {col}: TODOS LOS VALORES SON NULOS")

# Aplicar análisis a todos los datasets
for tipo, df in datasets_finales.items():
    analizar_calidad_completo(df, tipo)


ANÁLISIS DE CALIDAD COMPLETO - DATASETS CORREGIDOS

HECHOS_TRANSITO:
   Total registros: 76,759
   Años disponibles: ['2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022', '2023']
   Total columnas: 18

   ANÁLISIS POR COLUMNA:
     num_hecho:
      - Nulos: 0 (0.0%)
      - No nulos: 76,759
      - Rango: [1.0 - 8218.0]
      - Promedio: 3547.75
     dia_ocu:
      - Nulos: 0 (0.0%)
      - No nulos: 76,759
      - Rango: [1.0 - 31.0]
      - Promedio: 15.62
     mes_ocu:
      - Nulos: 0 (0.0%)
      - No nulos: 76,759
      - Rango: [1.0 - 12.0]
      - Promedio: 6.58
     dia_sem_ocu:
      - Nulos: 0 (0.0%)
      - No nulos: 76,759
      - Rango: [1.0 - 7.0]
      - Promedio: 4.35
     hora_ocu:
      - Nulos: 0 (0.0%)
      - No nulos: 76,759
      - Rango: [0.0 - 99.0]
      - Promedio: 13.72
     depto_ocu:
      - Nulos: 0 (0.0%)
      - No nulos: 76,759
      - Rango: [1.0 - 22.0]
      - Promedio: 7.24
     mupio_ocu:
      - Nulos: 0 (0.0%)
      

Viendo estos resultados es claro que huberion dificultades para juntar todos los datos al principio. Había más de 40 columnnas en los 3 archivos y varias de esas eran muy similares. Es más hay algunas columnas que son literalmente lo mismo, pero debido a ortografía (espacios, tildes, diferencias en el nombre) hay columnas que se marcan como diferente. Sabiendo esto es necesario limitar la selección de datos y jutnar la información redundante.

Luego de hacer varias pruebas este fue el mejor resultado para el data set. Es importante destacar que no todas las columnas fueron procesadas exitosamene, se a reducido la dimensión de los sets de datos. Esto pasó principalmente porque entre los distintos años habían varias columnas que o bien tenían nombres similares o columnas que solo se encontraban en algunos años en específico. Realmente el set de datos no tiene un id que relacione a las tres aparte del numero de correlativo o numero de hecho. Algunas de las estrategias que hicimos fue juntar los datos de columnas duplicadas y también columnas que significaban prácticamente lo mismo. Aún después de esto hay algunas columnas con valores núlos, pero al menos la mayoría parece estar funcionando y debería de ser suficiente para poder analizar los 3 archivos por separados y en conjunto. Para hacer los joins se espera poder usar cosas como llaves compuetas con el año, fecha, dia,zona y demás como sugiere la guía. la columna que tiene mayor cantidad de vlaores vacíos es la de causa_acc pero esperemos pueda complementarse con tipo_eve. aunque no todas las tablas tienen ambos.
