# Exploratory Data Analysis (EDA) Distribuido para ML en Spark/Cluster
Este notebook realiza un EDA eficiente y escalable usando PySpark y librerías distribuidas como Vaex y Modin, ideales para grandes volúmenes de datos en clústeres Spark/Hadoop.

In [1]:
# Inicializar SparkSession
from pyspark.sql import SparkSession
import os
import matplotlib.pyplot as plt
import seaborn as sns
spark = SparkSession.builder.appName("US_Accidents_EDA").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/01 17:01:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Cargar el dataset en Spark DataFrame
df = spark.read.option("header", True).option("inferSchema", True).csv("/kaggle/input/muestra-accidents/muestra.csv")
df.cache()
df.show(5)

In [None]:
# Dimensiones y tipos de datos
df.printSchema()
print(f"Filas: {df.count()}, Columnas: {len(df.columns)}")

In [None]:
# Estadísticas descriptivas
df.describe().show()

## Estadísticas descriptivas y distribución de la variable objetivo

In [None]:
# Conteo de severidad de accidentes (distribución de la variable objetivo)
severity_pd = df.groupBy('Severity').count().orderBy('Severity').toPandas()
sns.barplot(x='Severity', y='count', data=severity_pd)
plt.title('Distribución de Severidad de Accidentes')
plt.show()

In [None]:
# Estadísticas descriptivas de variables numéricas
num_features = [field.name for field in df.schema.fields if field.dataType.typeName() in ['integer', 'double']]
df.select(num_features).describe().show()

# Limpieza de valores nulos y preparacion de datos


In [None]:
from pyspark.sql.functions import col, count, when, lit, to_timestamp

# Eliminar columnas con demasiados nulos
cols_to_drop = [
    'End_Lat', 'End_Lng', 'Precipitation(in)', 'Wind_Chill(F)', 'Airport_Code'
]
df = df.drop(*cols_to_drop)

# Imputar columnas numéricas con la mediana
def impute_median(df, cols):
    for col_name in cols:
        median_val = df.approxQuantile(col_name, [0.5], 0.01)[0] if col_name in df.columns else None
        if median_val is not None:
            df = df.withColumn(col_name, when(col(col_name).isNull(), lit(median_val)).otherwise(col(col_name)))
    return df

num_cols = [
    'Wind_Speed(mph)', 'Visibility(mi)', 'Humidity(%)', 'Temperature(F)', 'Pressure(in)'
]
df = impute_median(df, num_cols)

# Imputar columnas categóricas con la moda
from pyspark.sql import functions as F

def impute_mode(df, cols):
    for col_name in cols:
        if col_name in df.columns:
            mode_row = df.groupBy(col_name).count().orderBy(F.desc('count')).first()
            mode_val = mode_row[0] if mode_row else 'Unknown'
            df = df.withColumn(col_name, when(col(col_name).isNull(), lit(mode_val)).otherwise(col(col_name)))
    return df

cat_cols = [
    'Weather_Condition', 'Wind_Direction', 'Weather_Timestamp',
    'Nautical_Twilight', 'Civil_Twilight', 'Sunrise_Sunset', 'Astronomical_Twilight',
    'Street', 'Timezone', 'Zipcode', 'City'
]
df = impute_mode(df, cat_cols)

# Verificar que no queden nulos 
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
# Imputación de nulos en columnas de fecha con la mediana o valor fijo si la columna es nula

from pyspark.sql.functions import col, when, lit, to_timestamp, unix_timestamp

def impute_datetime_median_spark(df, cols):
    for col_name in cols:
        if col_name in df.columns:
            # Convertir a timestamp si es necesario
            df = df.withColumn(col_name, to_timestamp(col(col_name)))
            # Convertir a long 
            df = df.withColumn(col_name + '_long', unix_timestamp(col(col_name)))
            notnull_count = df.filter(col(col_name + '_long').isNotNull()).count()
            if notnull_count > 0:
                median_val = df.approxQuantile(col_name + '_long', [0.5], 0.01)[0]
                df = df.withColumn(
                    col_name + '_long',
                    when(col(col_name + '_long').isNull(), lit(median_val)).otherwise(col(col_name + '_long'))
                )
                # Volver a timestamp
                df = df.withColumn(col_name, (col(col_name + '_long')).cast('timestamp'))
            else:
                df = df.withColumn(
                    col_name,
                    when(col(col_name).isNull(), lit('1970-01-01 00:00:00')).otherwise(col(col_name))
                )
            # Eliminar columna auxiliar
            df = df.drop(col_name + '_long')
    return df

