In [1]:
"""
This document shows the process of transforming the on time performance dataset into the dataset that can be used for 
machine learning. Below the final write are some early experiments in processing the data. 

"""

In [None]:
import findspark
findspark.init('/opt/spark')

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
spark = SparkSession.builder.appName('OnTimePerformance_byriel').getOrCreate()

In [3]:
data = spark.read.csv('On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2020_2.csv', header=True, inferSchema=True)

In [4]:
#Change year and month to match that of the dataset
year = 2020
month = 2

In [5]:
data = spark.read.csv('json_2020_5.csv/part-00000-94140472-3dda-4b78-8d0f-d4f4e43cd44c-c000.csv', header=True, inferSchema=True)

In [5]:
trim = data.select(['Year',
 'Month',
 'DayofMonth',
 'Dest',
 'ArrTime',
 'ArrDelayMinutes',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'WeatherDelay'])

In [8]:
data.columns

['Year',
 'Month',
 'DayofMonth',
 'Dest',
 'ArrTime',
 'ArrDelayMinutes',
 'Diverted',
 'Cancelled',
 'WeatherDelay']

In [6]:
trim = trim.na.fill(0)
trim = trim.filter((trim['CancellationCode'] == 'B') | (trim['Cancelled'] == 0))

In [7]:
trim = trim.select(['Year',
 'Month',
 'DayofMonth',
 'Dest',
 'ArrTime',
 'ArrDelayMinutes',
 'Diverted',
 'Cancelled',
 'WeatherDelay'])
#trim.show()

In [9]:
writePath = 'trimmed_'+str(year)+ '_'+str(month)+'.csv'
trim.write.csv(writePath, mode='overwrite', header=True, encoding='utf-8')

In [None]:
#Reload kernel and start from here after previous write
import findspark
findspark.init('/opt/spark')

from pyspark.sql import SparkSession
from pyspark import SparkFiles
spark = SparkSession.builder.appName('OnTimePerformance_byriel').getOrCreate()

#Change year and month to match that of the dataset
year = 2020
month = 2

trim = spark.read.csv('trimmed_2020_2.csv/part-00000-43263bac-9303-4d61-a41d-f95d3fb0cf4a-c000.csv', header=True, inferSchema=True)

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, TimestampType, StringType
import datetime

In [9]:
def makeTimestamp(year, month, day, time):
    #Takes the time and date given by the dataset and converts it to UTC timestamp (denoted as 'Z' in aviation)
    div = time//100
    mod = time % 100
    if div > 23 or div < 0 or mod > 59 or mod < 0:
        return None
    return  datetime.datetime.strptime('{}-{}-{} {}:{}'.format(year, month, day, div, mod), '%Y-%m-%d %H:%M')

In [10]:
makeTimestampUDF = udf(lambda year, month, day, time: makeTimestamp(year, month, day, time), TimestampType())

In [11]:
df = trim.withColumn('timestamp', makeTimestampUDF(trim['Year'], trim['Month'], trim['DayOfMonth'], trim['ArrTime']))

In [12]:
from timezonefinder import TimezoneFinder
from dateutil import tz
from datetime import timedelta

In [13]:
from importlib import reload
import iem_wx_scraper
reload(iem_wx_scraper)
from iem_wx_scraper import getWX, getMonthlyWX, getFullWX

In [14]:
tf = TimezoneFinder()

In [15]:
from pyspark.sql import functions as F

In [17]:
airports = [x['Dest'] for x in df.groupBy('Dest').count().collect()]

In [None]:
import json

In [21]:
allWeather = {}
try:
    with open('wx{}_{}.json'.format(year, month), 'r') as fp:
        allWeather = json.load(fp)
except FileNotFoundError:
    pass

In [19]:
#Populates the weather dictionary

