<a href="https://colab.research.google.com/github/Jarcos09/Tareas/blob/main/ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🎓 **Inteligencia Artificial Aplicada**

## 🤖 **Análisis de grandes volúmenes de datos (Gpo 10)**

### 🏛️ Tecnológico de Monterrey

#### 👨‍🏫 **Profesor titular :** Dr. Iván Olmos Pineda
#### 👩‍🏫 **Profesor asistente :** Verónica Sandra Guzmán de Valle

### 📊 **Proyecto | Base de Datos de Big Data**

#### 📅 **04 de mayo de 2025**

* 🧑‍💻 **A01795941 :** Juan Carlos Pérez Nava




In [34]:
import os
import sys
module_path = os.path.abspath(os.path.join('proyectos/librerias'))
if module_path not in sys.path:
    sys.path.append(module_path)
from graficas import *

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, sum, avg, lit, count, when, format_number, round, rand
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import Imputer

import matplotlib.pyplot as plt
import seaborn as sns
import kagglehub

import pandas as pd


from functools import reduce
spark.sparkContext.setLogLevel("ERROR")

In [2]:
path = kagglehub.dataset_download("sobhanmoosavi/us-accidents")
print("Path to dataset files:", path)

Path to dataset files: /home/jarcos/.cache/kagglehub/datasets/sobhanmoosavi/us-accidents/versions/13


In [3]:
spark = SparkSession.builder.master("local[*]").appName("CargarCSV").config("spark.driver.memory", "40g").config("spark.executor.memory", "20g").getOrCreate()
df_accident = spark.read.option("header", True).option("inferSchema", True).csv(path)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/23 23:53:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df_accident.show(5)

25/05/23 23:53:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+------------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|              Street|        City|    County|State|   Zipcode|Country|  Timezone|Airport_Code|  Weather_Timestamp|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pressure(in)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Precipitation(in)|Weather_Condition|Ameni

**Obtención de las estadísticas descriptivas de las características categóricas**

# Particionamiento

El particionamiento del conjunto de datos se basa en las condiciones climáticas y la severidad del accidente, dividiéndolo en múltiples subconjuntos según combinaciones específicas de estas características.

In [5]:
columnas_clave = [
    "ID", "Weather_Condition","Precipitation(in)","Severity", "City", "State",
    "Temperature(F)", "Humidity(%)", "Visibility(mi)","Wind_Direction","Wind_Speed(mph)","Crossing","Junction","Railway",
    "Roundabout","Stop","Sunrise_Sunset","Traffic_Calming","Traffic_Signal"]

total = df_accident.count()

combinaciones_top = df_accident.groupBy("Weather_Condition", "Severity") \
    .agg(count("*").alias("Frecuencia")) \
    .withColumn("Proporción", col("Frecuencia") / total) \
    .orderBy(col("Proporción").desc())

combinaciones_top = combinaciones_top.withColumn("Frecuencia", col("Frecuencia"))  \
    .withColumn("Proporción", col("Proporción")*100)

df_particionada = df_accident.select(columnas_clave)
df_particionada.write.mode("overwrite").partitionBy("Weather_Condition","Severity").parquet("us_accidents_partitioned")

combinaciones_top.show(10, truncate=False)





+-----------------+--------+----------+------------------+
|Weather_Condition|Severity|Frecuencia|Proporción        |
+-----------------+--------+----------+------------------+
|Fair             |2       |2226576   |28.810332392473782|
|Mostly Cloudy    |2       |792735    |10.25743511523869 |
|Cloudy           |2       |692929    |8.966015449005317 |
|Partly Cloudy    |2       |548760    |7.1005696655734685|
|Clear            |2       |536971    |6.948028270815386 |
|Light Rain       |2       |270162    |3.495706870017238 |
|Overcast         |2       |248938    |3.22108319011686  |
|Clear            |3       |244956    |3.1695589018882835|
|Fair             |3       |240084    |3.1065186376367455|
|Mostly Cloudy    |3       |189229    |2.4484905919651614|
+-----------------+--------+----------+------------------+
only showing top 10 rows



                                                                                

