In [23]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [5]:
# Countries
countries = spark.read.option('header', True).option('inferSchema', True).csv('countrylist.csv')
countries.limit(5).show()
countries.printSchema()
countries_count = countries.count()
print(f'Countries count: {countries_count}')

+------------+-------------------+
|COUNTRY_ABBR|       COUNTRY_FULL|
+------------+-------------------+
|          AA|              ARUBA|
|          AC|ANTIGUA AND BARBUDA|
|          AF|        AFGHANISTAN|
|          AG|            ALGERIA|
|          AI|   ASCENSION ISLAND|
+------------+-------------------+

root
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)

Countries count: 288


In [9]:
# Station list
stations = spark.read.option('header', True).option('inferSchema', True).csv('stationlist.csv')
stations.printSchema()
stations.limit(5).show()
stations_count = stations.count()
print(f'Stations count: {stations_count}')

root
 |-- STN_NO: string (nullable = true)
 |-- COUNTRY_ABBR: string (nullable = true)

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
+------+------------+

Stations count: 25306


In [31]:
# Weather
schema = StructType() \
.add('STN---', T.IntegerType(),True) \
.add('WBAN', T.IntegerType(),True) \
.add('YEARMODA', T.IntegerType(),True) \
.add('TEMP', T.DoubleType(),True) \
.add('DEWP', T.DoubleType(),True) \
.add('SLP', T.DoubleType(),True) \
.add('STP', T.DoubleType(),True) \
.add('VISIB', T.DoubleType(),True) \
.add('WDSP', T.DoubleType(),True) \
.add('MXSPD', T.DoubleType(),True) \
.add('GUST', T.DoubleType(),True) \
.add('MAX', T.StringType(),True) \
.add('MIN', T.StringType(),True) \
.add('PRCP', T.DoubleType(),True) \
.add('SNDP', T.DoubleType(),True) \
.add('FRSHTT', T.StringType(),True)

In [32]:
# Weather
weather = spark.read.option('header', True).option('inferSchema', False).schema(schema).csv('data/2019')
weather.printSchema()

