<a href="https://colab.research.google.com/github/AjanEshwara/AirTrafficSim/blob/main/Group_50_CRWK_CN7030.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Machine Learning on Big Data (CN7030) CRWK 24-25 Term B [60% weighting]**
# **Group ID: 50**
1.   Student 1: Walgama Eshwarage Don Ajan Sajuka Eshwara - U2782761
2.   Student 2: Safiat Alli-adeniji - U2803147
---

If you want to add comments on your group work, please write it here for us:


# **Initiate and Configure Spark**

In [1]:
# add your code here

!pip3 install pyspark




In [2]:
#Linking with pyspark
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Group_50_CRWK") \
    .config("spark.executor.memory","4g")\
    .config("spark.driver.memory", "2g")\
    .config("spark.executor.cores", "2") \
    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
    .getOrCreate()
spark

In [3]:
#connect with google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


---
# **Task 1 - Data Loading and Preprocessing (15 marks)**
---

In [4]:
from os import truncate
##1st student name:
# Create CSV data frame
df = spark.read.format('csv').load('/content/drive/MyDrive/full_data_flightdelay.csv', inferSchema = True, header = True)

#Show table
df.show(truncate=False)

#Show schema
df.printSchema()

#Data count and Number of columns
print(df.count())
print(len(df.columns))

+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+----------------------+---------------------+---------------------+-----------------------------+------------------------+------------------------+-----------------------+---------------------+---------+----------------------+--------+---------+----------------+----+----+----+----+----+
|MONTH|DAY_OF_WEEK|DEP_DEL15|DEP_TIME_BLK|DISTANCE_GROUP|SEGMENT_NUMBER|CONCURRENT_FLIGHTS|NUMBER_OF_SEATS|CARRIER_NAME          |AIRPORT_FLIGHTS_MONTH|AIRLINE_FLIGHTS_MONTH|AIRLINE_AIRPORT_FLIGHTS_MONTH|AVG_MONTHLY_PASS_AIRPORT|AVG_MONTHLY_PASS_AIRLINE|FLT_ATTENDANTS_PER_PASS|GROUND_SERV_PER_PASS |PLANE_AGE|DEPARTING_AIRPORT     |LATITUDE|LONGITUDE|PREVIOUS_AIRPORT|PRCP|SNOW|SNWD|TMAX|AWND|
+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+----------------------+---------------------+---------------------+-----------------------------+------

In [5]:
df.rdd.getNumPartitions()

2

In [6]:
df = df.repartition(12)
df.rdd.getNumPartitions()

12

In [7]:
from pyspark.sql.functions import col, isnan, when, count

# Check for null or NaN values in each column
df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns]).show()


+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+------------+---------------------+---------------------+-----------------------------+------------------------+------------------------+-----------------------+--------------------+---------+-----------------+--------+---------+----------------+----+----+----+----+----+
|MONTH|DAY_OF_WEEK|DEP_DEL15|DEP_TIME_BLK|DISTANCE_GROUP|SEGMENT_NUMBER|CONCURRENT_FLIGHTS|NUMBER_OF_SEATS|CARRIER_NAME|AIRPORT_FLIGHTS_MONTH|AIRLINE_FLIGHTS_MONTH|AIRLINE_AIRPORT_FLIGHTS_MONTH|AVG_MONTHLY_PASS_AIRPORT|AVG_MONTHLY_PASS_AIRLINE|FLT_ATTENDANTS_PER_PASS|GROUND_SERV_PER_PASS|PLANE_AGE|DEPARTING_AIRPORT|LATITUDE|LONGITUDE|PREVIOUS_AIRPORT|PRCP|SNOW|SNWD|TMAX|AWND|
+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+------------+---------------------+---------------------+-----------------------------+------------------------+-----------------------

In [8]:
#Missing values are negligible so we can drop that values.
df = df.na.drop()


In [9]:
# We can check whether null values are droped or not
df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns]).show()

+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+------------+---------------------+---------------------+-----------------------------+------------------------+------------------------+-----------------------+--------------------+---------+-----------------+--------+---------+----------------+----+----+----+----+----+
|MONTH|DAY_OF_WEEK|DEP_DEL15|DEP_TIME_BLK|DISTANCE_GROUP|SEGMENT_NUMBER|CONCURRENT_FLIGHTS|NUMBER_OF_SEATS|CARRIER_NAME|AIRPORT_FLIGHTS_MONTH|AIRLINE_FLIGHTS_MONTH|AIRLINE_AIRPORT_FLIGHTS_MONTH|AVG_MONTHLY_PASS_AIRPORT|AVG_MONTHLY_PASS_AIRLINE|FLT_ATTENDANTS_PER_PASS|GROUND_SERV_PER_PASS|PLANE_AGE|DEPARTING_AIRPORT|LATITUDE|LONGITUDE|PREVIOUS_AIRPORT|PRCP|SNOW|SNWD|TMAX|AWND|
+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+------------+---------------------+---------------------+-----------------------------+------------------------+-----------------------

