In [None]:
import os
import pandas as pd
import numpy as np


spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:13 http://ppa.launchpad.net/graph

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-01-26 04:32:21--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.5’


2022-01-26 04:32:21 (6.51 MB/s) - ‘postgresql-42.2.16.jar.5’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Final-Project").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import lpad
from pyspark.sql.functions import concat
from pyspark.sql.functions import lit
from pyspark.sql.functions import substring

In [None]:
from pyspark import SparkFiles
url = "https://finalprojectstorage10.s3.us-east-2.amazonaws.com/2018.csv"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("2018.csv"), header=True)
df.show()

+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|
+----------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+
|

In [None]:
from pyspark.sql.functions import *

#Timestamp function to fix formatting
def padTimeStamp(x,y):
  if y is None:
      y = "0"

  y = y.replace(".0","")
  return x + " " + y.zfill(4)[0:2] + ":" + y.zfill(4)[2:4]

#Create udf becuase python fucntions do not work with pyspark
padTimeStampUDF = udf(lambda x,y: padTimeStamp(x,y)) 

#inital dataframe applying the udf to pad and format the timestamp columns and pick the rest of the columns
initial_df = df.select(
                    "OP_CARRIER",
                    "OP_CARRIER_FL_NUM", 
                    "ORIGIN",
                    padTimeStampUDF(df["FL_DATE"],df["CRS_DEP_TIME"]).alias("CRS_DEPARTURE_TIMESTAMP"),
                    df.columns[0],df.columns[4],
                    padTimeStampUDF(df["FL_DATE"],df["DEP_TIME"]).alias("ACTUAL_DEPARTURE_TIMESTAMP"),
                    df.columns[7],df.columns[8],
                    padTimeStampUDF(df["FL_DATE"],df["WHEELS_OFF"]).alias("WHEELS_OFF_TIMESTAMP"),
                    padTimeStampUDF(df["FL_DATE"],df["WHEELS_ON"]).alias("WHEELS_ON_TIMESTAMP"),
                    df.columns[11],
                    padTimeStampUDF(df["FL_DATE"],df["CRS_ARR_TIME"]).alias("CRS_ARRIVAL_TIMESTAMP"),
                    padTimeStampUDF(df["FL_DATE"],df["ARR_TIME"]).alias("ACTUAL_ARRIVAL_TIMESTAMP"), 
                    df.columns[14], df.columns[15], df.columns[16], df.columns[17], df.columns[18], df.columns[19], df.columns[20], df.columns[21], df.columns[22], df.columns[23], df.columns[24], df.columns[25], df.columns[26] 
                  )
#initial_df.printSchema()
#initial_df.show()

#Changed the 1 and 0s to true and false
initial_df = initial_df.withColumn("DIVERTED", when(col("DIVERTED") == '1.0', True).otherwise(False))
initial_df = initial_df.withColumn("CANCELLED", when(col("CANCELLED") == '1.0', True).otherwise(False))

