In [None]:
"""
key fields we need to look at
@acid (Flight Number)
@airline (Airline)
@arrArpt (Arrival Airport)
@depArpt (Departure Airport)
fdm:trackInformation.nxcm:ncsmTrackData.nxcm:departureFixAndTime.@arrTime (Departure Time)
fdm:trackInformation.nxcm:ncsmTrackData.nxcm:eta.@timeValue (Scheduled Arrival Time)
fdm:trackInformation.nxcm:ncsmTrackData.nxcm:arrivalFixAndTime.@arrTime (Actual Arrival Time)

TODO LIST
[o] do these operations on all the json files, not just flights_000
[o] convert the timestamp (currently stored as a string) to some time value to do arithmetic, and determine the delay for each recorded flight
[] create another table with airport hubs for each airline to compare data?
"""

In [12]:
from datetime import datetime, timezone
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, to_timestamp
from pyspark.sql.types import StringType,IntegerType
from pyspark.sql.functions import collect_list,split,regexp_replace,col,round,concat,lit,avg,when,length
spark = SparkSession.builder.master("local[*]").appName('flight-data').getOrCreate()
flight_data = spark.read.format("json").options(inferschema='true',header='true').load('../part1/flights/flights_000.jsonl')
flight_data = flight_data.select('@acid', '@airline', '@arrArpt', '@depArpt',\
                                 'fdm:trackInformation.nxcm:ncsmTrackData.nxcm:departureFixAndTime.@arrTime',\
                                 'fdm:trackInformation.nxcm:ncsmTrackData.nxcm:eta.@timeValue',\
                                 'fdm:trackInformation.nxcm:ncsmTrackData.nxcm:arrivalFixAndTime.@arrTime')
# renames the table columns
new_column_names = ["Flight Number", "Airline", "Arrival Airport", "Departure Airport", "Departure Time", "Scheduled Arrival Time", "Actual Arrival Time"]
flight_data = flight_data.toDF(*new_column_names)
# removing invalid departure and arrival times, null values
flight_data = flight_data.na.drop()
# removes placeholder letter in front of airport code, if it exists
flight_data = flight_data.withColumn('Arrival Airport', when(length(col('Arrival Airport')) == 4, col('Arrival Airport').substr(2,3))\
                         .otherwise(col('Arrival Airport')))\
                         .withColumn('Departure Airport', when(length(col('Departure Airport')) == 4, col('Departure Airport').substr(2,3))\
                         .otherwise(col('Departure Airport')))\
                         .withColumn('Delayed', to_timestamp(flight_data["Scheduled Arrival Time"]).cast('long') < to_timestamp(flight_data["Actual Arrival Time"]).cast('long'))
                         
# flight_data.show(10)

fares = spark.read.parquet('./ticket_fares/output.parquet/')
flight_data = flight_data.join(fares, (flight_data.Airline == fares.Airline) 
                            & (flight_data['Arrival Airport'] == fares.Origin)
                            & (flight_data['Departure Airport'] == fares.Dest))\
                    .drop(fares.Airline).drop('ItinID', 'Origin', 'Dest')\
                    .dropDuplicates()
# flight_data.show(10)
# flight_data.write.csv("./temp.csv", mode='overwrite')

flight_data.write\
        .format('jdbc')\
        .option('url', 'jdbc:postgresql://localhost:5432/cs179g')\
        .option('dbtable', "flight_data")\
        .option("user", "group9") \
        .option("password", "group9") \
        .option("driver", "org.postgresql.Driver") \
        .mode('overwrite')\
        .save()

22/11/27 02:12:15 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


                                                                                

+-------------+-------+---------------+-----------------+--------------------+----------------------+--------------------+-------+----------+--------+--------+
|Flight Number|Airline|Arrival Airport|Departure Airport|      Departure Time|Scheduled Arrival Time| Actual Arrival Time|Delayed|Passengers|Distance|ItinFare|
+-------------+-------+---------------+-----------------+--------------------+----------------------+--------------------+-------+----------+--------+--------+
|      UAL1722|    UAL|            PHL|              SFO|2022-10-22T06:52:26Z|  2022-10-22T11:29:42Z|2022-10-22T11:13:58Z|  false|       1.0|  2521.0|   221.0|
|      DAL1198|    DAL|            ATL|              SAN|2022-10-22T06:04:55Z|  2022-10-22T09:17:59Z|2022-10-22T09:00:43Z|  false|       1.0|  1892.0|   774.0|
|      UAL1126|    UAL|            ORD|              PHX|2022-10-22T07:16:00Z|  2022-10-22T09:50:47Z|2022-10-22T09:32:48Z|  false|       3.0|  1440.0|   438.0|
|      UAL1200|    UAL|            IAD| 

                                                                                

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row

