The Data
For this exercise, students will use data published by the US Department of Transportation. This
data can be downloaded from the following URL:
https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/HG7NV7
The dataset is divided into several independent files, to make download easier. You do not need
to download and use the entire dataset. A small piece should be sufficient, one that fits in your
development environment and does not take too long to process. The Spark application you
develop, however, should be able to work with any subset of this dataset, and not be limited to a
specific piece.

In [7]:
# Importar las bibliotecas necesarias
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import when, lit, col, length, concat_ws, substring, avg, desc, date_format
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler, Normalizer, UnivariateFeatureSelector
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.linalg import SparseVector

# Crear una sesión de Spark
conf = SparkConf()
sc = SparkContext.getOrCreate(conf)
conf.setAppName("App")
sc.setLogLevel("ERROR")


# Inicializa la sesión de Spark
spark = SparkSession.builder.appName('Spark Practical Work').getOrCreate()

# Después de comprobar todos los campos del CSV, asignamos los tipos correctos
schema = StructType() \
    .add("Year", StringType(), True) \
    .add("Month", StringType(), True) \
    .add("DayofMonth", StringType(), True) \
    .add("DayOfWeek", StringType(), True) \
    .add("DepTime", StringType(), True) \
    .add("CRSDepTime", StringType(), True) \
    .add("ArrTime", StringType(), True) \
    .add("CRSArrTime", StringType(), True) \
    .add("UniqueCarrier", StringType(), True) \
    .add("FlightNum", StringType(), True) \
    .add("TailNum", StringType(), True) \
    .add("ActualElapsedTime", IntegerType(), True) \
    .add("CRSElapsedTime", IntegerType(), True) \
    .add("AirTime", IntegerType(), True) \
    .add("ArrDelay", IntegerType(), True) \
    .add("DepDelay", IntegerType(), True) \
    .add("Origin", StringType(), True) \
    .add("Dest", StringType(), True) \
    .add("Distance", IntegerType(), True) \
    .add("TaxiIn", IntegerType(), True) \
    .add("TaxiOut", IntegerType(), True) \
    .add("Cancelled", IntegerType(), True) \
    .add("CancellationCode", StringType(), True) \
    .add("Diverted", IntegerType(), True) \
    .add("CarrierDelay", IntegerType(), True) \
    .add("WeatherDelay", IntegerType(), True) \
    .add("NASDelay", IntegerType(), True) \
    .add("SecurityDelay", IntegerType(), True) \
    .add("LateAircraftDelay", IntegerType(), True)

# CSVs en Dataframe
df = spark.read.options(header=True, nanValue="NA", emptyValue="") \
    .schema(schema) \
    .csv("resources/1987.csv")  # Ejemplo: "src/main/resources/2007.csv" y "src/main/resources/2008.csv" como parámetros de entrada

print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<Dataframe>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
df.show(10, truncate=False)

<<<<<<<<<<<<<<<<<<<<<<<<<<<<Dataframe>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1987|10   |14        

Forbidden variables
The dataset consists of a single table with 29 columns. Some of these columns must not be
used, and therefore need to be filtered at the beginning of the analysis. These are:
● ArrTime
● ActualElapsedTime
● AirTime
● TaxiIn
● Diverted
● CarrierDelay
● WeatherDelay
● NASDelay
● SecurityDelay
● LateAircraftDelay
These variables contain information that is unknown at the time the plane takes off and,
therefore, cannot be used in the prediction model.


In [10]:
print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<Dataframe with forbidden variables removed>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
df = df.drop("ArrTime").drop("ActualElapsedTime").drop("AirTime").drop("TaxiIn").drop("Diverted").drop("CarrierDelay").drop("WeatherDelay").drop("NASDelay").drop("SecurityDelay").drop("LateAircraftDelay")
df.show(10, truncate=False)

<<<<<<<<<<<<<<<<<<<<<<<<<<<<Dataframe with forbidden variables removed>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+---------+----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiOut|Cancelled|CancellationCode|
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+---------+----------------+
|1987|10   |14        |3        |741    |730       |849       |PS           |1451     |NA     |79            |23      |11      |SAN   |SFO |447     |NULL   |0        |NA              |
|1987|10   |15        |4        |729    |730       |849       |PS           |1451     |NA     |79            |14      |-1      |SAN   |SFO |447     |NULL  