In [1]:
import os
import sys
import pandas as pd
import numpy as np
sys.path.append(os.getcwd() + '/Modules/')
from utc_converter import utc_converter
from metar_decoder import metar_decoder
pd.set_option('display.max_columns', None)

#### Data preprocessing
---

In [3]:
# load data frame
fw = pd.read_csv("..//Datasets/NEW/FW_with_airports.csv")

In [4]:
fw.head()

Unnamed: 0,tail_number,date,aircraft,origin_code,origin,destination_code,destination,departure,arrival,duration,distance_mi,Owner,origin_Latitude,origin_Longitude,destination_Latitude,destination_Longitude,airport_origin,city_origin,origin_state,country_origin,airport_destination,city_destination,destination_state,country_destination,departure_shift,arrival_shift
0,N100KB,2021-02-20,BE9L,US-0571,Williston Basin International Airport (KXWA),KBIS,Bismarck Muni (KBIS),04:57PM CST,05:44PM CST,0:46,174.005874,"EXECUTIVE AIR TAXI CORPBISMARCK, ND, US(Corpor...",48.258387,-103.748797,46.772701,-100.746002,Williston Basin International Airport,Williston,ND,US,Bismarck Municipal Airport,Bismarck,ND,US,day,day
1,N100KB,2021-02-20,BE9L,KBIS,Bismarck Muni (KBIS),US-0571,Williston Basin International Airport (KXWA),01:36PM CST,02:27PM CST,0:51,174.005874,"EXECUTIVE AIR TAXI CORPBISMARCK, ND, US(Corpor...",46.772701,-100.746002,48.258387,-103.748797,Bismarck Municipal Airport,Bismarck,ND,US,Williston Basin International Airport,Williston,ND,US,day,day
2,N100KB,2021-02-18,BE9L,KMOT,Minot Intl (KMOT),KBIS,Bismarck Muni (KBIS),08:20AM CST,08:53AM CST,0:32,105.704153,"EXECUTIVE AIR TAXI CORPBISMARCK, ND, US(Corpor...",48.259399,-101.279999,46.772701,-100.746002,Minot International Airport,Minot,ND,US,Bismarck Municipal Airport,Bismarck,ND,US,day,day
3,N100KB,2021-02-15,BE9L,US-0571,Williston Basin International Airport (KXWA),KMOT,Minot Intl (KMOT),02:15PM CST,02:49PM CST,0:33,113.89946,"EXECUTIVE AIR TAXI CORPBISMARCK, ND, US(Corpor...",48.258387,-103.748797,48.259399,-101.279999,Williston Basin International Airport,Williston,ND,US,Minot International Airport,Minot,ND,US,day,day
4,N100KB,2021-02-15,BE9L,KMOT,Minot Intl (KMOT),US-0571,Williston Basin International Airport (KXWA),08:00AM CST,08:32AM CST,0:32,113.89946,"EXECUTIVE AIR TAXI CORPBISMARCK, ND, US(Corpor...",48.259399,-101.279999,48.258387,-103.748797,Minot International Airport,Minot,ND,US,Williston Basin International Airport,Williston,ND,US,day,day


In [5]:
# convert date to datetime
fw.date = pd.to_datetime(fw.date, format='%Y-%m-%d')

In [6]:
# split ICAO_code
fw['ICAO_origin'] = fw.origin.str.rsplit("(", n = 1, expand = True)[1].str.split(")", n = 1, expand = True)[0]
fw['ICAO_destination'] = fw.destination.str.rsplit("(", n = 1, expand = True)[1].str.split(")", n = 1, expand = True)[0]

In [7]:
# Split dual ICAO and take the last one
fw.loc[fw.ICAO_origin.str.contains(' / ', na=False), 'ICAO_origin'] = fw.loc[fw.ICAO_origin.str.contains(' / ', na=False), 'ICAO_origin']\
                                                                        .str.split(" / ", expand=True, n=1)[1]

fw.loc[fw.ICAO_destination.str.contains(' / ', na=False), 'ICAO_destination'] = fw.loc[fw.ICAO_destination.str.contains(' / ', na=False), 'ICAO_destination']\
                                                                                  .str.split(" / ", expand=True, n=1)[1]

In [8]:
# replace null ICAO code by airport code
fw.loc[fw.ICAO_origin.isna(), 'ICAO_origin'] = fw.loc[fw.ICAO_origin.isna(), 'origin_code'] 
fw.loc[fw.ICAO_destination.isna(), 'ICAO_destination'] = fw.loc[fw.ICAO_destination.isna(), 'destination_code']