root
 |-- STN---: integer (nullable = true)
 |-- WBAN: integer (nullable = true)
 |-- YEARMODA: integer (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: double (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: string (nullable = true)



In [33]:
weather.limit(5).show()

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN|PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+----+-----+------+
| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7| 29.8|21.7*|null| 18.5|001000|
| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1*| 20.7|null| 22.8|001000|
| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4*|26.8*|null|999.9|011000|
| 10260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9| 36.1| 31.8|null|999.9|001000|
| 10260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5*| 32.7|null| 23.6|010000|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+----+-----+------+



In [36]:
# Country names by station
full_country_names_by_station = stations.join(F.broadcast(countries), 'COUNTRY_ABBR', 'left')

Check for missing country full names

In [37]:
full_country_names_by_station.where('COUNTRY_FULL IS NULL').groupBy('COUNTRY_ABBR').count().limit(100).show()

+------------+-----+
|COUNTRY_ABBR|count|
+------------+-----+
|          UC|    1|
|          RI|   37|
|          OD|   14|
|          AE|   22|
|          KV|    6|
|          MJ|   12|
|          NN|    1|
|          TT|    4|
+------------+-----+



According to: https://en.wikipedia.org/wiki/List_of_FIPS_country_codes the missing codes are:
* UC Curaçao
* RI Serbia
* OD South Sudan
* AE United Arab Emirates
* KV Kosovo
* MJ Montenegro
* NN Sint Maarten
* TT Timor-Leste

In [38]:
missing_countries = spark.createDataFrame([
    ('UC', 'CURAÇAO'), 
    ('RI', 'SERBIA'),
    ('OD', 'SOUTH SUDAN'),
    ('AE', 'UNITED ARAB EMIRATES'),
    ('KV', 'KOSOVO'),
    ('MJ', 'MONTENEGRO'),
    ('NN', 'SINT MAARTEN'),
    ('TT', 'TIMOR-LESTE')], ("COUNTRY_ABBR", "COUNTRY_FULL"))

completed_countries = countries.union(missing_countries)
full_country_names_by_station = stations.join(F.broadcast(completed_countries), 'COUNTRY_ABBR', 'left')

Check for missing country full names again

In [39]:
full_country_names_by_station.where('COUNTRY_FULL IS NULL').groupBy('COUNTRY_ABBR').count().limit(100).show()

+------------+-----+
|COUNTRY_ABBR|count|
+------------+-----+
+------------+-----+



In [41]:
# Weather data by station
weather_data_by_station = weather.withColumnRenamed('STN---', 'STN_NO') \
.join(F.broadcast(full_country_names_by_station), 'STN_NO', 'left')

weather_data_by_station.limit(5).show()

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+----+-----+------+------------+------------+
|STN_NO| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN|PRCP| SNDP|FRSHTT|COUNTRY_ABBR|COUNTRY_FULL|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+----+-----+------+------------+------------+
| 10260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7| 29.8|21.7*|null| 18.5|001000|          NO|      NORWAY|
| 10260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1*| 20.7|null| 22.8|001000|          NO|      NORWAY|
| 10260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4*|26.8*|null|999.9|011000|          NO|      NORWAY|
| 10260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9| 36.1| 31.8|null|999.9|001000|          NO|      NORWAY|
| 10260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5*| 32.7|null| 23.6|010000|          NO|      

In [42]:
weather_data_by_station.count()

4161334

Check if there are stations with no corresponding row in the stations dataframe

In [43]:
weather_data_by_station.where('COUNTRY_FULL IS NULL').show()

+------+----+--------+----+----+---+---+-----+----+-----+----+---+---+----+----+------+------------+------------+
|STN_NO|WBAN|YEARMODA|TEMP|DEWP|SLP|STP|VISIB|WDSP|MXSPD|GUST|MAX|MIN|PRCP|SNDP|FRSHTT|COUNTRY_ABBR|COUNTRY_FULL|
+------+----+--------+----+----+---+---+-----+----+-----+----+---+---+----+----+------+------------+------------+
+------+----+--------+----+----+---+---+-----+----+-----+----+---+---+----+----+------+------------+------------+



In [47]:
# Question 1: Which country had the hottest average mean temperature over the year?

weather_data_by_station.select('COUNTRY_FULL', 'TEMP') \
.filter('TEMP IS NOT NULL AND TEMP < 9999.9') \
.groupBy('COUNTRY_FULL').agg(F.avg(F.col('TEMP')).alias('AVG_MEAN_TEMP_BY_COUNTRY')) \
.sort(F.col('AVG_MEAN_TEMP_BY_COUNTRY').desc()) \
.limit(5).show()


+------------+------------------------+
|COUNTRY_FULL|AVG_MEAN_TEMP_BY_COUNTRY|
+------------+------------------------+
|    DJIBOUTI|       90.06114457831325|
| SOUTH SUDAN|       88.38571428571429|
|        CHAD|       87.36099706744868|
|       NIGER|       85.06022291247946|
|       SUDAN|       84.45494186046503|
+------------+------------------------+



Answer: DJIBOUTI

In [83]:
# Question 2: Which country had the most consecutive days of tornadoes/funnel cloud formations?

t_fc_col = F.split(F.col('FRSHTT'), '').getItem(5).cast('int').alias('T_FC')
t_fc_by_station = weather_data_by_station.select('COUNTRY_FULL', 'YEARMODA', t_fc_col)
t_fc_by_station.limit(5).show()

+------------+--------+----+
|COUNTRY_FULL|YEARMODA|T_FC|
+------------+--------+----+
|      NORWAY|20190101|   0|
|      NORWAY|20190102|   0|
|      NORWAY|20190103|   0|
|      NORWAY|20190104|   0|
|      NORWAY|20190105|   0|
+------------+--------+----+



In [84]:
def max_t_fc_consecutive_days(t_fc_consecutive_days):
    max_consecutive_days = 0
    count = 0
    # all consecutive days
    for t_fc in t_fc_consecutive_days:
        # a day without t or fc restarts the count
        if t_fc == 0: 
            if count > max_consecutive_days:
                max_consecutive_days = count
            count = 0
        # a consecutive day with t or fc
        else:
            count += 1
    return max_consecutive_days 

max_t_fc_consecutive_days_udf = F.udf(max_t_fc_consecutive_days, IntegerType())

In [85]:
t_fc_by_station.groupBy('COUNTRY_FULL', 'YEARMODA') \
.agg(F.max(F.col('T_FC')).alias('T_FC_REPORTED_BY_ANY_STATION_BY_DAY')) \
.orderBy(F.col('COUNTRY_FULL').asc(), F.col('YEARMODA').asc()) \
.groupBy('COUNTRY_FULL').agg(F.collect_list(F.col('T_FC_REPORTED_BY_ANY_STATION_BY_DAY')).alias('T_FC_CONSECUTIVE_DAYS_LIST')) \
.select('COUNTRY_FULL', max_t_fc_consecutive_days_udf(F.col('T_FC_CONSECUTIVE_DAYS_LIST')).alias('MAX_T_FC_CONSECUTIVE_DAYS')) \
.orderBy(F.col('MAX_T_FC_CONSECUTIVE_DAYS').desc()) \
.limit(10).show()

+--------------+-------------------------+
|  COUNTRY_FULL|MAX_T_FC_CONSECUTIVE_DAYS|
+--------------+-------------------------+
|         JAPAN|                        2|
|        CANADA|                        2|
|CAYMAN ISLANDS|                        2|
|         INDIA|                        2|
|         ITALY|                        2|
| UNITED STATES|                        2|
|         GHANA|                        2|
|      MALDIVES|                        1|
|UNITED KINGDOM|                        1|
|   BAHAMAS THE|                        1|
+--------------+-------------------------+



Answer: JAPAN, CANADA, CAYMAN ISLANDS, INDIA, ITALY, UNITED STATES, and GHANA.

In [90]:
# WIP to avoid UDFs

# Sorts the data by country and date and calculates if any station reported a tornado or funnel cloud during that day.
country_date_t_fc = t_fc_by_station.groupBy('COUNTRY_FULL', 'YEARMODA') \
.agg(F.max(F.col('T_FC')).alias('T_FC_REPORTED_BY_ANY_STATION_BY_DAY')) \
.orderBy(F.col('COUNTRY_FULL').asc(), F.col('YEARMODA').asc())

country_date_t_fc.limit(10).show()

+------------+--------+-----------------------------------+
|COUNTRY_FULL|YEARMODA|T_FC_REPORTED_BY_ANY_STATION_BY_DAY|
+------------+--------+-----------------------------------+
| AFGHANISTAN|20190101|                                  0|
| AFGHANISTAN|20190102|                                  0|
| AFGHANISTAN|20190103|                                  0|
| AFGHANISTAN|20190104|                                  0|
| AFGHANISTAN|20190105|                                  0|
| AFGHANISTAN|20190106|                                  0|
| AFGHANISTAN|20190107|                                  0|
| AFGHANISTAN|20190108|                                  0|
| AFGHANISTAN|20190109|                                  0|
| AFGHANISTAN|20190110|                                  0|
+------------+--------+-----------------------------------+



In [105]:
from pyspark.sql.window import Window
date_window = Window.partitionBy('COUNTRY_FULL', 'YEARMODA').orderBy('COUNTRY_FULL', 'YEARMODA')

country_date_t_fc.withColumn("PREV_DAY", F.lag(F.col('T_FC_REPORTED_BY_ANY_STATION_BY_DAY')).over(date_window))\
.orderBy(F.col('COUNTRY_FULL').asc(), F.col('YEARMODA').asc()) \
.where("COUNTRY_FULL = 'JAPAN'") \
.limit(10).show()

+------------+--------+-----------------------------------+--------+
|COUNTRY_FULL|YEARMODA|T_FC_REPORTED_BY_ANY_STATION_BY_DAY|PREV_DAY|
+------------+--------+-----------------------------------+--------+
|       JAPAN|20190101|                                  0|    null|
|       JAPAN|20190102|                                  0|    null|
|       JAPAN|20190103|                                  0|    null|
|       JAPAN|20190104|                                  0|    null|
|       JAPAN|20190105|                                  0|    null|
|       JAPAN|20190106|                                  0|    null|
|       JAPAN|20190107|                                  0|    null|
|       JAPAN|20190108|                                  0|    null|
|       JAPAN|20190109|                                  0|    null|
|       JAPAN|20190110|                                  0|    null|
+------------+--------+-----------------------------------+--------+



// TODO Complete the Window to avoid using UDFs.

In [52]:
# Question 3: Which country had the second highest average mean wind speed over the year?

weather_data_by_station.select('COUNTRY_FULL', 'WDSP') \
.filter('WDSP IS NOT NULL AND WDSP < 999.9') \
.groupBy('COUNTRY_FULL').agg(F.avg(F.col('WDSP')).alias('AVG_WDSP_BY_COUNTRY')) \
.sort(F.col('AVG_WDSP_BY_COUNTRY').desc()).limit(2) \
.sort(F.col('AVG_WDSP_BY_COUNTRY').asc()).limit(1) \
.show()

+------------+-------------------+
|COUNTRY_FULL|AVG_WDSP_BY_COUNTRY|
+------------+-------------------+
|       ARUBA| 15.975683060109283|
+------------+-------------------+



Answer: Aruba