In [1]:
!pip install pyspark



In [2]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Waiting for headers] [1 InRelease 0 B/110 kB 0%] [Connected to cloud.r-proj                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [1 InRelease 43.1 kB/110 kB 39%] [Connected to cloud.r-project.org (108.157.                                                                               Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [109 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-d

In [3]:
# Import packages
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
import time
from pyspark.sql.functions import col, count
from pyspark.sql.functions import col, sum
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import when, col
from pyspark.sql.functions import col, lit

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline





In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:

# Create a SparkSession
spark = SparkSession.builder.appName("LocalCSVToDataFrame").getOrCreate()

# Define the path to your local CSV file
drive_csv_path = "drive/MyDrive/Data/Combined_Flights_2022.csv"

# Read the local CSV file into a DataFrame
df = spark.read.csv(drive_csv_path, sep=",", header=True)

# Show the DataFrame
df.show()

+----------+--------------------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+-------------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|             Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|De

In [6]:
# Create a temporary view of the DataFrame.
df.createOrReplaceTempView('flights')

In [7]:
 # Get total number of records in dataset
print("The data contain %d records." % df.count())

# View the dataframe
df

The data contain 4078318 records.


DataFrame[FlightDate: string, Airline: string, Origin: string, Dest: string, Cancelled: string, Diverted: string, CRSDepTime: string, DepTime: string, DepDelayMinutes: string, DepDelay: string, ArrTime: string, ArrDelayMinutes: string, AirTime: string, CRSElapsedTime: string, ActualElapsedTime: string, Distance: string, Year: string, Quarter: string, Month: string, DayofMonth: string, DayOfWeek: string, Marketing_Airline_Network: string, Operated_or_Branded_Code_Share_Partners: string, DOT_ID_Marketing_Airline: string, IATA_Code_Marketing_Airline: string, Flight_Number_Marketing_Airline: string, Operating_Airline: string, DOT_ID_Operating_Airline: string, IATA_Code_Operating_Airline: string, Tail_Number: string, Flight_Number_Operating_Airline: string, OriginAirportID: string, OriginAirportSeqID: string, OriginCityMarketID: string, OriginCityName: string, OriginState: string, OriginStateFips: string, OriginStateName: string, OriginWac: string, DestAirportID: string, DestAirportSeqID: s

In [8]:
#sort values by flight
sorted_df = df.orderBy("FlightDate")


In [9]:
# How many null values in each column?
# df.isnull().sum()
null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])
null_counts.show()


+----------+-------+------+----+---------+--------+----------+-------+---------------+--------+-------+---------------+-------+--------------+-----------------+--------+----+-------+-----+----------+---------+-------------------------+---------------------------------------+------------------------+---------------------------+-------------------------------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+------------+---------+-------------+-------------+-------+--------+--------------------+----------+-------+---------+--------+------+----------+--------+--------+------------------+----------+-------------+------------------+
|FlightDate|Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|DepDelay|ArrTime|ArrDelayMinutes|AirTime|

In [10]:
# Only proceed with the following columns
essential_column_list = [
    'Flight_Number_Operating_Airline',
    'Month',
    'DayofMonth',
    'DayOfWeek',
    'OriginAirportID',
    'CRSDepTime',
    'DepTime',
    'DestAirportID',
    'CRSArrTime',
    'ArrTime',
    'ArrDelayMinutes',
    'DepDelayMinutes',
    'Cancelled',
    ]
essential_df = df[essential_column_list]
essential_df.head(10)

[Row(Flight_Number_Operating_Airline='4301', Month='4', DayofMonth='4', DayOfWeek='1', OriginAirportID='11921', CRSDepTime='1133', DepTime='1123.0', DestAirportID='11292', CRSArrTime='1245', ArrTime='1228.0', ArrDelayMinutes='0.0', DepDelayMinutes='0.0', Cancelled='False'),
 Row(Flight_Number_Operating_Airline='4299', Month='4', DayofMonth='4', DayOfWeek='1', OriginAirportID='12206', CRSDepTime='732', DepTime='728.0', DestAirportID='12266', CRSArrTime='849', ArrTime='848.0', ArrDelayMinutes='0.0', DepDelayMinutes='0.0', Cancelled='False'),
 Row(Flight_Number_Operating_Airline='4298', Month='4', DayofMonth='4', DayOfWeek='1', OriginAirportID='11413', CRSDepTime='1529', DepTime='1514.0', DestAirportID='11292', CRSArrTime='1639', ArrTime='1636.0', ArrDelayMinutes='0.0', DepDelayMinutes='0.0', Cancelled='False'),
 Row(Flight_Number_Operating_Airline='4296', Month='4', DayofMonth='4', DayOfWeek='1', OriginAirportID='12266', CRSDepTime='1435', DepTime='1430.0', DestAirportID='11973', CRSArrT

In [11]:
# Count the number of cancelled flights
cancelled_count = df.filter(col("Cancelled") == 1).agg(count("*").alias("CancelledFlights"))
cancelled_count.show()

+----------------+
|CancelledFlights|
+----------------+
|               0|
+----------------+



In [12]:
# Define conditions and corresponding values
cancelled_condition = col("Cancelled") == "True"
delayed_condition = col("DepDelayMinutes") > 0
ontime_condition = col("DepDelayMinutes") == 0

# Create a new 'Status' column based on conditions
df = essential_df.withColumn("Status",
                   when(cancelled_condition, lit(2))
                   .when(delayed_condition, lit(1))
                   .when(ontime_condition, lit(0))
                   .otherwise("Unknown"))
df.show()

+-------------------------------+-----+----------+---------+---------------+----------+-------+-------------+----------+-------+---------------+---------------+---------+------+
|Flight_Number_Operating_Airline|Month|DayofMonth|DayOfWeek|OriginAirportID|CRSDepTime|DepTime|DestAirportID|CRSArrTime|ArrTime|ArrDelayMinutes|DepDelayMinutes|Cancelled|Status|
+-------------------------------+-----+----------+---------+---------------+----------+-------+-------------+----------+-------+---------------+---------------+---------+------+
|                           4301|    4|         4|        1|          11921|      1133| 1123.0|        11292|      1245| 1228.0|            0.0|            0.0|    False|     0|
|                           4299|    4|         4|        1|          12206|       732|  728.0|        12266|       849|  848.0|            0.0|            0.0|    False|     0|
|                           4298|    4|         4|        1|          11413|      1529| 1514.0|        11292| 

In [13]:
# Remove the specified columns
# columns_to_drop = ['Cancelled', 'ArrDelayMinutes', 'DepDelayMinutes', 'ArrTime', 'DepTime']
columns_to_drop = ['Cancelled','ArrTime', 'DepTime']
df_clean = df.drop(*columns_to_drop)

In [14]:
# Convert the 'Status' column to float
df_clean = df_clean.withColumn("Status", col("Status").cast("float"))

# Display data types for each column
df_clean.printSchema()

root
 |-- Flight_Number_Operating_Airline: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- OriginAirportID: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- DestAirportID: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- ArrDelayMinutes: string (nullable = true)
 |-- DepDelayMinutes: string (nullable = true)
 |-- Status: float (nullable = true)



In [15]:
# Group by 'Status' column and count occurrences
status_counts = df_clean.groupBy("Status").count()
status_counts.show()

+------+-------+
|Status|  count|
+------+-------+
|   2.0| 123192|
|   1.0|1557028|
|   0.0|2398098|
+------+-------+



In [16]:
# Look at the value counts for binning
# Group by 'Flight_Number_Operating_Airline' column and count occurrences
flight_no_airline_count = df_clean.groupBy("Flight_Number_Operating_Airline").agg(count("*").alias("Count"))
flight_no_airline_count.show()

+-------------------------------+-----+
|Flight_Number_Operating_Airline|Count|
+-------------------------------+-----+
|                           3959|  367|
|                           4821|  529|
|                            675| 1326|
|                           1090|  587|
|                            467| 1166|
|                           1572|  792|
|                            296| 1055|
|                           4032|  238|
|                            829|  672|
|                           2136| 1347|
|                           2088| 1078|
|                           3606|  779|
|                           3414|  451|
|                           2162|  736|
|                           2904|  686|
|                           2294| 1049|
|                           3210|  411|
|                           1436|  945|
|                           2069| 1322|
|                           1159| 1004|
+-------------------------------+-----+
only showing top 20 rows



In [17]:

# Filter for flight numbers with value counts > 1000
cutoff_value = 1000
flight_no_over_1 = flight_no_airline_count.filter(col("Count") > cutoff_value)
flight_no_over_1.show()

+-------------------------------+-----+
|Flight_Number_Operating_Airline|Count|
+-------------------------------+-----+
|                            675| 1326|
|                            467| 1166|
|                            296| 1055|
|                           2136| 1347|
|                           2088| 1078|
|                           2294| 1049|
|                           2069| 1322|
|                           1159| 1004|
|                            691| 1010|
|                           2110| 1136|
|                            451| 1018|
|                           2275| 1325|
|                           1280| 1041|
|                           2530| 1059|
|                             51| 1084|
|                            591| 1071|
|                            475| 1351|
|                           1445| 1059|
|                           1500| 1445|
|                            574| 1222|
+-------------------------------+-----+
only showing top 20 rows



In [18]:
# Define the cutoff value
cutoff_value = 600

# Calculate value counts
flight_number_counts = df_clean.groupBy("Flight_Number_Operating_Airline").count()

# Replace flight numbers with value counts over the cutoff with "0"
df_clean = df_clean.join(
    flight_number_counts,
    "Flight_Number_Operating_Airline",
    "left"
).withColumn(
    "Flight_Number_Operating_Airline",
    when(col("Count") > cutoff_value, "0").otherwise(col("Flight_Number_Operating_Airline"))
).drop("Count")

# Show the updated DataFrame
df_clean.show()

+-------------------------------+-----+----------+---------+---------------+----------+-------------+----------+---------------+---------------+------+
|Flight_Number_Operating_Airline|Month|DayofMonth|DayOfWeek|OriginAirportID|CRSDepTime|DestAirportID|CRSArrTime|ArrDelayMinutes|DepDelayMinutes|Status|
+-------------------------------+-----+----------+---------+---------------+----------+-------------+----------+---------------+---------------+------+
|                           4271|    4|         4|        1|          11292|      1330|        11413|      1444|            0.0|            0.0|   0.0|
|                           4266|    4|         4|        1|          12266|      1810|        13256|      1932|            0.0|            0.0|   0.0|
|                              0|    4|         4|        1|          12266|      1940|        13422|      2106|            0.0|           14.0|   1.0|
|                           4291|    4|         4|        1|          12266|      1424| 

In [19]:
# Replace non-finite values with a placeholder (e.g., 0)
df_clean = df_clean.fillna(0)


In [20]:
# Convert columns to int
for col_name in df_clean.columns:
    df_clean = df_clean.withColumn(col_name, col(col_name).cast("int"))

# Show the updated DataFrame
df_clean.show()

+-------------------------------+-----+----------+---------+---------------+----------+-------------+----------+---------------+---------------+------+
|Flight_Number_Operating_Airline|Month|DayofMonth|DayOfWeek|OriginAirportID|CRSDepTime|DestAirportID|CRSArrTime|ArrDelayMinutes|DepDelayMinutes|Status|
+-------------------------------+-----+----------+---------+---------------+----------+-------------+----------+---------------+---------------+------+
|                           4271|    4|         4|        1|          11292|      1330|        11413|      1444|              0|              0|     0|
|                           4266|    4|         4|        1|          12266|      1810|        13256|      1932|              0|              0|     0|
|                              0|    4|         4|        1|          12266|      1940|        13422|      2106|              0|             14|     1|
|                           4291|    4|         4|        1|          12266|      1424| 

In [21]:
# Check the data types
df_clean.dtypes

[('Flight_Number_Operating_Airline', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('OriginAirportID', 'int'),
 ('CRSDepTime', 'int'),
 ('DestAirportID', 'int'),
 ('CRSArrTime', 'int'),
 ('ArrDelayMinutes', 'int'),
 ('DepDelayMinutes', 'int'),
 ('Status', 'int')]

In [23]:
from pyspark.ml import Pipeline


# Initialize Spark session
spark = SparkSession.builder \
    .appName("MultiClassClassification") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Define features and label columns
feature_columns = df_clean.columns
feature_columns.remove("Status")

# Create a VectorAssembler to combine feature columns into a single feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")

# Ensure that the input layer size matches the number of features
input_layer_size = len(feature_columns)

# Specify the layers for the MultilayerPerceptronClassifier
#layers = [input_layer_size, 48, 24, 12, 3]
layers = [input_layer_size, 60, 20, 10, 4]

# Create the MultilayerPerceptronClassifier model
mlp = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="Status",
    layers=layers,
    maxIter=5,
    seed=42
)

# Create a pipeline that combines the VectorAssembler and MultilayerPerceptronClassifier
pipeline = Pipeline(stages=[assembler, mlp])

# Split the data into training and testing sets
train_data, test_data = df_clean.randomSplit([0.8, 0.2], seed=42)

# Fit the model on the training data
model = pipeline.fit(train_data)


In [24]:
# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Status", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.6068069112589088
