Installing mysql and setup

In [1]:
! apt-get update
! apt-get install mysql-server

Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://archive.ubuntu.com/ubuntu jammy-updates/restricted amd64 Packages [3,962 kB]
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1,535 kB]
Get:13 http://archive.ubuntu.com/ubuntu jammy-updates/main amd

In [2]:
!mysql --version # Making sure everything has been installed correctly by checking the version
!service mysql start # Starting our server

mysql  Ver 8.0.41-0ubuntu0.22.04.1 for Linux on x86_64 ((Ubuntu))
 * Starting MySQL database server mysqld
   ...done.


In [3]:
!mysql -e "SET GLOBAL local_infile = 1;"

In [4]:
from google.colab import drive
from google.colab import files

drive.mount('/content/drive')

Mounted at /content/drive


Using python to preprocess the file before applying any sql statements. <br>
This removes any unnecessary columns that will not be used in our queries.

In [6]:
import pandas as pd
import numpy as np

df2018 = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/Combined_Flights_2018.csv")
df2022 = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/Combined_Flights_2022.csv")

columns_to_remove = ["CRSDepTime", "DepDelayMinutes", "ArrDelayMinutes", "CRSElapsedTime", "ActualElapsedTime", "DayofMonth", "DayOfWeek", "Marketing_Airline_Network",
                     "Operated_or_Branded_Code_Share_Partners", "DOT_ID_Marketing_Airline", "IATA_Code_Marketing_Airline", "Flight_Number_Marketing_Airline",	"Operating_Airline",
                     "DOT_ID_Operating_Airline", "IATA_Code_Operating_Airline", "Tail_Number", "Flight_Number_Operating_Airline", "OriginAirportID", "OriginAirportSeqID", "OriginCityMarketID",
                     "OriginState", "OriginStateFips", "OriginWac", "DestAirportID", "DestAirportSeqID", "DestCityMarketID", "DestState", "DestStateFips", "DestWac", "DepDel15", "DepartureDelayGroups",
                     "DepTimeBlk", "CRSArrTime", "ArrDel15", "ArrivalDelayGroups", "ArrTimeBlk", "DistanceGroup"]

In [7]:
# Remove columns specified above from dfs
df2018 = df2018.drop(columns=columns_to_remove)
df2022 = df2022.drop(columns=columns_to_remove)

print(df2018.head())
print(df2022.head())

print("Before 2018df length: %d" % len(df2018))
print("Before 2022df length: %d" % len(df2022))

   FlightDate            Airline Origin Dest  Cancelled  Diverted  DepTime  \
0  2018-01-23  Endeavor Air Inc.    ABY  ATL      False     False   1157.0   
1  2018-01-24  Endeavor Air Inc.    ABY  ATL      False     False   1157.0   
2  2018-01-25  Endeavor Air Inc.    ABY  ATL      False     False   1153.0   
3  2018-01-26  Endeavor Air Inc.    ABY  ATL      False     False   1150.0   
4  2018-01-27  Endeavor Air Inc.    ABY  ATL      False     False   1355.0   

   DepDelay  ArrTime  AirTime  ...  OriginCityName  OriginStateName  \
0      -5.0   1256.0     38.0  ...      Albany, GA          Georgia   
1      -5.0   1258.0     36.0  ...      Albany, GA          Georgia   
2      -9.0   1302.0     40.0  ...      Albany, GA          Georgia   
3     -12.0   1253.0     35.0  ...      Albany, GA          Georgia   
4      -5.0   1459.0     36.0  ...      Albany, GA          Georgia   

   DestCityName  DestStateName TaxiOut WheelsOff WheelsOn TaxiIn  ArrDelay  \
0   Atlanta, GA        Geo

In [8]:
df2018["FlightDate"] = df2018["FlightDate"].replace('0', pd.NaT)
df2022["FlightDate"] = df2022["FlightDate"].replace('0', pd.NaT)