In [6]:
max_reg = 2000

# Filtrar las filas
combinaciones_filtradas = combinaciones_top.filter(col("Frecuencia") >+ max_reg)

# Si hay datos, guardarlos en un vector
particiones = combinaciones_filtradas.select("Weather_Condition", "Severity").collect()

# Mostrar el resultado

print(f'✅ Se identificaron \033[32m\033[1m{len(particiones)}\033[0m particiones que contienen más de \033[36m{max_reg}\033[0m registros.')



✅ Se identificaron [32m[1m77[0m particiones que contienen más de [36m2000[0m registros.


In [8]:
contador_total = 0
semilla = 42

# Crear un DataFrame vacío con la misma estructura
df_muestras = spark.createDataFrame([], df_particionada.schema)
lista_muestras = []

for particion in particiones:

    contador_total += 1
    weather = particion["Weather_Condition"]
    severity = particion["Severity"]

    print(f"Extrayendo Partición #\033[32m\033[1m{contador_total:03}\033[0m | 🌦 Weather: \033[1;36m{weather}\033[0m | ⚠ Severity: \033[1;36m{severity}\033[0m")

    # Filtrar correctamente la partición
    df_filtrada = df_particionada.filter((col("Weather_Condition") == weather) & (col("Severity") == severity))

    # Limitar a 1200 registros
    df_rand = df_filtrada.orderBy(rand(semilla)).limit(max_reg)

    lista_muestras.append(df_rand)

df_muestras = lista_muestras[0]  # Inicializamos con el primer DataFrame

for df in lista_muestras[1:]:
    df_muestras = df_muestras.union(df)

# **Optimizar con persistencia**
#df_muestras = df_muestras.persist().coalesce(8)

# Contar los registros en el nuevo DataFrame
contador_total = df_muestras.count()


print(f"Total de registros obtenidos en la muestra: \033[32m\033[1m{contador_total}\033[0m")

[Stage 36:> (5 + 16) / 23][Stage 37:>  (0 + 0) / 23][Stage 38:>  (0 + 0) / 23]

Extrayendo Partición #[32m[1m001[0m | 🌦 Weather: [1;36mFair[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m002[0m | 🌦 Weather: [1;36mMostly Cloudy[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m003[0m | 🌦 Weather: [1;36mCloudy[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m004[0m | 🌦 Weather: [1;36mPartly Cloudy[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m005[0m | 🌦 Weather: [1;36mClear[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m006[0m | 🌦 Weather: [1;36mLight Rain[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m007[0m | 🌦 Weather: [1;36mOvercast[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m008[0m | 🌦 Weather: [1;36mClear[0m | ⚠ Severity: [1;36m3[0m
Extrayendo Partición #[32m[1m009[0m | 🌦 Weather: [1;36mFair[0m | ⚠ Severity: [1;36m3[0m
Extrayendo Partición #[32m[1m010[0m | 🌦 Weather: [1;36mMostly Cloudy[0m | ⚠ Severity: [1;36m3[0m
Extrayend

[Stage 36:> (8 + 15) / 23][Stage 37:>  (0 + 1) / 23][Stage 38:>  (0 + 0) / 23][Stage 36:>(11 + 12) / 23][Stage 37:>  (0 + 4) / 23][Stage 38:>  (0 + 0) / 23]

Extrayendo Partición #[32m[1m030[0m | 🌦 Weather: [1;36mLight Snow[0m | ⚠ Severity: [1;36m3[0m
Extrayendo Partición #[32m[1m031[0m | 🌦 Weather: [1;36mRain[0m | ⚠ Severity: [1;36m3[0m
Extrayendo Partición #[32m[1m032[0m | 🌦 Weather: [1;36mLight Drizzle[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m033[0m | 🌦 Weather: [1;36mPartly Cloudy[0m | ⚠ Severity: [1;36m4[0m
Extrayendo Partición #[32m[1m034[0m | 🌦 Weather: [1;36mThunder in the Vicinity[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m035[0m | 🌦 Weather: [1;36mCloudy / Windy[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m036[0m | 🌦 Weather: [1;36mHaze[0m | ⚠ Severity: [1;36m3[0m
Extrayendo Partición #[32m[1m037[0m | 🌦 Weather: [1;36mOvercast[0m | ⚠ Severity: [1;36m4[0m
Extrayendo Partición #[32m[1m038[0m | 🌦 Weather: [1;36mMostly Cloudy / Windy[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m039[0m | 🌦 Weather: [1;36mT-Storm[0m 

[Stage 36:>(13 + 10) / 23][Stage 37:>  (0 + 6) / 23][Stage 38:>  (0 + 0) / 23][Stage 36:=>(14 + 9) / 23][Stage 37:>  (0 + 7) / 23][Stage 38:>  (0 + 0) / 23]

Extrayendo Partición #[32m[1m043[0m | 🌦 Weather: [1;36mMostly Cloudy[0m | ⚠ Severity: [1;36m1[0m
Extrayendo Partición #[32m[1m044[0m | 🌦 Weather: [1;36mSmoke[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m045[0m | 🌦 Weather: [1;36mWintry Mix[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m046[0m | 🌦 Weather: [1;36mFog[0m | ⚠ Severity: [1;36m3[0m
Extrayendo Partición #[32m[1m047[0m | 🌦 Weather: [1;36mLight Rain[0m | ⚠ Severity: [1;36m4[0m
Extrayendo Partición #[32m[1m048[0m | 🌦 Weather: [1;36mPartly Cloudy / Windy[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m049[0m | 🌦 Weather: [1;36mCloudy[0m | ⚠ Severity: [1;36m1[0m
Extrayendo Partición #[32m[1m050[0m | 🌦 Weather: [1;36mHeavy T-Storm[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m051[0m | 🌦 Weather: [1;36mNone[0m | ⚠ Severity: [1;36m4[0m
Extrayendo Partición #[32m[1m052[0m | 🌦 Weather: [1;36mHeavy Rain[0m | ⚠ Severity: [1;36m3

[Stage 36:=>(15 + 8) / 23][Stage 37:>  (0 + 8) / 23][Stage 38:>  (0 + 0) / 23]

Extrayendo Partición #[32m[1m074[0m | 🌦 Weather: [1;36mLight Freezing Rain[0m | ⚠ Severity: [1;36m2[0m
Extrayendo Partición #[32m[1m075[0m | 🌦 Weather: [1;36mThunder in the Vicinity[0m | ⚠ Severity: [1;36m3[0m
Extrayendo Partición #[32m[1m076[0m | 🌦 Weather: [1;36mCloudy / Windy[0m | ⚠ Severity: [1;36m3[0m
Extrayendo Partición #[32m[1m077[0m | 🌦 Weather: [1;36mRain[0m | ⚠ Severity: [1;36m4[0m




Total de registros obtenidos en la muestra: [32m[1m148000[0m


                                                                                

# Imputando valores faltantes

In [9]:
def obten_nulos(particion):
  from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

  print(f"📊 Total de filas en la partición: {particion.count()}")
  print(f"🗂️ Número de columnas en la partición: {len(particion.columns)}")

  info_nulos = {}
  cols_nulos = {}

  total_rows = particion.count()

  registros_totales = particion.count()

  # Contar valores nulos por columna

  cols_nulos = particion.select(
    [sum(col(c).isNull().cast("int")).alias(c) for c in particion.columns]
    )

  # Convertir los resultados en un diccionario
  info_nulos = {c: cols_nulos.select(c).collect()[0][0] for c in particion.columns}

  # Filtrar solo las columnas con valores nulos
  cols_nulos = {c: {"count": v, "percent": (v / total_rows) * 100} for c, v in info_nulos.items() if v > 0}

  # Validar si existen nulos
  if not cols_nulos:
        print("✅ No existen valores nulos en la partición.")
        return

  listado = [(key, value['count'], value['percent']) for key, value in cols_nulos.items()]

  # Definir el esquema del DataFrame
  schema = StructType([
    StructField("Columna", StringType(), True),
    StructField("Total de nulos", IntegerType(), True),
    StructField("Porcentaje", DoubleType(), True)
  ])

  df_resumen_nulos = spark.createDataFrame(listado, schema=schema)

  for col_name in [c for c, t in df_resumen_nulos.dtypes if t == "double"]:
      df_resumen_nulos = df_resumen_nulos.withColumn(col_name, round(df_resumen_nulos[col_name], 2))

  df_resumen_nulos.orderBy(col("Total de nulos").desc()).show(truncate=False)


In [10]:
def imputacion_valores(particion):
    print("✅ Se realiza la imputación utilizando los siguientes valores:\n")

    # Obtener las modas (valores más frecuentes)
    moda_Weather = particion.groupBy("Weather_Condition").count().orderBy(col("count").desc()).first()["Weather_Condition"]
    moda_City = particion.groupBy("City").count().orderBy(col("count").desc()).first()["City"]
    moda_Sunset = particion.groupBy("Sunrise_Sunset").count().orderBy(col("count").desc()).first()["Sunrise_Sunset"]
    moda_wub = particion.groupBy("Wind_Direction").count().orderBy(col("count").desc()).first()["Wind_Direction"]

    # Obtener promedios de las variables numéricas
    media_Temperature = particion.select(round(avg(col("Temperature(F)")), 2).alias("avg_temp")).collect()[0][0]
    media_Humidity = particion.select(round(avg(col("Humidity(%)")), 2).alias("avg_humidity")).collect()[0][0]
    media_Visibility = particion.select(round(avg(col("Visibility(mi)")), 2).alias("avg_visibility")).collect()[0][0]
    media_Precipitation = particion.select(round(avg(col("Precipitation(in)")), 2).alias("avg_precipitation")).collect()[0][0]
    media_Wind_Speed = particion.select(round(avg(col("Wind_Speed(mph)")), 2).alias("avg_wind_speed")).collect()[0][0]


    # Imprimir valores calculados correctamente
    print(f"🌡️ Temperatura promedio: {media_Temperature}")
    print(f"💧 Humedad promedio: {media_Humidity}")
    print(f"👀 Visibilidad promedio: {media_Visibility}")
    print(f"🌧️ Precipitación promedio: {media_Precipitation}")
    print(f"🌬️ Velocidad del viento promedio: {media_Wind_Speed}")

    print(f"☁️ Condición meteorológica más frecuente: {moda_Weather}")
    print(f"🏙️ Ciudad más frecuente: {moda_City}")
    print(f"🌅 Hora de atardecer más frecuente: {moda_Sunset}")
    print(f"🌬️ Dirección del viento más frecuente: {moda_wub}")

    # Aplicar imputación con Imputer
    from pyspark.ml.feature import Imputer

    imputer_num = Imputer(
        inputCols=["Temperature(F)", "Humidity(%)", "Visibility(mi)", "Precipitation(in)", "Wind_Speed(mph)"],
        outputCols=["Temperature(F)", "Humidity(%)", "Visibility(mi)", "Precipitation(in)", "Wind_Speed(mph)"]
    ).setStrategy("mean")

    particion = imputer_num.fit(particion).transform(particion)

    # Imputación de valores categóricos con na.fill()
    particion = particion.na.fill({
        "Weather_Condition": moda_Weather,
        "City": moda_City,
        "Sunrise_Sunset": moda_Sunset,
        "Wind_Direction": moda_wub
    })

    print("\n🔍 Se validan nuevamente los valores nulos para corroborar la imputación.\n")

    obten_nulos(particion)

    for col_name in [c for c, t in particion.dtypes if t == "double"]:
      particion = particion.withColumn(col_name, round(particion[col_name], 2))

    return particion

In [11]:
obten_nulos(df_muestras)

📊 Total de filas en la partición: 148000
🗂️ Número de columnas en la partición: 19
+-----------------+--------------+----------+
|Columna          |Total de nulos|Porcentaje|
+-----------------+--------------+----------+
|Precipitation(in)|33333         |22.52     |
|Wind_Speed(mph)  |8766          |5.92      |
|Humidity(%)      |1055          |0.71      |
|Wind_Direction   |760           |0.51      |
|Temperature(F)   |713           |0.48      |
|Sunrise_Sunset   |509           |0.34      |
|Visibility(mi)   |425           |0.29      |
|City             |7             |0.0       |
+-----------------+--------------+----------+



In [12]:
muestra_imp = imputacion_valores(df_muestras)

✅ Se realiza la imputación utilizando los siguientes valores:





🌡️ Temperatura promedio: 57.58
💧 Humedad promedio: 75.18
👀 Visibilidad promedio: 7.01
🌧️ Precipitación promedio: 0.04
🌬️ Velocidad del viento promedio: 10.17
☁️ Condición meteorológica más frecuente: Fair
🏙️ Ciudad más frecuente: Houston
🌅 Hora de atardecer más frecuente: Day
🌬️ Dirección del viento más frecuente: CALM

🔍 Se validan nuevamente los valores nulos para corroborar la imputación.

📊 Total de filas en la partición: 148000
🗂️ Número de columnas en la partición: 19
✅ No existen valores nulos en la partición.


# Preparando datos

In [13]:
muestra_imp.groupBy("Severity").count().show()

+--------+-----+
|Severity|count|
+--------+-----+
|       2|74000|
|       3|42000|
|       4|22000|
|       1|10000|
+--------+-----+



In [14]:
muestra_imp.show()

+---------+-----------------+-----------------+--------+-------------+-----+--------------+-----------+--------------+--------------+---------------+--------+--------+-------+----------+-----+--------------+---------------+--------------+
|       ID|Weather_Condition|Precipitation(in)|Severity|         City|State|Temperature(F)|Humidity(%)|Visibility(mi)|Wind_Direction|Wind_Speed(mph)|Crossing|Junction|Railway|Roundabout| Stop|Sunrise_Sunset|Traffic_Calming|Traffic_Signal|
+---------+-----------------+-----------------+--------+-------------+-----+--------------+-----------+--------------+--------------+---------------+--------+--------+-------+----------+-----+--------------+---------------+--------------+
|A-5653328|             Fair|              0.0|       2|     Richmond|   CA|          52.0|       71.0|          10.0|             S|           15.0|    true|   false|  false|     false|false|         Night|          false|          true|
|A-4972285|             Fair|              0

In [15]:
categoricas = ["Weather_Condition", "City", "State", "Sunrise_Sunset","Wind_Direction"]
binarias = ["Crossing", "Junction", "Railway", "Roundabout", "Stop", "Traffic_Calming", "Traffic_Signal"]

In [35]:
# ✅ Crear una copia de `imp_sev_1` para trabajar sobre ella
Transf_muestra = muestra_imp.alias("copia_muestra")  # Esto asegura que el original quede intacto

# Convertir variables binarias a 0 y 1 en la copia
for columna in binarias:
    Transf_muestra = Transf_muestra.withColumn(columna + "_num", col(columna).cast("int"))

# Aplicar StringIndexer a las variables categóricas
indexers = [StringIndexer(inputCol=col, outputCol=col + "_Index").fit(Transf_muestra) for col in categoricas]
for indexer in indexers:
    Transf_muestra = indexer.transform(Transf_muestra)

# Aplicar One-Hot Encoding a las categóricas
#codificadores = [OneHotEncoder(inputCol=col + "_Index", outputCol=col + "_OHE").fit(Transf_muestra) for col in categoricas]
#for codificador in codificadores:
#    Transf_muestra = codificador.transform(Transf_muestra)

# 🔥 Eliminar las columnas originales que ya no se usarán en el modelo
Transf_muestra = Transf_muestra.drop(*categoricas).drop(*binarias)

Transf_muestra.show()




+---------+-----------------+--------+--------------+-----------+--------------+---------------+------------+------------+-----------+--------------+--------+-------------------+------------------+-----------------------+----------+-----------+--------------------+--------------------+
|       ID|Precipitation(in)|Severity|Temperature(F)|Humidity(%)|Visibility(mi)|Wind_Speed(mph)|Crossing_num|Junction_num|Railway_num|Roundabout_num|Stop_num|Traffic_Calming_num|Traffic_Signal_num|Weather_Condition_Index|City_Index|State_Index|Sunrise_Sunset_Index|Wind_Direction_Index|
+---------+-----------------+--------+--------------+-----------+--------------+---------------+------------+------------+-----------+--------------+--------+-------------------+------------------+-----------------------+----------+-----------+--------------------+--------------------+
|A-5653328|              0.0|       2|          52.0|       71.0|          10.0|           15.0|           1|           0|          0|     

In [36]:
from pyspark.sql.functions import when, col
from pyspark.sql.functions import mean


calcular_IQR(Transf_muestra,['Temperature(F)','Humidity(%)','Visibility(mi)','Wind_Speed(mph)'])

mean_values = Transf_muestra.select(
    mean("Temperature(F)").alias("mean_temp"),
    mean("Humidity(%)").alias("mean_humidity"),
    mean("Wind_Speed(mph)").alias("mean_wind_speed")
).collect()[0]


Transf_muestra_iqr = Transf_muestra.withColumn("Temperature(F)", when((col("Temperature(F)") < 14.5) | (col("Temperature(F)") > 114.5), mean_values["mean_temp"]).otherwise(col("Temperature(F)"))) \
             .withColumn("Humidity(%)", when((col("Humidity(%)") < -6.0) | (col("Humidity(%)") > 138.0), mean_values["mean_humidity"]).otherwise(col("Humidity(%)"))) \
             .withColumn("Wind_Speed(mph)", when((col("Wind_Speed(mph)") < -2.5) | (col("Wind_Speed(mph)") > 17.5), mean_values["mean_wind_speed"]).otherwise(col("Wind_Speed(mph)")))

In [37]:
calcular_IQR(Transf_muestra,['Temperature(F)','Humidity(%)','Visibility(mi)','Wind_Speed(mph)'])


Unnamed: 0_level_0,IQR,Límite Inf.,Límite Sup.
Columna,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Temperature(F),31.8,-5.6,121.6
Humidity(%),30.0,17.0,137.0
Visibility(mi),7.0,-7.5,20.5
Wind_Speed(mph),8.0,-7.0,25.0


In [38]:
calcular_IQR(Transf_muestra_iqr,['Temperature(F)','Humidity(%)','Visibility(mi)','Wind_Speed(mph)'])

Unnamed: 0_level_0,IQR,Límite Inf.,Límite Sup.
Columna,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Temperature(F),29.9,-0.85,118.75
Humidity(%),30.0,17.0,137.0
Visibility(mi),7.0,-7.5,20.5
Wind_Speed(mph),5.17,-2.755,17.925


In [39]:
atributos = [ 'Precipitation(in)', 'Temperature(F)', 'Humidity(%)', 'Visibility(mi)',
             'Crossing_num', 'Junction_num', 'Railway_num','Roundabout_num','Stop_num',
              'Traffic_Calming_num', 'Traffic_Signal_num','Weather_Condition_Index',
              'Sunrise_Sunset_Index','Wind_Direction_Index']

In [40]:
assembler = VectorAssembler(inputCols=atributos, outputCol = 'Caracteristicas')
df_vec = assembler.transform(Transf_muestra_iqr)
df_vec.select('Caracteristicas','Severity').show(5,truncate = False)

+---------------------------------------------------------------+--------+
|Caracteristicas                                                |Severity|
+---------------------------------------------------------------+--------+
|(14,[1,2,3,4,10,11,12,13],[52.0,71.0,10.0,1.0,1.0,1.0,1.0,1.0])|2       |
|(14,[1,2,3,11,13],[64.0,42.0,10.0,1.0,11.0])                   |2       |
|(14,[1,2,3,11,13],[68.0,40.0,10.0,1.0,6.0])                    |2       |
|(14,[0,1,2,3,4,11,13],[0.04,81.0,42.0,10.0,1.0,1.0,10.0])      |2       |
|(14,[1,2,3,11,13],[68.0,21.0,10.0,1.0,9.0])                    |2       |
+---------------------------------------------------------------+--------+
only showing top 5 rows



# Creando conjuntos de datos

In [41]:
#spark.conf.set("spark.sql.shuffle.partitions", "200")
train, test = df_vec.randomSplit([0.8,0.2], seed = 10)

train_size = train.count()
test_size = test.count()
total_size = train_size + test_size

train_pct = (train_size / total_size) * 100
test_pct = (test_size / total_size) * 100

print(f"""Existen {train_size} instancias en el conjunto train ({train_pct:.2f}%),
y {test_size} en el conjunto test ({test_pct:.2f}%).""")

Existen 118393 instancias en el conjunto train (80.00%),
y 29607 en el conjunto test (20.00%).


In [42]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='Caracteristicas', labelCol='Severity', maxIter=10, regParam=0.03, elasticNetParam=0.08)
lr_model = lr.fit(train)

In [43]:
y_pred = lr_model.transform(test)
y_pred.select('Caracteristicas','Severity','prediction').show(truncate = False)

+---------------------------------------------------------+--------+------------------+
|Caracteristicas                                          |Severity|prediction        |
+---------------------------------------------------------+--------+------------------+
|(14,[1,2,3,11,13],[62.0,34.0,10.0,1.0,14.0])             |2       |2.819870107813826 |
|(14,[1,2,3,11,12,13],[75.0,90.0,10.0,1.0,1.0,10.0])      |2       |2.8676742699880804|
|(14,[1,2,3,11,13],[89.0,59.0,10.0,1.0,1.0])              |2       |2.658066945416904 |
|(14,[1,2,3,11,13],[91.0,34.0,10.0,1.0,10.0])             |2       |2.7201346238856607|
|(14,[1,2,3,11,13],[85.0,34.0,10.0,1.0,9.0])              |2       |2.7278410864907965|
|(14,[1,2,3,11,13],[83.0,31.0,10.0,1.0,5.0])              |2       |2.7048710175259   |
|(14,[1,2,3,11],[67.0,90.0,10.0,1.0])                     |2       |2.7010615185452793|
|(14,[1,2,3,11,13],[88.0,8.0,10.0,1.0,17.0])              |2       |2.780504394903811 |
|(14,[1,2,3,10,11,12,13],[47.0,6

In [44]:
print ("The coefficient of the model is : ", lr_model.coefficients)
print ("The Intercept of the model is : ", lr_model.intercept)

The coefficient of the model is :  [0.1853748773460173,-0.0024634214028058142,-0.0001331171587630118,-0.0008318336420232537,-0.2204361732925904,0.06846844196034885,0.0,0.40097286456564646,-0.13201340819038784,0.0,-0.29161771987657376,-0.022379348229455857,0.11557946454825536,0.0070740658116992365]
The Intercept of the model is :  2.9087889814716283


In [45]:
#Root Mean Square Error
eval_lr = RegressionEvaluator(labelCol="Stop_num", predictionCol="prediction", metricName="rmse")
rmse_lr = eval_lr.evaluate(y_pred)
print("RMSE: %.3f" % rmse_lr)

# Mean Square Error
mse = eval_lr.evaluate(y_pred, {eval_lr.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval_lr.evaluate(y_pred, {eval_lr.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval_lr.evaluate(y_pred, {eval_lr.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 2.515
MSE: 6.324
MAE: 2.493
r2: -299.396


In [46]:
from pyspark.ml.clustering import KMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator

gmm = GaussianMixture(k=4, seed=1, featuresCol="Caracteristicas", predictionCol="Prediccion")
model = gmm.fit(df_vec)



In [47]:
print("Pesos de los clusters:", model.weights)
print("Medias de los clusters:", [g.mean for g in model.gaussians])
print("Covarianzas de los clusters:", [g.cov for g in model.gaussians])



Pesos de los clusters: [0.8610860884135962, 0.02827606923030971, 0.10446987656397766, 0.006167965792116337]
Medias de los clusters: [DenseVector([0.0219, 58.4098, 74.7803, 7.0779, 0.0941, 0.0001, 0.0002, 0.0, 0.0001, 0.0, 0.1442, 12.6512, 0.2936, 8.9464]), DenseVector([0.0306, 59.9939, 72.7909, 7.4928, 0.3916, 0.0039, 0.2445, 0.0003, 0.7561, 0.0003, 0.1331, 12.3939, 0.2853, 8.8585]), DenseVector([0.131, 59.2213, 78.9289, 6.1767, 0.0009, 0.7682, 0.0015, 0.0001, 0.0006, 0.0001, 0.0002, 13.1248, 0.3117, 9.391]), DenseVector([0.4484, 64.4302, 78.3936, 9.2562, 0.4383, 0.3645, 0.0255, 0.0014, 0.103, 0.1125, 0.5326, 16.351, 0.242, 9.7284])]
Covarianzas de los clusters: [DenseMatrix(14, 14, [0.0026, 0.0026, 0.1537, -0.0294, 0.0, 0.0, 0.0, 0.0, ..., 0.0001, -0.0, -0.0, 0.0, 0.064, -1.6459, -0.214, 41.9193], 0), DenseMatrix(14, 14, [0.0415, 0.0652, 0.2951, -0.0469, 0.0014, 0.0, -0.0012, 0.0, ..., 0.0416, -0.0006, -0.043, 0.0006, 0.1427, -0.1209, -0.3413, 43.2689], 0), DenseMatrix(14, 14, [0.0582

In [48]:
df_result = model.transform(df_vec)
df_result.select('Caracteristicas','Severity','Prediccion').show(truncate = False)

+----------------------------------------------------------------+--------+----------+
|Caracteristicas                                                 |Severity|Prediccion|
+----------------------------------------------------------------+--------+----------+
|(14,[1,2,3,4,10,11,12,13],[52.0,71.0,10.0,1.0,1.0,1.0,1.0,1.0]) |2       |0         |
|(14,[1,2,3,11,13],[64.0,42.0,10.0,1.0,11.0])                    |2       |0         |
|(14,[1,2,3,11,13],[68.0,40.0,10.0,1.0,6.0])                     |2       |0         |
|(14,[0,1,2,3,4,11,13],[0.04,81.0,42.0,10.0,1.0,1.0,10.0])       |2       |0         |
|(14,[1,2,3,11,13],[68.0,21.0,10.0,1.0,9.0])                     |2       |0         |
|(14,[1,2,3,11],[96.0,15.0,10.0,1.0])                            |2       |0         |
|(14,[1,2,3,4,10,11,12,13],[63.0,58.0,10.0,1.0,1.0,1.0,1.0,15.0])|2       |0         |
|(14,[1,2,3,11,13],[74.0,82.0,10.0,1.0,14.0])                    |2       |0         |
|(14,[1,2,3,11,13],[79.0,94.0,10.0,1.0,13.0

In [49]:
df_igual = df_result.withColumn("es_igual", when(col("Severity") == col("Prediccion"), 1).otherwise(0))
conteo_iguales = df_igual.select(count(when(col("es_igual") == 1, True)).alias("total_iguales"))

# Mostrar resultados
df_igual.show()
conteo_iguales.show()


+---------+-----------------+--------+-----------------+-----------+--------------+---------------+------------+------------+-----------+--------------+--------+-------------------+------------------+-----------------------+----------+-----------+--------------------+--------------------+--------------------+--------------------+----------+--------+
|       ID|Precipitation(in)|Severity|   Temperature(F)|Humidity(%)|Visibility(mi)|Wind_Speed(mph)|Crossing_num|Junction_num|Railway_num|Roundabout_num|Stop_num|Traffic_Calming_num|Traffic_Signal_num|Weather_Condition_Index|City_Index|State_Index|Sunrise_Sunset_Index|Wind_Direction_Index|     Caracteristicas|         probability|Prediccion|es_igual|
+---------+-----------------+--------+-----------------+-----------+--------------+---------------+------------+------------+-----------+--------------+--------+-------------------+------------------+-----------------------+----------+-----------+--------------------+--------------------+-------