#Final df where we change the column names and cast the datatypes
final_df = initial_df.selectExpr(
                                "OP_CARRIER as AIRLINE_CARRIER_CODE",
                                "OP_CARRIER_FL_NUM",
                                "ORIGIN as ORIGIN_AIRPORT_CODE",
                                "cast(CRS_DEPARTURE_TIMESTAMP as timestamp) CRS_DEPARTURE_TIMESTAMP",
                                "cast(FL_DATE as date) FLIGHT_DT",
                                "DEST as DEST_AIRPORT_CODE",
                                "cast(ACTUAL_DEPARTURE_TIMESTAMP as timestamp) ACTUAL_DEPARTURE_TIMESTAMP",
                                "cast(DEP_DELAY as integer) as DEPARTURE_DELAY_MINUTES",
                                "cast(TAXI_OUT as integer) TAXI_OUT_MINUTES",
                                "cast(WHEELS_OFF_TIMESTAMP as timestamp) WHEELS_OFF_TIMESTAMP",
                                "cast(WHEELS_ON_TIMESTAMP as timestamp) WHEELS_ON_TIMESTAMP",
                                "cast(TAXI_IN as integer) TAXI_IN_MINUTES",
                                "cast(CRS_ARRIVAL_TIMESTAMP as timestamp) CRS_ARRIVAL_TIMESTAMP",
                                "cast(CRS_ARRIVAL_TIMESTAMP as timestamp) ACTUAL_ARRIVAL_TIMESTAMP",
                                "cast(ARR_DELAY as integer) ARRIVAL_DELAY_MINUTES",
                                "CANCELLED as CANCELLED_IND",
                                "CANCELLATION_CODE",
                                "DIVERTED as DIVERTED_IND",
                                "cast(CRS_ELAPSED_TIME as integer) CRS_ELAPSED_TIME_MINUTES",
                                "cast(ACTUAL_ELAPSED_TIME as integer) ACTUAL_ELAPSED_TIME_MINUTES",
                                "cast(AIR_TIME as integer) AIR_TIME_MINUTES",
                                "cast(DISTANCE as integer) DISTANCE_MILES",
                                "cast(CARRIER_DELAY as integer) CARRIER_DELAY_MINUTES",
                                "cast(WEATHER_DELAY as integer) WEATHER_DELAY_MINUTES",
                                "cast(NAS_DELAY as integer) NAS_DELAY_MINUTES",
                                "cast(SECURITY_DELAY as integer) SECURITY_DELAY_MINUTES",
                                "cast(LATE_AIRCRAFT_DELAY as integer) LATE_AIRCRAFT_DELAY_MINUTES"
                                )
#Fill the null values with 0
final_df = final_df.na.fill(value=0,subset=["CARRIER_DELAY_MINUTES", "WEATHER_DELAY_MINUTES", "NAS_DELAY_MINUTES", "SECURITY_DELAY_MINUTES","LATE_AIRCRAFT_DELAY_MINUTES"])

final_df.show()

+--------------------+-----------------+-------------------+-----------------------+----------+-----------------+--------------------------+-----------------------+----------------+--------------------+-------------------+---------------+---------------------+------------------------+---------------------+-------------+-----------------+------------+------------------------+---------------------------+----------------+--------------+---------------------+---------------------+-----------------+----------------------+---------------------------+
|AIRLINE_CARRIER_CODE|OP_CARRIER_FL_NUM|ORIGIN_AIRPORT_CODE|CRS_DEPARTURE_TIMESTAMP| FLIGHT_DT|DEST_AIRPORT_CODE|ACTUAL_DEPARTURE_TIMESTAMP|DEPARTURE_DELAY_MINUTES|TAXI_OUT_MINUTES|WHEELS_OFF_TIMESTAMP|WHEELS_ON_TIMESTAMP|TAXI_IN_MINUTES|CRS_ARRIVAL_TIMESTAMP|ACTUAL_ARRIVAL_TIMESTAMP|ARRIVAL_DELAY_MINUTES|CANCELLED_IND|CANCELLATION_CODE|DIVERTED_IND|CRS_ELAPSED_TIME_MINUTES|ACTUAL_ELAPSED_TIME_MINUTES|AIR_TIME_MINUTES|DISTANCE_MILES|CARRIER_DELA

In [None]:
final_df.printSchema()

root
 |-- AIRLINE_CARRIER_CODE: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN_AIRPORT_CODE: string (nullable = true)
 |-- CRS_DEPARTURE_TIMESTAMP: timestamp (nullable = true)
 |-- FLIGHT_DT: date (nullable = true)
 |-- DEST_AIRPORT_CODE: string (nullable = true)
 |-- ACTUAL_DEPARTURE_TIMESTAMP: timestamp (nullable = true)
 |-- DEPARTURE_DELAY_MINUTES: integer (nullable = true)
 |-- TAXI_OUT_MINUTES: integer (nullable = true)
 |-- WHEELS_OFF_TIMESTAMP: timestamp (nullable = true)
 |-- WHEELS_ON_TIMESTAMP: timestamp (nullable = true)
 |-- TAXI_IN_MINUTES: integer (nullable = true)
 |-- CRS_ARRIVAL_TIMESTAMP: timestamp (nullable = true)
 |-- ACTUAL_ARRIVAL_TIMESTAMP: timestamp (nullable = true)
 |-- ARRIVAL_DELAY_MINUTES: integer (nullable = true)
 |-- CANCELLED_IND: boolean (nullable = false)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED_IND: boolean (nullable = false)
 |-- CRS_ELAPSED_TIME_MINUTES: integer (nullable = true)
 |-- A

