<a href="https://colab.research.google.com/github/TChen90/PySpark_Examples/blob/master/PySpark_Flights.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install & Setup

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz
!tar -xvf spark-3.0.0-preview2-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop2.7"
import findspark
findspark.init()

spark-3.0.0-preview2-bin-hadoop2.7/
spark-3.0.0-preview2-bin-hadoop2.7/data/
spark-3.0.0-preview2-bin-hadoop2.7/data/streaming/
spark-3.0.0-preview2-bin-hadoop2.7/data/streaming/AFINN-111.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/sample_binary_classification_data.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/sample_kmeans_data.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/sample_multiclass_classification_data.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/sample_lda_libsvm_data.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/iris_libsvm.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/pagerank_data.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/sample_linear_regression_data.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/pic_data.txt
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/als/
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/als/test.data
spark-3.0.0-preview2-bin-hadoop2.7/data/mllib/als/sample_movielens_rati

## Creating a SparkSession

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

## Load the dataset

In [0]:
# Read data from CSV file
flights = spark.read.csv('/content/drive/My Drive/Colab Notebooks/data/flights-larger.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

Data dictionary:

- mon — month (integer between 0 and 11)

- dom — day of month (integer between 1 and 31)

- dow — day of week (integer; 1 = Monday and 7 = Sunday)

- org — origin airport (IATA code)

- mile — distance (miles)

- carrier — carrier (IATA code)

- depart — departure time (decimal hour)

- duration — expected duration (minutes)

- delay — delay (minutes)

## Simple EDA & Data Preparation

In [4]:
# Get number of records
print("The data contain %d records." % flights.count())

The data contain 275000 records.


In [5]:
# View the first five records
flights.show(10)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
|  3| 28|  1|     B6|   377|LGA|1076| 13.33|     182|   70|
|  5| 28|  6|     B6|   904|ORD| 740|  9.58|     130|   47|
|  1| 19|  2|     UA|   820|SFO| 679| 12.75|     123|  135|
|  8|  5|  5|     US|  2175|LGA| 214|  13.0|      71|  -10|
|  5| 27|  5|     AA|  1240|ORD|1197| 14.42|     195|  -11|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 10 rows



In [6]:
# Check column data types
flights.dtypes

[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int')]

In [7]:
# Check data schema
flights.printSchema()

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)



In [8]:
# DataFrame description
flights.describe().show()

+-------+------------------+-----------------+------------------+-------+------------------+------+-----------------+------------------+------------------+-----------------+
|summary|               mon|              dom|               dow|carrier|            flight|   org|             mile|            depart|          duration|            delay|
+-------+------------------+-----------------+------------------+-------+------------------+------+-----------------+------------------+------------------+-----------------+
|  count|            275000|           275000|            275000| 275000|            275000|275000|           275000|            275000|            275000|           258289|
|   mean|           5.24232|15.71406909090909| 2.946090909090909|   null|2063.0542763636363|  null|881.2222872727273|14.124930981817384|151.64103636363637|28.34773064280709|
| stddev|3.4273573316203576|8.805568383848067|1.9635141531217672|   null| 2185.852169684581|  null|700.5178890821038| 4.6831895034

In [9]:
# Months is from 0 - 11, let's fix it
flights = flights.withColumn('mon', (flights['mon']+1).cast('integer'))
flights.describe().show()

+-------+------------------+-----------------+------------------+-------+------------------+------+-----------------+------------------+------------------+-----------------+
|summary|               mon|              dom|               dow|carrier|            flight|   org|             mile|            depart|          duration|            delay|
+-------+------------------+-----------------+------------------+-------+------------------+------+-----------------+------------------+------------------+-----------------+
|  count|            275000|           275000|            275000| 275000|            275000|275000|           275000|            275000|            275000|           258289|
|   mean|           6.24232|15.71406909090909| 2.946090909090909|   null|2063.0542763636363|  null|881.2222872727273|14.124930981817384|151.64103636363637|28.34773064280709|
| stddev|3.4273573316203576|8.805568383848067|1.9635141531217672|   null| 2185.852169684581|  null|700.5178890821038| 4.6831895034

In [10]:
# Checking missing values

from pyspark.sql.functions import isnan, when, count, col, avg, round

flights.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in flights.columns]).show()

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
|  0|  0|  0|      0|     0|  0|   0|     0|       0|16711|
+---+---+---+-------+------+---+----+------+--------+-----+