In [9]:
# Transform aita code to icao
fw.loc[~fw.ICAO_origin.str.startswith('K', na=False), 'ICAO_origin'] = 'K' + fw.loc[~fw.ICAO_origin.str.startswith('K', na=False), 'ICAO_origin']
fw.loc[~fw.ICAO_destination.str.startswith('K', na=False), 'ICAO_destination'] = 'K' + fw.loc[~fw.ICAO_destination.str.startswith('K', na=False), 'ICAO_destination']

In [10]:
# add time with UTC format 
fw = utc_converter(fw)

In [11]:
# extract hour from dep_UTC_time and assign a new variable
fw['dep_UTC_hour'] = pd.to_datetime(fw.dep_UTC_time.astype('str'), format='%H:%M:%S', errors='coerce').dt.hour.astype('str')
fw['arr_UTC_hour'] = pd.to_datetime(fw.arr_UTC_time.astype('str'), format='%H:%M:%S', errors='coerce').dt.hour.astype('str')

In [12]:
# sort by 'tail_number', 'date', 'dep_UTC_hour'
fw = fw.sort_values(['tail_number', 'date', 'dep_UTC_hour']).reset_index(drop=True)

In [18]:
# inspect data
fw[['ICAO_origin', 'dep_UTC_hour', 'ICAO_destination', 'arr_UTC_hour']]

Unnamed: 0,ICAO_origin,dep_UTC_hour,ICAO_destination,arr_UTC_hour
0,KSTP,1.0,KMOT,3.0
1,KMOT,3.0,KFSD,5.0
2,KRST,0.0,KMOT,3.0
3,KMOT,11.0,KDVL,12.0
4,KDVL,12.0,KMOT,13.0
...,...,...,...,...
21954,KFSD,20.0,KPIR,20.0
21955,KSTP,6.0,KPIR,7.0
21956,KCDW,18.0,K12N,19.0
21957,KFWN,19.0,KCDW,20.0


In [19]:
fw[['ICAO_origin', 'dep_UTC_hour', 'ICAO_destination', 'arr_UTC_hour']].isna().sum()

ICAO_origin         15
dep_UTC_hour         0
ICAO_destination    33
arr_UTC_hour         0
dtype: int64

In [21]:
# Initialize SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [36]:
# convert date to string
fw.date = fw.date.astype('str')

In [37]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.createDataFrame(fw)

  Unsupported type in conversion from Arrow: time64[us]
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


TypeError: field aircraft: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

In [28]:
# Loading data from Parquet
#air.write.parquet("metar_parquet")
metar_parquet = spark.read.parquet("..//Datasets/NEW/METAR/OLD/metar_parquet")
metar_parquet.createOrReplaceTempView("metar_parquet")

fw.write.parquet("fw_parquet")
fw_parquet = spark.read.parquet("..//Datasets/NEW/METAR/OLD/fw_parquet")
fw_parquet.createOrReplaceTempView("fw_parquet")

AttributeError: 'DataFrame' object has no attribute 'write'

In [42]:
%%time
# Select Count of data
count = spark.sql("SELECT * FROM metar_parquet")
count.show()

+-------+----------------+--------+--------+-----+-----+------+------+-----+----+-----+----+-----+----+-----+-----+-----+-----+-------+-------+-----+-----+-------+-----------------+-----------------+-----------------+--------------+--------------+--------------+-----+--------------------+----------+--------+
|station|           valid|     lon|     lat| tmpf| dwpf|  relh|  drct| sknt|p01i| alti|mslp| vsby|gust|skyc1|skyc2|skyc3|skyc4|  skyl1|  skyl2|skyl3|skyl4|wxcodes|ice_accretion_1hr|ice_accretion_3hr|ice_accretion_6hr|peak_wind_gust|peak_wind_drct|peak_wind_time| feel|               metar|      date|    time|
+-------+----------------+--------+--------+-----+-----+------+------+-----+----+-----+----+-----+----+-----+-----+-----+-----+-------+-------+-----+-----+-------+-----------------+-----------------+-----------------+--------------+--------------+--------------+-----+--------------------+----------+--------+
|    GHB|2020-12-18 00:00|  -91.99|   27.84|53.60|42.80| 66.70| 40.00|

In [27]:
%%time
# Select Count of data
count = spark.sql("SELECT count(*) FROM fw_parquet")
count.show()

+--------+
|count(1)|
+--------+
|   11012|
+--------+

CPU times: user 1.03 ms, sys: 972 µs, total: 2 ms
Wall time: 132 ms
