In [1]:
#importing the relevant packages
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.functions import col, year,month, to_date,  col,isnan, when, count, mean,min,max
from pyspark.sql import Window
import time

#supressing the warnings
import warnings
warnings.filterwarnings("ignore")

In [2]:
#looking at the spark context, showing the connection to the spark cluster
sc

### Loading the dataset from HDFS

In [47]:
#reading in the dataset from hdfs and timing how long it takes for comparison to using pandas
start_time = time.perf_counter()

weather_df = spark.read.csv('/user/weather_station_data.csv',header=True)

end_time = time.perf_counter()
total_time = end_time - start_time

print(f'Total execution time: {total_time} seconds')

Total execution time: 0.6626979400007258 seconds


### Performing exploratory data analysis on the dataset

In [48]:
#weather_df = spark.read.csv('/user/weather_station_data.csv',header=True, inferSchema=True)

In [49]:
weather_df.show(5)

+-----------------+---+----+-----+----+-----+----+-----+-----+----+------+-----+----+-----+-----+---+---+---+-----+----+-----+--------------+
|             date|ind|rain|ind.1|temp|ind.2|wetb|dewpt|vappr|rhum|   msl|ind.3|wdsp|ind.4|wddir| ww|  w|sun|  vis|clht|clamt|  station_name|
+-----------------+---+----+-----+----+-----+----+-----+-----+----+------+-----+----+-----+-----+---+---+---+-----+----+-----+--------------+
|01-jan-1944 00:00|  2| 0.0|    0| 7.2|    0| 6.7|  6.1|  9.4|  93|1027.9|    1|  19|    1|  290| 50|  5|0.0|20000|  12|    8|Dublin Airport|
|01-jan-1944 01:00|  0| 0.0|    0| 7.2|    0| 6.6|  6.1|  9.3|  91|1027.6|    1|  19|    1|  280|  2|  2|0.0|30000|  15|    7|Dublin Airport|
|01-jan-1944 02:00|  2| 0.0|    0| 7.2|    0| 6.6|  6.1|  9.3|  92|1027.0|    1|  19|    1|  260| 50|  5|0.0|30000|  15|    8|Dublin Airport|
|01-jan-1944 03:00|  2| 0.0|    0| 7.2|    0| 6.5|  5.5|  9.1|  90|1026.2|    1|  19|    1|  270| 50|  5|0.0|20000|  15|    7|Dublin Airport|
|01-ja

In [50]:
weather_df.columns

['date',
 'ind',
 'rain',
 'ind.1',
 'temp',
 'ind.2',
 'wetb',
 'dewpt',
 'vappr',
 'rhum',
 'msl',
 'ind.3',
 'wdsp',
 'ind.4',
 'wddir',
 'ww',
 'w',
 'sun',
 'vis',
 'clht',
 'clamt',
 'station_name']

In [51]:
weather_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- ind: string (nullable = true)
 |-- rain: string (nullable = true)
 |-- ind.1: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- ind.2: string (nullable = true)
 |-- wetb: string (nullable = true)
 |-- dewpt: string (nullable = true)
 |-- vappr: string (nullable = true)
 |-- rhum: string (nullable = true)
 |-- msl: string (nullable = true)
 |-- ind.3: string (nullable = true)
 |-- wdsp: string (nullable = true)
 |-- ind.4: string (nullable = true)
 |-- wddir: string (nullable = true)
 |-- ww: string (nullable = true)
 |-- w: string (nullable = true)
 |-- sun: string (nullable = true)
 |-- vis: string (nullable = true)
 |-- clht: string (nullable = true)
 |-- clamt: string (nullable = true)
 |-- station_name: string (nullable = true)



In [52]:
#converting the date column from a string to a timestamp
weather_df = weather_df.withColumn('date', F.to_timestamp('date',  "dd-MMM-yyyy HH:mm"))

In [53]:
#looking at the schema the below columns need to be recast to a float as they are being defaulted to a string
columns_to_float = ['rain','temp','wetb','dewpt','vappr','msl','sun']

for col_name in columns_to_float:
    weather_df = weather_df.withColumn(col_name, col(col_name).cast('float'))

In [54]:
#similarly the below columns need to be recast to integers from string
columns_to_integer = ['rhum','wdsp','wddir','ww','w','vis','clht','clamt']

