In [64]:
import findspark
findspark.init()
findspark.find()

from pyspark import SparkContext

from pyspark.sql import SparkSession

from pyspark import SparkConf

import pyspark.sql as pysql

from pyspark import StorageLevel


In [65]:
import pyspark.ml as pyml
import pyspark.sql.functions as pysqlfun


In [66]:
import os
import sys

#os.environ['PYSPARK_PYTHON'] = sys.executable
#os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [67]:
conf = SparkConf().setAppName("Tarea") \
    .setMaster("local[8]") \
    .set("spark.executor.cores","2") \
    .set("spark.executor.memory", "4g") \
    .set("spark.driver.memory","4g")

In [68]:
spark  = SparkSession.builder.config(conf=conf).getOrCreate()

In [6]:
columnas = [
    'Airline', # Aerolinea
    'CRSDepTime', #Hora de salida programada
    'DepTime', #Hora de salida actual
    'DepDelay', #Diferencia en minutos entre CRSDepTime y DepTime, tiempos negativos significa salidas tempranas
    'CRSArrTime', #Hora de llegada programada
    'ArrTime', #Hora de llegada
    'ArrDelay', #Diferencia de minutos entre la llegada programada y la hora de llegada, tiempos negativos significa llegadas tempranas
    'ActualElapsedTime', #Tiempo de vuelo verdadero
    'CRSElapsedTime', #Tiempo de vuelo programado
    'Distance', #Distancia entre aeropuertos en millas
    'Year', #Año del vuelo
    'Month', #Mes del vuelo
    'DayofMonth', #Dia del mes del vuelo
    'DayOfWeek', #Dia de la semana del vuelo
    'Tail_Number', #Numero de la cola, código para identificar la aeronave
    'Flight_Number_Operating_Airline', #Numero del vuelo
    
    'Origin', #Aeropuerto de origen
    'OriginAirportID', 'OriginAirportSeqID', #codigos del aeropuerto de origen
    'OriginCityName', #Nombre de la ciudad del aeropuerto de origen
    'OriginStateName', #Nombre de estado del aeropuerto de origen
    
    'Dest', #Aeropuerto de destino
    'DestAirportID', 'DestAirportSeqID', #codigos del aeropuerto de destino
    'DestCityName', #Nombre de la ciudad del aeropuerto de destino
    'DestStateName', #Nombre de estado del aeropuerto de destino
    
    'Cancelled', #El vuelo fue cancelado, 1 = Sí
    
    'Diverted', #Si el vuelo fue desviado, 1 = Sí
]

In [7]:
import zipfile
import pathlib

direccion = pathlib.Path.cwd()

path_datos_zip = pathlib.Path.joinpath(direccion.parent, 'Datos', 'data.zip')

path_guardar = pathlib.Path.joinpath(direccion.parent, 'Datos')

archivos_datos = [
    pathlib.Path.joinpath(direccion.parent, 'Datos', 'Combined_Flights_2022.csv').as_posix(),
    pathlib.Path.joinpath(direccion.parent, 'Datos', 'Combined_Flights_2021.csv').as_posix(),
    pathlib.Path.joinpath(direccion.parent, 'Datos', 'Combined_Flights_2020.csv').as_posix(),
    pathlib.Path.joinpath(direccion.parent, 'Datos', 'Combined_Flights_2019.csv').as_posix(),
    pathlib.Path.joinpath(direccion.parent, 'Datos', 'Combined_Flights_2018.csv').as_posix()
]

archivos_datos_en_zip  = [
    'Combined_Flights_2022.csv',
    'Combined_Flights_2021.csv',
    'Combined_Flights_2020.csv',
    'Combined_Flights_2019.csv',
    'Combined_Flights_2018.csv'
]

In [8]:
with zipfile.ZipFile(path_datos_zip, mode="r") as archivo_zip:
    for archivo in archivos_datos_en_zip:
        
        if os.path.isfile(path=pathlib.Path.joinpath(path_guardar,archivo).as_posix()):
            continue
        
        archivo_zip.extract(archivo, path_guardar)