fecha_cols = ['Start_Time', 'End_Time', 'Weather_Timestamp']
df = impute_datetime_median_spark(df, fecha_cols)

# Verificar que no queden nulos en fechas
df.select([F.count(F.when(col(c).isNull(), c)).alias(c) for c in fecha_cols]).show()

# Visualización de EDA distribuido: Graficar resultados agregados de Spark
Para visualizar los resultados de Spark, exportamos muestras o agregados a pandas y graficamos con matplotlib/seaborn. Esto permite análisis visual incluso en entornos distribuidos.

In [None]:
# Top 10 ciudades con más accidentes
city_pd = df.groupBy('City').count().orderBy('count', ascending=False).limit(10).toPandas()
plt.figure(figsize=(10, 5))
sns.barplot(x='City', y='count', data=city_pd)
plt.title('Top 10 ciudades con más accidentes')
plt.ylabel('Número de accidentes')
plt.xticks(rotation=45)
plt.show()

In [None]:
# Top 15 estados con más accidentes
state_pd = df.groupBy('State').count().orderBy('count', ascending=False).limit(15).toPandas()
plt.figure(figsize=(10, 5))
sns.barplot(x='State', y='count', data=state_pd)
plt.title('Accidentes por estado (Top 15)')
plt.ylabel('Número de accidentes')
plt.show()

In [None]:
# Distribución de accidentes por franja horaria (requiere conversión de fecha)
from pyspark.sql.functions import hour, col
if 'Start_Time' in df.columns:
    df = df.withColumn('hour', hour(col('Start_Time')))
    hour_pd = df.groupBy('hour', 'Severity').count().orderBy('hour', 'Severity').toPandas()
    plt.figure(figsize=(12, 6))
    sns.barplot(x='hour', y='count', hue='Severity', data=hour_pd)
    plt.title('Accidentes por hora y severidad')
    plt.xlabel('Hora')
    plt.ylabel('Número de accidentes')
    plt.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Ejemplo: Gráfico de barras de la distribución de severidad
severity_pd = df.groupBy('Severity').count().orderBy('Severity').toPandas()
sns.barplot(x='Severity', y='count', data=severity_pd)
plt.title('Distribución de Severidad de Accidentes')
plt.show()

In [None]:
# barplot de Distance(mi) vs Severity
boxplot_pd = df.groupBy('Severity').agg(F.expr('percentile_approx(`Distance(mi)`, 0.5)').alias('median'),
                                        F.avg('Distance(mi)').alias('mean')).orderBy('Severity').toPandas()
plt.figure(figsize=(8, 4))
sns.barplot(x='Severity', y='median', data=boxplot_pd)
plt.title('Mediana de Distance(mi) por Severity')
plt.ylabel('Distance(mi) (mediana)')
plt.show()

In [None]:
# Heatmap de Weather_Condition vs Severity (Top 10)
weather_pd = df.groupBy('Weather_Condition', 'Severity').count().toPandas()
top_weather = weather_pd.groupby('Weather_Condition')['count'].sum().nlargest(10).index
weather_pd = weather_pd[weather_pd['Weather_Condition'].isin(top_weather)]
weather_pivot = weather_pd.pivot(index='Weather_Condition', columns='Severity', values='count').fillna(0)
plt.figure(figsize=(10, 6))
sns.heatmap(weather_pivot, annot=True, fmt='.0f', cmap='Blues')
plt.title('Weather Condition vs Severity (Top 10)')
plt.ylabel('Weather_Condition')
plt.xlabel('Severity')
plt.show()

In [None]:
# Gráfico de barras de accidentes por hora y severidad
if 'hour' in df.columns:
    hour_pd = df.groupBy('hour', 'Severity').count().orderBy('hour', 'Severity').toPandas()
    plt.figure(figsize=(12, 6))
    sns.barplot(x='hour', y='count', hue='Severity', data=hour_pd)
    plt.title('Accidentes por hora y severidad')
    plt.xlabel('Hora')
    plt.ylabel('Número de accidentes')
    plt.show()

In [None]:
# barplots de variables numéricas vs Severity (además de Distance(mi))
for col in ['Temperature(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Speed(mph)']:
    boxplot_pd = df.groupBy('Severity').agg(F.expr(f'percentile_approx(`{col}`, 0.5)').alias('median')).orderBy('Severity').toPandas()
    plt.figure(figsize=(8, 4))
    sns.barplot(x='Severity', y='median', data=boxplot_pd)
    plt.title(f'Mediana de {col} por Severity')
    plt.ylabel(f'{col} (mediana)')
    plt.show()


