#Initialization

In [1]:
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark -q

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jre-headless is already the newest version (8u312-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 42 not upgraded.


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create SparkSession
spark=SparkSession.builder.config('spark.executor.memory','8g').config('spark.driver.memory', '20g') .getOrCreate()

#Data

In [4]:
import matplotlib.pyplot as plt

In [5]:
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml.stat import Correlation

In [6]:
df03=spark.read.csv('drive/MyDrive/HW4/2003.csv',header=True,inferSchema=True)
df04=spark.read.csv('drive/MyDrive/HW4/2004.csv',header=True,inferSchema=True)
df05=spark.read.csv('drive/MyDrive/HW4/2005.csv',header=True,inferSchema=True)

In [7]:
total_data=df03.union(df04).union(df05)

In [None]:
total_data.count()

20758406

In [None]:
total_data.filter(total_data.Cancelled==1).show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2003|    1|        13|        1|     NA|      1400|     NA|      1539|           UA|     1021

In [None]:
total_data.filter(total_data.Diverted==1).count()

39193

In [8]:
#刪除取消沒有資訊的班機、刪除Delay詳細資料
total_data=total_data.filter(total_data.Cancelled==0).drop('DepTime','ArrTime','Cancelled','CancellationCode',\
                              'CarrierDelay','WeatherDelay','NASDelay','SecurityDelay','LateAircraftDelay')

In [9]:
total_data=total_data.withColumn('delay',F.when((total_data.ArrDelay>0)|(total_data.DepDelay>0),1).otherwise(0)).\
            withColumn('CRSDepH',(total_data.CRSDepTime/100).cast('int')).\
            withColumn('CRSArrH',(total_data.CRSArrTime/100).cast('int')).drop('ArrDelay','DepDelay','CRSArrTime','CRSDepTime')

In [10]:
total_data=total_data.withColumn('ActualElapsedTime', F.regexp_replace('ActualElapsedTime', 'NA', '0')\
                                 .cast(IntegerType()))\
.withColumn('CRSElapsedTime', F.regexp_replace('CRSElapsedTime', 'NA', '0').cast(IntegerType()))\
.withColumn('AirTime', F.regexp_replace('AirTime', 'NA', '0').cast(IntegerType()))

In [11]:
total_data=total_data.na.fill({'TailNum': 'unknown'})

In [None]:
total_data.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- delay: integer (nullable = false)
 |-- CRSDepH: integer (nullable = true)
 |-- CRSArrH: integer (nullable = true)
 |-- UniqueCarrierIndex: double (nullable = false)
 |-- OriginIndex: double (nullable = false)
 |-- DestIndex: double (nullable = false)
 |-- TailNumIndex: double (nullable = false)



In [12]:
s1=StringIndexer(inputCol='UniqueCarrier',outputCol='UniqueCarrierIndex')
s2=StringIndexer(inputCol='Origin',outputCol='OriginIndex')
s3=StringIndexer(inputCol='Dest',outputCol='DestIndex')
s4=StringIndexer(inputCol='TailNum',outputCol='TailNumIndex')

In [13]:
total_data=Pipeline(stages=[s1,s2,s3,s4]).fit(total_data).transform(total_data)

In [None]:
total_data.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- delay: integer (nullable = false)
 |-- CRSDepH: integer (nullable = true)
 |-- CRSArrH: integer (nullable = true)
 |-- UniqueCarrierIndex: integer (nullable = true)
 |-- OriginIndex: integer (nullable = true)
 |-- DestIndex: integer (nullable = true)
 |-- TailNumIndex: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = false)



In [14]:
total_data=total_data.drop('UniqueCarrier','Origin','Dest','TailNum')