for col_name in columns_to_integer:
    weather_df = weather_df.withColumn(col_name, col(col_name).cast('integer'))

In [14]:
weather_df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- ind: string (nullable = true)
 |-- rain: float (nullable = true)
 |-- ind.1: string (nullable = true)
 |-- temp: float (nullable = true)
 |-- ind.2: string (nullable = true)
 |-- wetb: float (nullable = true)
 |-- dewpt: float (nullable = true)
 |-- vappr: float (nullable = true)
 |-- rhum: integer (nullable = true)
 |-- msl: float (nullable = true)
 |-- ind.3: string (nullable = true)
 |-- wdsp: integer (nullable = true)
 |-- ind.4: string (nullable = true)
 |-- wddir: integer (nullable = true)
 |-- ww: integer (nullable = true)
 |-- w: integer (nullable = true)
 |-- sun: float (nullable = true)
 |-- vis: integer (nullable = true)
 |-- clht: integer (nullable = true)
 |-- clamt: integer (nullable = true)
 |-- station_name: string (nullable = true)



In [15]:
#dropping indicator columns which will not be required for modelling therefore are removed from the dataframe
columns_to_drop = ['ind','ind.1','ind.2','ind.3','ind.4']
weather_df = weather_df.drop(*columns_to_drop)

In [16]:
weather_df.show(5)

+-------------------+----+----+----+-----+-----+----+------+----+-----+---+---+---+-----+----+-----+--------------+
|               date|rain|temp|wetb|dewpt|vappr|rhum|   msl|wdsp|wddir| ww|  w|sun|  vis|clht|clamt|  station_name|
+-------------------+----+----+----+-----+-----+----+------+----+-----+---+---+---+-----+----+-----+--------------+
|1944-01-01 00:00:00| 0.0| 7.2| 6.7|  6.1|  9.4|  93|1027.9|  19|  290| 50|  5|0.0|20000|  12|    8|Dublin Airport|
|1944-01-01 01:00:00| 0.0| 7.2| 6.6|  6.1|  9.3|  91|1027.6|  19|  280|  2|  2|0.0|30000|  15|    7|Dublin Airport|
|1944-01-01 02:00:00| 0.0| 7.2| 6.6|  6.1|  9.3|  92|1027.0|  19|  260| 50|  5|0.0|30000|  15|    8|Dublin Airport|
|1944-01-01 03:00:00| 0.0| 7.2| 6.5|  5.5|  9.1|  90|1026.2|  19|  270| 50|  5|0.0|20000|  15|    7|Dublin Airport|
|1944-01-01 04:00:00| 0.0| 7.2| 6.5|  5.5|  9.1|  89|1025.5|  19|  270|  2|  2|0.0|30000|  18|    7|Dublin Airport|
+-------------------+----+----+----+-----+-----+----+------+----+-----+-

In [17]:
#dropping "ww" and "w" columns as these represent synop code for present and past weather respectively.
#These columns will not add any value to the modelling part of this project so are not required.
columns_to_drop = ['ww','w']
weather_df = weather_df.drop(*columns_to_drop)

In [18]:
#renaming some columns for improved clarity
weather_df = weather_df.withColumnRenamed("date", "datetime") \
                       .withColumnRenamed("wetb", "wet_bulb") \
                       .withColumnRenamed("dewpt", "dew_point") \
                       .withColumnRenamed("vappr", "vap_pressure") \
                       .withColumnRenamed("rhum", "humidity") \
                       .withColumnRenamed("msl", "sea_pressure") \
                       .withColumnRenamed("wdsp", "wind_speed") \
                       .withColumnRenamed("wddir", "wind_dir") \
                       .withColumnRenamed("clht", "cloud_height") \
                       .withColumnRenamed("clamt", "clout_amount")
weather_df.show(2)

+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+-----+------------+------------+--------------+
|           datetime|rain|temp|wet_bulb|dew_point|vap_pressure|humidity|sea_pressure|wind_speed|wind_dir|sun|  vis|cloud_height|clout_amount|  station_name|
+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+-----+------------+------------+--------------+
|1944-01-01 00:00:00| 0.0| 7.2|     6.7|      6.1|         9.4|      93|      1027.9|        19|     290|0.0|20000|          12|           8|Dublin Airport|
|1944-01-01 01:00:00| 0.0| 7.2|     6.6|      6.1|         9.3|      91|      1027.6|        19|     280|0.0|30000|          15|           7|Dublin Airport|
+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+-----+------------+------------+--------------+
only showing top 2 rows