spark = SparkSession.builder.master("local[1]").appName("Test").getOrCreate()
sc = spark.sparkContext

def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

flight_data = spark.read.format("json").options(inferschema='true',header='true').load('../part1/flights/flights_000.jsonl')


In [44]:
from pyspark.sql.functions import col,lit, when
from pyspark.sql.types import BooleanType
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

# has_column(flight_data, "fdm:trackInformation.nxcm:airlineData")

if has_column(flight_data, "*.nxcm:qualifiedAircraftId.nxce:igtd"):
# if has_column(flight_data, "fdm:trackInformation.nxcm:airlineData.nxcm:flightTimeData.@originalArrival"):
    df_basicInfo = flight_data.withColumn("ResponseType", col("*.nxcm:qualifiedAircraftId.nxce:igtd"))
    # df_basicInfo = flight_data.withColumn("ResponseType", col("fdm:trackInformation.nxcm:airlineData.nxcm:flightTimeData.@originalArrival"))
else:
    # Adjust types according to your needs
    df_basicInfo = flight_data.withColumn("ResponseType", lit(None).cast("string"))
df_basicInfo.filter(col("ResponseType").isNotNull()).show()


+-----+--------+--------+--------+--------+----------+----------+--------+------------+---------------+----------------+----------------------+--------------------------+------------------------+----------------------------------+--------------------------+-------------------------+--------------------+--------------------+-------------------+------------------------------+---------------------+-------------------+-----------------+--------------------+------------+
|@acid|@airline|@arrArpt|@cdmPart|@depArpt|@fdTrigger|@flightRef|@msgType|@sensitivity|@sourceFacility|@sourceTimeStamp|fdm:arrivalInformation|fdm:boundaryCrossingUpdate|fdm:departureInformation|fdm:flightPlanAmendmentInformation|fdm:flightPlanCancellation|fdm:flightPlanInformation|fdm:ncsmFlightCreate|fdm:ncsmFlightModify|fdm:ncsmFlightRoute|fdm:ncsmFlightScheduleActivate|fdm:ncsmFlightSectors|fdm:ncsmFlightTimes|fdm:oceanicReport|fdm:trackInformation|ResponseType|
+-----+--------+--------+--------+--------+----------+----

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.master("local[1]").appName("PSQL Write Test")\
    .config("spark.driver.extraClassPath", "/home/ubuntu/postgresql-42.5.0.jar")\
    .getOrCreate()

data_path = "./temp.csv"
data = spark.read.csv(data_path, header=False, inferSchema=True).limit(5)
new_column_names = ["Flight Number", "Airline", "Arrival Airport", "Departure Airport", "Departure Time", "Estimated Arrival Time", "Actual Arrival Time", "Delayed", "Passengers", "Distance", "ItinFare"]
data = data.toDF(*new_column_names)
print(data.count())
data.show(10)

data.write\
	.format('jdbc')\
	.option('url', 'jdbc:postgresql://localhost:5432/cs179g')\
	.option('dbtable', "students")\
	.option("user", "group9") \
	.option("password", "group9") \
	.option("driver", "org.postgresql.Driver") \
	.mode('overwrite')\
	.save()

# df = spark.read \
#     .format("jdbc") \
#     .option("url", url) \
#     .option("dbtable", "students") \
#     .option("user", "group9") \
#     .option("password", "group9") \
#     .option("driver", "org.postgresql.Driver") \
#     .load()
# df.show()


5
+-------------+-------+---------------+-----------------+-------------------+----------------------+-------------------+-------+----------+--------+--------+
|Flight Number|Airline|Arrival Airport|Departure Airport|     Departure Time|Estimated Arrival Time|Actual Arrival Time|Delayed|Passengers|Distance|ItinFare|
+-------------+-------+---------------+-----------------+-------------------+----------------------+-------------------+-------+----------+--------+--------+
|      UAL1722|    UAL|            PHL|              SFO|2022-10-22 06:52:26|   2022-10-22 11:29:42|2022-10-22 11:13:58|  false|       1.0|  2521.0|   221.0|
|      DAL1198|    DAL|            ATL|              SAN|2022-10-22 06:04:55|   2022-10-22 09:17:59|2022-10-22 09:00:43|  false|       1.0|  1892.0|   774.0|
|      UAL1126|    UAL|            ORD|              PHX|2022-10-22 07:16:00|   2022-10-22 09:50:47|2022-10-22 09:32:48|  false|       3.0|  1440.0|   438.0|
|      UAL1200|    UAL|            IAD|           