# Data Loading

In [27]:
df = spark.read.format("parquet")\
    .option("header","true")\
    .option("inferschema","true")\
    .load("s3://vitaproject23/cleandata/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Drop Columns

Our aim is to predict delay at the time of Ticket booking and hence the following columns wont help us in prediction because customer will be unaware of the following data :- 'TAXI_OUT', 'TAXI_IN', 'WHEELS_OFF', 'WHEELS_ON', 'ARR_DELAY', 'DEP_DELAY', 'ACTUAL_ELAPSED_TIME', 'DEP_TIME', 'ARR_TIME'

In [28]:
df = df.drop('FL_DATE','TAXI_OUT','WHEELS_OFF','WHEELS_ON','TAXI_IN','ARR_DELAY','DEP_DELAY','ACTUAL_ELAPSED_TIME','DEP_TIME','ARR_TIME')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df.columns
print("Number of Columns: ", len(df.columns))
print("Number of Rows:", df.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of Columns:  12
Number of Rows: 60431020

# Data Preprocessing

In [23]:
from pyspark.ml.feature import StringIndexer,VectorAssembler,VectorIndexer,StandardScaler

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# StringIndexer

- Converting Categorical Columns such as 'OP_CARRIER', 'ORIGIN', 'DEST' are converted into Indexed Columns 'OP_CARRIER_I', 'ORIGIN_I', 'DEST_I' using StringIndexer.
- StringIndexer is used to convert categorical columns to numeric.

In [6]:
indexer1 = StringIndexer(inputCol='OP_CARRIER',outputCol='OP_CARRIER_I')
#strindexedDF1 = indexer1.fit(df).transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
indexer2 = StringIndexer(inputCol='ORIGIN',outputCol='ORIGIN_I')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
indexer3 = StringIndexer(inputCol='DEST',outputCol='DEST_I')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# VectorAssembler

- Machine Learning models in Spark expects all features in single column. Therefore VectorAssembler combines all features and gives us vector which can be stored in single column. 
- VectorAssembler expects only Numerical Features, hence we do not take into account 'OP_CARRRIER', 'ORIGIN' and 'DEST'.

In [11]:
assembler = VectorAssembler(inputCols= ['CRS_DEP_TIME', 'CRS_ARR_TIME', 'CRS_ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 
                'MONTH', 'WEEKDAY', 'OP_CARRIER_I', 'ORIGIN_I', 'DEST_I'], outputCol= "features")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# VectorIndexer

- After VectorAssembler next step is VectorIndexer. 
- Vector Indexer Automatically identifies categorical features from the feature vector (Output Column of VectorAssembler) and then indexes those categorical features inside vector. 
- VectorIndexer let usskip OneHotEncoding stage for encoding categorical features.

In [12]:
vecindexer = VectorIndexer(inputCol= "features", outputCol= "indexed_features")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# StandardScaler

- StandardScaler scales each value in the feature vector such that the mean is 0 and the standard deviation is 1
- It takes parameters:
    - withStd: True by default. Scales the data to unit standard deviation
    - withMean: False by default. Centers the data with mean before scaling

In [13]:
stdscaler = StandardScaler(inputCol= "indexed_features", outputCol= "scaledfeatures")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Pipeline Without Model

In [29]:
# import Pipeline from pyspark.ml package
from pyspark.ml import Pipeline

# Build the pipeline object by providing stages(transformers + Estimator) 
# that you need the dataframe to pass through
# Transfoermers - binarizer, bucketizer, indexers, encoder, assembler
# Estimator - lr
mlppipeline = Pipeline(stages=[indexer1,indexer2,indexer3,assembler,vecindexer,stdscaler])

# fit the pipeline for the trainind data


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
mlppipelinemodel = mlppipeline.fit(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
mlppipelinepredicted = mlppipelinemodel.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Train Test Split

In [32]:
from pyspark.sql.functions import col
trainDF = mlppipelinepredicted.where(col("YEAR")!=2018)
testDF = mlppipelinepredicted.where(col("YEAR")==2018)

# print the count of observations in each set
print("Observations in training set = ", trainDF.count())
print("Observations in testing set = ", testDF.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Observations in training set =  53359203
Observations in testing set =  7071817

# Model Building

In [33]:
# import the MultilayerPerceptronClassifier function from the pyspark.ml.classification package
from pyspark.ml.classification import MultilayerPerceptronClassifier
mlp = MultilayerPerceptronClassifier(layers = [10,5,5,2],featuresCol='scaledfeatures', labelCol='FLIGHT_STATUS')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
# fit the MultilayerPerceptronClassifier object on the training data
mlp_model = mlp.fit(trainDF)
# #This MultilayerPerceptronClassifier can be used as a transformer to perform prediction on the testing data
pred_df = mlp_model.transform(testDF)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# Build the MulticlassClassificationEvaluator object 'evaluator'
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 1. Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol = 'FLIGHT_STATUS', predictionCol = 'prediction', metricName = 'accuracy')
# 2. F1 Score (F-measure)
evaluator2 = MulticlassClassificationEvaluator(labelCol = 'FLIGHT_STATUS', predictionCol = 'prediction', metricName = 'f1')
mlpacc = evaluator.evaluate(pred_df)
mlpf1=evaluator2.evaluate(pred_df)
print("accuracy:",mlpacc)
print('f1:', mlpf1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

accuracy: 0.6906218020064716
f1: 0.6324285283466642

# Pipeline with model

In [34]:
from pyspark.sql.functions import col, column,when
import pyspark.sql.functions as F
from pyspark.sql.functions import month,dayofweek,year
from pyspark.ml.feature import StringIndexer,VectorAssembler,VectorIndexer,StandardScaler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml import Pipeline


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
df = spark.read.format("csv")\
    		.option("header","true")\
    		.option("inferschema","true")\
    		.load('s3://vitaproject23/rawdata/')
df=df.drop('Unnamed: 27')
     

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
CombinedDF.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

61556964

In [23]:
def Clean(df):
    df=df.where((col("CANCELLED")==0) & (col("DIVERTED")==0))
    df=df.drop("CANCELLED","CANCELLATION_CODE","DIVERTED","Unnamed: 27","OP_CARRIER_FL_NUM","DEP_TIME", "ARR_TIME")
    df=df.withColumn("CRS_DEP_TIME",col("CRS_DEP_TIME")/600).withColumn("WHEELS_OFF",col("WHEELS_OFF")/600).withColumn("WHEELS_ON",col("WHEELS_ON")/600).withColumn("CRS_ARR_TIME",col("CRS_ARR_TIME")/600)
    df=df.withColumn("CRS_DEP_TIME",F.col("CRS_DEP_TIME").cast("int")).withColumn("WHEELS_OFF",F.col("WHEELS_OFF").cast("int")).withColumn("WHEELS_ON",F.col("WHEELS_ON").cast("int")).withColumn("CRS_ARR_TIME",F.col("CRS_ARR_TIME").cast("int"))
    df = df.withColumn('MONTH',month(df.FL_DATE)).withColumn('WEEKDAY',dayofweek(df.FL_DATE)).withColumn('YEAR',year(df.FL_DATE))
    df = df.na.drop()
    carrier_name = {'UA':'United Airlines',
    'AS':'Alaska Airlines',
    '9E':'Endeavor Air',
    'B6':'JetBlue Airways',
    'EV':'ExpressJet',
    'F9':'Frontier Airlines',
    'HA':'Hawaiian Airlines',
    'MQ':'Envoy Air',
    'NK':'Spirit Airlines',
    'OO':'SkyWest Airlines',
    'VX':'Virgin America',
    'WN':'Southwest Airlines',
    'YV':'Mesa Airline',
    'YX':'Republic Airways',
    'AA':'American Airlines',
    'US':'US Airways',
    'FL':'AirTran Airways Corporation',
    'NW':'Northwest Airlines',
    'CO':'Continental Air Lines',
    'XE':'Expressjet Airlines',
    'DL':'Delta Airlines',
    'OH':'Comair Airlines',
    'G4': 'Allegiant Airlines'}
    df = df.na.replace(carrier_name,1)
    df = df.withColumn("FLIGHT_STATUS", when(df.ARR_DELAY <= 0 ,0).otherwise(1))
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
CleanedDF = Clean(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
def index(df):
    df = df.drop('FL_DATE','TAXI_OUT','WHEELS_OFF','WHEELS_ON','TAXI_IN','ARR_DELAY','DEP_DELAY','ACTUAL_ELAPSED_TIME','DEP_TIME','ARR_TIME')
    indexer1 = StringIndexer(inputCol='OP_CARRIER',outputCol='OP_CARRIER_I')
    indexer2 = StringIndexer(inputCol='ORIGIN',outputCol='ORIGIN_I')
    indexer3 = StringIndexer(inputCol='DEST',outputCol='DEST_I')
    return indexer3

stringindexed = index(CleanedDF)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
def assemble():
    assembler = VectorAssembler(inputCols= ['CRS_DEP_TIME', 'CRS_ARR_TIME', 'CRS_ELAPSED_TIME', 'AIR_TIME', 'DISTANCE', 
                'MONTH', 'WEEKDAY', 'OP_CARRIER_I', 'ORIGIN_I', 'DEST_I'], outputCol= "features")
    
    return assembler
assembleDF = assemble()
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
def vect():
    vecindexer = VectorIndexer(inputCol= "features", outputCol= "indexed_features")
    return vecindexer
vectindexed = vect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
def std():
    stdscaler = StandardScaler(inputCol= "indexed_features", outputCol= "scaledfeatures")
    return stdscaler
stdDF = std()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
def model():
    mlp = MultilayerPerceptronClassifier(layers = [10,5,5,2],featuresCol='scaledfeatures', labelCol='FLIGHT_STATUS')
    return mlp
mlpmodel = model()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
pipeline = Pipeline(stages=[CleanedDF])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
pipelinedata = pipeline.fit(df)
pipeline_data = pipelinedata.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Cannot recognize a pipeline stage of type <class 'pyspark.sql.dataframe.DataFrame'>.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 97, in _fit
    "Cannot recognize a pipeline stage of type %s." % type(stage))
TypeError: Cannot recognize a pipeline stage of type <class 'pyspark.sql.dataframe.DataFrame'>.



In [40]:
trainDF1 = df.where(col("YEAR")!=2018)
testDF1 = df.where(col("YEAR")==2018)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
"cannot resolve '`YEAR`' given input columns: [FL_DATE, TAXI_OUT, ARR_TIME, ARR_DELAY, DEP_DELAY, DEST, DIVERTED, NAS_DELAY, CANCELLED, LATE_AIRCRAFT_DELAY, WHEELS_ON, OP_CARRIER_FL_NUM, SECURITY_DELAY, CRS_DEP_TIME, CRS_ARR_TIME, DEP_TIME, AIR_TIME, WHEELS_OFF, WEATHER_DELAY, CRS_ELAPSED_TIME, ACTUAL_ELAPSED_TIME, CARRIER_DELAY, DISTANCE, ORIGIN, OP_CARRIER, TAXI_IN, CANCELLATION_CODE];;\n'Filter NOT ('YEAR = 2018)\n+- Project [FL_DATE#1276, OP_CARRIER#1277, OP_CARRIER_FL_NUM#1278, ORIGIN#1279, DEST#1280, CRS_DEP_TIME#1281, DEP_TIME#1282, DEP_DELAY#1283, TAXI_OUT#1284, WHEELS_OFF#1285, WHEELS_ON#1286, TAXI_IN#1287, CRS_ARR_TIME#1288, ARR_TIME#1289, ARR_DELAY#1290, CANCELLED#1291, CANCELLATION_CODE#1292, DIVERTED#1293, CRS_ELAPSED_TIME#1294, ACTUAL_ELAPSED_TIME#1295, AIR_TIME#1296, DISTANCE#1297, CARRIER_DELAY#1298, WEATHER_DELAY#1299, ... 3 more fields]\n   +- Relation[FL_DATE#1276,OP_CARRIER#1277,OP_CARRIER_FL_NUM#1278,ORIGIN#1279,DEST#1280,CRS_DEP_TIME#1281

In [43]:
pipelinemodel = pipeline.fit()
pipeline_pred = pipelinemodel.transform(testDF1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
pipelinemodel.save("s3://minmaxscale/model/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Build the BinaryClassificationEvaluator object 'evaluator'
evaluator = BinaryClassificationEvaluator()

# Calculate the accracy and print its value
accuracy = mlppipelinepredicted.filter(mlppipelinepredicted.FLIGHT_STATUS == mlppipelinepredicted.prediction).count()/float(mlppipelinepredicted.count())
print("Accuracy = ", accuracy)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling o523.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in stage 271.0 failed 4 times, most recent failure: Lost task 28.3 in stage 271.0 (TID 4387, ip-172-31-29-118.ec2.internal, executor 10): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$9: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(B