In [2]:
import findspark
findspark.init()

In [111]:
import pyspark
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
from pyspark.sql import Window 

In [4]:
# Spark Session for notebook
spark = SparkSession.builder.getOrCreate()

In [27]:
data_path = 'paytmteam-de-weather-challenge/'

### Step 1

#### Load weather data

In [45]:
data=spark.read.csv(data_path+'data/2019', header=True)

In [46]:
data.show(5)

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
|010260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7| 29.8|21.7*|0.02G| 18.5|001000|
|010260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1*| 20.7|0.48G| 22.8|001000|
|010260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4*|26.8*|0.25G|999.9|011000|
|010260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9| 36.1| 31.8|0.52G|999.9|001000|
|010260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5*| 32.7|0.02G| 23.6|010000|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
only showing top 5 rows



In [47]:
data.count()

4158416

#### Get full station-country info

In [35]:
# Join Station and Country lists
station = spark.read.csv(data_path+'stationlist.csv', header=True)
country = spark.read.csv(data_path+'countrylist.csv', header=True)

In [34]:
station.show(5)

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
+------+------------+
only showing top 5 rows



In [36]:
country.show(5)

+------------+-------------------+
|COUNTRY_ABBR|       COUNTRY_FULL|
+------------+-------------------+
|          AA|              ARUBA|
|          AC|ANTIGUA AND BARBUDA|
|          AF|        AFGHANISTAN|
|          AG|            ALGERIA|
|          AI|   ASCENSION ISLAND|
+------------+-------------------+
only showing top 5 rows



In [37]:
station.count(), country.count()

(25306, 288)

In [55]:
stn_cntry = station.join(country, on='COUNTRY_ABBR', how='left_outer')

Note an inner join gives 25,209 rows here. There are some country abbreviations in the station dataset not covered in the country set.

In [56]:
stn_cntry.count()

25306

#### Join global data with station information

In [49]:
# Rename the station number from the global data
data = data.withColumnRenamed('STN---', 'STN_NO')

In [138]:
df = data.join(stn_cntry, on='STN_NO', how='left_outer')

In [139]:
df.count()

4161334

### Step 2

#### Data Cleanup

In [140]:
# It looks like there are 366 days for this year. Not a leap year though
df.select('YEARMODA').distinct().count()

366

In [141]:
# Contains the first day of 2020 in the dataset. I want to remove that.
df.agg(F.max('YEARMODA'), F.min('YEARMODA')).show()

+-------------+-------------+
|max(YEARMODA)|min(YEARMODA)|
+-------------+-------------+
|     20200101|     20190101|
+-------------+-------------+



In [142]:
df = df.filter((df.YEARMODA >= 20190101) & (df.YEARMODA <= 20191231))

In [146]:
# Also the miss entry for station number is has many measurements. Since I don't know what country these belong
# to I will remove them completely
day_count = df.groupBy('STN_NO').agg(F.count(F.lit(1)).alias('num_days'))

In [147]:
day_count.orderBy('num_days', ascending=False).show(10)

+------+--------+
|STN_NO|num_days|
+------+--------+
|999999|   75225|
|785510|     730|
|788660|     730|
|917920|     730|
|789900|     730|
|785140|     730|
|789880|     730|
|788730|     730|
|785265|     716|
|153000|     365|
+------+--------+
only showing top 10 rows



In [148]:
# Move this to data cleaning
df = df.filter(df.STN_NO != 999999)

In [149]:
#Note there are a lot of stations with few measurements.
day_count.orderBy('num_days').show(10)

+------+--------+
|STN_NO|num_days|
+------+--------+
|871271|       1|
|649300|       1|
|406220|       1|
|111370|       1|
|227270|       1|
|112340|       1|
|854520|       1|
|618020|       1|
|766810|       1|
|627510|       1|
+------+--------+
only showing top 10 rows



#### 1. Country with the hottest average mean temperature over the year

Want the average of the mean temperatures. That's the 'TEMP' column.

In [150]:
temp = df.select('STN_NO', 'YEARMODA', 'TEMP', 'COUNTRY_ABBR', 'COUNTRY_FULL').distinct()
temp.count()

4083991

In [151]:
temp_agg = temp.groupBy('STN_NO').agg(F.count(F.lit(1)).alias('num_days')).orderBy('num_days', ascending=False)
temp_agg.show(10)

+------+--------+
|STN_NO|num_days|
+------+--------+
|917920|     730|
|785510|     730|
|789900|     730|
|785140|     730|
|789880|     730|
|788660|     730|
|788730|     730|
|785265|     716|
|722678|     365|
|153000|     365|
+------+--------+
only showing top 10 rows