In [9]:
df = spark.read.option("header","true").csv(archivos_datos)

                                                                                

In [10]:
df = df.select(columnas)

In [11]:
df = df.persist(StorageLevel.DISK_ONLY)

24/11/20 21:16:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [12]:
df = df.withColumn("CRSDepTime", df["CRSDepTime"].cast(pysql.types.FloatType()))
df = df.withColumn("DepTime", df["DepTime"].cast(pysql.types.FloatType()))
df = df.withColumn("DepDelay", df["DepDelay"].cast(pysql.types.FloatType()))
df = df.withColumn("ArrTime", df["ArrTime"].cast(pysql.types.FloatType()))
df = df.withColumn("ArrDelay", df["ArrDelay"].cast(pysql.types.FloatType()))
df = df.withColumn("ActualElapsedTime", df["ActualElapsedTime"].cast(pysql.types.FloatType()))
df = df.withColumn("CRSElapsedTime", df["CRSElapsedTime"].cast(pysql.types.FloatType()))
df = df.withColumn("Distance", df["Distance"].cast(pysql.types.FloatType()))
df = df.withColumn("Year", df["Year"].cast(pysql.types.IntegerType()))
df = df.withColumn("Month", df["Month"].cast(pysql.types.IntegerType()))
df = df.withColumn("DayofMonth", df["DayofMonth"].cast(pysql.types.IntegerType()))
df = df.withColumn("DayOfWeek", df["DayOfWeek"].cast(pysql.types.IntegerType()))

df = df.withColumn("Cancelled", pysql.functions.when(df["Cancelled"] == "False",0).otherwise(1))
df = df.withColumn("Diverted", pysql.functions.when(df["Diverted"] == "False",0).otherwise(1))

In [13]:
#cambiar None a Otros

for columna in ['Airline','Tail_Number','Flight_Number_Operating_Airline',
                'Origin','OriginAirportID', 'OriginAirportSeqID',
                'OriginCityName','OriginStateName','Dest','DestAirportID',
                'DestAirportSeqID','DestCityName','DestStateName']:

    df = df.withColumn(columna,pysqlfun.when(
        df[columna].isNull() | (df[columna] == "None"),"Otros").otherwise(df[columna]))



In [14]:
#df.select(['Airline']).distinct().toPandas().to_csv("airlines.csv", header=True, index=False)
#
#df.select([
#    'Origin', #Aeropuerto de origen
#    'OriginAirportID', 'OriginAirportSeqID', #codigos del aeropuerto de origen
#    'OriginCityName', #Nombre de la ciudad del aeropuerto de origen
#    'OriginStateName', #Nombre de estado del aeropuerto de origen
#]).distinct().toPandas().to_csv("airports_origin.csv", header=True, index=False)

#df.select([
#    'Dest', #Aeropuerto de destino
#    'DestAirportID', 'DestAirportSeqID', #codigos del aeropuerto de destino
#    'DestCityName', #Nombre de la ciudad del aeropuerto de destino
#    'DestStateName', #Nombre de estado del aeropuerto de destino
#]).distinct().toPandas().to_csv("airports_dest.csv", header=True, index=False)


## String a categorico

Tail_Number

In [15]:
df_tail_number = df.groupBy("Tail_Number").count() \
    .sort("count", ascending = False).persist(StorageLevel.DISK_ONLY)

24/11/20 21:16:36 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [16]:
df_tail_number = df_tail_number.withColumn(
    "Tail_Number_Grupo", pysqlfun.when((df_tail_number["Tail_Number"] == "Otros"), "0")
    .when(df_tail_number["count"] < 100, "1")
    .when((df_tail_number["count"] >= 100) & (df_tail_number["count"] < 1000), "2")
    .when((df_tail_number["count"] >= 1000) & (df_tail_number["count"] < 10000), "3")
    .when((df_tail_number["count"] >= 10000) & (df_tail_number["count"] < 100000), "4")
    .otherwise("5")
)

