In [2]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("Airline ML")\
    .config("spark.executor.memory","16g")\
    .config("spark.executor.cores","4")\
    .config("spark.driver.memory","6g")\
    .config("spark.executor.instances","5")\
    .config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")\
    .config("spark.yarn.access.hadoopFileSystems","s3a://ml-field")\
    .config("spark.dynamicAllocation.enabled","false")\    
.getOrCreate()

flight_df=spark.read.parquet(
  "s3a://ml-field/demo/flight-analysis/data/airline_parquet_2/",
)

flight_df = flight_df.na.drop() #.limit(100000)

In [3]:
from IPython.core.display import HTML
import os
HTML('<a href="http://spark-{}.{}">Spark UI</a>'.format(os.getenv("CDSW_ENGINE_ID"),os.getenv("CDSW_DOMAIN")))

In [4]:
#spark.stop()

In [6]:
flight_df.printSchema()

root
 |-- FL_DATE: timestamp (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)



In [5]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf,substring,weekofyear,concat,col,when,length,lit

#No longer needed
#convert_time_to_hour = udf(lambda x: x if len(x) == 4 else "0{}".format(x),StringType())

flight_df\
    .withColumn(
        'CRS_DEP_HOUR',
        when(
            length(col("CRS_DEP_TIME")) == 4,col("CRS_DEP_TIME")
        )\
        .otherwise(concat(lit("0"),col("CRS_DEP_TIME")))
    )\
    .select("CRS_DEP_HOUR")\
flight_df = flight_df.withColumn('CRS_DEP_HOUR',col('CRS_DEP_HOUR').cast('double'))
flight_df = flight_df.withColumn('WEEK',weekofyear('FL_DATE').cast('double'))
flight_df = flight_df.withColumn("ROUTE", concat(col("ORIGIN"),col("DEST")))


In [6]:
flight_df.printSchema()

root
 |-- FL_DATE: timestamp (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: string (nullable = true)
 |-- CRS_ARR_TIME: string (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- CRS_DEP_HOUR: double (nullable = true)
 |-- WEEK: double (nullable = true)
 |-- ROUTE: string (nullable = true)



In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

numeric_cols = ["CRS_ELAPSED_TIME","DISTANCE","WEEK","CRS_DEP_HOUR"]

op_carrier_indexer = StringIndexer(inputCol ='OP_CARRIER', outputCol = 'OP_CARRIER_INDEXED',handleInvalid="keep")
op_carrier_encoder = OneHotEncoder(inputCol ='OP_CARRIER_INDEXED', outputCol='OP_CARRIER_ENCODED')

origin_indexer = StringIndexer(inputCol ='ORIGIN', outputCol = 'ORIGIN_INDEXED',handleInvalid="keep")
origin_encoder = OneHotEncoder(inputCol ='ORIGIN_INDEXED', outputCol='ORIGIN_ENCODED')

dest_indexer = StringIndexer(inputCol ='DEST', outputCol = 'DEST_INDEXED',handleInvalid="keep")
dest_encoder = OneHotEncoder(inputCol ='DEST_INDEXED', outputCol='DEST_ENCODED')

route_indexer = StringIndexer(inputCol ='ROUTE', outputCol = 'ROUTE_INDEXED',handleInvalid="keep")
route_encoder = OneHotEncoder(inputCol ='ROUTE_INDEXED', outputCol='ROUTE_ENCODED')

input_cols=[
    'OP_CARRIER_ENCODED',
    #'ROUTE_INDEXED',
    'ORIGIN_ENCODED',
    'DEST_ENCODED'] + numeric_cols

assembler = VectorAssembler(
    inputCols = input_cols,
    outputCol = 'features')

from pyspark.ml import Pipeline

pipeline_indexed_only = Pipeline(
    stages=[
        op_carrier_indexer,
        op_carrier_encoder,
        origin_indexer,
        origin_encoder,
        dest_indexer,
        dest_encoder,
        #route_indexer,
        assembler]
)


In [8]:
pipelineModel = pipeline_indexed_only.fit(flight_df)
model_df = pipelineModel.transform(flight_df)
selectedCols = ['CANCELLED', 'features']# + cols
model_df = model_df.select(selectedCols)
model_df.printSchema()
(train, test) = model_df.randomSplit([0.7, 0.3])

root
 |-- CANCELLED: double (nullable = true)
 |-- features: vector (nullable = true)



In [9]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'CANCELLED', maxIter=10)

lrModel = lr.fit(train)

In [10]:
print(lrModel.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial. (default: auto)
featuresCol: features column name (default: features, current: features)
fitIntercept: whether to fit an intercept term (default: True)
labelCol: label column name (default: label, current: CANCELLED)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. (undefined)
maxIter: maximum number of iterations (>= 0) (default: 100, current: 10)
predictionCol: prediction column name (default: prediction)
probabilityCol: Column name f

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictionslr = lrModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="CANCELLED",metricName="areaUnderROC")
evaluator.evaluate(predictionslr)

0.712283740399597

In [12]:
import matplotlib.pyplot as plt 
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
#print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

Training set areaUnderROC: 0.7118916290251833


In [13]:
input_cols=[
    'OP_CARRIER_INDEXED',
    #'ROUTE_INDEXED',
    'ORIGIN_INDEXED',
    'DEST_INDEXED'] + numeric_cols

pipelineModel = pipeline_indexed_only.fit(flight_df)
model_df = pipelineModel.transform(flight_df)
selectedCols = ['CANCELLED', 'features']# + cols
model_df = model_df.select(selectedCols)
model_df.printSchema()
(train, test) = model_df.randomSplit([0.7, 0.3])

root
 |-- CANCELLED: double (nullable = true)
 |-- features: vector (nullable = true)



In [14]:
from pyspark.ml.classification import RandomForestClassifier

rfclassifier = RandomForestClassifier(labelCol = 'CANCELLED', featuresCol = 'features', maxBins=390)
rfmodel = rfclassifier.fit(train)

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictionsrf = rfmodel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="CANCELLED",metricName="areaUnderROC")
evaluator.evaluate(predictionsrf)

0.5

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10,featuresCol = 'features', labelCol = 'CANCELLED')

gbtModel = gbt.fit(train)

In [None]:
predictionsgbt = gbtModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="CANCELLED",metricName="areaUnderROC")
evaluator.evaluate(predictionsgbt)

In [None]:
spark.stop()