for airport in airports:

    if airport not in allWeather:
        wx = getMonthlyWX(airport, datetime.datetime(year, month, 1))
        reports = []
        #Create a list of dictionaries
        for i in wx[1:]:
            reports.append({})
            for j in range(len(wx[0])):
                reports[-1][wx[0][j]] = i[j]

        #convert the time to datetime objects
        for i in reports:
            i['valid'] = datetime.datetime.strptime(i['valid'], '%Y-%m-%d %H:%M')

        #Get timezone of airport
        tf = TimezoneFinder()
        if reports == []:
            continue
        timezone = tf.timezone_at(lng=float(reports[0]['lon']), lat=float(reports[0]['lat']))   

        #convert weather rport (UTC) to local time
        from_zone = tz.gettz('UTC')
        to_zone = tz.gettz(timezone)

        for i in reports:
            #Give timezone info
            i['valid'] = i['valid'].replace(tzinfo=from_zone)
            #Convert time based on timezone
            i['valid'] = i['valid'].astimezone(to_zone)
            #Remove timezone info
            i['valid'] = i['valid'].replace(tzinfo=None)
        for i in reports:
            for entry in i:
                if i[entry] == 'M':
                    i[entry] = 0
        for i in range(len(reports)):
            reports[i] = {
                'temp': float(reports[i]['tmpf']),
                'dewpoint':float(reports[i]['dwpf']),
                'wind':float(reports[i]['sknt']),
                'precip':float(reports[i]['p01i']),
                'alti':float(reports[i]['alti']),
                'vis':float(reports[i]['vsby']),
                'cloudCoverage':reports[i]['skyc1'],
                'cloudAlt':float(reports[i]['skyl1']),
                'weather':reports[i]['wxcodes'],
                'ice':float(reports[i]['ice_accretion_1hr']),
                'time':reports[i]['valid'].strftime("%Y-%m-%d %H:%M:%S")

            }
        allWeather[airport] = reports 
        #outDict = {airport: reports}

Downloading: PSE Downloading: MSY Downloading: DRT Downloading: GEG Downloading: BUR Downloading: GRB Downloading: GRR Downloading: JLN Downloading: PVD Downloading: GSO Downloading: MYR Downloading: OAK Downloading: FAR Downloading: MQT Downloading: MSN Downloading: FSM Downloading: DCA Downloading: CID Downloading: MLU Downloading: SWO Downloading: LEX Downloading: ORF Downloading: SCE Downloading: EVV Downloading: CWA Downloading: CRW Downloading: SAV Downloading: GCK Downloading: TRI Downloading: CMH Downloading: TYR Downloading: CAK Downloading: CHO Downloading: MOB Downloading: PNS Downloading: IAH Downloading: SHV Downloading: CVG Downloading: TOL Downloading: SJC Downloading: LGA Downloading: TLH Downloading: BUF Downloading: ACT Downloading: HPN Downloading: MLI Downloading: AUS Downloading: SJU Downloading: ATW Downloading: LYH Downloading: GJT Downloading: AVL Downloading: LGB Downloading: BFL Downloading: SRQ Downloading: RNO Downloading: EYW Downloading: SBN Downloading: R

In [20]:
#Output the weather dictionary as a .json file
import json
with open('wx{}_{}.json'.format(year, month), 'w') as fp:
    for airport in allWeather:
        for report in allWeather[airport]:
            if type(report['time']) != str:
                report['time'] = report['time'].strftime("%Y-%m-%d %H:%M:%S")
    json.dump(allWeather, fp)

In [21]:
def getClosestWXCached(airport, time):
    if airport not in allWeather:
        return None
    
    reports = allWeather[airport]
    
    if reports == []:
        return None
    
    #Convert any string times to timestamp objects
    for i in reports:
        if type(i['time']) == str:
            i['time'] = datetime.datetime.strptime(i['time'], '%Y-%m-%d %H:%M:%S')
    
    #Get the closest report that happened before the timestamp
    reports = [x for x in reports if x['time'] < time]
    current = max(reports, key = lambda wx: wx['time'])
    
    #Fill null values given as 'M' with 0
    for label in current:
        if current[label] == 'M':
            current[label] = 0
    #print(current)
    
    outDict = {}
    #Remove the timestmap object before returning it to spark
    for label in current:
        if label != 'time':
            outDict[label] = current[label]
    return json.dumps(outDict)
    
    #return (colTuple)