# Convert date to correct format so sql can store properly
df2018["FlightDate"] = pd.to_datetime(df2018["FlightDate"], errors='coerce')
df2022["FlightDate"] = pd.to_datetime(df2022["FlightDate"], errors='coerce')

# Drop any empty rows
df2018 = df2018.dropna()
df2022 = df2022.dropna()

# Drop any empty rows
df2018 = df2018.dropna(subset=["FlightDate"])
df2022 = df2022.dropna(subset=["FlightDate"])
print("After 2018df length: %d" % len(df2018))
print("After 2022df length: %d" % len(df2022))

# Change true/false to 1/0 to be able to store in db
df2018['Cancelled'] = df2018['Cancelled'].map({True: 1, False: 0})
df2018['Diverted'] = df2018['Diverted'].map({True: 1, False: 0})
df2022['Cancelled'] = df2022['Cancelled'].map({True: 1, False: 0})
df2022['Diverted'] = df2022['Diverted'].map({True: 1, False: 0})

print(df2018.head())
print(df2022.head())

# Check column length
print("2018df columns: %d" % len(df2018.columns))
print("2022df columns: %d" % len(df2022.columns))

After 2018df length: 5578618
After 2022df length: 3944916
  FlightDate            Airline Origin Dest  Cancelled  Diverted  DepTime  \
0 2018-01-23  Endeavor Air Inc.    ABY  ATL          0         0   1157.0   
1 2018-01-24  Endeavor Air Inc.    ABY  ATL          0         0   1157.0   
2 2018-01-25  Endeavor Air Inc.    ABY  ATL          0         0   1153.0   
3 2018-01-26  Endeavor Air Inc.    ABY  ATL          0         0   1150.0   
4 2018-01-27  Endeavor Air Inc.    ABY  ATL          0         0   1355.0   

   DepDelay  ArrTime  AirTime  ...  OriginCityName  OriginStateName  \
0      -5.0   1256.0     38.0  ...      Albany, GA          Georgia   
1      -5.0   1258.0     36.0  ...      Albany, GA          Georgia   
2      -9.0   1302.0     40.0  ...      Albany, GA          Georgia   
3     -12.0   1253.0     35.0  ...      Albany, GA          Georgia   
4      -5.0   1459.0     36.0  ...      Albany, GA          Georgia   

   DestCityName  DestStateName TaxiOut WheelsOff Whe

In [9]:
# Save df to csv file
df2018.to_csv("2018filtered_flights.csv", index=False)
df2022.to_csv("2022filtered_flights.csv", index=False)

# Append 2022 df to same csv
#with open('filtered_flights.csv', 'a') as f:
#    df2022.to_csv(f, header=False)

print("Saved as 'filtered_flights.csv'.")

Saved as 'filtered_flights.csv'.


In [None]:
# Move file to correct folder so sql can access
!mv '2018filtered_flights.csv' /var/lib/mysql-files/
!mv '2022filtered_flights.csv' /var/lib/mysql-files/

In [None]:
!cat '/var/lib/mysql-files/filtered_flights.csv'

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
2018-01-05,Southwest Airlines Co.,ONT,PHX,,,1653.0,13.0,1904.0,53.0,325.0,2018,1,1,"Ontario, CA",California,"Phoenix, AZ",Arizona,9.0,1702.0,1855.0,9.0,9.0,0.0
2018-01-05,Southwest Airlines Co.,ONT,PHX,,,722.0,-3.0,917.0,39.0,325.0,2018,1,1,"Ontario, CA",California,"Phoenix, AZ",Arizona,7.0,729.0,908.0,9.0,-18.0,0.0
2018-01-05,Southwest Airlines Co.,ONT,PHX,,,1102.0,-3.0,1302.0,49.0,325.0,2018,1,1,"Ontario, CA",California,"Phoenix, AZ",Arizona,6.0,1108.0,1257.0,5.0,-8.0,0.0
2018-01-05,Southwest Airlines Co.,ONT,PHX,,,2035.0,5.0,2252.0,52.0,325.0,2018,1,1,"Ontario, CA",California,"Phoenix, AZ",Arizona,5.0,2040.0,2232.0,20.0,12.0,0.0
2018-01-05,Southwest Airlines Co.,ONT,SJC,,,2127.0,-3.0,2247.0,51.0,333.0,2018,1,1,"Ontario, CA",California,"San Jose, CA",California,6.0,2133.0,2224.0,23.0,-3.0,0.0
2018-01-05,Southwest Airlines Co.,ONT,SJC,,,543.0,-2.0,657.0,57.0,333.0,2018,1,1,"Ontario, CA",California,"San Jose, CA",Californ

