El dataset para este trabajo es el Flight Status Prediction que se obtuvo de Kaggle (https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022). El dataset es una recopilación de datos de vuelos, el más importante de ellos si el vuelo fue cancelado. El objetivo de esta actividad es realizar un modelo con ayuda de pyspark que pueda predecir si un vuelo va a ser cancelado

In [None]:
#Se importan librerías y dependencias

#Bibliotecas para poder trabajar con Spark
!sudo apt update;
!apt-get install openjdk-8-jdk-headless -qq > /dev/null;
!wget -q https://downloads.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz;
!tar xf spark-3.2.2-bin-hadoop3.2.tgz;
#Configuración de Spark con Python
!pip install -q findspark;
!pip install pyspark;

#Estableciendo variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init();
findspark.find();

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:14 http://security.ubuntu.com/ubuntu 

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark import SparkConf

In [None]:
#Se busca la ubicación de los datos en drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd "/content/drive/MyDrive/Big-Data-Portafolio/"
!ls

/content/drive/MyDrive/Big-Data-Portafolio
Combined_Flights_2018.csv  spark-3.2.2-bin-hadoop3.2.tgz
portafolio.ipynb	   spark-3.2.2-bin-hadoop3.2.tgz.1
spark-3.2.2-bin-hadoop3.2


In [None]:
#Se crea la sesión de spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('OperacionesFiltrado').config("spark.driver.memory", "12g").getOrCreate()
spark

In [None]:
#Se leen los datos

combined_flights = spark.read.csv('/content/drive/MyDrive/Big-Data-Portafolio/Combined_Flights_2018.csv', header=True, inferSchema=True)
combined_flights.show()

+----------+-----------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrD

In [None]:
#Se muestran las características cuando el vuelo fue cancelado
combined_flights.filter("Cancelled==true").show()

+----------+-----------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+--------------------+---------+-------------+--------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDel

In [None]:
#Se muestran las características cuando el vuelo no fue cancelado
combined_flights.filter("Cancelled==false").show()

+----------+-----------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrD

In [None]:
#Se quitan las columnas que son siempre null cuando el vuelo fue cancelado para que no haga una predicción automática el modelo
columns_objects_input = ["DepTime", "DepDelayMinutes", "DepDelay", "ArrTime", "ArrDelayMinutes", "AirTime", "ActualElapsedTime", "DepDel15", "DepartureDelayGroups", "TaxiOut", "WheelsOff", "WheelsOn", "TaxiIn", "ArrDelay", "ArrDel15", "ArrivalDelayGroups"]
combined_flights = combined_flights.drop(*columns_objects_input)

In [None]:
combined_flights.printSchema()

root
 |-- FlightDate: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |-- DOT_ID_Marketing_Airline: integer (nullable = true)
 |-- IATA_Code_Marketing_Airline: string (nullable = true)
 |-- Flight_Number_Marketing_Airline: integer (nullable = true)
 |-- Operating_Airline: string (nullable = true)
 |-- DOT_ID_Operating_Airline: integer (nullable = true)
 |-- IATA_Code

In [None]:
#Se quitan los nulls
combined_flights = combined_flights.na.drop(how="any")
combined_flights.show()

+----------+-----------------+------+----+---------+--------+----------+--------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+----------+----------+----------+-------------+------------------+
|FlightDate|          Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|CRSElapsedTime|Distance|Year|Quarter|Month|DayofMonth|DayOfWeek|Marketing_Airline_Network|Operated_or_Branded_Code_Share_Partners|DOT_ID_Marketing_Airline|IATA_Code_Marketing_Airline|Flight_Number_Marketing_Airline|Operating_Airl

In [None]:
#label encoding
columns_objects_input = [x[0] for x in combined_flights.dtypes if x[1].startswith('string')]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_encoded").setHandleInvalid("keep") for column in columns_objects_input]
pipeline = Pipeline(stages=indexers)
combined_flights = pipeline.fit(combined_flights).transform(combined_flights)
combined_flights = combined_flights.drop(*columns_objects_input)

In [None]:
#Se toman las variables dependientes
combined_flights = combined_flights.withColumn("Cancelled", col("Cancelled").cast(IntegerType()))
combined_flights.show()

+---------+--------+----------+--------------+--------+----+-------+-----+----------+---------+------------------------+-------------------------------+------------------------+-------------------------------+---------------+------------------+------------------+---------------+---------+-------------+----------------+----------------+-------------+-------+----------+-------------+------------------+------------------+---------------+--------------+------------+---------------------------------+-----------------------------------------------+-----------------------------------+-------------------------+-----------------------------------+-------------------+----------------------+-------------------+-----------------------+--------------------+-----------------+---------------------+------------------+------------------+
|Cancelled|Diverted|CRSDepTime|CRSElapsedTime|Distance|Year|Quarter|Month|DayofMonth|DayOfWeek|DOT_ID_Marketing_Airline|Flight_Number_Marketing_Airline|DOT_ID_Operatin

In [None]:
#Se crea un labeled point para procesar los datos (se quita el primero)
combined_flights_rdd = combined_flights.rdd.map(lambda x: LabeledPoint(x[0], x[2:]))

In [None]:
#Se dividen en entrenamiento y validación
(trainingData, testData) = combined_flights_rdd.randomSplit([0.7, 0.3])

In [None]:
#Se entrena un decision tree
numClasses = 2
categoricalFeaturesInfo = {}
model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo)

In [None]:
#Se generan las predicciones con test
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label)
predictions_zipped = predictions.zipWithIndex()
labelsAndPredictions_zipped = labelsAndPredictions.zipWithIndex()

In [None]:
#Se guardan las predicciones con los datos reales
data_rdd = predictions_zipped.map(lambda x: (x[1], x[0])).join(labelsAndPredictions_zipped.map(lambda x: (x[1], x[0]))).map(lambda x: (x[1][0], x[1][1]))

In [None]:
#Se cuenta cuantos tienen datos diferentes (i.e. cuantos son incorrectos) que en este caso son 20367
#Se imprime la estructura del árbol
testErr = data_rdd.filter(lambda lp: lp[0] != lp[1]).count()
testData_count = float(testData.count())
print(testErr, testData_count)
ratio_err = testErr / testData_count
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

20367 1702823.0
Test Error = 20367
Learned classification tree model:
DecisionTreeModel classifier of depth 5 with 11 nodes
  If (feature 10 <= 20283.5)
   If (feature 29 <= 0.5)
    If (feature 24 <= 0.5)
     Predict: 0.0
    Else (feature 24 > 0.5)
     If (feature 28 <= 122.5)
      Predict: 0.0
     Else (feature 28 > 122.5)
      If (feature 25 <= 38.5)
       Predict: 1.0
      Else (feature 25 > 38.5)
       Predict: 0.0
   Else (feature 29 > 0.5)
    Predict: 0.0
  Else (feature 10 > 20283.5)
   Predict: 0.0