In [22]:
#Testing timestamp caching

timestamp = makeTimestamp(2020, 5, 1, 1149)
print('hi')
print(getClosestWXCached('DEN', timestamp))
#print(schema)
print(getClosestWXCached('ABE', timestamp))
print(getClosestWXCached('MCO', timestamp))

hi
None
{'temp': 0.0, 'dewpoint': 0.0, 'wind': 4.0, 'precip': 0.0, 'alti': 30.14, 'vis': 10.0, 'cloudCoverage': 'CLR', 'cloudAlt': 0.0, 'weather': 0, 'ice': 0.0, 'time': datetime.datetime(2020, 3, 1, 18, 55)}
{"temp": 0.0, "dewpoint": 0.0, "wind": 4.0, "precip": 0.0, "alti": 30.14, "vis": 10.0, "cloudCoverage": "CLR", "cloudAlt": 0.0, "weather": 0, "ice": 0.0}
{'temp': 0.0, 'dewpoint': 0.0, 'wind': 8.0, 'precip': 0.0, 'alti': 30.27, 'vis': 10.0, 'cloudCoverage': 'CLR', 'cloudAlt': 0.0, 'weather': 0, 'ice': 0.0, 'time': datetime.datetime(2020, 3, 1, 18, 55)}
{"temp": 0.0, "dewpoint": 0.0, "wind": 8.0, "precip": 0.0, "alti": 30.27, "vis": 10.0, "cloudCoverage": "CLR", "cloudAlt": 0.0, "weather": 0, "ice": 0.0}


In [6]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, ArrayType, DoubleType
#The schema of the JSON column
schema = StructType([
    StructField('temp',DoubleType()),
    StructField('dewpoint',DoubleType()),
    StructField('wind',DoubleType()),
    StructField('precip',DoubleType()),
    StructField('alti',DoubleType()),
    StructField('vis',DoubleType()),
    StructField('cloudCoverage',StringType()),
    StructField('cloudAlt',DoubleType()),
    StructField('weather',StringType()),
    StructField('ice',DoubleType())
])

In [24]:
wxUDF = udf(lambda airport, timestamp: getClosestWXCached(airport, timestamp), StringType())#StructType(schema))

In [89]:
df.show(500)

+----+-----+----------+----+-------+---------------+--------+---------+------------+-------------------+
|Year|Month|DayofMonth|Dest|ArrTime|ArrDelayMinutes|Diverted|Cancelled|WeatherDelay|          timestamp|
+----+-----+----------+----+-------+---------------+--------+---------+------------+-------------------+
|2020|    5|         1| DEN|   1103|            0.0|     0.0|      0.0|         0.0|2020-05-01 11:03:00|
|2020|    5|         2| DEN|   1108|            0.0|     0.0|      0.0|         0.0|2020-05-02 11:08:00|
|2020|    5|         4| DEN|   1100|            0.0|     0.0|      0.0|         0.0|2020-05-04 11:00:00|
|2020|    5|         6| DEN|   1101|            0.0|     0.0|      0.0|         0.0|2020-05-06 11:01:00|
|2020|    5|         8| DEN|   1103|            0.0|     0.0|      0.0|         0.0|2020-05-08 11:03:00|
|2020|    5|        11| DEN|   1110|            0.0|     0.0|      0.0|         0.0|2020-05-11 11:10:00|
|2020|    5|        13| DEN|   1105|            0.0|   

In [25]:
json_df = df.na.drop().withColumn('wx', wxUDF(df['Dest'], df['timestamp']))

In [44]:
json_df.show(truncate=False)