**Start SQL shell and run these commands: <br>**
show databases; <br>
CREATE DATABASE flights; <br>
use flights; <br>

**Creates 'flights' table with relevant column names and datatypes.<br>**
CREATE TABLE flights (
    FlightDate DATE,
    Airline VARCHAR(100),
    Origin VARCHAR(5),
    Dest VARCHAR(5),
    Cancelled INT,
    Diverted INT,
    DepTime INT,
    DepDelay INT,
    ArrTime INT,
    AirTime INT,
    Distance INT,
    Year INT,
    Quarter INT,
    Month INT,
    OriginCityName VARCHAR(100),
    OriginStateName VARCHAR(100),
    DestCityName VARCHAR(100),
    DestStateName VARCHAR(100),
    TaxiOut INT,
    WheelsOff INT,
    WheelsOn INT,
    TaxiIn INT,
    ArrDelay INT,
    DivAirportLandings INT
); <br>

**Loads each file into db <br>**
LOAD DATA INFILE '/var/lib/mysql-files/2018filtered_flights.csv' INTO TABLE flights FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 ROWS; <br>

LOAD DATA INFILE '/var/lib/mysql-files/2022filtered_flights.csv' INTO TABLE flights FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 ROWS;

In [None]:
!mysql

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 30
Server version: 8.0.41-0ubuntu0.22.04.1 (Ubuntu)

Copyright (c) 2000, 2025, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> use flights;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> LOAD DATA INFILE '/var/lib/mysql-files/2018filtered_flights.csv' INTO TABLE flights FIELDS TE RMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 ROWS;
Query OK, 5578618 rows affected (2 min 25.42 sec)

mysql> LOAD DATA INFILE '/var/lib/mysql-files/2022filtered_flights.csv' INTO TABLE flights FIELDS TE RMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 ROWS;
Query OK, 3944916 ro

### Spark Section
First, we are setting up the spark environment.

In [10]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [11]:
!wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!rm spark-3.3.1-bin-hadoop3.tgz   # Tidying up

--2025-03-23 11:58:19--  https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299350810 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.1-bin-hadoop3.tgz’


2025-03-23 11:58:31 (23.6 MB/s) - ‘spark-3.3.1-bin-hadoop3.tgz’ saved [299350810/299350810]



In [12]:
# Setting up our environmental variables:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [13]:
!pip install -q findspark
import findspark
findspark.init()

In [14]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) #  This will format our output tables a bit nicer when not using the show() method
spark

In [15]:
import multiprocessing
print(multiprocessing.cpu_count())

2


In [16]:
# Creating our SparkContext:
sc = spark.sparkContext
!find /content/ -name "2018filtered_flights.csv"
# Read the file as an RDD:
rdd = sc.textFile("2018filtered_flights.csv")

/content/2018filtered_flights.csv


In [18]:
import csv
from io import StringIO

def parse_csv(line):
    if not line.strip():  # Skip empty lines
        return []

    try:
        # Read the line as CSV and extract the first row
        parsed_row = next(csv.reader(StringIO(line)))
        return parsed_row
    except StopIteration:
        # If there's no valid row, return an empty list
        return []
    except Exception as e:
        # Catch other potential exceptions and return empty list
        print(f"Error parsing line: {line}. Error: {e}")
        return []