In [11]:
# Check number of flights by month
flights_by_mon = flights.groupBy('mon').count()
flights_by_mon.sort('count', ascending=False).show()

+---+-----+
|mon|count|
+---+-----+
|  6|26615|
|  1|25736|
|  7|25452|
|  2|24582|
|  3|24346|
|  8|24049|
| 12|23513|
|  4|22720|
|  5|22503|
|  9|19432|
| 10|18557|
| 11|17495|
+---+-----+



In [12]:
# Average delays by carrier
flights.groupBy('carrier').agg(round(avg('delay'),2).alias('Average_delay')).show()

+-------+-------------+
|carrier|Average_delay|
+-------+-------------+
|     UA|        31.63|
|     AA|        33.97|
|     B6|        29.73|
|     OO|         28.7|
|     US|        15.62|
|     AQ|         2.92|
|     OH|        32.07|
|     HA|         7.82|
|     WN|        14.89|
+-------+-------------+



### Removing columns and rows

In [13]:
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')

# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()

# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()
print(flights_none_missing.count())

258289


In [14]:
flights_none_missing.show(5)

+---+---+---+-------+---+----+------+--------+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|
+---+---+---+-------+---+----+------+--------+-----+
| 11| 10|  1|     OO|ORD| 157|  8.18|      51|   27|
| 12| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|
|  3| 14|  5|     B6|JFK|2248| 21.17|     365|   60|
|  6| 25|  3|     WN|SJC| 386| 12.92|      85|   22|
|  4| 28|  1|     B6|LGA|1076| 13.33|     182|   70|
+---+---+---+-------+---+----+------+--------+-----+
only showing top 5 rows



In [15]:
# Check for missing value in delay again
flights_none_missing.filter('delay IS NULL').count()

0

### Column manipulation

In [0]:
# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_labeled = flights_none_missing.withColumn('label', (flights_none_missing.delay >= 15).cast('integer'))

### Bucketing departure time

In [17]:
from pyspark.ml.feature import Bucketizer

# Create buckets at 3 hour intervals through the day
buckets = Bucketizer(splits=[0, 3, 6, 9, 12, 15, 18, 21, 24], inputCol='depart', outputCol='depart_bucket')

# Bucket the departure times
bucketed = buckets.transform(flights_labeled)
bucketed.select('depart', 'depart_bucket').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
|  8.18|          2.0|
|  7.17|          2.0|
| 21.17|          7.0|
| 12.92|          4.0|
| 13.33|          4.0|
+------+-------------+
only showing top 5 rows



### Transform categorical columns

In [0]:
# indexing all categorical columns in the dataset
from pyspark.ml.feature import StringIndexer
indexer1 = StringIndexer(inputCol='org', outputCol='org_index')
indexer2 = StringIndexer(inputCol='carrier', outputCol='carrier_index')
indexer3 = StringIndexer(inputCol='depart_bucket', outputCol='depart_bucket_index')

indexer1 = indexer1.fit(bucketed)
indexer2 = indexer2.fit(bucketed)
indexer3 = indexer3.fit(bucketed)

bucketed = indexer1.transform(bucketed)
bucketed = indexer2.transform(bucketed)
bucketed = indexer3.transform(bucketed)

### One-Hot Encoding

In [19]:
from pyspark.ml.feature import OneHotEncoder

# Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=['org_index', 'carrier_index', 'depart_bucket_index'],
             outputCols=['org_classVec', 'carrier_classVec', 'depart_bucket_classVec'])
# Apply the one hot encoder to the flights data
onehot = onehot.fit(bucketed)
flights_onehot = onehot.transform(bucketed)

# Check the results
flights_onehot.columns

['mon',
 'dom',
 'dow',
 'carrier',
 'org',
 'mile',
 'depart',
 'duration',
 'delay',
 'label',
 'depart_bucket',
 'org_index',
 'carrier_index',
 'depart_bucket_index',
 'org_classVec',
 'carrier_classVec',
 'depart_bucket_classVec']

### Assembling columns

To consolidate all of the predictor columns into a single column before feeding data into models

In [20]:
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow', 'carrier_classVec', 'org_classVec', 'mile', 'depart_bucket_classVec', 'duration'
], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights_onehot)