In [17]:
#df_tail_number.toPandas().to_csv("df_tail_number.csv", header=True, index=False)

In [18]:
df = df.join(other=df_tail_number,on="Tail_Number",how="left")

Aeropuerto de origen

In [19]:
df_origin = df.groupBy("Origin").count() \
    .sort("count", ascending = False).persist(StorageLevel.DISK_ONLY)

In [20]:
df_origin = df_origin.withColumn(
    "Origin_Grupo", pysqlfun.when((df_origin["Origin"] == "Otros"), "0")
    .when((df_origin["count"] < 1000), "1")
    .when((df_origin["count"] >= 1000) & (df_origin["count"] < 5000), "2")
    .when((df_origin["count"] >= 5000) & (df_origin["count"] < 10000), "3")
    .when((df_origin["count"] >= 10000) & (df_origin["count"] < 50000), "4")
    .when((df_origin["count"] >= 50000) & (df_origin["count"] < 100000), "5")
    .when((df_origin["count"] >= 100000) & (df_origin["count"] < 500000), "6")
    .when((df_origin["count"] >= 500000) & (df_origin["count"] < 1000000), "7")
    .when((df_origin["count"] >= 1000000) & (df_origin["count"] < 5000000), "8")
    .otherwise("9")
)

In [21]:
#df_origin.toPandas().to_csv("df_origin.csv", header=True, index=False)

In [22]:
df = df.join(other=df_origin,on="Origin",how="left")

Aeropuerto de destino

In [23]:
df_dest = df.groupBy("Dest").count() \
    .sort("count", ascending = False).persist(StorageLevel.DISK_ONLY)

In [24]:
df_dest = df_dest.withColumn(
    "Dest_Grupo", pysqlfun.when((df_dest["Dest"] == "Otros"), "0")
    .when((df_dest["count"] < 1000), "1")
    .when((df_dest["count"] >= 1000) & (df_dest["count"] < 5000), "2")
    .when((df_dest["count"] >= 5000) & (df_dest["count"] < 10000), "3")
    .when((df_dest["count"] >= 10000) & (df_dest["count"] < 50000), "4")
    .when((df_dest["count"] >= 50000) & (df_dest["count"] < 100000), "5")
    .when((df_dest["count"] >= 100000) & (df_dest["count"] < 500000), "6")
    .when((df_dest["count"] >= 500000) & (df_dest["count"] < 1000000), "7")
    .when((df_dest["count"] >= 1000000) & (df_dest["count"] < 5000000), "8")
    .otherwise("9")
)

In [25]:
#df_dest.toPandas().to_csv("df_dest.csv", header=True, index=False)

In [26]:
df = df.join(other=df_dest,on="Dest",how="left")

Estado de origen

In [27]:
df_origin_state = df.groupBy("OriginStateName").agg({"Cancelled" : "avg"}) \
    .sort("avg(Cancelled)", ascending = False).persist(StorageLevel.DISK_ONLY)

In [28]:
df_origin_state = df_origin_state.withColumn(
    "OriginStateName_Grupo", pysqlfun.when((df_origin_state["OriginStateName"] == "Otros"), "0")
    .when((df_origin_state["avg(Cancelled)"] < 0.01), "1")
    .when((df_origin_state["avg(Cancelled)"] >= 0.010) & (df_origin_state["avg(Cancelled)"] < 0.015), "2")
    .when((df_origin_state["avg(Cancelled)"] >= 0.015) & (df_origin_state["avg(Cancelled)"] < 0.020), "3")
    .when((df_origin_state["avg(Cancelled)"] >= 0.020) & (df_origin_state["avg(Cancelled)"] < 0.025), "4")
    .when((df_origin_state["avg(Cancelled)"] >= 0.025) & (df_origin_state["avg(Cancelled)"] < 0.030), "5")
    .when((df_origin_state["avg(Cancelled)"] >= 0.030) & (df_origin_state["avg(Cancelled)"] < 0.035), "6")
    .when((df_origin_state["avg(Cancelled)"] >= 0.035) & (df_origin_state["avg(Cancelled)"] < 0.040), "7")
    .otherwise("8")
)

