In [1]:
# importamos librerias de pyspark para realizar el preprocesado de los datos
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, isnan, isnull, mean, round
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.feature import IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor

# importamos funciones auxiliares
# from filter_datasets import *
# from process_2008_data import *
from models import *


In [2]:
# se crea la sesion de spark
spark = SparkSession.builder.appName("proyecto").getOrCreate()


In [3]:
# file_configs = [
#     {"input": "airports.csv", "output": "filtered_airports.csv", "columns": ["iata"]},
#     {"input": "carriers.csv", "output": "filtered_carriers.csv", "columns": ["Code"]},
#     {
#         "input": "plane-data.csv",
#         "output": "filtered_plane_data.csv",
#         "columns": ["tailnum"],
#     },
# ]

# # Process each file
# for config in file_configs:
#     filter_columns(config["input"], config["output"], config["columns"])

In [4]:
# Input and output file paths
# input_2008_file = "2008.csv"
# input_plane_file = "plane-data.csv"
# output_file = "processed_2008.csv"
# #original_col = [Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay]
# # Run the function
# process_2008_data(input_2008_file, input_plane_file, output_file)

In [5]:
# EDA
# se carga el dataset
df = spark.read.csv("processed_2008.csv", header=True, inferSchema=True)
# Mostrar esquema de las columnas
df.printSchema()

# Mostrar los primeros registros
df.show(5)

# 1. Resumen estadístico de las columnas numéricas
numerical_cols = [
    "Month",
    "DayofMonth",
    "DayOfWeek",
    "DepTime",
    "CRSDepTime",
    "CRSArrTime",
    "CRSElapsedTime",
    "ArrDelay",
    "DepDelay",
]
df.select(numerical_cols).describe().show()

