<a href="https://colab.research.google.com/github/Himagination/PySpark_Projects/blob/main/Machine_Learning_PySpark_Flights.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Machine Learning with PySpark

## Setting PySpark Environment

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"
!pip install -q findspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

## Objective

1. Build a Machine Learning Model on a flight dataset to determine whether a flight will be delayed or not.

## The Data

*  **Download Data From Source**

*  ***Flights Data Set***

Notes on CSV format:

*  Fields are separated by a comma (this is the default separator) and
missing data are denoted by the string 'NA'.
*  Data dictionary:

1.  mon — month (integer between 1 and 12)
2.  dom — day of month (integer between 1 and 31)
3.  dow — day of week (integer; 1 = Monday and 7 = Sunday)
4.  org — origin airport (IATA code)
5.  mile — distance (miles)
6.  carrier — carrier (IATA code)
7.  depart — departure time (decimal hour)
8.  duration — expected duration (minutes)
9.  delay — delay (minutes)

In [2]:
!wget https://assets.datacamp.com/production/repositories/3918/datasets/e1c1a03124fb2199743429e9b7927df18da3eacf/flights-larger.csv

--2021-10-21 02:10:39--  https://assets.datacamp.com/production/repositories/3918/datasets/e1c1a03124fb2199743429e9b7927df18da3eacf/flights-larger.csv
Resolving assets.datacamp.com (assets.datacamp.com)... 104.18.17.147, 104.18.16.147
Connecting to assets.datacamp.com (assets.datacamp.com)|104.18.17.147|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9660229 (9.2M) [text/csv]
Saving to: ‘flights-larger.csv’


2021-10-21 02:10:40 (30.8 MB/s) - ‘flights-larger.csv’ saved [9660229/9660229]



* Read Data as CSV

In [4]:
# Read data from CSV file
flights = spark.read.csv('flights-larger.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')
# Get number of records
print("The data contain %d records." % flights.count())
# View the first five records
flights.show(5)
# Check column data types
flights.printSchema()

The data contain 275000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)



## Data Preparation
1. Remove Columns and Rows
- Removing an uninformative column and
- Removing rows which do not have information about whether or not a flight was delayed.

2.  Column Manipulation.
-  Convert the units of distance, replacing the mile column with a kmcolumn; and
-  Create a Boolean column indicating whether or not a flight was delayed.(The Federal Aviation Administration (FAA) considers a flight to be "delayed" when it arrives 15 minutes or more after its scheduled time.)

3.  Categorical Columns
-  In the flights data there are two columns, carrier and org, which hold categorical data. Transform those columns into indexed numerical values.

4.  Assembling Columns
-  Consolidate all of the predictor columns into a single column.

In [7]:
# Remove the 'flight' column
flights_drop_column = flights.drop('flight')
# Number of records with missing 'delay' values
flights_drop_column.filter('delay IS NULL').count()
# Remove records with missing 'delay' values
flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')
# Remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()
print(flights_none_missing.count())

258289


In [8]:
# Import the required function
from pyspark.sql.functions import round
# Convert 'mile' to 'km' and drop 'mile' column
flights_km = flights_none_missing.withColumn('km', round(flights_none_missing.mile * 1.60934, 0)).drop('mile')                  
# Create 'label' column indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))
# Check first five records
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [11]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights_km)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights_km)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)
flights_indexed.show(5)                              

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



In [12]:
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'
], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights_indexed)

# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0]  |27   |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60   |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0]  |22   |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70   |
+-----------------------------------------+-----+
only showing top 5 rows



## Decision Tree Model
1. Train/Test Split
- training data (used to train the model) and
- testing data (used to test the model).

2. Build a Decision Tree Model
3. Evaluate the Decision Tree
- A confusion matrix gives a useful breakdown of predictions versus known values. It has four cells which represent the counts of:

- True Negatives (TN) — model predicts negative outcome & known outcome is negative
- True Positives (TP) — model predicts positive outcome & known outcome is positive
- False Negatives (FN) — model predicts negative outcome but known outcome is positive
- False Positives (FP) — model predicts positive outcome but known outcome is negative.


In [14]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights_assembled.count()
print(training_ratio)

0.8000998881098305


In [15]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+---------------------------------------+
|label|prediction|probability                            |
+-----+----------+---------------------------------------+
|1    |0.0       |[0.624002075107358,0.375997924892642]  |
|0    |0.0       |[0.624002075107358,0.375997924892642]  |
|1    |1.0       |[0.3208750137407937,0.6791249862592064]|
|0    |1.0       |[0.4277341460555621,0.572265853944438] |
|1    |1.0       |[0.4277341460555621,0.572265853944438] |
+-----+----------+---------------------------------------+
only showing top 5 rows



In [16]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9683|
|    0|       0.0|16329|
|    1|       1.0|16604|
|    0|       1.0| 9016|
+-----+----------+-----+

0.6378408738766657


## Logistic Regression Model

In [17]:
# Import the logistic regression class
from pyspark.ml.classification import LogisticRegression

# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)

# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9498|
|    0|       0.0|14744|
|    1|       1.0|16789|
|    0|       1.0|10601|
+-----+----------+-----+



In [18]:
# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

0.6107259064146265


In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {multi_evaluator.metricName: "areaUnderROC"})

precision = 0.61
recall    = 0.64