In [None]:
#final_df.filter((final_df.AIRLINE_CARRIER_CODE == "YX") & (final_df.OP_CARRIER_FL_NUM == "3624") & (final_df.CRS_DEPARTURE_TIMESTAMP == '2018-06-21 20:07:00')).show(100)

## Airport Names 

In [None]:
url2 = "https://finalprojectstorage10.s3.us-east-2.amazonaws.com/airport.csv"
spark.sparkContext.addFile(url2)
airport_initial_df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("airport.csv"), header=True)
airport_initial_df.show()

+------+-----+-------------+--------------------+------------+-------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+---------+--------------+--------+
|    id|ident|         type|                name|latitude_deg|longitude_deg|elevation_ft|continent|iso_country|iso_region|municipality|scheduled_service|gps_code|iata_code|local_code|home_link|wikipedia_link|keywords|
+------+-----+-------------+--------------------+------------+-------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+---------+--------------+--------+
|  6523|  00A|     heliport|   Total Rf Heliport| 40.07080078| -74.93360138|          11|       NA|         US|     US-PA|    Bensalem|               no|     00A|     null|       00A|     null|          null|    null|
|323361| 00AA|small_airport|Aero B Ranch Airport|   38.704022|  -101.473911|        3435|       NA|         US|     US-KS|      

In [None]:
#Select only relevant columns to later join with finaldf
airport_df = airport_initial_df.select("iata_code", "name", "latitude_deg", "longitude_deg")
airport_df.show()
airport_df.count()

+---------+--------------------+------------+-------------+
|iata_code|                name|latitude_deg|longitude_deg|
+---------+--------------------+------------+-------------+
|     null|   Total Rf Heliport| 40.07080078| -74.93360138|
|     null|Aero B Ranch Airport|   38.704022|  -101.473911|
|     null|        Lowell Field|   59.947733|  -151.692524|
|     null|        Epps Airpark|  34.8647995| -86.77030182|
|     null|Newport Hospital ...|     35.6087|   -91.254898|
|     null|      Fulton Airport|  34.9428028|  -97.8180194|
|     null|      Cordes Airport| 34.30559921| -112.1650009|
|     null|Goldstone (GTS) A...|    35.35474|  -116.885329|
|     null| Williams Ag Airport|   39.427188|  -121.763427|
|     null|Kitchen Creek Hel...|  32.7273736| -116.4597417|
|     null|          Cass Field|   40.622202|  -104.344002|
|     null| Grass Patch Airport| 28.64550018| -82.21900177|
|     null|  Ringhaver Heliport| 28.84659958| -82.34539795|
|     null|   River Oak Airport| 27.2308

70174

In [None]:
#Drop null and duplicates
airport_df = airport_df.dropna()
airport_df = airport_df.drop_duplicates(['iata_code'])
airport_df.count()

9131

In [None]:
from pyspark.sql.types import *

test_df = final_df
#Origin Airport Code Join
joined_df = test_df.join(airport_df, test_df.ORIGIN_AIRPORT_CODE == airport_df.iata_code,"left")
joined_df = joined_df.withColumnRenamed("name","ORIGIN_AIRPORT_NAME").withColumnRenamed("latitude_deg","ORIGIN_LATITUDE_DEG").withColumnRenamed("longitude_deg","ORIGIN_LONGITUDE_DEG").drop(joined_df.iata_code)

