# Weather Data - Pre-Processing

Objective: Pre-Process Weather Data For Joining with Flights Data  

Steps:  
A) Select Columns  
Keep only useful columns
columns_kept = [DATE, LATITUDE, LONGITUDE, ELEVATION, REPORT_TYPE, CALL_SIGN, WND, CIG, VIS, TMP, DEW, SLP]

B) Filter Weather Data  
To only 368 distinct airports (from cleaned flight data). Filter by inner joining distinct airports table with weather's CALL_SIGN.

C) Parse certain columns  
Specifically, ['WND', 'CIG', 'VIS', 'TMP', 'DEW', 'SLP']. Parse to many columns to fix format of raw data.

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, date_trunc, col
from datetime import datetime, timedelta
from pyspark.sql.types import TimestampType

sqlContext = SQLContext(sc)
# display(dbutils.fs.ls("dbfs:/mnt/mids-w261/datasets_final_project/"))

In [0]:
#Raw Weather From Database
weather = spark.read.option("header", "true").parquet(f"dbfs:/mnt/mids-w261/datasets_final_project/weather_data/*.parquet")
# weather_single = spark.read.option("header", "true").parquet(f"dbfs:/mnt/mids-w261/datasets_final_project/weather_data_single/*.parquet")
print("Weather Shape       : %s, %s" %(weather.count(), len(weather.columns)))
# print("Weather_Single Shape: %s, %s" %(weather_single.count(), len(weather_single.columns)))

# A) Select Columns 
Used For Modeling or PreProcessing  
Keep only useful columns  
columns_kept = [DATE, LATITUDE, LONGITUDE, ELEVATION, REPORT_TYPE, CALL_SIGN, WND, CIG, VIS, TMP, DEW, SLP]

In [0]:
#SELECTING FEATURES FOR MODELING

#Add column for truncated DATE by Hour (to join with flight data)
weather_hr = weather.withColumn("DATE_HR", date_trunc(timestamp='DATE',format='hour'))
weather_hr.registerTempTable('weather_sql')

#Keep Features Used for Modeling
weather_filter=spark.sql("""SELECT DATE, DATE_HR, LATITUDE, LONGITUDE, ELEVATION, trim(REPORT_TYPE) as REPORT_TYPE, trim(CALL_SIGN) as CALL_SIGN, trim(WND) as WND, trim(CIG) as CIG, trim(VIS) as VIS, trim(TMP) as TMP, trim(DEW) as DEW, trim(SLP) as SLP 
                            FROM weather_sql
                            """)

#Save to cache data
file_to_store = weather_filter                        #CHANGE THIS: name of Spark Dataframe (to save in database)
filename = "weather_filter"                      #CHANGE THIS: new file name in database
dbutils.fs.rm("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/"+filename, True)      #remove file if there already is an existing one, be careful with this!!!
file_to_store.write.parquet("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/" + filename)

In [0]:
#READING PARQUET File from Shared Directory
filename = "weather_filter"                      #CHANGE THIS: file name in database (to open)
weather_filter = spark.read.option("header", "true").parquet("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/"+filename+"/part-00*.parquet")
# weather_filter.display()

# B) Filter Weather Data
BY ONLY AIRPORTS FROM AIRPORT PRE-PROCESSING DATA  
Filter Weather Data to only 368 distinct airports (from cleaned flight data). Filter by inner joining distinct airports table with weather's CALL_SIGN.

In [0]:
#READING PARQUET File from Shared Directory 
filename = "airlines_preprocessed"                      #CHANGE THIS: file name in database (to open)
ICAO = spark.read.option("header", "true").parquet("dbfs:/mnt/mids-w261/team_25/data_processing_folder/"+filename+"/part-00*.parquet")

#Get Unique Airport ICAO Codes
ICAO_origin=ICAO.select("origin_icao_code")
ICAO_dest=ICAO.select("dest_icao_code")
ICAO_unique = ICAO_origin.union(ICAO_dest).distinct()
print("ICAO UNIQUE COUNT:",ICAO_unique.count())
ICAO_unique.registerTempTable("ICAO_unique_sql")

#Prepare Weather Data for Joining
weather_filter.registerTempTable("weather_filter_sql")
spark.sql("""SELECT *, trim(CALL_SIGN) as KEY
              FROM weather_filter_sql""").registerTempTable("weather_filter_sql")

In [0]:
#Join Weather With Airport Locations weather with airport locations
weather_joined = spark.sql("""SELECT * 
                          FROM weather_filter_sql t1 
                          INNER JOIN ICAO_unique_sql t2 
                          ON t1.KEY = t2.origin_icao_code""")
weather_joined = weather_joined.drop('KEY','origin_icao_code')