In [152]:
temp_agg.count(), temp_agg.filter(temp_agg.num_days==365).count()

(12143, 6859)

In [153]:
temp.filter((temp.STN_NO == 788660) & (temp.YEARMODA==20190101)).show()

+------+--------+----+------------+--------------------+
|STN_NO|YEARMODA|TEMP|COUNTRY_ABBR|        COUNTRY_FULL|
+------+--------+----+------------+--------------------+
|788660|20190101|78.9|          NN|                null|
|788660|20190101|78.9|          NT|NETHERLANDS ANTILLES|
+------+--------+----+------------+--------------------+



There are still a few minor problems/duplicate station entries. I'm going to clean up the data by making some assumptions.
1. Remove the country name Null
2. Remove the no-record value for the TEMP column, which is 9999.9

In [122]:
temp = temp.filter((temp.STN_NO != 999999) & (temp.COUNTRY_FULL.isNotNull()) & (temp.TEMP != 9999.9))

In [154]:
temp.count()

4083991

##### Calculation for average

In [155]:
# Calculation
avg_temp = temp.groupBy('COUNTRY_ABBR', 'COUNTRY_FULL').agg(F.avg('TEMP').alias('temp'), F.count(F.lit(1)).alias('rec_cnt'))

In [156]:
avg_temp.orderBy('temp', ascending=False).show()

+------------+--------------------+-----------------+-------+
|COUNTRY_ABBR|        COUNTRY_FULL|             temp|rec_cnt|
+------------+--------------------+-----------------+-------+
|          DJ|            DJIBOUTI|90.06114457831325|    332|
|          OD|                null|88.38571428571429|     21|
|          CD|                CHAD|87.36099706744866|   2728|
|          NG|               NIGER|85.06022291247945|   5473|
|          SU|               SUDAN| 84.4549418604651|   1032|
|          ES|         EL SALVADOR|84.44045944678854|   2133|
|          TV|              TUVALU|84.32217090069284|    433|
|          JU| JUAN DE NOVA ISLAND|84.15945945945947|     37|
|          TL|             TOKELAU|83.96142857142854|     70|
|          IO|BRITISH INDIAN OC...|83.91150684931505|    365|
|          CJ|      CAYMAN ISLANDS|83.77780821917811|    730|
|          UV|        BURKINA FASO|83.74875078076202|   3202|
|          MV|            MALDIVES|83.67879452054797|   1825|
|       

Based on this data I'm seeing that the country with the hottest average temperature over 2019 was Djibouti, with a temp of 90.1F. It should be noted that it only has 332 records, which is not even one measurement each day from a single weather station.

It would need to be considered what portion of the year was covered and by how many stations to decide if this was an accurate picture.

### 2. Which country has the most consecutive days of tornados/funnel cloud formation

Looking at the 6th digit in the FRSHTT column. I'm going to assume that if a country has tornado formation at any of its weather station it counts for the country as a whole.

In [158]:
# The last digit is 1 when there's a tornado. Use a modulo 2 to extract this digit into a new column for a tornado flag
df = df.withColumn('tornado', F.col('FRSHTT') % 2)

In [159]:
df.filter(df.tornado==0).select('FRSHTT', 'tornado').show(5)

+------+-------+
|FRSHTT|tornado|
+------+-------+
|011010|    0.0|
|010000|    0.0|
|000000|    0.0|
|010000|    0.0|
|010000|    0.0|
+------+-------+
only showing top 5 rows



In [160]:
df.filter(df.tornado==1).select('FRSHTT', 'tornado').show(5)

+------+-------+
|FRSHTT|tornado|
+------+-------+
|010011|    1.0|
|000011|    1.0|
|010011|    1.0|
|010001|    1.0|
|010001|    1.0|
+------+-------+
only showing top 5 rows



In [161]:
# Group by country code and day. Give the country a 1 for tornado formation if there was a tornado detected at
# one of the weather stations
tornado = df.groupBy('YEARMODA', 'COUNTRY_ABBR').agg(F.max('tornado').alias('ntnl_tornado'))

In [162]:
tornado.show(5)

+--------+------------+------------+
|YEARMODA|COUNTRY_ABBR|ntnl_tornado|
+--------+------------+------------+
|20190818|          NO|         0.0|
|20190820|          NO|         0.0|
|20190204|          FI|         0.0|
|20190123|          UK|         0.0|
|20190325|          SZ|         0.0|
+--------+------------+------------+
only showing top 5 rows