#Departure Airport Code Join and rename columns
joined_df = joined_df.join(airport_df, joined_df.DEST_AIRPORT_CODE == airport_df.iata_code,"left")
joined_df = joined_df.withColumnRenamed("name","DEST_AIRPORT_NAME").withColumnRenamed("latitude_deg","DEST_LATITUDE_DEG").withColumnRenamed("longitude_deg","DEST_LONGITUDE_DEG").drop(joined_df.iata_code)

#Cast latitude and longitude to float
joined_df = joined_df.withColumn("ORIGIN_LATITUDE_DEG",col("ORIGIN_LATITUDE_DEG").cast(FloatType())) \
                     .withColumn("ORIGIN_LONGITUDE_DEG",col("ORIGIN_LONGITUDE_DEG").cast(FloatType())) \
                     .withColumn("DEST_LATITUDE_DEG",col("DEST_LATITUDE_DEG").cast(FloatType())) \
                     .withColumn("DEST_LONGITUDE_DEG",col("DEST_LONGITUDE_DEG").cast(FloatType()))

In [None]:
joined_df.show(5)

+--------------------+-----------------+-------------------+-----------------------+----------+-----------------+--------------------------+-----------------------+----------------+--------------------+-------------------+---------------+---------------------+------------------------+---------------------+-------------+-----------------+------------+------------------------+---------------------------+----------------+--------------+---------------------+---------------------+-----------------+----------------------+---------------------------+--------------------+-------------------+--------------------+--------------------+-----------------+------------------+
|AIRLINE_CARRIER_CODE|OP_CARRIER_FL_NUM|ORIGIN_AIRPORT_CODE|CRS_DEPARTURE_TIMESTAMP| FLIGHT_DT|DEST_AIRPORT_CODE|ACTUAL_DEPARTURE_TIMESTAMP|DEPARTURE_DELAY_MINUTES|TAXI_OUT_MINUTES|WHEELS_OFF_TIMESTAMP|WHEELS_ON_TIMESTAMP|TAXI_IN_MINUTES|CRS_ARRIVAL_TIMESTAMP|ACTUAL_ARRIVAL_TIMESTAMP|ARRIVAL_DELAY_MINUTES|CANCELLED_IND|CANCELLA

In [None]:
joined_df.printSchema()

root
 |-- AIRLINE_CARRIER_CODE: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: string (nullable = true)
 |-- ORIGIN_AIRPORT_CODE: string (nullable = true)
 |-- CRS_DEPARTURE_TIMESTAMP: timestamp (nullable = true)
 |-- FLIGHT_DT: date (nullable = true)
 |-- DEST_AIRPORT_CODE: string (nullable = true)
 |-- ACTUAL_DEPARTURE_TIMESTAMP: timestamp (nullable = true)
 |-- DEPARTURE_DELAY_MINUTES: integer (nullable = true)
 |-- TAXI_OUT_MINUTES: integer (nullable = true)
 |-- WHEELS_OFF_TIMESTAMP: timestamp (nullable = true)
 |-- WHEELS_ON_TIMESTAMP: timestamp (nullable = true)
 |-- TAXI_IN_MINUTES: integer (nullable = true)
 |-- CRS_ARRIVAL_TIMESTAMP: timestamp (nullable = true)
 |-- ACTUAL_ARRIVAL_TIMESTAMP: timestamp (nullable = true)
 |-- ARRIVAL_DELAY_MINUTES: integer (nullable = true)
 |-- CANCELLED_IND: boolean (nullable = false)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED_IND: boolean (nullable = false)
 |-- CRS_ELAPSED_TIME_MINUTES: integer (nullable = true)
 |-- A

## Write to Database

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://airline-db.cjrwalaqox8l.us-east-2.rds.amazonaws.com/airline"
config = {"user":"postgres", 
          "password": "57jcw2Jvswa7vhDe", 
          "driver":"org.postgresql.Driver"}


In [None]:
# Write DataFrame to flightdata table in RDS
final_df.write.jdbc(url=jdbc_url, table='project.flight_data', mode=mode, properties=config)

In [None]:
#Write Joined Dataframe to joined flightdata table in RDS
joined_df.write.jdbc(url=jdbc_url, table='project.joined_flight_data', mode=mode, properties=config)