In [1]:
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

import pyspark
from pyspark.ml import feature, regression, Pipeline, classification
from pyspark.sql import functions as fn, Row
from pyspark import sql
import re
import time

import matplotlib.pyplot as plt
import pandas as pd

In [2]:
## Convert back to spark for encoding pipeline ##
flights_df_clean = spark.read.csv('flights_clean.csv', header = True).drop('_c0')

In [3]:
## Check to make sure it made it okay ##
flights_df_clean.show(1)

+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+---------------+
|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_DELAY|
+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+---------------+
|    1|  1|          4|     OO|         5460|     N583SW|           RDD|                SFO|            Morning|        Delayed|
+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+---------------+
only showing top 1 row



In [4]:
## THE PIPELINE ##

categorical_cols = ['DAY', 'MONTH', 'DAY_OF_WEEK', 'AIRLINE','FLIGHT_NUMBER','TAIL_NUMBER','ORIGIN_AIRPORT',
                    'DESTINATION_AIRPORT','SCHEDULED_DEPARTURE']

label_indexer = [feature.StringIndexer(inputCol = "DEPARTURE_DELAY",
                                     outputCol = "label", handleInvalid = "skip")]

indexers = [feature.StringIndexer(inputCol = column, 
                                  outputCol = "{0}_index".format(column), handleInvalid = "skip")
            for column in categorical_cols
           ]

encoders = [feature.OneHotEncoder(dropLast=False, inputCol = indexer.getOutputCol(),
                                         outputCol = "{0}_encoded".format(indexer.getOutputCol()))
          for indexer in indexers
           ]

assembler = [feature.VectorAssembler(inputCols = [encoder.getOutputCol() for encoder in encoders],
                                     outputCol = "features")
             ]

# Build it and fit it to the dataset #

flights_pipe = Pipeline(stages=indexers + label_indexer + encoders +assembler)
preprocessing_model=flights_pipe.fit(flights_df_clean)

In [5]:
# Transform the dataset using the pipeline to prepare it for training the model #

flights_transformed = preprocessing_model.transform(flights_df_clean)

In [6]:
# Show the features column to make sure everything encoded #

flights_transformed.select('features').show(5, False)

+----------------------------------------------------------------------------------------+
|features                                                                                |
+----------------------------------------------------------------------------------------+
|(11659,[14,38,44,52,2403,10643,11276,11340,11654],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(11659,[14,38,44,56,2154,7076,11026,11349,11654],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(11659,[14,38,44,53,6373,10636,11070,11376,11654],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(11659,[14,38,44,49,127,6890,11023,11352,11654],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])  |
|(11659,[14,38,44,55,3335,7662,11108,11337,11654],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
+----------------------------------------------------------------------------------------+
only showing top 5 rows



In [7]:
# Make a training, validation, and test split #
training_df, validation_df, testing_df = flights_transformed.randomSplit([0.6, 0.3, 0.1], seed=0)

# Train the logistic regression model, manually passing it the optimized parameters #
lr = classification.LogisticRegression(regParam = 0.01,
                                      elasticNetParam = 0.5, maxIter = 10, featuresCol='features',labelCol='label')

lr_model = lr.fit(training_df)

In [8]:
# Read in demo .csv, and make sure the values are correct #
demo_df = spark.read.csv('demo.csv', header = True)
demo_df.show(1)

+-----------+-----+---+-------------+-----------+--------------+-------------------+-------------------+-------+
|DAY_OF_WEEK|MONTH|DAY|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|AIRLINE|
+-----------+-----+---+-------------+-----------+--------------+-------------------+-------------------+-------+
|          7|    4| 28|         1315|     N922EV|           LGA|                SFO|            Evening|     AA|
+-----------+-----+---+-------------+-----------+--------------+-------------------+-------------------+-------+



In [9]:
# transform the input data using the preprocessing pipeline and display the features to make sure the LR model will
# accept it #
transformed_demo_df = preprocessing_model.transform(demo_df)

In [10]:
# Make sure it's encoded properly #
transformed_demo_df.select('features').show()

+--------------------+
|            features|
+--------------------+
|(11659,[5,36,47,5...|
+--------------------+



In [11]:
# transform the input data using the LR model and display the probability #

lr_model.transform(transformed_demo_df).select('Probability','Prediction').show(1,False)

+----------------------------------------+----------+
|Probability                             |Prediction|
+----------------------------------------+----------+
|[0.5091047553125613,0.49089524468743884]|0.0       |
+----------------------------------------+----------+