Now I want to create a window function to find the streaks of tornado days.

I'm running out of time here - but my plan was to partition by country and order by date. Then start counting a streak. If the current day did not have a tornado then reset the streak to 0. If the current day did have a tornado, continue the streak. If there was a lost measurement (i.e. gap between days was > 1) then make an assumption that the streak was reset, and take the current days value.

In [164]:
# Only need to keep the days where there was a tornado
tornado = tornado.filter(tornado.ntnl_tornado == 1)

In [165]:
tornado.show()

+--------+------------+------------+
|YEARMODA|COUNTRY_ABBR|ntnl_tornado|
+--------+------------+------------+
|20190124|          IT|         1.0|
|20190607|          IN|         1.0|
|20190124|          JA|         1.0|
|20190914|          BD|         1.0|
|20190113|          US|         1.0|
|20190816|          IN|         1.0|
|20191031|          SC|         1.0|
|20190115|          SC|         1.0|
|20190806|          BF|         1.0|
|20190611|          NO|         1.0|
|20190514|          MT|         1.0|
|20190418|          TU|         1.0|
|20191108|          CJ|         1.0|
|20191107|          IT|         1.0|
|20191111|          US|         1.0|
|20190405|          AO|         1.0|
|20191204|          JA|         1.0|
|20191029|          PE|         1.0|
|20190810|          JA|         1.0|
|20191109|          IT|         1.0|
+--------+------------+------------+
only showing top 20 rows



In [166]:
# Calculate the number of days between records - needs to be consecutive.
day_window = Window.partitionBy('COUNTRY_ABBR').orderBy('YEARMODA')

In [167]:
tornado = tornado.withColumn('day_gap', F.col('YEARMODA') - F.lag(F.col('YEARMODA')).over(day_window))

In [169]:
tornado.orderBy('COUNTRY_ABBR', 'YEARMODA').show(10)

+--------+------------+------------+-------+
|YEARMODA|COUNTRY_ABBR|ntnl_tornado|day_gap|
+--------+------------+------------+-------+
|20190923|          AA|         1.0|   null|
|20190704|          AG|         1.0|   null|
|20190206|          AO|         1.0|   null|
|20190405|          AO|         1.0|  199.0|
|20190510|          AR|         1.0|   null|
|20190120|          AU|         1.0|   null|
|20190616|          AU|         1.0|  496.0|
|20190619|          AU|         1.0|    3.0|
|20191014|          AU|         1.0|  395.0|
|20191019|          AU|         1.0|    5.0|
+--------+------------+------------+-------+
only showing top 10 rows



My next step here would be to find the groups of continues days, then calculate which group had the most rows

### 3. Second highest average mean wind speed

I will be looking at the WDSP column to calculate averages, and remove the no-record value of 999.9. It's important here because there are many no records and it's heavily skewing the calculation.

In [126]:
df.count()

4083991

In [127]:
# Remove the no record value for windspeed
wind = df.filter(df.WDSP != 999.9)
wind.count()

3958697

In [120]:
avg_wind = wind.groupBy('COUNTRY_ABBR', 'COUNTRY_FULL').agg(F.avg('WDSP').alias('wdsp'), F.count(F.lit(1)).alias('rec_cnt'))

In [121]:
avg_wind.orderBy('WDSP', ascending=False).show()

+------------+--------------------+------------------+-------+
|COUNTRY_ABBR|        COUNTRY_FULL|              wdsp|rec_cnt|
+------------+--------------------+------------------+-------+
|          FK|FALKLAND ISLANDS ...| 17.87783300198807|   2012|
|          AA|               ARUBA|15.981917808219173|    365|
|          FO|       FAROE ISLANDS| 15.28067010309278|    776|
|          FS|FRENCH SOUTHERN A...|14.203721841332031|   1021|
|          BB|            BARBADOS|14.101643835616441|    365|
|          SB|ST. PIERRE AND MI...| 13.90767123287671|    365|
|          CV|          CAPE VERDE| 13.61522184300341|   1465|
|          TE|     TROMELIN ISLAND|13.005277777777751|    360|
|          SH|          ST. HELENA|12.730518394648827|   1196|
|          MR|          MAURITANIA|12.723464912280699|    456|
|          AY|          ANTARCTICA| 12.27517270608179|  23595|
|          SO|             SOMALIA|12.274880611270296|   1047|
|          CK|COCOS (KEELING) I...|12.054545454545453| 

I'm seeing that ARUBA had the second highest average wind speed over the year at 16.0 Knots with 365 measurements.