In [10]:
df.show(truncate=False)

+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+----------------------------+---------------------+---------------------+-----------------------------+------------------------+------------------------+-----------------------+---------------------+---------+---------------------------------------+--------+---------+------------------------------------+----+----+----+----+-----+
|MONTH|DAY_OF_WEEK|DEP_DEL15|DEP_TIME_BLK|DISTANCE_GROUP|SEGMENT_NUMBER|CONCURRENT_FLIGHTS|NUMBER_OF_SEATS|CARRIER_NAME                |AIRPORT_FLIGHTS_MONTH|AIRLINE_FLIGHTS_MONTH|AIRLINE_AIRPORT_FLIGHTS_MONTH|AVG_MONTHLY_PASS_AIRPORT|AVG_MONTHLY_PASS_AIRLINE|FLT_ATTENDANTS_PER_PASS|GROUND_SERV_PER_PASS |PLANE_AGE|DEPARTING_AIRPORT                      |LATITUDE|LONGITUDE|PREVIOUS_AIRPORT                    |PRCP|SNOW|SNWD|TMAX|AWND |
+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+---------------

we have confirmed that missing values have droped.


In [11]:
df.groupBy("DEP_DEL15").count().show()

+---------+-----+
|DEP_DEL15|count|
+---------+-----+
|        1|11112|
|        0|48473|
+---------+-----+



In [12]:
# Check the data types of the columns
df.dtypes

[('MONTH', 'int'),
 ('DAY_OF_WEEK', 'int'),
 ('DEP_DEL15', 'int'),
 ('DEP_TIME_BLK', 'string'),
 ('DISTANCE_GROUP', 'int'),
 ('SEGMENT_NUMBER', 'int'),
 ('CONCURRENT_FLIGHTS', 'int'),
 ('NUMBER_OF_SEATS', 'int'),
 ('CARRIER_NAME', 'string'),
 ('AIRPORT_FLIGHTS_MONTH', 'int'),
 ('AIRLINE_FLIGHTS_MONTH', 'int'),
 ('AIRLINE_AIRPORT_FLIGHTS_MONTH', 'int'),
 ('AVG_MONTHLY_PASS_AIRPORT', 'int'),
 ('AVG_MONTHLY_PASS_AIRLINE', 'int'),
 ('FLT_ATTENDANTS_PER_PASS', 'double'),
 ('GROUND_SERV_PER_PASS', 'double'),
 ('PLANE_AGE', 'int'),
 ('DEPARTING_AIRPORT', 'string'),
 ('LATITUDE', 'double'),
 ('LONGITUDE', 'double'),
 ('PREVIOUS_AIRPORT', 'string'),
 ('PRCP', 'double'),
 ('SNOW', 'double'),
 ('SNWD', 'double'),
 ('TMAX', 'double'),
 ('AWND', 'double')]

In [13]:
#Create a list of the columns that are string typed
categoricalColumns = [item[0] for item in df.dtypes if item[1].startswith('string')]
categoricalColumns

['DEP_TIME_BLK', 'CARRIER_NAME', 'DEPARTING_AIRPORT', 'PREVIOUS_AIRPORT']

In [14]:
#String Indexer
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=categoricalColumns,
                        outputCols=[x + "_Index" for x in categoricalColumns])
df = indexer.fit(df).transform(df)
df.show()

+-----+-----------+---------+------------+--------------+--------------+------------------+---------------+--------------------+---------------------+---------------------+-----------------------------+------------------------+------------------------+-----------------------+--------------------+---------+--------------------+--------+---------+--------------------+----+----+----+----+-----+------------------+------------------+-----------------------+----------------------+
|MONTH|DAY_OF_WEEK|DEP_DEL15|DEP_TIME_BLK|DISTANCE_GROUP|SEGMENT_NUMBER|CONCURRENT_FLIGHTS|NUMBER_OF_SEATS|        CARRIER_NAME|AIRPORT_FLIGHTS_MONTH|AIRLINE_FLIGHTS_MONTH|AIRLINE_AIRPORT_FLIGHTS_MONTH|AVG_MONTHLY_PASS_AIRPORT|AVG_MONTHLY_PASS_AIRLINE|FLT_ATTENDANTS_PER_PASS|GROUND_SERV_PER_PASS|PLANE_AGE|   DEPARTING_AIRPORT|LATITUDE|LONGITUDE|    PREVIOUS_AIRPORT|PRCP|SNOW|SNWD|TMAX| AWND|DEP_TIME_BLK_Index|CARRIER_NAME_Index|DEPARTING_AIRPORT_Index|PREVIOUS_AIRPORT_Index|
+-----+-----------+---------+-----------

