## Import modules and create a spark session

In [2]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

#create Spark session
appName = "Classification in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option","some-value") \
    .getOrCreate()

## Read data file into Spark dataFrame

In [3]:
#define our schema
flightSchema = StructType([
    StructField("DayOfMonth", IntegerType(), False),
    StructField("DayOfWeek", IntegerType(), False),
    StructField("Carrier", StringType(), False),
    StructField("OriginAirportID", IntegerType(), False),
    StructField("DestAirportID", IntegerType(), False),
    StructField("DepDelay", IntegerType(), False),
    StructField("ArrDelay", IntegerType(), False),
])

#read csv data with our defined schema
flightDataFrame = spark.read.csv('dataset/flights.csv', schema=flightSchema, header=True)
flightDataFrame.show(3)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayOfMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 3 rows



## Select an important data for classification features and change arrival delay into binary class "late" vs "not late"

In [4]:
#select columns
data = flightDataFrame.select("DayOfMonth", "DayOfWeek",
                              "OriginAirportID", "DestAirportID",
                              "DepDelay", ((col("ArrDelay")>15).cast("Int").alias("Late")))
data.show(3)

+----------+---------+---------------+-------------+--------+----+
|DayOfMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|Late|
+----------+---------+---------------+-------------+--------+----+
|        19|        5|          11433|        13303|      -3|   0|
|        19|        5|          14869|        12478|       0|   0|
|        19|        5|          14057|        14869|      -4|   0|
+----------+---------+---------------+-------------+--------+----+
only showing top 3 rows



## Divide data into training and testing data

In [5]:
#divide data into 70% for training and 30% for testing
dividedData = data.randomSplit([0.7,0.3])
trainingData = dividedData[0] #index-0 from dividedData
testingData = dividedData[1] #index-1 from dividedData
train_rows = trainingData.count()
test_rows = testingData.count()

print("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1892919 ; Testing data rows: 809299


## Prepare the training data

In [6]:
#define an assambler
assambler = VectorAssembler(inputCols = [
    "DayOfMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay"
], outputCol="features")

#change our features into one column using our defined assambler
trainingDataFinal = assambler.transform(trainingData).select(col("features"),(col("Late").alias("label")))
trainingDataFinal.show(truncate=False, n=4)

+------------------------------+-----+
|features                      |label|
+------------------------------+-----+
|[1.0,1.0,10140.0,10397.0,-2.0]|0    |
|[1.0,1.0,10140.0,10821.0,8.0] |0    |
|[1.0,1.0,10140.0,11259.0,-2.0]|0    |
|[1.0,1.0,10140.0,11259.0,-1.0]|0    |
+------------------------------+-----+
only showing top 4 rows



## Train our classifier model using training data

In [7]:
#call Spark Logistic Regression we import before
classifier = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3)
#train the model
model = classifier.fit(trainingDataFinal)
print("Regression model is trained!")

Regression model is trained!


## Prepare the testing data

In [8]:
#change our feature data into one column using our defined assambler
#just like what we did before in training data
testingDataFinal = assambler.transform(testingData).select(col("features"),(col("Late").alias("trueLabel")))
testingDataFinal.show(truncate=False, n=4)

+------------------------------+---------+
|features                      |trueLabel|
+------------------------------+---------+
|[1.0,1.0,10140.0,10397.0,-4.0]|0        |
|[1.0,1.0,10140.0,11259.0,-3.0]|0        |
|[1.0,1.0,10140.0,11259.0,21.0]|1        |
|[1.0,1.0,10140.0,11259.0,35.0]|1        |
+------------------------------+---------+
only showing top 4 rows



## Predict the testing data using our trained model

In [11]:
#predict testing data using our model
prediction = model.transform(testingDataFinal)
#choose the columns
finalPrediction = prediction.select("features","prediction","probability","trueLabel")
#show some prediction results
finalPrediction.show(10)

+--------------------+----------+--------------------+---------+
|            features|prediction|         probability|trueLabel|
+--------------------+----------+--------------------+---------+
|[1.0,1.0,10140.0,...|       0.0|[0.83108029435097...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82946711327995...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.77677984881158...|        1|
|[1.0,1.0,10140.0,...|       0.0|[0.74109455662351...|        1|
|[1.0,1.0,10140.0,...|       0.0|[0.81097836180410...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.84285621961931...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.83532206891714...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82549990866712...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.82348093630085...|        0|
|[1.0,1.0,10140.0,...|       0.0|[0.81980305184469...|        0|
+--------------------+----------+--------------------+---------+
only showing top 10 rows



## Calculate how accurate our model performance

In [12]:
correctPrediction = finalPrediction.filter(finalPrediction['prediction'] == finalPrediction['trueLabel']).count()
totalData = finalPrediction.count()
print("correct prediction:", correctPrediction, "total data:", totalData, ", accuracy:", correctPrediction/totalData)

correct prediction: 667374 total data: 809299 , accuracy: 0.8246321816782178
