# Global Hourly Climate Data

## Data Preprocessing

Taken From:
* __[GHC 2018 Data](https://www.ncei.noaa.gov/data/global-hourly/archive/csv/2018.tar.gz)__
* __[GHC 2019 Data](https://www.ncei.noaa.gov/data/global-hourly/archive/csv/2019.tar.gz)__
* __[ISD Data Documentation](https://www.ncei.noaa.gov/data/global-hourly/doc/isd-format-document.pdf)__
* https://www.ncei.noaa.gov/data/global-hourly/doc/CSV_HELP.pdf

In [1]:
import warnings
import pandas as pd
import geopandas as gpd
import pyspark.sql.functions as f

from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SparkContext

warnings.filterwarnings("ignore")

#sc = SparkContext.getOrCreate(conf=swan_spark_conf) 
spark = SparkSession.builder.appName('GHC').getOrCreate()
spark

In [2]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

### Station List

In [3]:
stations_pdf = pd.read_csv('../raw_data/GHC/stations.csv')

In [4]:
filtered_station = stations_pdf.loc[(stations_pdf['END_DATE'] > '2019-12-31') & (stations_pdf['BEGIN_DATE'] < '2018-01-01')]

In [5]:
filtered_station

Unnamed: 0,STATION_ID,STATION,BEGIN_DATE,END_DATE,STATE,COUNTRY,LATITUDE,LONGITUDE,ELEVATION
0,99727299999,BERGEN POINT,2008-01-01,2021-08-12,NY,US,40.639,-74.146,10.0
2,72505394728,CENTRAL PARK,2005-01-01,2021-08-13,NY,US,40.77898,-73.96925,42.67
3,74486094789,JOHN F KENNEDY INTERNATIONAL AIRPORT,1973-01-01,2021-08-13,NY,US,40.63915,-73.76401,3.35
4,99728099999,KINGS POINT,2008-01-01,2021-08-12,NY,US,40.8,-73.77,10.0
5,72503014732,LA GUARDIA AIRPORT,1973-01-01,2021-08-13,NY,US,40.77944,-73.88035,3.35
8,72058100178,LINDEN AIRPORT,2017-01-01,2021-08-13,NJ,US,40.617,-74.25,7.01
16,72502014734,NEWARK LIBERTY INTERNATIONAL AP,1973-01-01,2021-08-13,NJ,US,40.6825,-74.1694,2.13
19,72055399999,PORT AUTH DOWNTN MANHATTAN WALL ST HEL,2016-07-20,2021-08-12,NY,US,40.701214,-74.009028,2.13
20,99774399999,ROBBINS REEF,2008-01-01,2021-08-12,NJ,US,40.65,-74.066667,72.0
21,99727199999,THE BATTERY,2008-01-17,2021-08-12,NY,US,40.701,-74.014,10.0


### Load 2018 Global Hourly Climate Data and Select Stations

__Main Observation__: 
1. WND : WIND-OBSERVATION
1. CIG : SKY-CONDITION-OBSERVATION
1. VIS : VISIBILITY-OBSERVATION
1. TMP : AIR-TEMPERATURE-OBSERVATION air temperature
1. DEW : AIR-TEMPERATURE-OBSERVATION dew point
1. SLP : AIR-PRESSURE-OBSERVATION

In [6]:
ghc_2018_sdf = spark.read.csv('../raw_data/GHC/2018', header=True, inferSchema = True)

In [7]:
nyc_ghc_2018_sdf = ghc_2018_sdf.filter((ghc_2018_sdf.STATION.contains('99727299999')) | # BERGEN POINT
                                       (ghc_2018_sdf.STATION.contains('72505394728')) | # CENTRAL PARK
                                       (ghc_2018_sdf.STATION.contains('74486094789')) | # JOHN F KENNEDY INTERNATIONAL AIRPORT
                                       (ghc_2018_sdf.STATION.contains('99728099999')) | # KINGS POINT
                                       (ghc_2018_sdf.STATION.contains('72503014732')) | # LA GUARDIA AIRPORT
                                       (ghc_2018_sdf.STATION.contains('72058100178')) | # LINDEN AIRPORT
                                       (ghc_2018_sdf.STATION.contains('72502014734')) | # NEWARK LIBERTY INTERNATIONAL AP
                                       (ghc_2018_sdf.STATION.contains('72055399999')) | # PORT AUTH DOWNTN MANHATTAN WALL ST HEL
                                       (ghc_2018_sdf.STATION.contains('99774399999')) | # ROBBINS REEF
                                       (ghc_2018_sdf.STATION.contains('99727199999')))  # THE BATTERY

In [8]:
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.select('DATE', 'WND', 'CIG', 'CIG', 
                                           'VIS', 'TMP', 'DEW', 'SLP')

In [9]:
# WIND OBSERVATION
# Split column value with ',' in it
# Example, "9999, 1, N, 99" string become a list of ["9999", "1", "N", "99"]
split_col = f.split(nyc_ghc_2018_sdf['WND'], ',')
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn('wind_angle_degrees', split_col.getItem(0)) \
                                   .withColumn('wind_angle_qcode', split_col.getItem(1)) \
                                   .withColumn('wind_type_code', split_col.getItem(2)) \
                                   .withColumn('wind_speed_mps', split_col.getItem(3)) \
                                   .withColumn('wind_speed_qcode', split_col.getItem(4)) \
                                   .drop('WND')

# SKY CONDITION OBSERVATION
split_col = f.split(nyc_ghc_2018_sdf['CIG'], ',')
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn('sky_ceil_height_meters', split_col.getItem(0)) \
                                   .withColumn('sky_ceil_qcode', split_col.getItem(1)) \
                                   .withColumn('sky_ceil_det_code', split_col.getItem(2)) \
                                   .withColumn('sky_cavok', split_col.getItem(3)) \
                                   .drop('CIG')

# VISIBILITY OBSERVATION
split_col = f.split(nyc_ghc_2018_sdf['VIS'], ',')
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn('vis_distance_meters', split_col.getItem(0)) \
                                   .withColumn('vis_distance_qcode', split_col.getItem(1)) \
                                   .withColumn('vis_vary_code', split_col.getItem(2)) \
                                   .withColumn('vis_vary_qcode', split_col.getItem(3)) \
                                   .drop('VIS')

# AIR TEMP OBSERVATION air
split_col = f.split(nyc_ghc_2018_sdf['TMP'], ',')
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn('air_temp_celsius', split_col.getItem(0)) \
                                   .withColumn('air_temp_qcode', split_col.getItem(1)) \
                                   .drop('TMP')

# AIR TEMP OBSERVATION dew
split_col = f.split(nyc_ghc_2018_sdf['DEW'], ',')
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn('dew_point_celsius', split_col.getItem(0)) \
                                   .withColumn('dew_point_qcode', split_col.getItem(1)) \
                                   .drop('DEW')

# ATMOSPHERIC PRESSURE OBSERVATION sea level pressure
split_col = f.split(nyc_ghc_2018_sdf['SLP'], ',')
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn('atp_hectopascals', split_col.getItem(0)) \
                                   .withColumn('atp_qcode', split_col.getItem(1)) \
                                   .drop('SLP')

In [10]:
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn('wind_angle_degrees', f.regexp_replace('wind_angle_degrees', r"^0+(?!$)", '')) \
                                   .withColumn('wind_speed_mps', f.regexp_replace('wind_speed_mps', r"^0+(?!$)", '')) \
                                   .withColumn('sky_ceil_height_meters', f.regexp_replace('sky_ceil_height_meters', r"^0+(?!$)", '')) \
                                   .withColumn('vis_distance_meters', f.regexp_replace('vis_distance_meters', r"^0+(?!$)", '')) \
                                   .withColumn('air_temp_celsius', f.regexp_replace('air_temp_celsius', r"(^\+)|(^\-)?0+(?!$)", '')) \
                                   .withColumn('dew_point_celsius', f.regexp_replace('dew_point_celsius', r"(^\+)|(^\-)?0+(?!$)", '')) \
                                   .withColumn('atp_hectopascals', f.regexp_replace('atp_hectopascals', r"^0+(?!$)", ''))

 #### Setting Schema

In [11]:
from shutil import rmtree
from os import path

fpath = '../preprocessed_data/nyc_ghc_2018'

if path.exists(fpath):
    rmtree(fpath)
        
nyc_ghc_2018_sdf.write.option("header", True).csv("../preprocessed_data/nyc_ghc_2018")

In [12]:
nyc_ghc_2018_sdf = spark.read.csv('../preprocessed_data/nyc_ghc_2018/', header = True)

ints = ('wind_angle_degrees', 'wind_speed_mps', 'sky_ceil_height_meters', 'vis_distance_meters', 
        'air_temp_celsius', 'dew_point_celsius', 'atp_hectopascals')

strings = ('wind_angle_qcode', 'wind_type_code', 'wind_speed_qcode', 'sky_ceil_qcode',
           'sky_ceil_det_code', 'sky_cavok', 'vis_distance_qcode', 'vis_vary_code', 
           'vis_vary_qcode', 'air_temp_qcode', 'dew_point_qcode', 'atp_qcode')

dtimes = ('DATE',)

dtypes = {column: IntegerType() for column in ints}
dtypes.update({column: StringType() for column in strings})
dtypes.update({column: TimestampType() for column in dtimes})

schema = StructType()

for column in nyc_ghc_2018_sdf.columns:
    schema.add(column, # column name
               dtypes[column], # data type
               True # is nullable?
              )
    
nyc_ghc_2018_sdf = spark.read.csv('../preprocessed_data/nyc_ghc_2018/', header = True, schema = schema)
nyc_ghc_2018_sdf.printSchema()

root
 |-- DATE: timestamp (nullable = true)
 |-- wind_angle_degrees: integer (nullable = true)
 |-- wind_angle_qcode: string (nullable = true)
 |-- wind_type_code: string (nullable = true)
 |-- wind_speed_mps: integer (nullable = true)
 |-- wind_speed_qcode: string (nullable = true)
 |-- sky_ceil_height_meters: integer (nullable = true)
 |-- sky_ceil_qcode: string (nullable = true)
 |-- sky_ceil_det_code: string (nullable = true)
 |-- sky_cavok: string (nullable = true)
 |-- vis_distance_meters: integer (nullable = true)
 |-- vis_distance_qcode: string (nullable = true)
 |-- vis_vary_code: string (nullable = true)
 |-- vis_vary_qcode: string (nullable = true)
 |-- air_temp_celsius: integer (nullable = true)
 |-- air_temp_qcode: string (nullable = true)
 |-- dew_point_celsius: integer (nullable = true)
 |-- dew_point_qcode: string (nullable = true)
 |-- atp_hectopascals: integer (nullable = true)
 |-- atp_qcode: string (nullable = true)



In [13]:
from pyspark.sql.functions import col,when

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn("wind_angle_degrees", 
                            when(nyc_ghc_2018_sdf["wind_angle_degrees"] == '999',None) \
                            .otherwise(nyc_ghc_2018_sdf["wind_angle_degrees"]))

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn("wind_speed_mps", 
                            when(nyc_ghc_2018_sdf["wind_speed_mps"] == '9999',None) \
                            .otherwise(nyc_ghc_2018_sdf["wind_speed_mps"]))

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn("air_temp_celsius", 
                            when(nyc_ghc_2018_sdf["air_temp_celsius"] == '9999',None) \
                            .otherwise(nyc_ghc_2018_sdf["air_temp_celsius"]))

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn("dew_point_celsius", 
                            when(nyc_ghc_2018_sdf["dew_point_celsius"] == '9999',None) \
                            .otherwise(nyc_ghc_2018_sdf["dew_point_celsius"]))

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn("sky_ceil_height_meters", 
                            when(nyc_ghc_2018_sdf["sky_ceil_height_meters"] == '99999',None) \
                            .otherwise(nyc_ghc_2018_sdf["sky_ceil_height_meters"]))

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn("atp_hectopascals", 
                            when(nyc_ghc_2018_sdf["atp_hectopascals"] == '99999',None) \
                            .otherwise(nyc_ghc_2018_sdf["atp_hectopascals"]))

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn("vis_distance_meters", 
                            when(nyc_ghc_2018_sdf["vis_distance_meters"] == '999999',None) \
                            .otherwise(nyc_ghc_2018_sdf["vis_distance_meters"]))

In [14]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = ['wind_angle_degrees', "wind_speed_mps", "air_temp_celsius", 
                              "dew_point_celsius", "sky_ceil_height_meters", "atp_hectopascals", 
                               "vis_distance_meters"],
                  outputCols = ["{}_imputed".format(c) 
                                for c in ['wind_angle_degrees', "wind_speed_mps", "air_temp_celsius", 
                                          "dew_point_celsius", "sky_ceil_height_meters", "atp_hectopascals", 
                                           "vis_distance_meters"]]
                 ).setStrategy("mean")

nyc_ghc_2018_sdf = imputer.fit(nyc_ghc_2018_sdf).transform(nyc_ghc_2018_sdf)

In [15]:
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.select('DATE', 'wind_angle_degrees_imputed', 
                                           'wind_speed_mps_imputed', 'air_temp_celsius_imputed', 
                                           'dew_point_celsius_imputed', 'sky_ceil_height_meters_imputed', 
                                           'atp_hectopascals_imputed', 'vis_distance_meters_imputed')

In [16]:
# Filter through the minimum and maximum set in the dictionary
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.filter('wind_angle_degrees_imputed >= 0 and wind_angle_degrees_imputed <= 360') \
                                    .filter('wind_speed_mps_imputed >= 0 and wind_speed_mps_imputed <= 900') \
                                    .filter('air_temp_celsius_imputed >= -932 and air_temp_celsius_imputed <= 618') \
                                    .filter('dew_point_celsius_imputed >= -982 and dew_point_celsius_imputed <= 368') \
                                    .filter('sky_ceil_height_meters_imputed >= 0 and sky_ceil_height_meters_imputed <= 22000') \
                                    .filter('atp_hectopascals_imputed >= 8600 and atp_hectopascals_imputed <= 10900') \
                                    .filter('vis_distance_meters_imputed >= 0 and vis_distance_meters_imputed <= 160000')

In [17]:
# Descaling some features given in the dictionary
# Wind speed scaled by 10 
# Air temp scaled by 10
# Dew point scaled by 10
# ATP scaled by 10
# All descaled by 10

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn('wind_speed_mps', nyc_ghc_2018_sdf['wind_speed_mps_imputed'] / 10) \
                                    .withColumn('air_temp_celsius', nyc_ghc_2018_sdf['air_temp_celsius_imputed'] / 10) \
                                    .withColumn('dew_temp_celsius', nyc_ghc_2018_sdf['dew_point_celsius_imputed'] / 10) \
                                    .withColumn('atp_hectopascals', nyc_ghc_2018_sdf['atp_hectopascals_imputed'] / 10)

In [18]:
# Renaming and drop columns
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumnRenamed('wind_angle_degrees_imputed', 'wind_angle_degrees') \
                                   .withColumnRenamed('sky_ceil_height_meters_imputed', 'sky_ceil_height_meters') \
                                   .withColumnRenamed('vis_distance_meters_imputed', 'vis_distance_meters')

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.drop('wind_speed_mps_imputed', 'air_temp_celsius_imputed', 
                                         'dew_point_celsius_imputed', 'atp_hectopascals_imputed')

# Reorder columns
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.select('DATE', 'wind_angle_degrees', 
                                           'wind_speed_mps', 'air_temp_celsius', 
                                           'dew_temp_celsius','sky_ceil_height_meters', 
                                           'vis_distance_meters', 'atp_hectopascals')

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.orderBy('DATE')

In [19]:
nyc_ghc_2018_sdf.limit(5)

DATE,wind_angle_degrees,wind_speed_mps,air_temp_celsius,dew_temp_celsius,sky_ceil_height_meters,vis_distance_meters,atp_hectopascals
2018-01-01 00:00:00,320,8.2,1.6,18.3,22000,16000,1026.3
2018-01-01 00:00:00,310,9.3,11.1,2.0,22000,16000,1026.8
2018-01-01 00:00:00,320,5.1,1.6,2.0,22000,16000,1026.7
2018-01-01 00:00:00,310,9.8,1.5,9.9,10472,14287,1027.1
2018-01-01 00:00:00,10,7.2,1.4,9.9,10472,14287,1027.3


In [20]:
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.groupBy('DATE').agg(f.mean('wind_angle_degrees'), f.mean('wind_speed_mps'),
                                                        f.mean('air_temp_celsius'), f.mean('dew_temp_celsius'),
                                                        f.mean('sky_ceil_height_meters'), f.mean('vis_distance_meters'), 
                                                        f.mean('atp_hectopascals')).orderBy('Date')

In [21]:
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumnRenamed('avg(wind_angle_degrees)', 'wind_angle_degrees') \
                                   .withColumnRenamed('avg(wind_speed_mps)', 'wind_speed_mps') \
                                   .withColumnRenamed('avg(air_temp_celsius)', 'air_temp_celsius') \
                                   .withColumnRenamed('avg(dew_temp_celsius)', 'dew_temp_celsius') \
                                   .withColumnRenamed('avg(sky_ceil_height_meters)', 'sky_ceil_height_meters') \
                                   .withColumnRenamed('avg(vis_distance_meters)', 'vis_distance_meters') \
                                   .withColumnRenamed('avg(atp_hectopascals)', 'atp_hectopascals')

In [22]:
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumn("closest_hour", f.date_trunc("hour", f.col("DATE")))

In [23]:
nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.groupBy("closest_hour").agg(f.mean('wind_angle_degrees'), f.mean('wind_speed_mps'),
                                                        f.mean('air_temp_celsius'), f.mean('dew_temp_celsius'),
                                                        f.mean('sky_ceil_height_meters'), f.mean('vis_distance_meters'),
                                                        f.mean('atp_hectopascals'))

nyc_ghc_2018_sdf = nyc_ghc_2018_sdf.withColumnRenamed('avg(wind_angle_degrees)', 'wind_angle_degrees') \
                                   .withColumnRenamed('avg(wind_speed_mps)', 'wind_speed_mps') \
                                   .withColumnRenamed('avg(air_temp_celsius)', 'air_temp_celsius') \
                                   .withColumnRenamed('avg(dew_temp_celsius)', 'dew_temp_celsius') \
                                   .withColumnRenamed('avg(sky_ceil_height_meters)', 'sky_ceil_height_meters') \
                                   .withColumnRenamed('avg(vis_distance_meters)', 'vis_distance_meters') \
                                   .withColumnRenamed('avg(atp_hectopascals)', 'atp_hectopascals')

In [24]:
nyc_ghc_2018_sdf.limit(5)

closest_hour,wind_angle_degrees,wind_speed_mps,air_temp_celsius,dew_temp_celsius,sky_ceil_height_meters,vis_distance_meters,atp_hectopascals
2018-01-01 02:00:00,256.0,3.766666666666667,8.304166666666667,10.429166666666667,20078.666666666668,15792.0,1021.9208333333332
2018-01-03 15:00:00,217.41666666666663,1.6583333333333332,5.611111111111111,14.390277777777776,18601.38888888889,15887.166666666666,1021.9875
2018-01-13 09:00:00,202.9642857142857,4.01904761904762,11.127380952380951,9.229166666666668,876.8392857142857,4544.178571428572,1015.5845238095236
2018-01-14 15:00:00,260.52777777777777,3.368055555555556,6.683333333333334,16.070833333333336,20719.11111111111,15887.166666666666,1025.522222222222
2018-01-16 08:00:00,136.25,1.2041666666666666,3.695833333333333,7.041666666666667,2103.5,15792.0,1023.9958333333334


In [25]:
from shutil import rmtree
from os import path

fpath = '../preprocessed_data/nyc_ghc_2018_pre.parquet/'

if path.exists(fpath):
    rmtree(fpath)
        
nyc_ghc_2018_sdf.write.format('parquet').save('../preprocessed_data/nyc_ghc_2018_pre.parquet')

### Local Climatological Data 2019

In [26]:
ghc_2019_sdf = spark.read.csv('../raw_data/GHC/2019', header=True, inferSchema = True)

In [27]:
# JFK INTERNATIONAL AIRPORT, NY US WBAN: 74486094789
# NY CITY CENTRAL PARK, NY US WBAN: 72505394728 
# LAGUARDIA AIRPORT, NY US WBAN: 72503014732
# LINDEN AIRPORT, NJ US WBAN: 72058100178
# NEWARK LIBERTY INTERNATIONAL AIRPORT, NJ US WBAN: 72502014734 

nyc_ghc_2019_sdf = ghc_2019_sdf.filter((ghc_2019_sdf.STATION.contains('99727299999')) | # BERGEN POINT
                                       (ghc_2019_sdf.STATION.contains('72505394728')) | # CENTRAL PARK
                                       (ghc_2019_sdf.STATION.contains('74486094789')) | # JOHN F KENNEDY INTERNATIONAL AIRPORT
                                       (ghc_2019_sdf.STATION.contains('99728099999')) | # KINGS POINT
                                       (ghc_2019_sdf.STATION.contains('72503014732')) | # LA GUARDIA AIRPORT
                                       (ghc_2019_sdf.STATION.contains('72058100178')) | # LINDEN AIRPORT
                                       (ghc_2019_sdf.STATION.contains('72502014734')) | # NEWARK LIBERTY INTERNATIONAL AP
                                       (ghc_2019_sdf.STATION.contains('72055399999')) | # PORT AUTH DOWNTN MANHATTAN WALL ST HEL
                                       (ghc_2019_sdf.STATION.contains('99774399999')) | # ROBBINS REEF
                                       (ghc_2019_sdf.STATION.contains('99727199999')))  # THE BATTERY

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.select('DATE', 'WND', 'CIG', 'CIG', 
                                           'VIS', 'TMP', 'DEW', 'SLP')

# WIND OBSERVATION
# Split column value with ',' in it
# Example, "9999, 1, N, 99" string become a list of ["9999", "1", "N", "99"]
split_col = f.split(nyc_ghc_2019_sdf['WND'], ',')
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn('wind_angle_degrees', split_col.getItem(0)) \
                                   .withColumn('wind_angle_qcode', split_col.getItem(1)) \
                                   .withColumn('wind_type_code', split_col.getItem(2)) \
                                   .withColumn('wind_speed_mps', split_col.getItem(3)) \
                                   .withColumn('wind_speed_qcode', split_col.getItem(4)) \
                                   .drop('WND')

# SKY CONDITION OBSERVATION
split_col = f.split(nyc_ghc_2019_sdf['CIG'], ',')
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn('sky_ceil_height_meters', split_col.getItem(0)) \
                                   .withColumn('sky_ceil_qcode', split_col.getItem(1)) \
                                   .withColumn('sky_ceil_det_code', split_col.getItem(2)) \
                                   .withColumn('sky_cavok', split_col.getItem(3)) \
                                   .drop('CIG')

# VISIBILITY OBSERVATION
split_col = f.split(nyc_ghc_2019_sdf['VIS'], ',')
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn('vis_distance_meters', split_col.getItem(0)) \
                                   .withColumn('vis_distance_qcode', split_col.getItem(1)) \
                                   .withColumn('vis_vary_code', split_col.getItem(2)) \
                                   .withColumn('vis_vary_qcode', split_col.getItem(3)) \
                                   .drop('VIS')

# AIR TEMP OBSERVATION air
split_col = f.split(nyc_ghc_2019_sdf['TMP'], ',')
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn('air_temp_celsius', split_col.getItem(0)) \
                                   .withColumn('air_temp_qcode', split_col.getItem(1)) \
                                   .drop('TMP')

# AIR TEMP OBSERVATION dew
split_col = f.split(nyc_ghc_2019_sdf['DEW'], ',')
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn('dew_point_celsius', split_col.getItem(0)) \
                                   .withColumn('dew_point_qcode', split_col.getItem(1)) \
                                   .drop('DEW')

# ATMOSPHERIC PRESSURE OBSERVATION sea level pressure
split_col = f.split(nyc_ghc_2019_sdf['SLP'], ',')
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn('atp_hectopascals', split_col.getItem(0)) \
                                   .withColumn('atp_qcode', split_col.getItem(1)) \
                                   .drop('SLP')

In [28]:
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn('wind_angle_degrees', f.regexp_replace('wind_angle_degrees', r"^0+(?!$)", '')) \
                                   .withColumn('wind_speed_mps', f.regexp_replace('wind_speed_mps', r"^0+(?!$)", '')) \
                                   .withColumn('sky_ceil_height_meters', f.regexp_replace('sky_ceil_height_meters', r"^0+(?!$)", '')) \
                                   .withColumn('vis_distance_meters', f.regexp_replace('vis_distance_meters', r"^0+(?!$)", '')) \
                                   .withColumn('air_temp_celsius', f.regexp_replace('air_temp_celsius', r"(^\+)|(^\-)?0+(?!$)", '')) \
                                   .withColumn('dew_point_celsius', f.regexp_replace('dew_point_celsius', r"(^\+)|(^\-)?0+(?!$)", '')) \
                                   .withColumn('atp_hectopascals', f.regexp_replace('atp_hectopascals', r"^0+(?!$)", ''))

 #### Setting Schema

In [29]:
from shutil import rmtree
from os import path

fpath = '../preprocessed_data/nyc_ghc_2019'

if path.exists(fpath):
    rmtree(fpath)
        
nyc_ghc_2019_sdf.write.option("header", True).csv("../preprocessed_data/nyc_ghc_2019")

In [30]:
nyc_ghc_2019_sdf = spark.read.csv('../preprocessed_data/nyc_ghc_2019/', header = True)

ints = ('wind_angle_degrees', 'wind_speed_mps', 'sky_ceil_height_meters', 'vis_distance_meters', 
        'air_temp_celsius', 'dew_point_celsius', 'atp_hectopascals')

strings = ('wind_angle_qcode', 'wind_type_code', 'wind_speed_qcode', 'sky_ceil_qcode',
           'sky_ceil_det_code', 'sky_cavok', 'vis_distance_qcode', 'vis_vary_code', 
           'vis_vary_qcode', 'air_temp_qcode', 'dew_point_qcode', 'atp_qcode')

dtimes = ('DATE',)

dtypes = {column: IntegerType() for column in ints}
dtypes.update({column: StringType() for column in strings})
dtypes.update({column: TimestampType() for column in dtimes})

schema = StructType()

for column in nyc_ghc_2019_sdf.columns:
    schema.add(column, # column name
               dtypes[column], # data type
               True # is nullable?
              )
    
nyc_ghc_2019_sdf = spark.read.csv('../preprocessed_data/nyc_ghc_2019/', header = True, schema = schema)
nyc_ghc_2019_sdf.printSchema()

root
 |-- DATE: timestamp (nullable = true)
 |-- wind_angle_degrees: integer (nullable = true)
 |-- wind_angle_qcode: string (nullable = true)
 |-- wind_type_code: string (nullable = true)
 |-- wind_speed_mps: integer (nullable = true)
 |-- wind_speed_qcode: string (nullable = true)
 |-- sky_ceil_height_meters: integer (nullable = true)
 |-- sky_ceil_qcode: string (nullable = true)
 |-- sky_ceil_det_code: string (nullable = true)
 |-- sky_cavok: string (nullable = true)
 |-- vis_distance_meters: integer (nullable = true)
 |-- vis_distance_qcode: string (nullable = true)
 |-- vis_vary_code: string (nullable = true)
 |-- vis_vary_qcode: string (nullable = true)
 |-- air_temp_celsius: integer (nullable = true)
 |-- air_temp_qcode: string (nullable = true)
 |-- dew_point_celsius: integer (nullable = true)
 |-- dew_point_qcode: string (nullable = true)
 |-- atp_hectopascals: integer (nullable = true)
 |-- atp_qcode: string (nullable = true)



In [31]:
from pyspark.sql.functions import col,when

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn("wind_angle_degrees", 
                            when(nyc_ghc_2019_sdf["wind_angle_degrees"] == '999',None) \
                            .otherwise(nyc_ghc_2019_sdf["wind_angle_degrees"]))

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn("wind_speed_mps", 
                            when(nyc_ghc_2019_sdf["wind_speed_mps"] == '9999',None) \
                            .otherwise(nyc_ghc_2019_sdf["wind_speed_mps"]))

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn("air_temp_celsius", 
                            when(nyc_ghc_2019_sdf["air_temp_celsius"] == '9999',None) \
                            .otherwise(nyc_ghc_2019_sdf["air_temp_celsius"]))

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn("dew_point_celsius", 
                            when(nyc_ghc_2019_sdf["dew_point_celsius"] == '9999',None) \
                            .otherwise(nyc_ghc_2019_sdf["dew_point_celsius"]))

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn("sky_ceil_height_meters", 
                            when(nyc_ghc_2019_sdf["sky_ceil_height_meters"] == '99999',None) \
                            .otherwise(nyc_ghc_2019_sdf["sky_ceil_height_meters"]))

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn("atp_hectopascals", 
                            when(nyc_ghc_2019_sdf["atp_hectopascals"] == '99999',None) \
                            .otherwise(nyc_ghc_2019_sdf["atp_hectopascals"]))

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn("vis_distance_meters", 
                            when(nyc_ghc_2019_sdf["vis_distance_meters"] == '999999',None) \
                            .otherwise(nyc_ghc_2019_sdf["vis_distance_meters"]))

In [32]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols = ['wind_angle_degrees', "wind_speed_mps", "air_temp_celsius", 
                              "dew_point_celsius", "sky_ceil_height_meters", "atp_hectopascals", 
                               "vis_distance_meters"],
                  outputCols = ["{}_imputed".format(c) 
                                for c in ['wind_angle_degrees', "wind_speed_mps", "air_temp_celsius", 
                                          "dew_point_celsius", "sky_ceil_height_meters", "atp_hectopascals", 
                                           "vis_distance_meters"]]
                 ).setStrategy("mean")

nyc_ghc_2019_sdf = imputer.fit(nyc_ghc_2019_sdf).transform(nyc_ghc_2019_sdf)

In [33]:
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.select('DATE', 'wind_angle_degrees_imputed', 
                                           'wind_speed_mps_imputed', 'air_temp_celsius_imputed', 
                                           'dew_point_celsius_imputed', 'sky_ceil_height_meters_imputed', 
                                           'atp_hectopascals_imputed', 'vis_distance_meters_imputed')

In [34]:
# Filter through the minimum and maximum set in the dictionary
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.filter('wind_angle_degrees_imputed >= 0 and wind_angle_degrees_imputed <= 360') \
                                    .filter('wind_speed_mps_imputed >= 0 and wind_speed_mps_imputed <= 900') \
                                    .filter('air_temp_celsius_imputed >= -932 and air_temp_celsius_imputed <= 618') \
                                    .filter('dew_point_celsius_imputed >= -982 and dew_point_celsius_imputed <= 368') \
                                    .filter('sky_ceil_height_meters_imputed >= 0 and sky_ceil_height_meters_imputed <= 22000') \
                                    .filter('atp_hectopascals_imputed >= 8600 and atp_hectopascals_imputed <= 10900') \
                                    .filter('vis_distance_meters_imputed >= 0 and vis_distance_meters_imputed <= 160000')

In [35]:
# Descaling some features given in the dictionary
# Wind speed scaled by 10 
# Air temp scaled by 10
# Dew point scaled by 10
# ATP scaled by 10
# All descaled by 10

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn('wind_speed_mps', nyc_ghc_2019_sdf['wind_speed_mps_imputed'] / 10) \
                                    .withColumn('air_temp_celsius', nyc_ghc_2019_sdf['air_temp_celsius_imputed'] / 10) \
                                    .withColumn('dew_temp_celsius', nyc_ghc_2019_sdf['dew_point_celsius_imputed'] / 10) \
                                    .withColumn('atp_hectopascals', nyc_ghc_2019_sdf['atp_hectopascals_imputed'] / 10)

In [36]:
# Renaming and drop columns
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumnRenamed('wind_angle_degrees_imputed', 'wind_angle_degrees') \
                                   .withColumnRenamed('sky_ceil_height_meters_imputed', 'sky_ceil_height_meters') \
                                   .withColumnRenamed('vis_distance_meters_imputed', 'vis_distance_meters')

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.drop('wind_speed_mps_imputed', 'air_temp_celsius_imputed', 
                                         'dew_point_celsius_imputed', 'atp_hectopascals_imputed')

# Reorder columns
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.select('DATE', 'wind_angle_degrees', 
                                           'wind_speed_mps', 'air_temp_celsius', 
                                           'dew_temp_celsius','sky_ceil_height_meters', 
                                           'vis_distance_meters', 'atp_hectopascals')

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.orderBy('DATE')

In [37]:
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.groupBy('DATE').agg(f.mean('wind_angle_degrees'), f.mean('wind_speed_mps'),
                                                        f.mean('air_temp_celsius'), f.mean('dew_temp_celsius'),
                                                        f.mean('sky_ceil_height_meters'), f.mean('vis_distance_meters'), 
                                                        f.mean('atp_hectopascals')).orderBy('Date')

In [38]:
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumnRenamed('avg(wind_angle_degrees)', 'wind_angle_degrees') \
                                   .withColumnRenamed('avg(wind_speed_mps)', 'wind_speed_mps') \
                                   .withColumnRenamed('avg(air_temp_celsius)', 'air_temp_celsius') \
                                   .withColumnRenamed('avg(dew_temp_celsius)', 'dew_temp_celsius') \
                                   .withColumnRenamed('avg(sky_ceil_height_meters)', 'sky_ceil_height_meters') \
                                   .withColumnRenamed('avg(vis_distance_meters)', 'vis_distance_meters') \
                                   .withColumnRenamed('avg(atp_hectopascals)', 'atp_hectopascals')

In [39]:
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumn("closest_hour", f.date_trunc("hour", f.col("DATE")))

In [40]:
nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.groupBy("closest_hour").agg(f.mean('wind_angle_degrees'), f.mean('wind_speed_mps'),
                                                        f.mean('air_temp_celsius'), f.mean('dew_temp_celsius'),
                                                        f.mean('sky_ceil_height_meters'), f.mean('vis_distance_meters'),
                                                        f.mean('atp_hectopascals'))

nyc_ghc_2019_sdf = nyc_ghc_2019_sdf.withColumnRenamed('avg(wind_angle_degrees)', 'wind_angle_degrees') \
                                   .withColumnRenamed('avg(wind_speed_mps)', 'wind_speed_mps') \
                                   .withColumnRenamed('avg(air_temp_celsius)', 'air_temp_celsius') \
                                   .withColumnRenamed('avg(dew_temp_celsius)', 'dew_temp_celsius') \
                                   .withColumnRenamed('avg(sky_ceil_height_meters)', 'sky_ceil_height_meters') \
                                   .withColumnRenamed('avg(vis_distance_meters)', 'vis_distance_meters') \
                                   .withColumnRenamed('avg(atp_hectopascals)', 'atp_hectopascals')

In [41]:
from shutil import rmtree
from os import path

fpath = '../preprocessed_data/nyc_ghc_2019_pre.parquet/'

if path.exists(fpath):
    rmtree(fpath)
        
nyc_ghc_2019_sdf.write.format('parquet').save('../preprocessed_data/nyc_ghc_2019_pre.parquet')

In [42]:
nyc_ghc_2019_sdf.limit(5)

closest_hour,wind_angle_degrees,wind_speed_mps,air_temp_celsius,dew_temp_celsius,sky_ceil_height_meters,vis_distance_meters,atp_hectopascals
2019-01-01 06:00:00,203.33035714285717,0.9401785714285716,6.775,7.761607142857143,1597.875,6314.366071428572,1012.9821428571428
2019-01-18 05:00:00,129.96875,1.725,0.475,5.746875,2243.0,10891.90625,1019.4375
2019-01-30 04:00:00,255.0192307692308,4.509615384615385,2.3500000000000005,1.948076923076923,3772.942307692308,14612.384615384615,1015.4403846153848
2019-01-31 10:00:00,274.7083333333333,5.420833333333333,15.858333333333334,19.5,20231.83333333333,15796.5,1022.5916666666668
2019-02-02 05:00:00,223.95833333333331,1.2291666666666667,5.791666666666668,14.25,20231.83333333333,15796.5,1023.3125