Vector Assembler

In [15]:
#import vector assembler
import pyspark.ml.feature
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["MONTH","DAY_OF_WEEK","DISTANCE_GROUP","SEGMENT_NUMBER","CONCURRENT_FLIGHTS","NUMBER_OF_SEATS",
                                       "AIRPORT_FLIGHTS_MONTH","AIRLINE_FLIGHTS_MONTH","AIRLINE_AIRPORT_FLIGHTS_MONTH","AVG_MONTHLY_PASS_AIRPORT",
                                       "AVG_MONTHLY_PASS_AIRLINE","FLT_ATTENDANTS_PER_PASS","GROUND_SERV_PER_PASS","PLANE_AGE",
                                       "LATITUDE","LONGITUDE","PRCP","SNOW","SNWD","TMAX","AWND","DEP_TIME_BLK_Index","CARRIER_NAME_Index",
                                       "DEPARTING_AIRPORT_Index","PREVIOUS_AIRPORT_Index"], outputCol="features",handleInvalid='error')
data = assembler.transform(df)
data = data.select("features", "DEP_DEL15")
data.show(5, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|features                                                                                                                                                                         |DEP_DEL15|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|[1.0,5.0,5.0,1.0,9.0,143.0,1294.0,107363.0,430.0,161957.0,1.3382999E7,6.178236301460919E-5,9.889412309998219E-5,18.0,41.728,-71.426,0.0,0.0,0.0,31.0,15.21,8.0,0.0,71.0,0.0]     |0        |
|[1.0,4.0,1.0,2.0,15.0,143.0,8569.0,107363.0,2292.0,1391212.0,1.3382999E7,6.178236301460919E-5,9.889412309998219E-5,12.0,26.074,-80.152,0.17,0.0,0.0,82.0,11.63,5.0,0.0,17.0,19.0]|0        |
|[1.0,5.0,2.0,5.0,44.0,70.0,23400.0,23760.0,4691.0

Standerd Scaler

In [16]:
#Import standerd scaler
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(data)
data = scalerModel.transform(data)
data = data.select("scaledFeatures", "DEP_DEL15")
data.show(5, truncate=False)



+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|scaledFeatures                                                                                                                                                                                                                                                                                                                                                                                                            |DEP_DEL15|
+---------------------------------------------------------------------------------------------------------------------------------------------------------

In [17]:
#split data
train_data, test_data = data.randomSplit([0.7, 0.3],seed = 42)
train_data.show(5, truncate=False)
test_data.show(5, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|scaledFeatures                                                                                                                                                                                                                                                                                                                                                                                                |DEP_DEL15|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

---
# **Task 2 - Model Selection and Implementation (25 marks)**
---


In [18]:
##1st student name: Walgama Eshwarage Don Eshwara - 2782761
# Using logistics regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(featuresCol = 'scaledFeatures', labelCol = 'DEP_DEL15', regParam=0.01,threshold=0.5)

lr_Model = lr.fit(train_data)

lr_prediction_train = lr_Model.transform(train_data)
lr_prediction_test = lr_Model.transform(test_data)

In [19]:
lr_prediction_test.select("DEP_DEL15","prediction").show(20)

+---------+----------+
|DEP_DEL15|prediction|
+---------+----------+
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        1|       0.0|
|        0|       0.0|
|        1|       0.0|
|        0|       0.0|
|        0|       0.0|
+---------+----------+
only showing top 20 rows



In [None]:
##2nd student name:
# add the code here


---
# **Task 3 - Model Parameter Tuning (20 marks)**
---


In [20]:
##1st student name:
from pyspark.sql import functions as F

# Check class distribution
class_dist = df.groupBy('DEP_DEL15').agg(F.count('*')).collect()
print(class_dist)


[Row(DEP_DEL15=1, count(1)=11112), Row(DEP_DEL15=0, count(1)=48473)]


In [21]:
lr = LogisticRegression(weightCol="weight")
balanced_df = df.withColumn("weight", F.when(F.col("DEP_DEL15") == 1, 10.0).otherwise(1.0))

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Assuming your target column is 'DEP_DEL15' (based on your error message)
# and features are in 'scaledFeatures'

# 1. Prepare the features and label columns
assembler = VectorAssembler(
    inputCols=[col for col in train_data.columns if col != "DEP_DEL15"],
    outputCol="features"
)

# 2. Create the Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="DEP_DEL15")

# 3. Create a pipeline
pipeline = Pipeline(stages=[assembler, lr])

# 4. Create parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 1.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [10, 50, 100])
             .build())

# 5. Create evaluator (using areaUnderPR for imbalanced data)
evaluator = BinaryClassificationEvaluator(
    labelCol="DEP_DEL15",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderPR"
)

# 6. Create cross-validator
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    seed=42
)

# 7. Run cross-validation
cvModel = cv.fit(train_data)

# 8. Get best model
best_model = cvModel.bestModel.stages[-1]  # Get the LR model from pipeline

# 9. View best parameters
print("Best parameters:")
print(f"regParam: {best_model.getRegParam()}")
print(f"elasticNetParam: {best_model.getElasticNetParam()}")
print(f"maxIter: {best_model.getMaxIter()}")

Best parameters:
regParam: 0.01
elasticNetParam: 0.0
maxIter: 10


In [23]:
# Add more parameters to grid
paramGrid = (ParamGridBuilder()
            .addGrid(lr.regParam, [0.001, 0.01, 0.1, 1.0])
            .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
            .addGrid(lr.maxIter, [50, 100, 200])
            .addGrid(lr.fitIntercept, [True, False])
            .addGrid(lr.threshold, [0.3, 0.4, 0.5, 0.6])  # Adjust decision threshold
            .build())

In [None]:
##2nd student name:
# add the code here


In [None]:
##3rd student name:
# add the code here


---
# **Task 4 - Model Evaluation and Accuracy Calculation (20 marks)**
---

In [24]:
##1st student name: Walgama Eshwarage Don Eshwara - 2782761

#confusion matrix

cm = lr_prediction_test.groupBy('DEP_DEL15', 'prediction').count()
cm.show(cm.count(), truncate=False)

+---------+----------+-----+
|DEP_DEL15|prediction|count|
+---------+----------+-----+
|1        |0.0       |3187 |
|0        |0.0       |14238|
|1        |1.0       |147  |
|0        |1.0       |91   |
+---------+----------+-----+



In [25]:
import pandas as pd
from pyspark.sql.functions import col, when

cm_pandas = cm.toPandas()
cm_pandas.pivot(index = 'DEP_DEL15', columns = 'prediction', values = 'count')




prediction,0.0,1.0
DEP_DEL15,Unnamed: 1_level_1,Unnamed: 2_level_1
0,14238,91
1,3187,147


In [26]:
#
tp = lr_prediction_test[(lr_prediction_test.DEP_DEL15 == 1) & (lr_prediction_test.prediction == 1)].count()
fp = lr_prediction_test[(lr_prediction_test.DEP_DEL15 == 0) & (lr_prediction_test.prediction == 1)].count()
fn = lr_prediction_test[(lr_prediction_test.DEP_DEL15 == 1) & (lr_prediction_test.prediction == 0)].count()
tn = lr_prediction_test[(lr_prediction_test.DEP_DEL15 == 0) & (lr_prediction_test.prediction == 0)].count()

In [None]:

accuracy = (tp + tn) / (tp + fp + fn + tn)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * (precision * recall) / (precision + recall)

print('accuracy: ', round(accuracy, 4) * 100)
print('precision: ', round(precision, 4) * 100)
print('recall: ', round(recall, 4) * 100)
print('f1 score: ', round(f1, 4) * 100)



accuracy:  81.44
precision:  61.760000000000005
recall:  4.41
f1 score:  8.23


In [None]:
##2nd student name:
# add the code here


In [None]:
##3rd student name:
# add the code here


---
# **Task 5 - Results Visualization or Printing (5 marks)**
---

In [None]:
##1st student name:
# add the code here


In [None]:
##2nd student name:
# add the code here


In [None]:
##3rd student name:
# add the code here


---
# **Task 6 - LSEP Considerations (5 marks)**
---

# Student 1: **Type the chosen issue**

add contribution here ...

# Student 2: **Type the chosen issue**

add contribution here ...

# Student 3: **Type the chosen issue**

add contribution here ...

---

# **Task 7 - Convert ipynb to HTML for Turnitin submission [5 marks]**

---



In [28]:
# install nbconvert (if facing the conversion error)
!pip3 install nbconvert



In [31]:
# convert ipynb to html and submit this HTML file
!jupyter nbconvert --to html Group_50_CRWK_CN7030.ipynb

This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [--JupyterApp.answer_yes=True]
--execute
    Execute the notebook prior to export.
    Equivalent to: [--ExecutePr