+----+-----+----------+----+-------+---------------+--------+---------+------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Year|Month|DayofMonth|Dest|ArrTime|ArrDelayMinutes|Diverted|Cancelled|WeatherDelay|timestamp          |wx                                                                                                                                                                  |
+----+-----+----------+----+-------+---------------+--------+---------+------------+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2020|5    |1         |MCO |1149   |0.0            |0.0     |0.0      |0.0         |2020-05-01 11:49:00|{"temp": 0.0, "dewpoint": 0.0, "wind": 8.0, "precip": 0.0, "alti": 30.08, "vis": 10.0,

In [51]:
json_df.filter(json_df['wx'].isNull()).show()

+----+-----+----------+----+-------+---------------+--------+---------+------------+-------------------+----+
|Year|Month|DayofMonth|Dest|ArrTime|ArrDelayMinutes|Diverted|Cancelled|WeatherDelay|          timestamp|  wx|
+----+-----+----------+----+-------+---------------+--------+---------+------------+-------------------+----+
|2020|    5|         1| SJU|   1244|            0.0|     0.0|      0.0|         0.0|2020-05-01 12:44:00|null|
|2020|    5|         4| SJU|   1408|           62.0|     0.0|      0.0|         0.0|2020-05-04 14:08:00|null|
|2020|    5|         7| SJU|   1257|            0.0|     0.0|      0.0|         0.0|2020-05-07 12:57:00|null|
|2020|    5|         8| SJU|   1249|            0.0|     0.0|      0.0|         0.0|2020-05-08 12:49:00|null|
|2020|    5|        11| SJU|   1252|            0.0|     0.0|      0.0|         0.0|2020-05-11 12:52:00|null|
|2020|    5|        14| SJU|   1249|            0.0|     0.0|      0.0|         0.0|2020-05-14 12:49:00|null|
|2020|    

In [50]:
#This was an attempt that failed

'''from pyspark.sql.functions import flatten
addCols = ['temp', 'dewpoint', 'wind', 'precip', 'alti', 'vis', 'cloudCoverage', 'cloudAlt', 'weather', 'ice']
json_flat = json_df.na.drop()
json_flat.show()
for i in addCols:
    json_flat = json_flat.withColumn(i, json_df['wx'][i])
    json_flat.show()
#json_flat = json_df
'''

+----+-----+----------+----+-------+---------------+--------+---------+------------+-------------------+--------------------+
|Year|Month|DayofMonth|Dest|ArrTime|ArrDelayMinutes|Diverted|Cancelled|WeatherDelay|          timestamp|                  wx|
+----+-----+----------+----+-------+---------------+--------+---------+------------+-------------------+--------------------+
|2020|    5|         1| MCO|   1149|            0.0|     0.0|      0.0|         0.0|2020-05-01 11:49:00|{"temp": 0.0, "de...|
|2020|    5|         4| MCO|   1157|            1.0|     0.0|      0.0|         0.0|2020-05-04 11:57:00|{"temp": 0.0, "de...|
|2020|    5|         6| MCO|   1203|            7.0|     0.0|      0.0|         0.0|2020-05-06 12:03:00|{"temp": 0.0, "de...|
|2020|    5|         8| MCO|   1157|            1.0|     0.0|      0.0|         0.0|2020-05-08 11:57:00|{"temp": 0.0, "de...|
|2020|    5|        11| MCO|   1156|            0.0|     0.0|      0.0|         0.0|2020-05-11 11:56:00|{"temp": 0.0, 

AnalysisException: Can't extract value from wx#658: need struct type but got string;

In [26]:
json_df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- Dest: string (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- wx: string (nullable = true)



In [27]:
writePath = 'json_'+str(year)+ '_'+str(month)+'.csv'
json_df.write.csv(writePath, mode='overwrite', header=True, encoding='utf-8')

In [3]:
#Reload kernel and start from here after previous write
import findspark
findspark.init('/opt/spark')

from pyspark.sql import SparkSession
from pyspark import SparkFiles
spark = SparkSession.builder.appName('OnTimePerformance_byriel').getOrCreate()

#Change year and month to match that of the dataset
year = 2020
month = 2

json_df = spark.read.csv('json_2020_2.csv/part-00000-aaebefda-670e-429d-9da8-ad0c8f5de0df-c000.csv', header=True, inferSchema=True)

In [7]:
from pyspark.sql.functions import from_json

#from_json call
json_df = json_df.select('Dest', 'ArrDelayMinutes', 'Diverted', 'Cancelled', 'WeatherDelay', 'timestamp', 
                                     from_json(json_df['wx'], schema).alias('json'))

json_df.show()

+----+---------------+--------+---------+------------+-------------------+--------------------+
|Dest|ArrDelayMinutes|Diverted|Cancelled|WeatherDelay|          timestamp|                json|
+----+---------------+--------+---------+------------+-------------------+--------------------+
| LYH|            0.0|     0.0|      0.0|         0.0|2020-02-01 15:31:00|[0.0, 0.0, 2.0, 0...|
| LYH|            9.0|     0.0|      0.0|         0.0|2020-02-08 15:48:00|[0.0, 0.0, 8.0, 0...|
| SHV|           18.0|     0.0|      0.0|         0.0|2020-02-13 23:56:00|[0.0, 0.0, 5.0, 0...|
| SHV|            0.0|     0.0|      0.0|         0.0|2020-02-14 23:21:00|[0.0, 0.0, 5.0, 0...|
| SHV|           10.0|     0.0|      0.0|         0.0|2020-02-15 23:48:00|[0.0, 0.0, 4.0, 0...|
| SHV|            0.0|     0.0|      0.0|         0.0|2020-02-16 23:37:00|[0.0, 0.0, 5.0, 0...|
| SHV|            0.0|     0.0|      0.0|         0.0|2020-02-17 23:29:00|[0.0, 0.0, 7.0, 0...|
| SHV|            0.0|     0.0|      0.0

In [69]:
json_df.printSchema()

root
 |-- Dest: string (nullable = true)
 |-- ArrDelayMinutes: double (nullable = false)
 |-- Diverted: double (nullable = false)
 |-- Cancelled: double (nullable = false)
 |-- WeatherDelay: double (nullable = false)
 |-- timestamp: timestamp (nullable = true)
 |-- json: struct (nullable = true)
 |    |-- temp: double (nullable = true)
 |    |-- dewpoint: double (nullable = true)
 |    |-- wind: double (nullable = true)
 |    |-- precip: double (nullable = true)
 |    |-- alti: double (nullable = true)
 |    |-- vis: double (nullable = true)
 |    |-- cloudCoverage: string (nullable = true)
 |    |-- cloudAlt: double (nullable = true)
 |    |-- weather: string (nullable = true)
 |    |-- ice: double (nullable = true)



In [67]:
#json_df_expanded.select('json.temp', 'json.dewpoint', 'json.wind', 'json.precip', 'json.alti', 'json.vis', 'json.cloudCoverage', 'json.cloudAlt', 'json.weather', 'json.ice').show()

In [8]:
#Expand json column into individual columns
json_flat = json_df.select('Dest', 'ArrDelayMinutes', 'Diverted', 'Cancelled', 'WeatherDelay', 'json.*')

In [9]:
json_flat.printSchema()

root
 |-- Dest: string (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- dewpoint: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- alti: double (nullable = true)
 |-- vis: double (nullable = true)
 |-- cloudCoverage: string (nullable = true)
 |-- cloudAlt: double (nullable = true)
 |-- weather: string (nullable = true)
 |-- ice: double (nullable = true)



In [14]:
json_flat.show()

+----+---------------+--------+---------+------------+----+--------+----+------+-----+----+-------------+--------+-------+---+
|Dest|ArrDelayMinutes|Diverted|Cancelled|WeatherDelay|temp|dewpoint|wind|precip| alti| vis|cloudCoverage|cloudAlt|weather|ice|
+----+---------------+--------+---------+------------+----+--------+----+------+-----+----+-------------+--------+-------+---+
| MCO|            0.0|     0.0|      0.0|         0.0| 0.0|     0.0| 8.0|   0.0|30.08|10.0|          CLR|     0.0|      0|0.0|
| MCO|            1.0|     0.0|      0.0|         0.0| 0.0|     0.0| 4.0|   0.0|30.04|10.0|          CLR|     0.0|      0|0.0|
| MCO|            7.0|     0.0|      0.0|         0.0| 0.0|     0.0|11.0|   0.0|29.94|10.0|          SCT|  4500.0|      0|0.0|
| MCO|            1.0|     0.0|      0.0|         0.0| 0.0|     0.0|11.0|   0.0|30.09|10.0|          CLR|     0.0|      0|0.0|
| MCO|            0.0|     0.0|      0.0|         0.0| 0.0|     0.0|11.0|   0.0| 30.1|10.0|          CLR|     0

In [10]:
json_flat = json_flat.select(['Year',
 'Month',
 'DayofMonth',
 'Dest',
 'ArrTime',
 'ArrDelayMinutes',
 'Diverted',
 'Cancelled',
 'WeatherDelay',
 'temp',
 'dewpoint',
 'wind',
 'precip',
 'alti',
 'vis',
 'cloudCoverage',
 'cloudAlt',
 'weather',
 'ice']
)


AnalysisException: cannot resolve '`Year`' given input columns: [ArrDelayMinutes, Cancelled, Dest, Diverted, WeatherDelay, alti, cloudAlt, cloudCoverage, dewpoint, ice, precip, temp, vis, weather, wind];;
'Project ['Year, 'Month, 'DayofMonth, Dest#19, 'ArrTime, ArrDelayMinutes#21, Diverted#22, Cancelled#23, WeatherDelay#24, temp#82, dewpoint#83, wind#84, precip#85, alti#86, vis#87, cloudCoverage#88, cloudAlt#89, weather#90, ice#91]
+- Project [Dest#19, ArrDelayMinutes#21, Diverted#22, Cancelled#23, WeatherDelay#24, json#38.temp AS temp#82, json#38.dewpoint AS dewpoint#83, json#38.wind AS wind#84, json#38.precip AS precip#85, json#38.alti AS alti#86, json#38.vis AS vis#87, json#38.cloudCoverage AS cloudCoverage#88, json#38.cloudAlt AS cloudAlt#89, json#38.weather AS weather#90, json#38.ice AS ice#91]
   +- Project [Dest#19, ArrDelayMinutes#21, Diverted#22, Cancelled#23, WeatherDelay#24, timestamp#25, from_json(StructField(temp,DoubleType,true), StructField(dewpoint,DoubleType,true), StructField(wind,DoubleType,true), StructField(precip,DoubleType,true), StructField(alti,DoubleType,true), StructField(vis,DoubleType,true), StructField(cloudCoverage,StringType,true), StructField(cloudAlt,DoubleType,true), StructField(weather,StringType,true), StructField(ice,DoubleType,true), wx#26, Some(America/New_York)) AS json#38]
      +- Relation[Year#16,Month#17,DayofMonth#18,Dest#19,ArrTime#20,ArrDelayMinutes#21,Diverted#22,Cancelled#23,WeatherDelay#24,timestamp#25,wx#26] csv


In [11]:
json_flat = json_flat.na.drop()

In [74]:
json_flat.printSchema()

root
 |-- Dest: string (nullable = true)
 |-- ArrDelayMinutes: double (nullable = false)
 |-- Diverted: double (nullable = false)
 |-- Cancelled: double (nullable = false)
 |-- WeatherDelay: double (nullable = false)
 |-- temp: double (nullable = true)
 |-- dewpoint: double (nullable = true)
 |-- wind: double (nullable = true)
 |-- precip: double (nullable = true)
 |-- alti: double (nullable = true)
 |-- vis: double (nullable = true)
 |-- cloudCoverage: string (nullable = true)
 |-- cloudAlt: double (nullable = true)
 |-- weather: string (nullable = true)
 |-- ice: double (nullable = true)



In [13]:
#Final output

writePath = 'final_'+str(year)+ '_'+str(month)+'.csv'
json_flat.write.csv(writePath, mode='overwrite', header=True)