In [29]:
#df_origin_state.toPandas().to_csv("df_origin_state.csv", header=True, index=False)

In [30]:
df = df.join(other=df_origin_state,on="OriginStateName",how="left")

Estado de destino

In [31]:
df_dest_state = df.groupBy("DestStateName").agg({"Cancelled" : "avg"}) \
    .sort("avg(Cancelled)", ascending = False).persist(StorageLevel.DISK_ONLY)

In [32]:
df_dest_state = df_dest_state.withColumn(
    "DestStateName_Grupo", pysqlfun.when((df_dest_state["DestStateName"] == "Otros"), "0")
    .when((df_dest_state["avg(Cancelled)"] < 0.01), "1")
    .when((df_dest_state["avg(Cancelled)"] >= 0.010) & (df_dest_state["avg(Cancelled)"] < 0.015), "2")
    .when((df_dest_state["avg(Cancelled)"] >= 0.015) & (df_dest_state["avg(Cancelled)"] < 0.020), "3")
    .when((df_dest_state["avg(Cancelled)"] >= 0.020) & (df_dest_state["avg(Cancelled)"] < 0.025), "4")
    .when((df_dest_state["avg(Cancelled)"] >= 0.025) & (df_dest_state["avg(Cancelled)"] < 0.030), "5")
    .when((df_dest_state["avg(Cancelled)"] >= 0.030) & (df_dest_state["avg(Cancelled)"] < 0.035), "6")
    .when((df_dest_state["avg(Cancelled)"] >= 0.035) & (df_dest_state["avg(Cancelled)"] < 0.040), "7")
    .otherwise("8")
)

In [33]:
#df_dest_state.toPandas().to_csv("df_dest_state.csv", header=True, index=False)

In [34]:
df = df.join(other=df_dest_state,on="DestStateName",how="left")

In [35]:
df_seleccion = df.select(["Cancelled","Distance","CRSDepTime","CRSArrTime",
                          "Year","Month","DayOfWeek","DayofMonth","Airline",
                          "Tail_Number_Grupo","origin_Grupo","Dest_Grupo",
                          "OriginStateName_Grupo","DestStateName_Grupo"])

In [36]:
df_seleccion = df_seleccion.persist(StorageLevel.DISK_ONLY)

In [37]:
indexer = pyml.feature.StringIndexer(
    inputCols=["Airline","Tail_Number_Grupo","origin_Grupo",
               "Dest_Grupo","OriginStateName_Grupo","DestStateName_Grupo"],
    outputCols=["Airline_indexado","Tail_Number_indexado","Origin_indexado",
                "Dest_indexado","OriginStateName_indexado","DestStateName_indexado"])

In [38]:
df_indexado = indexer.fit(df_seleccion) \
    .transform(df_seleccion).persist(StorageLevel.DISK_ONLY)

                                                                                

In [39]:
assembler = pyml.feature.VectorAssembler(
    inputCols=[
        "Distance",
        #"CRSDepTime",
        #"CRSArrTime",
        "Year",
        "Month",
        "DayofMonth",
        "DayOfWeek",
        "Airline_indexado",
        "Tail_Number_indexado",
        "Origin_indexado",
        "Dest_indexado",
        "OriginStateName_indexado",
        "DestStateName_indexado"
    ],
    outputCol="features"
)

In [40]:
df_with_features = assembler.transform(df_indexado)

In [41]:
train, test = df_with_features.randomSplit([0.8,0.2], seed = 42)

In [42]:
train = train.persist(StorageLevel.DISK_ONLY)

In [43]:
test = test.persist(StorageLevel.DISK_ONLY)