# Eliminación de filas con outliers extremos en columnas numéricas
Filtramos las filas que tienen valores mayores al percentil 99.5 en 'Distance(mi)', 'Wind_Speed(mph)' y 'Visibility(mi)' para mejorar la calidad del análisis y los modelos de ML.

In [None]:
cols_outliers = ['Distance(mi)', 'Wind_Speed(mph)', 'Visibility(mi)']
percentile = 0.995  # Puedes ajustar este valor
from pyspark.sql import functions as F

for col in cols_outliers:
    if col in df.columns:
        upper = df.approxQuantile(col, [percentile], 0.01)[0]
        df = df.filter(F.col(col) <= upper)

print(f"Filas restantes tras eliminar outliers en {cols_outliers}: {df.count()}")

# Feature Engineering sobre la columna 'Description'
La columna 'Description' contiene información textual sobre el accidente. Vamos a explorarla y extraer features útiles para modelos de ML.

In [None]:
# Frecuencia de descripciones
if 'Description' in df.columns:
    desc_pd = df.groupBy('Description').count().orderBy('count', ascending=False).toPandas()
    print(desc_pd.head(10))

In [None]:
# Limpieza básica de texto: minúsculas, quitar signos de puntuación
from pyspark.sql.functions import lower, regexp_replace, length
if 'Description' in df.columns:
    df = df.withColumn('Description_clean', lower(regexp_replace('Description', '[^a-zA-Z0-9 ]', '')))

In [None]:
# Tokenización de palabras clave
from pyspark.ml.feature import Tokenizer
if 'Description_clean' in df.columns:
    tokenizer = Tokenizer(inputCol="Description_clean", outputCol="Description_tokens")
    df = tokenizer.transform(df)
    tokens_pd = df.select('Description_tokens').limit(10000).toPandas()
    from collections import Counter
    import itertools
    all_tokens = list(itertools.chain.from_iterable(tokens_pd['Description_tokens']))
    word_freq = Counter(all_tokens)
    print(word_freq.most_common(20))
    from wordcloud import WordCloud
    wc = WordCloud(width=800, height=400).generate_from_frequencies(word_freq)
    plt.figure(figsize=(12,6))
    plt.imshow(wc, interpolation='bilinear')
    plt.axis('off')
    plt.show()

In [None]:
# Features simples: longitud, número de palabras, presencia de palabras clave
from pyspark.sql.functions import size
if 'Description_clean' in df.columns and 'Description_tokens' in df.columns:
    df = df.withColumn('desc_length', length('Description_clean'))
    df = df.withColumn('desc_num_words', size('Description_tokens'))
    from pyspark.sql.functions import array_contains, lit
    df = df.withColumn('desc_has_accident', array_contains('Description_tokens', lit('accident')))

In [None]:
# Vectorización básica: TF y TF-IDF
from pyspark.ml.feature import CountVectorizer, IDF
if 'Description_tokens' in df.columns:
    cv = CountVectorizer(inputCol="Description_tokens", outputCol="desc_tf", vocabSize=1000, minDF=5)
    cv_model = cv.fit(df)
    df = cv_model.transform(df)
    idf = IDF(inputCol="desc_tf", outputCol="desc_tfidf")
    idf_model = idf.fit(df)
    df = idf_model.transform(df)


In [None]:
df.columns

# Feature Engineering y Pipeline de Preparación de Datos para ML en Spark

Implementación el pipeline de preparación de datos para clasificación de severidad, adaptando el flujo de Kaggle y usando PySpark, con ranking manual de Weather_Condition.

In [None]:
# Selección de features relevantes
features_finales = [
    'Distance(mi)', 'Precipitation(in)',
    'Weather_Condition', 'State', 'hour', 'weekday',
    'Sunrise_Sunset', 'Traffic_Signal', 'Crossing',
    'desc_length', 'desc_num_words', 'desc_has_accident', 'Severity', 'desc_tf',
 'desc_tfidf'
]
features_finales = [col for col in features_finales if col in df.columns]
df_ml = df.select(*features_finales)

In [None]:
# Ranking manual de Weather_Condition con ranking único por condición (igual que Kaggle)
from pyspark.sql.functions import monotonically_increasing_id

# Obtener lista única de condiciones
weather_conditions = df_ml.select('Weather_Condition').distinct().rdd.flatMap(lambda x: x).collect()
weather_conditions = sorted(weather_conditions)

