In [0]:
# Coding on Microsoft Azure notebook
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col
from pyspark.sql.functions import substring
from pyspark.sql.functions import to_date
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
# step 1.1 Load the global weather data into your big data technology of choice.
data_w = spark.read.option("header", "true").csv("/FileStore/tables/data/2019/*.gz")

In [0]:
# step 1.2 Join the stationlist.csv with the countrylist.csv to get the full country name for each station number.
# load two csv files
stationlist = spark.read.csv("/FileStore/tables/stationlist.csv", header=True, inferSchema=True)
countrylist = spark.read.csv("/FileStore/tables/countrylist.csv", header=True, inferSchema=True)
# join two csv files by country names
# I want to get full country name for stationlist, I left join stationlist with countrylist, in order to not have 2 COUNTRY_ABBR, we did a drop at the end
stationlist_fullname = stationlist.join(countrylist, stationlist.COUNTRY_ABBR == countrylist.COUNTRY_ABBR,how='left').drop(countrylist.COUNTRY_ABBR)
stationlist_fullname.show()

In [0]:
# step 1.3 Join the global weather data with the full country names by station number.
data_full_Cname = data_w.join(stationlist_fullname, data_w['STN---']==stationlist_fullname['STN_NO'], how = 'left').drop(stationlist_fullname.STN_NO)
data_full_Cname.show()

In [0]:
data_full_Cname = data_full_Cname.withColumn("TEMP", data_full_Cname["TEMP"].cast(FloatType())).withColumn("WDSP", data_full_Cname["WDSP"].cast(FloatType()))

# step 2.1 Which country had the hottest average mean temperature over the year?
# filter missing value for temp
data_full_Cname_rmM = data_full_Cname.where(data_full_Cname['TEMP']!=9999.9)

avg_temp = data_full_Cname_rmM.groupBy('COUNTRY_FULL').avg('TEMP').select(col('COUNTRY_FULL'),col('avg(TEMP)').alias('avg_temp'))
hot_country_1st = avg_temp.orderBy('avg_temp',ascending=False).first()['COUNTRY_FULL']
print('The country with hohtst average men temperature over the year is: %s' % hot_country_1st)

In [0]:
# step 2.2 Which country had the most consecutive days of tornadoes/funnel cloud formations?
tor = data_full_Cname.withColumn("tor", substring(col("FRSHTT"), -1, 1))
tor = tor.na.drop()
tor = tor.withColumn("dt", to_date(col("YEARMODA"), "yyyyMMdd"))
tor.show()

In [0]:
# The days with Tornadoes
df = tor.groupby(['COUNTRY_FULL','dt']).agg(F.sum('tor'))
df = df.withColumnRenamed('sum(tor)','tor')
df_order = df.orderBy('COUNTRY_FULL','dt').where(col('tor')>0)
df_order.show()

In [0]:
# data with consecutive (*count) days of tornadoes, 
w = Window.partitionBy('COUNTRY_FULL').orderBy('dt')
df_order = df_order.withColumn("rowN", F.row_number().over(w))
df_order = df_order.withColumn("discriminater", col('dt')-col('rowN'))
df_count = df_order.groupBy('COUNTRY_FULL','discriminater').count()
df_count.show()

In [0]:
# the most consecutive days 
df_max = df_count.agg(F.max('count').alias('max'))
df_max.show()

In [0]:
# The country had the most consecutive days of tornadoes/funnel cloud formations
df_count.join(df_max, df_count['count'] == df_max['max']).select('COUNTRY_FULL').distinct().show()

In [0]:
# step 2.3 Which country had the second highest average mean wind speed over the year?
data_full_Cname_rmM_wind = data_full_Cname.where(data_full_Cname['WDSP']!=9999.9)
avg_WDSP = data_full_Cname_rmM_wind.groupBy('COUNTRY_FULL').avg('WDSP').select(col('COUNTRY_FULL'),col('avg(WDSP)').alias('avg_WDSP'))
wind_country_2nd = avg_WDSP.orderBy('avg_WDSP',ascending=False).take(2)[-1]['COUNTRY_FULL']
print('The country with the second highest average mean wind speed over the year is: %s' % wind_country_2nd)