In [19]:
cols_to_describe = weather_df.drop('date','station_name').columns
desc = weather_df.describe(cols_to_describe)
desc.show()

24/04/02 16:22:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 9:>                                                          (0 + 1) / 1]

+-------+-------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+
|summary|               rain|              temp|         wet_bulb|        dew_point|      vap_pressure|          humidity|      sea_pressure|       wind_speed|         wind_dir|                sun|               vis|      cloud_height|      clout_amount|
+-------+-------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+
|  count|            2175331|           2177539|          2177441|          2177539|           2177536|           2177536|           2177521|          2177477|          2177474|            2175380|           2177539|           2177539|

                                                                                

In [20]:
#displaying available weather station names in the dataset
weather_df.select("station_name").distinct().show() 



+---------------+
|   station_name|
+---------------+
| Dublin Airport|
|Shannon Airport|
|  Knock Airport|
|   Cork Airport|
+---------------+



                                                                                

In [21]:
#checking the count of available rows for each weather station
weather_df.groupBy('station_name').count().show()

[Stage 13:>                                                         (0 + 2) / 2]

+---------------+------+
|   station_name| count|
+---------------+------+
| Dublin Airport|702721|
|Shannon Airport|688104|
|  Knock Airport|241794|
|   Cork Airport|544920|
+---------------+------+



                                                                                

In [22]:
# As part of the modelling phase, only one weather station will be utilsed. 
# Dublin Airport is chosen for this and the dataframe is filtered to only contain weather data related to this station:
weather_df = weather_df.filter(weather_df.station_name=='Dublin Airport')
weather_df.select("station_name").distinct().show() 



+--------------+
|  station_name|
+--------------+
|Dublin Airport|
+--------------+



                                                                                

In [23]:
#station_name is no longer required and can be dropped from the dataframe
weather_df = weather_df.drop('station_name')

In [24]:
weather_df.show()

+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+-----+------------+------------+
|           datetime|rain|temp|wet_bulb|dew_point|vap_pressure|humidity|sea_pressure|wind_speed|wind_dir|sun|  vis|cloud_height|clout_amount|
+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+-----+------------+------------+
|1944-01-01 00:00:00| 0.0| 7.2|     6.7|      6.1|         9.4|      93|      1027.9|        19|     290|0.0|20000|          12|           8|
|1944-01-01 01:00:00| 0.0| 7.2|     6.6|      6.1|         9.3|      91|      1027.6|        19|     280|0.0|30000|          15|           7|
|1944-01-01 02:00:00| 0.0| 7.2|     6.6|      6.1|         9.3|      92|      1027.0|        19|     260|0.0|30000|          15|           8|
|1944-01-01 03:00:00| 0.0| 7.2|     6.5|      5.5|         9.1|      90|      1026.2|        19|     270|0.0|20000|          15|           7|
|1944-

### Checking for missing values in the dataframe

In [25]:
weather_df.select([count(when(col(c).isNull(), c)).alias(c) for c in weather_df.columns]).show()



+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+
|datetime|rain|temp|wet_bulb|dew_point|vap_pressure|humidity|sea_pressure|wind_speed|wind_dir|sun|vis|cloud_height|clout_amount|
+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+
|       0|   0|   0|       0|        0|           1|       1|           0|         0|       1|  0|  0|           0|           0|
+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+



                                                                                

In [26]:
#taking a look at the null values
weather_df.filter(weather_df.vap_pressure.isNull()).show()

[Stage 24:>                                                         (0 + 1) / 1]

+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+----+------------+------------+
|           datetime|rain|temp|wet_bulb|dew_point|vap_pressure|humidity|sea_pressure|wind_speed|wind_dir|sun| vis|cloud_height|clout_amount|
+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+----+------------+------------+
|1998-09-26 04:00:00| 0.2|13.7|    13.7|     13.7|        null|    null|      1003.8|        10|      50|0.0|1300|           3|           8|
+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+----+------------+------------+



                                                                                