In [44]:
cantidad_total = train.count()

                                                                                

In [45]:
cantidad_cancelados = train.filter(train["Cancelled"] == 1).count()

                                                                                

In [46]:
print(cantidad_total, cantidad_cancelados)

23351864 621004


In [47]:
weight_cancelled = cantidad_total / (2 * cantidad_cancelados)

weight_no_cancelled = cantidad_total / (2 * (cantidad_total - cantidad_cancelados))

In [48]:
print(weight_cancelled, weight_no_cancelled)

18.801701760375135 0.5136599319163463


In [49]:
train = train.withColumn("weight", pysqlfun.when(
    train["Cancelled"] == 1, weight_cancelled).otherwise(weight_no_cancelled))

In [54]:
ml_class = pyml.classification.RandomForestClassifier(
    featuresCol="features",
    labelCol="Cancelled",
    weightCol="weight", seed= 255
)

In [55]:
model = ml_class.fit(train)

24/11/20 21:37:25 WARN MemoryStore: Not enough space to cache rdd_307_34 in memory! (computed 27.0 MiB so far)
24/11/20 21:37:25 WARN BlockManager: Persisting block rdd_307_34 to disk instead.
24/11/20 21:37:26 WARN MemoryStore: Not enough space to cache rdd_307_33 in memory! (computed 40.8 MiB so far)
24/11/20 21:37:26 WARN BlockManager: Persisting block rdd_307_33 to disk instead.
24/11/20 21:37:26 WARN MemoryStore: Not enough space to cache rdd_307_36 in memory! (computed 3.5 MiB so far)
24/11/20 21:37:26 WARN BlockManager: Persisting block rdd_307_36 to disk instead.
24/11/20 21:37:26 WARN MemoryStore: Not enough space to cache rdd_307_35 in memory! (computed 27.0 MiB so far)
24/11/20 21:37:26 WARN BlockManager: Persisting block rdd_307_35 to disk instead.
24/11/20 21:37:26 WARN MemoryStore: Not enough space to cache rdd_307_38 in memory! (computed 1544.0 KiB so far)
24/11/20 21:37:26 WARN BlockManager: Persisting block rdd_307_38 to disk instead.
24/11/20 21:37:26 WARN MemoryStore

In [None]:
model.save("RandomForest_model")

                                                                                

In [58]:
#evaluar modelo
prediction = model.transform(test)

In [59]:
evaluator = pyml.evaluation.BinaryClassificationEvaluator(
    labelCol="Cancelled",
    rawPredictionCol = "prediction",
    metricName="areaUnderROC"
)

In [61]:
AUC = evaluator.evaluate(prediction)



                                                                                

In [None]:
AUC

0.7282688942849

In [56]:
#true positive, true negative, false positive, false negative

tp = prediction.filter((pysqlfun.col("Cancelled") == 1) & (pysqlfun.col("prediction") == 1)).count()
tn = prediction.filter((pysqlfun.col("Cancelled") == 0) & (pysqlfun.col("prediction") == 0)).count()
fp = prediction.filter((pysqlfun.col("Cancelled") == 0) & (pysqlfun.col("prediction") == 1)).count()
fn = prediction.filter((pysqlfun.col("Cancelled") == 1) & (pysqlfun.col("prediction") == 0)).count()

                                                                                

In [57]:
#accuracy
accuracy = (tp + tn) / (tp + tn + fp + fn)
print(f"Accuracy: {accuracy}")

Accuracy: 0.8699598316854157


In [58]:
#precision
precision = tp / (tp + fp)
print(f"Precision: {precision}")

Precision: 0.11528245853623705


In [59]:
#recall
recall = tp / (tp + fn)
print(f"Recall: {recall}")

Recall: 0.5785694630206767


In [60]:
#F1

f1 = 2 *(precision * recall) / (precision + recall)

print(f"F1: {f1}")

F1: 0.19225690110175203


In [70]:
spark.stop()