# Preprocessing

In [1]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import sys
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkFiles
import pyspark.sql.functions as F
import pyspark.sql.types as T 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [2]:
# Create a Spark session now to access functionalities of Spark
spark = (SparkSession.builder.appName('Preprocessing')
.master("spark://10.131.7.106:7077")
.config("spark.driver.memory", "20G")
.config("spark.executor.cores", "2")
.config("spark.driver.cores", "2")
.config("spark.executor.pyspark.memory", "30G")
.config("spark.dynamicAllocation.enabled", "true")
.getOrCreate() 
)

spark

In [3]:
pwd

'/mnt/group-10-pvc'

### Read data

In [4]:
file_names_range = list(range(2009, 2018))
file_paths = [f'/mnt/group-10-pvc/data/{file}.csv' for file in file_names_range]
print(file_paths)

['/mnt/group-10-pvc/data/2009.csv', '/mnt/group-10-pvc/data/2010.csv', '/mnt/group-10-pvc/data/2011.csv', '/mnt/group-10-pvc/data/2012.csv', '/mnt/group-10-pvc/data/2013.csv', '/mnt/group-10-pvc/data/2014.csv', '/mnt/group-10-pvc/data/2015.csv', '/mnt/group-10-pvc/data/2016.csv', '/mnt/group-10-pvc/data/2017.csv']