# 2. Inspección de valores nulos o faltantes
missing_data = df.select(
    [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
)
print("Cantidad de valores nulos por columna:")
missing_data.show()

# 3. Inspección de columnas categóricas
categorical_cols = ["UniqueCarrier", "TailNum", "Origin", "Dest"]
for col_name in categorical_cols:
    print(f"Distribución de valores únicos para la columna {col_name}:")
    df.groupBy(col_name).count().orderBy("count", ascending=False).show(5)

# 4. Inspección específica de la variable objetivo (ArrDelay)
print("Estadísticas descriptivas de la variable objetivo (ArrDelay):")
df.select("ArrDelay").describe().show()

# 5. Identificar correlaciones básicas (opcional, solo entre columnas numéricas)
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
vector_df = assembler.transform(df).select("features")
correlation_matrix = Correlation.corr(vector_df, "features").head()[0]
print("Matriz de correlación:")
print(correlation_matrix)

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- IssueDate: string (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: integer (nullable = true)

+-----+----------+---------+-------+----------+-------+----------+-------------+-------+----------+--------------+--------+--------+------+----+---------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|TailNum| IssueDate|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Cancelled|
+-----+----------+---------+--

In [6]:
# elimeinamos los nulos en la columna ArrDelay
df = df.filter(df["ArrDelay"].isNotNull())

In [7]:
missing_data = df.select(
    [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
)
print("Cantidad de valores nulos por columna:")
missing_data.show()


Cantidad de valores nulos por columna:
+-----+----------+---------+-------+----------+-------+----------+-------------+-------+---------+--------------+--------+--------+------+----+---------+
|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|TailNum|IssueDate|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Cancelled|
+-----+----------+---------+-------+----------+-------+----------+-------------+-------+---------+--------------+--------+--------+------+----+---------+
|    0|         0|        0|      0|         0|      0|         0|            0|      0|   192053|             0|       0|       0|     0|   0|        0|
+-----+----------+---------+-------+----------+-------+----------+-------------+-------+---------+--------------+--------+--------+------+----+---------+



In [8]:
from pyspark.sql.functions import col, year, avg, when, lit, coalesce
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler

current_year = 2008  # Ajustar según el año actual
data = df.withColumn(
    "PlaneAge",
    when(col("IssueDate").isNotNull(), current_year - year(col("IssueDate"))).otherwise(
        None
    ),
)
# Rellenar nulos en PlaneAge con la media
avg_age = data.select(avg("PlaneAge")).first()[0]
data = data.withColumn(
    "PlaneAge", when(col("PlaneAge").isNull(), avg_age).otherwise(col("PlaneAge"))
)

# Convertir columnas categóricas a índices numéricos
categorical_columns = ["UniqueCarrier", "Origin", "Dest"]
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_Index") for col in categorical_columns
]
for indexer in indexers:
    data = indexer.fit(data).transform(data)

# Eliminar columnas originales categóricas
data = data.drop(*categorical_columns)

# Normalizar los valores
feature_columns = [
    "Month",
    "DayofMonth",
    "DayOfWeek",
    "DepTime",
    "CRSDepTime",
    "CRSArrTime",
    "CRSElapsedTime",
    "DepDelay",
    "Cancelled",
    "PlaneAge",
    "UniqueCarrier_Index",
    "Origin_Index",
    "Dest_Index",
]

# Impute nulls with 0 before assembling the features
# This will prevent the MinMaxScaler from failing
for col_name in feature_columns:
    data = data.withColumn(col_name, coalesce(col(col_name), lit(0)))

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features_assembled")
data = assembler.transform(data)

scaler = MinMaxScaler(inputCol="features_assembled", outputCol="features")
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

# Seleccionar columnas finales (incluye la normalizada y la variable objetivo)
df = data.select("features", "ArrDelay")

# Mostrar algunas filas del conjunto preprocesado
df.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|features                                                                                                                                                                            |ArrDelay|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|[0.0,0.06666666666666667,0.5,0.5593997498957899,0.5616786774056803,0.6081424936386768,0.13362701908957414,0.04298554122704182,0.5,0.5,0.0,0.13448275862068965,0.27241379310344827]  |16.0    |
|[0.0,0.06666666666666667,0.5,0.468528553563985,0.47477744807121663,0.5275657336726038,0.15565345080763582,0.03790543180930051,0.5,0.5,0.0,0.13448275862068965,0.4103448275862069]   |2.0     |
|[0.0,0.06666666666666667,0.5,0.83701542

In [9]:
# Se crea un vector con las columnas que se van a utilizar
features = df.columns
features.remove("ArrDelay")
from pyspark.sql.functions import col
# Se convierte la variable target a numérica
# df = df.withColumn("ArrDelay", df["ArrDelay"].cast(IntegerType()))
# df = df.withColumn("ArrDelay", when(col("ArrDelay") > 15, 1).otherwise(0)) # 1 si el vuelo se retrasó más de 15 minutos, 0 si no
# df = df.withColumn("ArrDelay", df["ArrDelay"].cast(IntegerType()))


In [10]:
print("Primeras filas del dataframe:")
df.show(5)

Primeras filas del dataframe:
+--------------------+--------+
|            features|ArrDelay|
+--------------------+--------+
|[0.0,0.0666666666...|    16.0|
|[0.0,0.0666666666...|     2.0|
|[0.0,0.0666666666...|    -4.0|
|[0.0,0.0666666666...|    -2.0|
|[0.0,0.0666666666...|    16.0|
+--------------------+--------+
only showing top 5 rows



In [11]:
# # eliminamos las filas que hemos indexado
# df = df.drop(*["UniqueCarrier", "TailNum", "Origin", "Dest"])

In [12]:
# Dividir los datos en entrenamiento y prueba
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)



In [13]:
# Modelo 1: Regresión Lineal
lr = LinearRegression(featuresCol="features", labelCol="ArrDelay")
param_grid_lr = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 0.5])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)

best_lr_model, rmse_lr = train_and_evaluate(lr, param_grid_lr, train_data, test_data)




Resultados del modelo LinearRegression:
+--------+------------------+
|ArrDelay|prediction        |
+--------+------------------+
|60.0    |62.26202041735425 |
|163.0   |168.64584399515618|
|162.0   |170.47209558614256|
|172.0   |171.2217331357149 |
|140.0   |144.29143715341334|
|26.0    |0.7960002960486889|
|11.0    |16.539858124774085|
|20.0    |13.642598816385672|
|-23.0   |-9.32566518111362 |
|172.0   |172.26266830300213|
+--------+------------------+
only showing top 10 rows



In [14]:
# Modelo 2: Bosques Aleatorios
rf = RandomForestRegressor(featuresCol="features", labelCol="ArrDelay")
param_grid_rf = (
    ParamGridBuilder()
    .addGrid(rf.numTrees, [50, 100])
    .addGrid(rf.maxDepth, [5, 10])
    .build()
)

best_rf_model, rmse_rf = train_and_evaluate(rf, param_grid_rf, train_data, test_data)



KeyboardInterrupt: 

In [None]:
# Modelo 3: Gradient Boosted Trees
gbt = GBTRegressor(featuresCol="features", labelCol="ArrDelay")
param_grid_gbt = (
    ParamGridBuilder()
    .addGrid(gbt.maxIter, [10, 50])
    .addGrid(gbt.maxDepth, [5, 10])
    .build()
)

best_gbt_model, rmse_gbt = train_and_evaluate(
    gbt, param_grid_gbt, train_data, test_data
)



In [None]:
# Comparar modelos
results = [
    ("Linear Regression", rmse_lr),
    ("Random Forest", rmse_rf),
    ("Gradient Boosted Trees", rmse_gbt),
]
results_sorted = sorted(results, key=lambda x: x[1])  # Ordenar por menor RMSE

print("Model Comparison (RMSE):")
for model_name, rmse in results_sorted:
    print(f"{model_name}: {rmse:.3f}")

# Elegir el mejor modelo
best_model_name, best_rmse = results_sorted[0]
print(f"\nBest Model: {best_model_name} with RMSE = {best_rmse:.3f}")

In [15]:
# linear regression
# Uso de funciones del archivo models.py
from models import estimate_lr, estimate_kmeans
input_columns_lr = [col for col in df.columns if (col != "ArrDelay" and col != "IssueDate")]
print(input_columns_lr)
model = estimate_lr(df, input_columns_lr, target)

['features']


NameError: name 'target' is not defined

In [None]:
# kmeans
input_columns_kmeans = [col for col in df.columns if (col != "ArrDelay" and col != "IssueDate")]
k = 3
kmeans_model = estimate_kmeans(df, input_columns_kmeans, k)