#Save to cache data
file_to_store = weather_joined                        #CHANGE THIS: name of Spark Dataframe (to save in database)
filename = "weather_joined"                      #CHANGE THIS: new file name in database
dbutils.fs.rm("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/"+filename, True)      #remove file if there already is an existing one, be careful with this!!!
file_to_store.write.parquet("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/" + filename)

In [0]:
#READING PARQUET File from Shared Directory
filename = "weather_joined"                      #CHANGE THIS: file name in database (to open)
weather_joined = spark.read.option("header", "true").parquet("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/"+filename+"/part-00*.parquet")
# weather_filter.display()
# weather_joined.display()
print("WEATHER COUNT:",weather_filter.count())
print("JOINED COUNT:", weather_joined.count())

# C) Parse Certain Columns

('WND', 'CIG', 'VIS', 'TMP', 'DEW', 'SLP') to many columns to fix format of raw data.

In [0]:
#SPLIT CERTAIN COLUMNS B/C OF THE CONCATENATED RAW FORMAT
# 'WND', 'CIG', 'VIS', 'TMP', 'DEW', 'SLP'   <- columns to split

weather_split = weather_joined
#Split WND  ->  direction, direction quality, observation type, speed (x10), speed quality
split_col = f.split(weather_split['WND'], ',')
weather_split = weather_split.withColumn('WND_SPEED', split_col.getItem(3).cast(IntegerType()))
weather_split = weather_split.withColumn('WND_SPEED_QUAL', split_col.getItem(4))

#Split CIG -> ceiling height dimension, ceiling quality code, ceiling determination code, CAVOK code, 
split_col = f.split(weather_split['CIG'], ',')
weather_split = weather_split.withColumn('CIG_HEIGHT', split_col.getItem(0).cast(IntegerType()))
weather_split = weather_split.withColumn('CIG_QUAL', split_col.getItem(1))

#Split VIS -> distance, distance quality, variability, quality variability
split_col = f.split(weather_split['VIS'], ',')
weather_split = weather_split.withColumn('VIS_DIST', split_col.getItem(0).cast(IntegerType()))
weather_split = weather_split.withColumn('VIS_DIST_QUAL', split_col.getItem(1))
weather_split = weather_split.withColumn('VIS_VAR', split_col.getItem(2))
weather_split = weather_split.withColumn('VIS_VAR_QUAL', split_col.getItem(3))

#Split TMP -> temp (x10), quality
split_col = f.split(weather_split['TMP'], ',')
weather_split = weather_split.withColumn('TEMP', split_col.getItem(0).cast(IntegerType()))
weather_split = weather_split.withColumn('TEMP_QUAL', split_col.getItem(1))

#Split DEW -> temp (x10), quality
split_col = f.split(weather_split['DEW'], ',')
weather_split = weather_split.withColumn('DEW_TEMP', split_col.getItem(0).cast(IntegerType()))
weather_split = weather_split.withColumn('DEW_TEMP_QUAL', split_col.getItem(1))

#Split SLP -> sea level pressure, sea level pressure quality
split_col = f.split(weather_split['SLP'], ',')
weather_split = weather_split.withColumn('SLPRESS', split_col.getItem(0).cast(IntegerType()))
weather_split = weather_split.withColumn('SLPRESS_QUAL', split_col.getItem(1))

weather_joined_split = weather_split.drop('WND','CIG','VIS','TMP','DEW','SLP')

#Save to cache data
file_to_store = weather_joined_split                        #CHANGE THIS: name of Spark Dataframe (to save in database)
filename = "weather_joined_split"                      #CHANGE THIS: new file name in database
dbutils.fs.rm("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/"+filename, True)      #remove file if there already is an existing one, be careful with this!!!
file_to_store.write.parquet("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/" + filename)

In [0]:
#READING PARQUET File from Shared Directory
filename = "weather_joined_split"                      #CHANGE THIS: file name in database (to open)
weather_joined_split = spark.read.option("header", "true").parquet("dbfs:/mnt/mids-w261/team_25/weather_processing_folder/"+filename+"/part-00*.parquet")
# weather_joined_split.display()

# Error Checking, Count Size of each dataset

In [0]:
#COUNT (to check size of each dataset and choose which dataset to keep)
print("{:60s}\t| {:10s}\t| {}".format("Data","Rows","Columns"))
print("{:60s}\t| {:10d}\t| {}".format("weather",weather.count(),len(weather.columns)))
print("{:60s}\t| {:10d}\t| {}".format("weather_filter",weather_filter.count(),len(weather_filter.columns)))
print("{:60s}\t| {:10d}\t| {}".format("weather_joined",weather_joined.count(),len(weather_joined.columns)))
print("{:60s}\t| {:10d}\t| {}".format("weather_joined_split",weather_joined_split.count(),len(weather_joined_split.columns)))