# Check the resulting column
flights_assembled.select('features', 'label').show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(27,[0,1,2,5,11,1...|    1|
|(27,[0,1,2,5,11,1...|    0|
|(27,[0,1,2,7,13,1...|    1|
|(27,[0,1,2,6,16,1...|    1|
|(27,[0,1,2,7,14,1...|    1|
+--------------------+-----+
only showing top 5 rows



In [21]:
# Check if dataset is balance
flights_assembled.select('features', 'label').groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|130588|
|    0|127701|
+-----+------+



### Train/test split

In [22]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8,0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights_assembled.count()
print(training_ratio)

0.7998404887548444


## Building Models

### Build a Decision Tree

In [23]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label')
tree_model = tree.fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |1.0       |[0.34821276836798276,0.6517872316320172]|
|1    |1.0       |[0.34821276836798276,0.6517872316320172]|
|0    |1.0       |[0.34821276836798276,0.6517872316320172]|
|0    |1.0       |[0.34821276836798276,0.6517872316320172]|
|1    |1.0       |[0.34821276836798276,0.6517872316320172]|
+-----+----------+----------------------------------------+
only showing top 5 rows



### Evaluate the Decision Tree

In [24]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|12979|
|    0|       0.0|18808|
|    1|       1.0|13180|
|    0|       1.0| 6732|
+-----+----------+-----+

0.6187353720574866


In [25]:
# Calcualate the precision
precision = TP / (TP + FP)
# Calcualate the recall
recall = TP / (TP + FN)
# Print out results
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

precision = 0.66
recall    = 0.50


In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Check for AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})
print('Test Area Under ROC: ', auc)

Test Area Under ROC:  0.587543389032608


### Build a Random Forest with Grid Search and Cross Validation

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


# Create a random forest classifier
forest = RandomForestClassifier()

# Create a parameter grid
params = ParamGridBuilder() \
            .addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
            .addGrid(forest.maxDepth, [2, 5, 10]) \
            .build()

# Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

# Create a cross-validator
cv = CrossValidator(estimator=forest, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

In [0]:
# Run cross validations and making predicitons with it
cvModel = cv.fit(flights_train)
cvPreds = cvModel.transform(flights_test)

In [0]:
# Average AUC for each parameter combination in grid
avg_auc = cvModel.avgMetrics

# Average AUC for the best model
best_model_auc = max(cvModel.avgMetrics)

# What's the optimal parameter value?
opt_max_depth = cvModel.bestModel.explainParam('maxDepth')
opt_feat_substrat = cvModel.bestModel.explainParam('featureSubsetStrategy')

# AUC for best model on testing data
best_auc = evaluator.evaluate(cvPreds)

In [30]:
# Print out results
print('Average AUC: ' + str(avg_auc))
print('Average AUC for the best model: ' + str(best_model_auc))
print('AUC for best model on testing data: ' + str(best_auc))

Average AUC: [0.6042699925438404, 0.6686740016978114, 0.731316449101635, 0.6457283982103839, 0.6725170610454173, 0.7202611866846995, 0.6520911650426628, 0.6748977142771689, 0.7128605471389942, 0.6515049378389767, 0.6712178975725571, 0.706585637060634]
Average AUC for the best model: 0.731316449101635
AUC for best model on testing data: 0.7347437189348925


In [31]:
# Create a confusion matrix for CV results
cvPreds.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix for CV results
TN_cv = cvPreds.filter('prediction = 0 AND label = prediction').count()
TP_cv = cvPreds.filter('prediction = 1 AND label = prediction').count()
FN_cv = cvPreds.filter('prediction = 0 AND label != prediction').count()
FP_cv = cvPreds.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions for CV results
accuracy_cv = (TN_cv + TP_cv) / (TN_cv + TP_cv + FN_cv + FP_cv)
print(accuracy_cv)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 8367|
|    0|       0.0|16921|
|    1|       1.0|17792|
|    0|       1.0| 8619|
+-----+----------+-----+

0.6714443219404631


In [32]:
# Print out precisions, recall for CV model
precision_cv = TP_cv / (TP_cv + FP_cv)
recall_cv = TP_cv / (TP_cv + FN_cv)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision_cv, recall_cv))

precision = 0.67
recall    = 0.68