In [15]:
#index轉成整數
total_data=total_data.withColumn('UniqueCarrierIndex',total_data.UniqueCarrierIndex.cast(IntegerType()))\
          .withColumn('OriginIndex',total_data.OriginIndex.cast(IntegerType()))\
          .withColumn('DestIndex',total_data.DestIndex.cast(IntegerType()))\
          .withColumn('TailNumIndex',total_data.TailNumIndex.cast(IntegerType()))

In [None]:
#相關係數
train=total_data.filter(total_data.Year<2005)
for i in range(1,len(train.columns)):
  print(train.columns[i],train.corr('delay',train.columns[i]))

In [16]:
#Take features with higher corr
total_data=total_data.withColumn('features',F.array(total_data.TaxiOut, total_data.CRSDepH, total_data.CRSArrH,\
                          total_data.ActualElapsedTime, total_data.AirTime, total_data.OriginIndex))
#Distance,CRSElapsedTime,FlightNum,Month,DayofMonth,DayOfWeek,UniqueCarrierIndex,DestIndex,TailNumIndex
list_to_vector_udf=F.udf(lambda l: Vectors.dense(l), VectorUDT())
total_data=total_data.withColumn('features',list_to_vector_udf(total_data['features']))\
            .withColumn('label',total_data.delay)

In [17]:
train=total_data.select('features','label').filter(total_data.Year<2005)
test=total_data.select('features','label').filter(total_data.Year==2005)

In [None]:
train

DataFrame[features: vector, label: int]

#LogisticRegression

In [18]:
lr=LogisticRegression()
grid=ParamGridBuilder().addGrid(lr.maxIter,[10,50]).addGrid(lr.regParam,[0.1,0.05,0.01]).build()
evaluator=BinaryClassificationEvaluator()
cv=CrossValidator(estimator=lr,estimatorParamMaps=grid,evaluator=evaluator,parallelism=2)
cvModel=cv.fit(train)

In [19]:
cvModel.avgMetrics

[0.6765471908310282,
 0.6817442858706163,
 0.6843694783859787,
 0.6772324040971711,
 0.681708791587081,
 0.6838576850477861]

In [20]:
evaluator.extractParamMap()

{Param(parent='BinaryClassificationEvaluator_43164dc68991', name='labelCol', doc='label column name.'): 'label',
 Param(parent='BinaryClassificationEvaluator_43164dc68991', name='metricName', doc='metric name in evaluation (areaUnderROC|areaUnderPR)'): 'areaUnderROC',
 Param(parent='BinaryClassificationEvaluator_43164dc68991', name='numBins', doc='Number of bins to down-sample the curves (ROC curve, PR curve) in area computation. If 0, no down-sampling will occur. Must be >= 0.'): 1000,
 Param(parent='BinaryClassificationEvaluator_43164dc68991', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name.'): 'rawPrediction'}

In [21]:
cvModel.bestModel.extractParamMap()

{Param(parent='LogisticRegression_e20885421e81', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_e20885421e81', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_e20885421e81', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_e20885421e81', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_e20885421e81', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_e20885421e81', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_e20885421e81', name='maxBlockSizeInMB', doc='maximum memory in MB for s

In [29]:
Training=cvModel.transform(train)

In [34]:
Training.count()

13388584

In [30]:
for i in range(2):
  for j in range(2):
    print('label',i,'pred',j,Training.filter((Training.label==i)&(Training.prediction==j)).count())

label 0 pred 0 5323688
label 0 pred 1 1655217
label 1 pred 0 3227405
label 1 pred 1 3182274


In [33]:
evaluator.evaluate(Training)

0.6842773530369519

In [31]:
prediction=cvModel.transform(test)

In [35]:
prediction.count()

7006866

In [25]:
for i in range(2):
  for j in range(2):
    print('label',i,'pred',j,prediction.filter((prediction.label==i)&(prediction.prediction==j)).count())

label 0 pred 0 2615514
label 0 pred 1 796550
label 1 pred 0 1831581
label 1 pred 1 1763221


In [32]:
evaluator.evaluate(prediction)

0.685047516952035