In [1]:
import numpy as np
import pandas as pd
import os

## Machine Learning with Spark

This is a tutorial of machine learning with PySpark. I will create a classification model and a regression model using Pipelines.

In [2]:
#!pip install sparkmagic
#!pip install pyspark

### Import Spark SQL and Spark ML Libraries

We'll train a **LogisticRegression** model with a **Pipleline** preparing the data, a **CrossValidator** to tuene the parameters of the model, and a **BinaryClassificationEvaluator** to evaluate our trained model.

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
#from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

### Load Source Data
The data from the flight.csv file data includes specific characteristics (or features) for each flight, as well as a column indicating how many minutes late or early the flight arrived.

In [8]:
csv = spark.read.csv(r'D:\Subjects\ARTIFICIAL INTELLIGENCE\SEMESTER - 4\Big Data Analytics\End Sem Project\flights.csv', inferSchema=True, header=True)
csv.show(10)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

### Prepare the Data for a Classification Model (Decision Tree Learning Model)
I select a subset of columns to use as features and create a Boolean label field named *label* with values 1 or 0. Specifically, **1** for flight that arrived late, **0** for flight was early or on-time.

In [9]:
data = csv.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("label")))
data.show(10)

+----------+---------+-------+---------------+-------------+--------+-----+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|label|
+----------+---------+-------+---------------+-------------+--------+-----+
|        19|        5|     DL|          11433|        13303|      -3|    0|
|        19|        5|     DL|          14869|        12478|       0|    0|
|        19|        5|     DL|          14057|        14869|      -4|    0|
|        19|        5|     DL|          15016|        11433|      28|    1|
|        19|        5|     DL|          11193|        12892|      -6|    0|
|        19|        5|     DL|          10397|        15016|      -1|    0|
|        19|        5|     DL|          15016|        10397|       0|    0|
|        19|        5|     DL|          10397|        14869|      15|    1|
|        19|        5|     DL|          10397|        10423|      33|    1|
|        19|        5|     DL|          11278|        10397|     323|    1|
+----------+

### Split the Data

I will use 70% of the data for training, and reserve 30% for testing. In the testing data, the *label* column is renamed to *trueLabel* so I can use it later to compare predicted labels with known actual values.

In [10]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 1890195  Testing Rows: 812023


### Define the Pipeline

A pipeline consists of a series of transformer and estimator stages that typically prepare a DataFrame for modeling and then train a predictive model. In this case, you will create a pipeline with seven stages:
* A **StringIndexer estimator** that converts string values to indexes for categorical features
* A **VectorAssembler** that combines categorical features into a single vector
* A **VectorIndexer** that creates indexes for a vector of categorical features
* A **VectorAssembler** that creates a vector of continuous numeric features
* A **MinMaxScaler** that normalizes continuous numeric features
* A **VectorAssembler** that creates a vector of categorical and continuous features
* A **DecisionTreeClassifier** that trains a classification model.

In [11]:
strIdx = StringIndexer(inputCol = "Carrier", outputCol = "CarrierIdx")
catVect = VectorAssembler(inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
#dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

### Run the Pipeline to train a model
Run the pipeline as an Estimator on the training data to train a model.

In [12]:
piplineModel = pipeline.fit(train)

### Generate label predictions
Transform the test data with all of the stages and the trained model in the pipeline to generate label predictions.

In [13]:
prediction = piplineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+---------------------------------------------------+----------+---------+
|features                                           |prediction|trueLabel|
+---------------------------------------------------+----------+---------+
|[10.0,1.0,0.0,10397.0,10693.0,0.03167185877466251] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,12264.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10397.0,12264.0,0.04205607476635514] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,13851.0,0.03167185877466251] |0.0       |0        |
|[10.0,1.0,0.0,10397.0,13851.0,0.03374870197300104] |0.0       |0        |
|[10.0,1.0,0.0,10529.0,11193.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10721.0,11193.0,0.030114226375908618]|0.0       |0        |
|[10.0,1.0,0.0,10721.0,13931.0,0.03271028037383177] |0.0       |1        |
|[10.0,1.0,0.0,10792.0,11433.0,0.029595015576323987]|0.0       |0        |
|[10.0,1.0,0.0,10792.0,12478.0,0.055036344755970926]|0.0       |1        |
|[10.0,1.0,0.0,10821.0,11

Looking into the results, some trueLabel 1s are predicted as 0. Let's evaluate the model.

## Evaluating a Classification Model
We'll calculate a *Confusion Matrix* and the *Area Under ROC* (Receiver Operating Characteristic) to evaluate the model. 
### Compute Confusion Matrix
Classifiers are typically evaluated by creating a *confusion matrix*, which indicates the number of:
- True Positives
- True Negatives
- False Positives
- False Negatives

From these core measures, other evaluation metrics such as *precision*, *recall* and *F1* can be calculated.

In [15]:
tp = float(predicted.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND truelabel == 1").count())
accuracy = (tp + tn) / (tp + tn + fp + fn)
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy", accuracy),
 ("Precision", pr),
 ],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|           18797.0|
|       FP|              77.0|
|       TN|          650073.0|
|       FN|          143076.0|
| Accuracy|0.8237081954575178|
|Precision|0.9959203136590018|
+---------+------------------+



# ------ END -----