In [124]:
#Import Libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evalu

In [125]:
#Configuration Jobs

spark = SparkSession \
    .builder \
    .appName("Flights") \
    .getOrCreate()

In [126]:
#Load csv Data to DataFrame

In [127]:
#Flights
flightsDF= spark.read.option("header","true").csv("./Data/flights.csv")
#Airports
airportsDF= spark.read.option("header","true").csv("./Data/airports.csv")
#Airline
airlinesDF= spark.read.option("header","true").csv("./Data/airlines.csv")

In [128]:
#Functions to transform or remove columns
def RemoveColumnsFunction(df: DataFrame, columns_to_drop: ['']) -> DataFrame:
    return df.drop(*columns_to_drop)

In [129]:
#join airlines to flights to avoid code
flightsDF = flightsDF.join(airlinesDF,flightsDF.AIRLINE ==  airlinesDF.IATA_CODE,"inner").drop(flightsDF.AIRLINE).drop(col("IATA_CODE")).withColumnRenamed("AIRLINE","AIRLINE_") 
flightsDF = flightsDF.join(airportsDF,flightsDF.ORIGIN_AIRPORT ==  airportsDF.IATA_CODE,"inner").withColumnRenamed("CITY","CITY_DEPART_") 
columns_to_drop = ['IATA_CODE','AIRPORT','CITY','STATE','COUNTRY','LATITUDE','LONGITUDE']
flightsDF = RemoveColumnsFunction(flightsDF, columns_to_drop)
flightsDF = flightsDF.join(airportsDF,flightsDF.DESTINATION_AIRPORT ==  airportsDF.IATA_CODE,"inner").withColumnRenamed("CITY","CITY_ARRIVAL_") 
flightsDF = RemoveColumnsFunction(flightsDF, columns_to_drop)

In [130]:
# remove unecessary columns
columns_to_drop = [
    "YEAR",
    "DAY",
    "TAXI_OUT",
    "TAXI_IN",
    "WHEELS_ON",
    "WHEELS_OFF",
    "YEAR",
    "DAY",
    "DAY_OF_WEEK",
    "AIR_SYSTEM_DELAY",
    "SECURITY_DELAY",
    "AIRLINE_DELAY",
    "LATE_AIRCRAFT_DELAY",
    "WEATHER_DELAY",
    "DIVERTED",
    "FLIGHT_NUMBER",
    "TAIL_NUMBER",
    "AIR_TIME",
    "ELAPSED_TIME",
    "DISTANCE",
    "SCHEDULED_DEPARTURE",
    "DEPARTURE_TIME",
    "SCHEDULED_TIME",
    "SCHEDULED_ARRIVAL",
    "ARRIVAL_TIME"
]

In [131]:
# Transform CANCELLED column values to boolean

In [154]:
flightsDF = flightsDF.drop(*columns_to_drop)
flightsDF = flightsDF.withColumn("CANCELLED", flightsDF.CANCELLED == 1)

In [155]:
flightsDF.show()

+-----+--------------+-------------------+---------------+-------------+---------+-------------------+--------------------+--------------+-----------------+
|MONTH|ORIGIN_AIRPORT|DESTINATION_AIRPORT|DEPARTURE_DELAY|ARRIVAL_DELAY|CANCELLED|CANCELLATION_REASON|            AIRLINE_|  CITY_DEPART_|    CITY_ARRIVAL_|
+-----+--------------+-------------------+---------------+-------------+---------+-------------------+--------------------+--------------+-----------------+
|    1|           ANC|                SEA|            -11|          -22|    false|               null|Alaska Airlines Inc.|     Anchorage|          Seattle|
|    1|           LAX|                PBI|             -8|           -9|    false|               null|American Airlines...|   Los Angeles|  West Palm Beach|
|    1|           SFO|                CLT|             -2|            5|    false|               null|     US Airways Inc.| San Francisco|        Charlotte|
|    1|           LAX|                MIA|             -5|

In [156]:
flightsDF.groupBy('CANCELLED').count().orderBy('count', ascending=False)

DataFrame[CANCELLED: boolean, count: bigint]

In [157]:
# Add feature column

In [162]:
flightsDF = flightsDF.withColumn("MONTH", flightsDF.MONTH.cast("integer"))

origin_airport_indexer = StringIndexer(inputCol="ORIGIN_AIRPORT", outputCol="ORIGIN_AIRPORT_index")
origin_airport_encoder = OneHotEncoder(inputCol="ORIGIN_AIRPORT_index", outputCol="ORIGIN_AIRPORT_fact")

dest_airport_indexer = StringIndexer(inputCol="DESTINATION_AIRPORT", outputCol="DESTINATION_AIRPORT_index")
dest_airport_encoder = OneHotEncoder(inputCol="DESTINATION_AIRPORT_index", outputCol="DESTINATION_AIRPORT_fact")

airline_indexer = StringIndexer(inputCol="AIRLINE_", outputCol="AIRLINE_index")
airline_encoder = OneHotEncoder(inputCol="AIRLINE_index", outputCol="AIRLINE_fact")

city_dep_indexer = StringIndexer(inputCol="CITY_DEPART_", outputCol="CITY_DEPART_index")
city_dep_encoder = OneHotEncoder(inputCol="CITY_DEPART_index", outputCol="CITY_DEPART_fact")

city_arrival_indexer = StringIndexer(inputCol="CITY_ARRIVAL_", outputCol="CITY_ARRIVAL_index")
city_arrival_encoder = OneHotEncoder(inputCol="CITY_ARRIVAL_index", outputCol="CITY_ARRIVAL_fact")

In [163]:
cols = [
    "MONTH",
    "ORIGIN_AIRPORT_fact",
    "DESTINATION_AIRPORT_fact",
    "AIRLINE_fact",
    "CITY_DEPART_fact",
    "CITY_ARRIVAL_fact"
]
assembler = VectorAssembler(inputCols=cols, outputCol="features")

In [164]:
# Pipeline

In [165]:
flights_pipe = Pipeline(stages=[
    origin_airport_indexer,
    origin_airport_encoder,
    dest_airport_indexer,
    dest_airport_encoder,
    airline_indexer,
    airline_encoder,
    city_dep_indexer,
    city_dep_encoder,
    city_arrival_indexer,
    city_arrival_encoder,
    assembler
])

piped_data = flights_pipe.fit(flightsDF).transform(flightsDF)

train_data, test_data = piped_data.randomSplit([.7, .3])

print('data points(rows) in train data :',  train_data.count())
print('data points(rows) in test data :',  test_data.count())

                                                                                

data points(rows) in train data : 3730718




data points(rows) in test data : 1602196


                                                                                

In [166]:
# training data

In [169]:
# Create a LogisticRegression Estimator
lr = LogisticRegression()
evaluator = evalu.BinaryClassificationEvaluator(metricName="areaUnderROC")

LogisticRegressionModel = lr.fit(train_data)

test_results = LogisticRegressionModel.transform(test_data)
print(evaluator.evaluate(test_results))

IllegalArgumentException: label does not exist. Available: MONTH, ORIGIN_AIRPORT, DESTINATION_AIRPORT, DEPARTURE_DELAY, ARRIVAL_DELAY, CANCELLED, CANCELLATION_REASON, AIRLINE_, CITY_DEPART_, CITY_ARRIVAL_, ORIGIN_AIRPORT_index, ORIGIN_AIRPORT_fact, DESTINATION_AIRPORT_index, DESTINATION_AIRPORT_fact, AIRLINE_index, AIRLINE_fact, CITY_DEPART_index, CITY_DEPART_fact, CITY_ARRIVAL_index, CITY_ARRIVAL_fact, features