very_good = [
    'Clear', 'Fair', 'Fair / Windy', 'Mostly Clear', 'Sunny', 'Partly Cloudy', 'Partly Cloudy / Windy',
    'Mostly Sunny', 'Scattered Clouds'
]
good = [
    'Mostly Cloudy', 'Cloudy', 'Overcast', 'Overcast / Windy', 'Cloudy / Windy',
    'Haze', 'Smoke'
]
moderate = [
    'Light Rain', 'Rain Showers', 'Rain', 'Showers', 'Drizzle', 'Sprinkles',
    'Light Rain / Windy', 'Rain / Windy', 'Light Drizzle',
    'Light Freezing Rain', 'Light Snow Grains', 'Light Freezing Drizzle'
]
bad = [
    'Heavy Rain', 'Heavy Rain / Windy', 'Rain Shower', 'Thunderstorms', 'Thunderstorm',
    'Thunderstorms and Rain', 'Light Thunderstorms and Rain',
    'Fog', 'Fog / Windy', 'Mist', 'Patches of Fog', 'Shallow Fog',
    'Blowing Dust', 'Widespread Dust', 'Volcanic Ash'
]
very_bad = [
    'Snow', 'Light Snow', 'Heavy Snow', 'Snow Showers', 'Blowing Snow',
    'Freezing Rain', 'Freezing Drizzle', 'Ice Pellets', 'Sleet', 'Hail',
    'Snow Grains', 'Small Hail', 'Freezing Fog', 'Heavy Freezing Rain',
    'Heavy Thunderstorms and Rain', 'Unknown', 'Duststorm', 'Sandstorm'
]

# Crear ranking único por condición
ranking = {}
rank = 0
for group in [very_good, good, moderate, bad, very_bad]:
    for cond in group:
        ranking[cond] = rank
        rank += 1
remaining_conditions = [cond for cond in weather_conditions if cond not in ranking]
for cond in remaining_conditions:
    ranking[cond] = rank
    rank += 1

# Crear un DataFrame de mapeo para Spark
import pandas as pd
ranking_pd = pd.DataFrame(list(ranking.items()), columns=['Weather_Condition', 'Weather_Condition_rank'])
ranking_spark = spark.createDataFrame(ranking_pd)

# Hacer join para asignar el ranking único
from pyspark.sql.functions import coalesce

df_ml = df_ml.join(ranking_spark, on='Weather_Condition', how='left')
df_ml = df_ml.drop('Weather_Condition')

In [None]:
# Encoding de variables categóricas restantes (StringIndexer + OneHotEncoder)
from pyspark.sql.functions import col

# Convertir columnas booleanas a string
for c in ['Traffic_Signal', 'Crossing']:
    if c in df_ml.columns and dict(df_ml.dtypes)[c] == 'boolean':
        df_ml = df_ml.withColumn(c, col(c).cast('string'))

from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
cat_vars = ['State', 'hour', 'weekday', 'Sunrise_Sunset', 'Traffic_Signal', 'Crossing']
cat_vars = [col for col in cat_vars if col in df_ml.columns]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep") for col in cat_vars]
encoders = [OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_ohe") for col in cat_vars]
pipeline = Pipeline(stages=indexers + encoders)
df_ml = pipeline.fit(df_ml).transform(df_ml)

In [None]:
# Estandarización de variables numéricas
from pyspark.ml.feature import StandardScaler, VectorAssembler
num_vars = ['Distance(mi)', 'Precipitation(in)', 'desc_length', 'desc_num_words']
num_vars = [col for col in num_vars if col in df_ml.columns]
assembler_num = VectorAssembler(inputCols=num_vars, outputCol="num_features")
df_ml = assembler_num.transform(df_ml)
scaler = StandardScaler(inputCol="num_features", outputCol="num_features_scaled")
df_ml = scaler.fit(df_ml).transform(df_ml)

In [None]:
# Incorporar features de texto TF-IDF
from pyspark.ml.feature import VectorSlicer
n_features_tfidf = 200  
tfidf_col = 'desc_tfidf'
if tfidf_col in df_ml.columns:
    slicer = VectorSlicer(inputCol=tfidf_col, outputCol="desc_tfidf_sliced", indices=list(range(n_features_tfidf)))
    df_ml = slicer.transform(df_ml)

In [None]:
tfidf_col in df_ml.columns

In [None]:
# División train/test (randomSplit, no estratifica pero es lo estándar en Spark)
train_df, test_df = df_ml.randomSplit([0.8, 0.2], seed=42)
print('Train count:', train_df.count(), 'Test count:', test_df.count())

In [None]:
# Guardar dataset final para ML (Parquet)
train_df.write.mode('overwrite').parquet('../data/train_ml.parquet')
test_df.write.mode('overwrite').parquet('../data/test_ml.parquet')