In [1]:
spark

## Countries

In [19]:
countries_df = spark.read.option('header', True).option('inferSchema', True).csv('countrylist.csv')
countries_df.printSchema()

root
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)



In [20]:
countries_df.limit(5).toPandas()

Unnamed: 0,COUNTRY_ABBR,COUNTRY_FULL
0,AA,ARUBA
1,AC,ANTIGUA AND BARBUDA
2,AF,AFGHANISTAN
3,AG,ALGERIA
4,AI,ASCENSION ISLAND


In [82]:
countries_df.count()

288

## Station list

In [22]:
stations_df = spark.read.option('header', True).option('inferSchema', True).csv('stationlist.csv')
stations_df.printSchema()

root
 |-- STN_NO: string (nullable = true)
 |-- COUNTRY_ABBR: string (nullable = true)



In [23]:
stations_df.limit(5).toPandas()

Unnamed: 0,STN_NO,COUNTRY_ABBR
0,12240,NO
1,20690,SW
2,20870,SW
3,21190,SW
4,32690,UK


In [81]:
stations_df.count()

25306

## Weather data

In [33]:
data_path = 'data/2019'
weather_df = spark.read.option('header', True).option('inferSchema', True).csv(data_path)
weather_df.printSchema()

root
 |-- STN---: integer (nullable = true)
 |-- WBAN: integer (nullable = true)
 |-- YEARMODA: integer (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: integer (nullable = true)



In [34]:
weather_df.limit(5).toPandas()

Unnamed: 0,STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT
0,10260,99999,20190101,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000
1,10260,99999,20190102,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000
2,10260,99999,20190103,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000
3,10260,99999,20190104,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000
4,10260,99999,20190105,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000


## Dataset cleanup

In [109]:
from pyspark.sql.functions import when, col, lit 

# weather_cleaned_df = weather_df \
# .filter( \
#     (weather_df['TEMP'] == 9999.9)
# )
weather_cleaned_df = weather_df \
.select(weather_df['STN---'].alias('STN_NO'), 'WBAN', 'YEARMODA', 'FRSHTT', \
    'TEMP', when(weather_df['TEMP'].isNull(), lit(9999.9)).otherwise(weather_df['TEMP']).alias('TEMP_AVG'), \
'DEWP', when(weather_df['DEWP'].isNull(), lit(9999.9)).otherwise(weather_df['DEWP']).alias('DEWP_MEAN'), \
'SLP', when(weather_df['SLP'].isNull(), lit(9999.9)).otherwise(weather_df['SLP']).alias('SLP_MEAN'), \
'STP', when(weather_df['STP'].isNull(), lit(9999.9)).otherwise(weather_df['STP']).alias('STP_MEAN'), \
'VISIB', when(weather_df['VISIB'].isNull(), lit(999.9)).otherwise(weather_df['VISIB']).alias('VISIB_MEAN'), \
'WDSP', when(weather_df['WDSP'].isNull(), lit(999.9)).otherwise(weather_df['WDSP']).alias('WDSP_MEAN'), \
'MXSPD', when(weather_df['MXSPD'].isNull(), lit(999.9)).otherwise(weather_df['MXSPD']).alias('MXSPD_MAX'), \
'GUST', when(weather_df['GUST'].isNull(), lit(999.9)).otherwise(weather_df['GUST']).alias('GUST_MAX'), \
'MAX', when(weather_df['MAX'].isNull(), lit(9999.9)).otherwise(weather_df['MAX']).alias('MAX_TEMP'), \
'MIN', when(weather_df['MIN'].isNull(), lit(9999.9)).otherwise(weather_df['MIN']).alias('MIN_TEMP'), \
'PRCP', when(weather_df['PRCP'].isNull(), lit(99.9)).otherwise(weather_df['PRCP']).alias('PRCP_TOTAL'), \
'SNDP', when(weather_df['SNDP'].isNull(), lit(999.9)).otherwise(weather_df['SNDP']).alias('SNDP_DEPTH')) \
.drop('STN---') \
.drop('TEMP') \
.drop('DEWP') \
.drop('SLP') \
.drop('STP') \
.drop('VISIB') \
.drop('WDSP') \
.drop('MXSPD') \
.drop('GUST') \
.drop('MAX') \
.drop('MIN') \
.drop('PRCP') \
.drop('SNDP')

weather_cleaned_df.limit(5).toPandas()

Unnamed: 0,STN_NO,WBAN,YEARMODA,FRSHTT,TEMP_AVG,DEWP_MEAN,SLP_MEAN,STP_MEAN,VISIB_MEAN,WDSP_MEAN,MXSPD_MAX,GUST_MAX,MAX_TEMP,MIN_TEMP,PRCP_TOTAL,SNDP_DEPTH
0,10260,99999,20190101,1000,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5
1,10260,99999,20190102,1000,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8
2,10260,99999,20190103,11000,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9
3,10260,99999,20190104,1000,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9
4,10260,99999,20190105,10000,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6


## Country names by station

In [97]:
country_names_by_station_df = stations_df.join(countries_df, 'COUNTRY_ABBR', 'left')
country_names_by_station_df.limit(1).toPandas()

Unnamed: 0,COUNTRY_ABBR,STN_NO,COUNTRY_FULL
0,NO,12240,NORWAY


In [98]:
country_names_by_station_df.count()

25306

## weather data by station

In [99]:
from pyspark.sql.functions import broadcast

weather_data_by_station = weather_cleaned_df.join(broadcast(country_names_by_station_df), 'STN_NO', 'right')
weather_data_by_station.limit(5).toPandas()

Unnamed: 0,STN_NO,WBAN,YEARMODA,FRSHTT,TEMP_AVG,DEWP_MEAN,SLP_MEAN,STP_MEAN,VISIB_MEAN,WDSP_MEAN,MXSPD_MAX,GUST_MAX,MAX_TEMP,MIN_TEMP,PRCP_TOTAL,SNDP_DEPTH,COUNTRY_ABBR,COUNTRY_FULL
0,13840,99999,20190101,0,38.6,24.8,9999.9,9999.9,999.9,11.7,33.6,44.1,42.8*,33.1*,0.00G,999.9,NO,NORWAY
1,13840,99999,20190102,0,30.5,16.6,9999.9,9999.9,999.9,8.0,23.7,35.0,33.8*,27.5*,0.02G,999.9,NO,NORWAY
2,13840,99999,20190103,0,32.2,21.7,9999.9,9999.9,999.9,3.5,13.0,999.9,37.4*,26.6*,0.00G,999.9,NO,NORWAY
3,13840,99999,20190104,0,34.2,27.2,9999.9,9999.9,999.9,2.6,6.0,999.9,41.0*,26.6*,0.00G,999.9,NO,NORWAY
4,13840,99999,20190105,100000,30.6,27.4,9999.9,9999.9,15.5,2.0,6.4,999.9,36.9*,21.2*,0.00G,999.9,NO,NORWAY


## Which country had the hottest average mean temperature over the year?

In [103]:
from pyspark.sql.functions import max, col
weather_data_by_station.select('COUNTRY_FULL', 'TEMP_AVG') \
.groupBy('COUNTRY_FULL').agg(max(weather_data_by_station['TEMP_AVG']).alias('MAX_AVG_TEMP_BY_COUNTRY')) \
.sort(col('MAX_AVG_TEMP_BY_COUNTRY').desc()) \
.limit(1).toPandas()


Unnamed: 0,COUNTRY_FULL,MAX_AVG_TEMP_BY_COUNTRY
0,KUWAIT,110.0


## Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [111]:
weather_data_by_station.select('COUNTRY_FULL', 'FRSHTT') \
.limit(1).toPandas()



Unnamed: 0,COUNTRY_FULL,FRSHTT
0,NORWAY,0


In [112]:
a = 100
str(a).zfill(5)

'00100'

In [113]:
# We can use an UDF here and a Window function to calculate the result
def is_tornado_or_funnel_cloud(val):
    str(a).zfill(5)[5]


SyntaxError: invalid syntax (<ipython-input-113-1a593b6386a5>, line 2)

##  Which country had the second highest average mean wind speed over the year?

In [122]:
weather_data_by_station.select('COUNTRY_FULL', 'WDSP_MEAN') \
.groupBy('COUNTRY_FULL').agg(max(weather_data_by_station['WDSP_MEAN']).alias('MAX_WDSP_MEAN')) \
.limit(2).sort(col('MAX_WDSP_MEAN').desc()).toPandas()

Unnamed: 0,COUNTRY_FULL,MAX_WDSP_MEAN
0,SOUTH AFRICA,999.9
1,ARMENIA,999.9
