In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
     |████████████████████████████████| 281.3 MB 32 kB/s              
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
     |████████████████████████████████| 198 kB 41.2 MB/s            
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - done
[?25h  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=29c3190b8ba5bba69def63afacc724a83374753847bce001e2e6ff78471b918b
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Success

# **Predicting flight delays using PySpark**

In this Notebook we will predict flight delays using PySpark. We will use to basic machine learning models namely **Descision Tree** and **Logistic Regression** to achieve this task. 

Spark is currently the most popular technology to  process and work with big(large quantities) data. Also, in camparision to other distributed computing technologies, working with Spark is easier even for begginers. We will interact with spark using python PySpark library.

# Input Data
We are going to use some airline flight data as a CSV file as input data. Short description of data fields is following:

**Data columns:**

mon — month (int between 1 and 12)

dom — day of month (int between 1 and 31)

dow — day of week (int; 1 = Monday and 7 = Sunday)

org — origin airport (str; IATA code)

mile — distance (int; miles)

carrier — carrier (str; IATA code)

depart — departure time (int; decimal hour)

duration — expected duration (int; minutes)

delay — delay (int; minutes)

(IATA -> International Air Transport Association)

**Importing neccessary packages and libraries**

In [2]:
import numpy as np
import pandas as pd 
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pyspark.sql.functions import round
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator


We will use a local Spark cluster using all available cores, which will be accessible via a SparkSession object.

In [3]:
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('ML with PySpark') \
                    .getOrCreate()
# What version of Spark?
print(spark.version)


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/25 10:51:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


3.2.0


# Basic data exploration


Exploring basic info like:

- How many records?
- Which datatypes?
- missing values etc.


In [4]:
flights_df = spark.read.csv('../input/pyspark-datasets/flights-larger.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

                                                                                

In [5]:
# Get number of records
print("The data contain %d records." % flights_df.count())
# View the first five records
flights_df.show(5)

The data contain 275000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



In [6]:
# Check column data types
print(flights_df.dtypes)

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


# **Data preparation for training our ML model**

# Data Prepartion includes:

**Data Cleaning**
- removing an uninformative column and
- removing rows having missing vlaues

**Column/Data manipulation**
- We will consider a flight to be "delayed" when it arrives 15 minutes or more after its scheduled time (this complies with FAA's defintion of delayed flight
- Based on this definition, we will create new boolean column 'label' stating if a flight was delayed or not
- Convert columns that hold categorical data(carrier & org) into indexed numerical values

**Assembling columns**
- The final stage consists of  consolidating all predictor columns into a single one

In [7]:
# Remove the 'flight' column
flights_df =  flights_df.drop('flight')

# Remove records with missing 'delay' values
#flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')

# Remove records with missing values 
flights_df = flights_df.dropna()
print(flights_df.count())



258289


                                                                                

In [8]:
# Convert columns 'mile' to 'km' and then drop it
flights_km = flights_df.withColumn('km', round(flights_df.mile * 1.60934, 0)) \
                    .drop('mile')

# Create 'label' column indicating whether a flight is delayed or not
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

# Check first five records
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [9]:
# Create an indexer, which identifies categories and then creates a new column with numeric index values
flights_indexed = StringIndexer(inputCol='carrier', outputCol='carrier_idx').fit(flights_km).transform(flights_km)

# Repeat the process for org column
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)
flights_indexed.show(5)

                                                                                

+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|        2.0|    0.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|        2.0|    0.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|        3.0|    5.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|        4.0|    3.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



In [10]:
# Create an assembler object
assembler = VectorAssembler(inputCols=['mon', 'dom', 'dow',
'carrier_idx', 'org_idx', 'km', 'depart', 'duration'], outputCol='features')
# Consolidate predictor columns
flights_assembled = assembler.transform(flights_indexed)
# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0]  |27   |
|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19  |
|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60   |
|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0]  |22   |
|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70   |
+-----------------------------------------+-----+
only showing top 5 rows



# **Machine Learning Models**



# First classification model:

**Decision Trees: offers inherit simplicity and explanablility**

In [11]:
# Split into training and testing sets in a 80:20 ratio
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=42)

# Check that training set has around 80% of records
training_ratio = flights_train.count() / flights_assembled.count()
print(training_ratio)

                                                                                

0.7998753334443202


                                                                                

In [12]:
# Create a DT classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)
# Create predictions on test data
prediction = tree_model.transform(flights_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

[Stage 41:>                                                         (0 + 1) / 1]

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |1.0       |[0.31856586262750075,0.6814341373724992]|
|1    |0.0       |[0.6183646554566435,0.3816353445433564] |
|1    |0.0       |[0.6183646554566435,0.3816353445433564] |
|1    |1.0       |[0.31856586262750075,0.6814341373724992]|
|1    |1.0       |[0.31856586262750075,0.6814341373724992]|
+-----+----------+----------------------------------------+
only showing top 5 rows



                                                                                

**Evaluate the model**
A confusion matrix gives a useful breakdown of predictions versus known values. It has four cells which represent the counts of:
True Negatives (TN) — prediction is negative & label is negative

True Positives (TP) — prediction is positive & label is positive

False Negatives (FN) — prediction is negative & label is positive

False Positives (FP) — prediction is positive & label is negative

Using these four measure, we can then calculate the accuravy of the model as follows:

**Accuracy=(TN+TP)/(TN+TP+FN+FP)**

In [13]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label != prediction').count()
FP = prediction.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

                                                                                

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9666|
|    0|       0.0|16350|
|    1|       1.0|16349|
|    0|       1.0| 9325|
+-----+----------+-----+





0.6325981814664345


                                                                                

The accuracy is decent but not a good one. We have a lot of false predictions.

# Seocond classification model:

**Logistic Regression: simple and easy to train**

In [14]:
# Create a classifier object and train on training data
logistic = LogisticRegression().fit(flights_train)
# Create predictions for the testing data and show confusion matrix
prediction = logistic.transform(flights_test)
prediction.groupBy('label', 'prediction').count().show()



+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9471|
|    0|       0.0|14934|
|    1|       1.0|16544|
|    0|       1.0|10741|
+-----+----------+-----+



                                                                                

In [15]:
# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})

# Find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: "areaUnderROC"})

precision = 0.64
recall    = 0.63


                                                                                

Again the matrices are reflecting decent values but not good ones. Which means, improving in models' efficiency can be considered as potential future work.

Please provide your valuable feedback and tips to improve efficiency or better models. If you found the notebook interesting or learn anything new, then please don't forget to upvote :-).

In [16]:
#Close spark session
spark.stop()