In [27]:
weather_df.filter(weather_df.wind_dir.isNull()).show()

[Stage 26:>                                                         (0 + 1) / 1]

+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+-----+------------+------------+
|           datetime|rain|temp|wet_bulb|dew_point|vap_pressure|humidity|sea_pressure|wind_speed|wind_dir|sun|  vis|cloud_height|clout_amount|
+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+-----+------------+------------+
|2007-07-16 18:00:00| 0.0|17.8|    14.5|     11.9|        13.9|      68|      1005.2|        10|    null|0.5|30000|          90|           5|
+-------------------+----+----+--------+---------+------------+--------+------------+----------+--------+---+-----+------------+------------+



                                                                                

**Forward filling** is used as the method of dealing with these missing values. This method works by replacing the missing values with the most recent available observations and is particularly useful when dealing with time series data. Since there is only 1 missing value for each affected column forward fillining was deemed a suitable approach

To implement forward filling in pyspark, a *Window* function is used. A window function in pySpark allows one to perform calculations (such as fetching the last value) across a set of rows that are related to the current row. <br>

This allows me to use the *Last* function from pyspark.sql functions across each date partition defined below and ordered by datetime. Using *ignorenulls=True* means that only null values will be updated with the last available value for that column, therefore forward filling the missing values:

In [28]:
#creating temporary date column from datetime for partitioning purposes
weather_df = weather_df.withColumn('date', to_date('datetime'))

In [29]:
#defining the window order by datetime and partitioned by date
#setting rowsBetween -sys.maxsize and 0, includes all rows from the beginning of the partition up until the current row
window = Window.partitionBy('date').orderBy('datetime').rowsBetween(-sys.maxsize,0)

In [30]:
#applying the window function to the affected columns and utilising the last function to fill the missing values
weather_df = weather_df.withColumn('vap_pressure', F.last('vap_pressure',ignorenulls=True).over(window))
weather_df = weather_df.withColumn('humidity', F.last('humidity',ignorenulls=True).over(window))
weather_df = weather_df.withColumn('wind_dir', F.last('wind_dir',ignorenulls=True).over(window))

In [31]:
#checking number of null values aagin
weather_df.select([count(when(col(c).isNull(), c)).alias(c) for c in weather_df.columns]).show()

[Stage 29:>                                                         (0 + 2) / 2]

+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+
|datetime|rain|temp|wet_bulb|dew_point|vap_pressure|humidity|sea_pressure|wind_speed|wind_dir|sun|vis|cloud_height|clout_amount|date|
+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+
|       0|   0|   0|       0|        0|           0|       0|           0|         0|       0|  0|  0|           0|           0|   0|
+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+



                                                                                

In [32]:
weather_df.filter(weather_df.vis.isNull()).show()

                                                                                

+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+
|datetime|rain|temp|wet_bulb|dew_point|vap_pressure|humidity|sea_pressure|wind_speed|wind_dir|sun|vis|cloud_height|clout_amount|date|
+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+
+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+



In [33]:
#Checking for duplicates
weather_df.exceptAll(weather_df.dropDuplicates()).show()

24/04/02 16:23:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/04/02 16:23:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+
|datetime|rain|temp|wet_bulb|dew_point|vap_pressure|humidity|sea_pressure|wind_speed|wind_dir|sun|vis|cloud_height|clout_amount|date|
+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+
+--------+----+----+--------+---------+------------+--------+------------+----------+--------+---+---+------------+------------+----+



[Stage 46:>                                                         (0 + 1) / 1]                                                                                

In [34]:
weather_df.count()

                                                                                

702721

For the modelling stage of this project only the last 5 years worth of weather data will be used to ensure the processing time of the neural networks isnt too large. Since the time intervals are hourly, 5 years worth of data will provide me with sufficient amount of data for neural networks.

In [35]:
weather_df = weather_df.filter(col('date')>'2019')

In [36]:
weather_df.count()

                                                                                

45241

In [37]:
#exporting csv for modelling stage
weather_df.write.csv('/user/weatherProcessed.csv',header=True,sep=',',mode='overwrite')

                                                                                

In [None]:
weather_df.printSchema()