In [5]:
schema = T.StructType([
    T.StructField("FL_DATE", T.TimestampType(), nullable=True),
    T.StructField("OP_CARRIER", T.StringType(), nullable=True),
    T.StructField("OP_CARRIER_FL_NUM", T.IntegerType(), nullable=True),
    T.StructField("ORIGIN", T.StringType(), nullable=True),
    T.StructField("DEST", T.StringType(), nullable=True),
    T.StructField("CRS_DEP_TIME", T.StringType(), nullable=True),
    T.StructField("DEP_TIME", T.StringType(), nullable=True),
    T.StructField("DEP_DELAY", T.DoubleType(), nullable=True),
    T.StructField("TAXI_OUT", T.DoubleType(), nullable=True),
    T.StructField("WHEELS_OFF", T.StringType(), nullable=True),
    T.StructField("WHEELS_ON", T.StringType(), nullable=True),
    T.StructField("TAXI_IN", T.DoubleType(), nullable=True),
    T.StructField("CRS_ARR_TIME", T.StringType(), nullable=True),
    T.StructField("ARR_TIME",T.StringType(), nullable=True),
    T.StructField("ARR_DELAY", T.DoubleType(), nullable=True),
    T.StructField("CANCELLED", T.DoubleType(), nullable=True),
    T.StructField("CANCELLATION_CODE", T.StringType(), nullable=True),
    T.StructField("DIVERTED", T.DoubleType(), nullable=True),
    T.StructField("CRS_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("ACTUAL_ELAPSED_TIME", T.DoubleType(), nullable=True),
    T.StructField("AIR_TIME", T.DoubleType(), nullable=True),
    T.StructField("DISTANCE", T.DoubleType(), nullable=True),
    T.StructField("CARRIER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("WEATHER_DELAY", T.DoubleType(), nullable=True),
    T.StructField("NAS_DELAY", T.DoubleType(), nullable=True),
    T.StructField("SECURITY_DELAY", T.DoubleType(), nullable=True),
    T.StructField("LATE_AIRCRAFT_DELAY", T.DoubleType(), nullable=True),
    T.StructField("Unnamed: 27", T.StringType(), nullable=True)
])

In [6]:
df = spark.read.schema(schema).format("csv").option("header", "true").load(file_paths)

### Get an overview of data

In [7]:
df.sample(withReplacement=False, fraction = 0.00001).show(n=10, vertical = True)

-RECORD 0----------------------------------
 FL_DATE             | 2009-01-07 00:00:00 
 OP_CARRIER          | UA                  
 OP_CARRIER_FL_NUM   | 307                 
 ORIGIN              | BWI                 
 DEST                | LAX                 
 CRS_DEP_TIME        | 1703                
 DEP_TIME            | 1724.0              
 DEP_DELAY           | 21.0                
 TAXI_OUT            | 10.0                
 WHEELS_OFF          | 1734.0              
 WHEELS_ON           | 1944.0              
 TAXI_IN             | 8.0                 
 CRS_ARR_TIME        | 2005                
 ARR_TIME            | 1952.0              
 ARR_DELAY           | -13.0               
 CANCELLED           | 0.0                 
 CANCELLATION_CODE   | null                
 DIVERTED            | 0.0                 
 CRS_ELAPSED_TIME    | 362.0               
 ACTUAL_ELAPSED_TIME | 328.0               
 AIR_TIME            | 310.0               
 DISTANCE            | 2329.0   

In [8]:
df.dtypes

[('FL_DATE', 'timestamp'),
 ('OP_CARRIER', 'string'),
 ('OP_CARRIER_FL_NUM', 'int'),
 ('ORIGIN', 'string'),
 ('DEST', 'string'),
 ('CRS_DEP_TIME', 'string'),
 ('DEP_TIME', 'string'),
 ('DEP_DELAY', 'double'),
 ('TAXI_OUT', 'double'),
 ('WHEELS_OFF', 'string'),
 ('WHEELS_ON', 'string'),
 ('TAXI_IN', 'double'),
 ('CRS_ARR_TIME', 'string'),
 ('ARR_TIME', 'string'),
 ('ARR_DELAY', 'double'),
 ('CANCELLED', 'double'),
 ('CANCELLATION_CODE', 'string'),
 ('DIVERTED', 'double'),
 ('CRS_ELAPSED_TIME', 'double'),
 ('ACTUAL_ELAPSED_TIME', 'double'),
 ('AIR_TIME', 'double'),
 ('DISTANCE', 'double'),
 ('CARRIER_DELAY', 'double'),
 ('WEATHER_DELAY', 'double'),
 ('NAS_DELAY', 'double'),
 ('SECURITY_DELAY', 'double'),
 ('LATE_AIRCRAFT_DELAY', 'double'),
 ('Unnamed: 27', 'string')]

In [9]:
df.columns

['FL_DATE',
 'OP_CARRIER',
 'OP_CARRIER_FL_NUM',
 'ORIGIN',
 'DEST',
 'CRS_DEP_TIME',
 'DEP_TIME',
 'DEP_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'WHEELS_ON',
 'TAXI_IN',
 'CRS_ARR_TIME',
 'ARR_TIME',
 'ARR_DELAY',
 'CANCELLED',
 'CANCELLATION_CODE',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'ACTUAL_ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'CARRIER_DELAY',
 'WEATHER_DELAY',
 'NAS_DELAY',
 'SECURITY_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'Unnamed: 27']

In [10]:
df.describe().show(vertical=True)

-RECORD 0-----------------------------------
 summary             | count                
 OP_CARRIER          | 54343518             
 OP_CARRIER_FL_NUM   | 54343518             
 ORIGIN              | 54343518             
 DEST                | 54343518             
 CRS_DEP_TIME        | 54343517             
 DEP_TIME            | 53520112             
 DEP_DELAY           | 53520077             
 TAXI_OUT            | 53495447             
 WHEELS_OFF          | 53495451             
 WHEELS_ON           | 53465748             
 TAXI_IN             | 53465749             
 CRS_ARR_TIME        | 54343516             
 ARR_TIME            | 53465748             
 ARR_DELAY           | 53359207             
 CANCELLED           | 54343518             
 CANCELLATION_CODE   | 856625               
 DIVERTED            | 54343518             
 CRS_ELAPSED_TIME    | 54343468             
 ACTUAL_ELAPSED_TIME | 53359206             
 AIR_TIME            | 53359207             
 DISTANCE 

### Clean data

In [12]:
for sample_size, name in zip([0.01, 0.1, 1], ["clean_0.01","clean_0.1","clean_all"])
    df = spark.read.schema(schema).format("csv").option("header", "true").load(file_paths)
    if sampple_size < 1:
        df = df.sample(0.1)
    #drop useless columns
    df = df.drop("Unnamed: 27")

    def string_to_timestamp(flight_timestamp, timestamp, departure_timestamp = None):
        if timestamp is None:
            return None
        converted_hours_minutes = timestamp
        converted_hours_minutes = converted_hours_minutes.replace(".0", "").zfill(4)
        hour = int(converted_hours_minutes[:2]) % 24
        minute = int(converted_hours_minutes[2:]) % 60
        converted_timestamp = flight_timestamp
        converted_timestamp = converted_timestamp.replace(hour=hour, minute=minute)

        #attempt to fix flights that go past midnight, will produce wrong day on very rare occasions
        if departure_timestamp:
            if hour < departure_timestamp.hour:
                converted_timestamp = converted_timestamp + datetime.timedelta(days=1)
        return converted_timestamp

    def explain_cancelation_code(code):
        if code is None:
            return None
        return {"A" : 'By carrier', "B" : 'Due to weather', "C" : 'By national air system', "D" : 'For security'}[code]


    string_to_timestamp_udf = F.udf(string_to_timestamp, T.TimestampType())  
    explain_cancelation_code_udf = F.udf(explain_cancelation_code, T.StringType())



    #convert time columns to actual timestamps
    timestamp_columns_list = [
     'WHEELS_OFF',
     'WHEELS_ON',
     'CRS_ARR_TIME',
     'ARR_TIME',
    ]
    column_name = "DEP_TIME"
    df = df.withColumn(column_name, string_to_timestamp_udf(F.col("FL_DATE"), F.col(column_name)))
    column_name = "CRS_DEP_TIME"
    df = df.withColumn(column_name, string_to_timestamp_udf(F.col("FL_DATE"), F.col(column_name)))
    for column_name in timestamp_columns_list:
        df = df.withColumn(column_name, string_to_timestamp_udf(F.col("FL_DATE"), F.col(column_name), F.col("DEP_TIME")))


    #add column with error code details
    df = df.withColumn("CANCELLATION_CODE_EXPLAINED", explain_cancelation_code_udf(F.col("CANCELLATION_CODE")))

    df.show(n=5, vertical = True)

    df.count()

    df.show()
    df.write.mode("overwrite").format("parquet").save(f"data/Clean_Data/{name}")


SyntaxError: invalid syntax (<ipython-input-12-6b8bed8aa63d>, line 1)

In [None]:
# Drop the cols which indirectly indicate if a flight is cancelled or not (apart from the column CANCELLED)
# Most of those cols contain null values, if the flight is cancelled

classify_df = df.drop("CARRIER_DELAY", 
                        "WEATHER_DELAY",
                        "NAS_DELAY",
                        "SECURITY_DELAY",
                        "LATE_AIRCRAFT_DELAY",
                        "CANCELLATION_CODE",
                        "DEP_TIME",
                        "DEP_DELAY",
                        "TAXI_OUT",
                        "WHEELS_OFF",
                        "WHEELS_ON",
                        "TAXI_IN",
                        "ARR_TIME",
                        "ARR_DELAY",
                        "ACTUAL_ELAPSED_TIME", 
                        "AIR_TIME")

In [None]:
# Convert timestamp column to numerical
classify_df = classify_df.withColumn("FL_DATE", F.unix_timestamp("FL_DATE"))

In [None]:
classify_df.columns

In [None]:
classify_df.show(5)

In [None]:
classify_df.count()

In [None]:
# Take a subset: either balanced (with subsampling) or unbalanced
# we take a subset, because of memory limitations

# select subsample of positive samples - 10%
pos_df = classify_df.filter(F.col('CANCELLED').isin(1)).sample(fraction=0.1)
# select an equal amount of negative samples (number of neg samples == number of pos samples)
neg_df = classify_df.filter(F.col('CANCELLED').isin(0)).orderBy(F.rand()).limit(pos_df.count())


# Combine pos_df and neg_df - 171146 rows
classify_df = pos_df.union(neg_df).sample(fraction=1.0).cache()
classify_df.show(5)

In [None]:
pos_df.count()

In [None]:
neg_df.count()

In [None]:
classify_df.count()

### Preprocess data for training models

In [None]:
# Define StringIndexer: categorical (string) cols -> to column indices, 
# Each category gets a integer based on their frequency (start from 0)

carrier_indexer = StringIndexer(inputCol="OP_CARRIER", outputCol="OP_CARRIER_Index")
origin_indexer = StringIndexer(inputCol="ORIGIN", outputCol="ORIGIN_Index")
dest_indexer = StringIndexer(inputCol="DEST", outputCol="DEST_Index")

In [None]:
# Define onehotencoder for a index columns 
onehotencoder_carrier_vector = OneHotEncoder(inputCol="OP_CARRIER_Index", outputCol="OP_CARRIER_vec")
onehotencoder_origin_vector = OneHotEncoder(inputCol="ORIGIN_Index", outputCol="ORIGIN_vec")
onehotencoder_dest_vector = OneHotEncoder(inputCol="DEST_Index", outputCol="DEST_vec")

In [None]:
# Pipelining the preprocessing stages defined above 
pipeline = Pipeline(stages=[carrier_indexer, origin_indexer, dest_indexer,
                            onehotencoder_carrier_vector, onehotencoder_origin_vector,
                            onehotencoder_dest_vector])

transformed_df = pipeline.fit(classify_df).transform(classify_df)

In [None]:
transformed_df.show(5)

In [None]:
# Select columns that are combined to one feature column
feature_columns = transformed_df.columns

# Remove cols that whould not be in our feature cols (label col, intermediate preprocessing cols)
for item in ["CANCELLED", "ORIGIN", "DEST", "OP_CARRIER", "OP_CARRIER_Index", "ORIGIN_Index", "DEST_Index"]:
    feature_columns.remove(item)


assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Build feature col
assembled_df = assembler.transform(transformed_df)

In [None]:
# Select only feature and label column
final_classify_df = assembled_df.select("features", F.col("CANCELLED").alias("label"))

In [None]:
final_classify_df.printSchema()

In [None]:
train, test = final_classify_df.randomSplit([.7, .3], seed=9) # 70, 30 split on balanced set or on subset of samples

In [None]:
spark.catalog.clearCache()
# caching data into memory - models run quicker
train = train.repartition(32).cache()
test = test.repartition(32).cache()

### Train models

In [None]:
# Define the models
decision_tree = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')
rand_forest = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
gbt = GBTClassifier(labelCol = 'label', featuresCol = 'features')

In [None]:
decision_tree_model = decision_tree.fit(train)

In [None]:
rand_forest_model = rand_forest.fit(train)

In [None]:
gbt_model = gbt.fit(train)