In [1]:
from utils import initialize_spark, load_df
from pyspark.sql import functions as F 
from pyspark.sql import Window 

In [2]:
fmat = 'csv'
weather_data = f'data/2019/*.gz'
station_list = f'data/stationlist.csv'
country_list = f'data/countrylist.csv'

In [3]:
spark = initialize_spark()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

### Load the global weather data into your big data technology of choice

In [4]:
dataDF = load_df(spark, weather_data, fmat=fmat)
dataDF

Total Records = 4158416


STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT
10260,99999,20190101,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000
10260,99999,20190102,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000
10260,99999,20190103,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000
10260,99999,20190104,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000
10260,99999,20190105,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000
10260,99999,20190106,38.5,34.1,1008.2,994.2,12.8,10.0,17.5,28.9,41.4,33.8*,0.12G,23.2,10000
10260,99999,20190107,32.1,29.8,996.8,982.7,6.9,11.3,15.5,28.6,35.1*,30.4,0.00G,999.9,1000
10260,99999,20190108,31.6,28.0,997.4,983.3,22.9,5.9,11.7,19.0,34.3,28.0*,0.53G,0.4,11000
10260,99999,20190109,29.9,27.7,1011.6,997.3,29.8,7.6,15.2,26.6,32.4,26.1,0.20G,23.6,1000
10260,99999,20190110,33.1,30.6,979.1,965.3,5.3,17.8,24.9,41.8,41.4,28.8*,0.00G,999.9,11000


### Join the global weather data with the full country names by station number

In [5]:
stationDF = load_df(spark, station_list, fmat=fmat)
countryDF = load_df(spark, country_list, fmat=fmat)
countryMappingDF = stationDF.join(countryDF, on=['COUNTRY_ABBR'], how='inner')
countryMappingDF

Total Records = 25306
Total Records = 288


COUNTRY_ABBR,STN_NO,COUNTRY_FULL
NO,12240,NORWAY
SW,20690,SWEDEN
SW,20870,SWEDEN
SW,21190,SWEDEN
UK,32690,UNITED KINGDOM
UK,33450,UNITED KINGDOM
UK,39290,UNITED KINGDOM
EI,39790,IRELAND
IC,40480,ICELAND
IC,41300,ICELAND


 ### Join the global weather data with the full country names by station number

In [6]:
dataDF = dataDF.withColumnRenamed("STN---","STN_NO")
sparkDF = countryMappingDF.join(dataDF, on=['STN_NO'], how='inner')
sparkDF

STN_NO,COUNTRY_ABBR,COUNTRY_FULL,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT
10875,NO,NORWAY,99999,20190101,41.1,30.1,9999.9,9999.9,5.9,46.7,59.1,74.0,44.6*,37.4*,99.99,999.9,11010
10875,NO,NORWAY,99999,20190102,40.5,29.0,9999.9,9999.9,6.2,20.5,32.1,44.1,41.0*,37.4*,99.99,999.9,10000
10875,NO,NORWAY,99999,20190103,43.0,36.6,9999.9,9999.9,6.1,13.5,21.0,999.9,44.6*,41.0*,0.00I,999.9,0
10875,NO,NORWAY,99999,20190104,46.7,44.4,9999.9,9999.9,5.8,27.4,33.0,40.0,48.2*,42.8*,99.99,999.9,10000
10875,NO,NORWAY,99999,20190105,46.5,44.1,9999.9,9999.9,6.1,18.3,25.1,999.9,48.2*,44.6*,99.99,999.9,10000
10875,NO,NORWAY,99999,20190106,45.5,39.4,9999.9,9999.9,5.7,16.4,24.1,999.9,46.4*,42.8*,99.99,999.9,10000
10875,NO,NORWAY,99999,20190107,45.1,40.5,9999.9,9999.9,5.4,28.5,47.0,56.9,48.2*,42.8*,99.99,999.9,10000
10875,NO,NORWAY,99999,20190108,42.3,33.6,9999.9,9999.9,6.1,35.7,44.1,55.9,46.4*,37.4*,99.99,999.9,10000
10875,NO,NORWAY,99999,20190109,40.7,31.0,9999.9,9999.9,6.2,16.8,25.1,35.0,42.8*,39.2*,99.99,999.9,10000
10875,NO,NORWAY,99999,20190110,47.2,45.3,9999.9,9999.9,3.5,25.7,29.9,999.9,48.2*,41.0*,99.99,999.9,110000


### Which country had the hottest average mean temperature over the year

In [7]:
sparkDF[sparkDF['TEMP'] != 9999.9].groupBy("COUNTRY_FULL").agg(F.mean('TEMP')).sort(F.col("avg(TEMP)").desc()).show()

+--------------------+-----------------+
|        COUNTRY_FULL|        avg(TEMP)|
+--------------------+-----------------+
|            DJIBOUTI|90.06114457831325|
|                CHAD|87.36099706744866|
|               NIGER|85.06022291247945|
|               SUDAN| 84.4549418604651|
|         EL SALVADOR|84.44045944678854|
|              TUVALU|84.32217090069284|
| JUAN DE NOVA ISLAND|84.15945945945947|
|             TOKELAU|83.96142857142854|
|BRITISH INDIAN OC...|83.91150684931505|
|      CAYMAN ISLANDS| 83.7715458276334|
|        BURKINA FASO|83.74875078076202|
|            MALDIVES|83.67879452054797|
|           SINGAPORE|83.63278538812786|
|            CAMBODIA|83.49766803840879|
|                MALI|83.38503218884121|
|ST. VINCENT AND T...|83.35714285714286|
|    MARSHALL ISLANDS|83.24540636042401|
|          MICRONESIA|83.22937823834197|
|             SENEGAL|83.11452655889146|
|               HAITI|83.10185185185185|
+--------------------+-----------------+
only showing top

### Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [8]:
tmpDF = sparkDF.withColumn("TFC", F.substring(F.col("FRSHTT"), -1, 1))
w1 = Window.partitionBy(tmpDF.COUNTRY_FULL).orderBy(tmpDF.YEARMODA)
w2 = Window.partitionBy(tmpDF.COUNTRY_FULL,tmpDF.TFC).orderBy(tmpDF.YEARMODA)

res = tmpDF.withColumn('grp',F.row_number().over(w1)-F.row_number().over(w2))

#Window definition for streak
w3 = Window.partitionBy(res.COUNTRY_FULL,res.TFC,res.grp).orderBy(res.YEARMODA)
streak_res = res.withColumn('streak_1',F.when(res.TFC == 0,0).otherwise(F.row_number().over(w3)))
streak_res

STN_NO,COUNTRY_ABBR,COUNTRY_FULL,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT,TFC,grp,streak_1
353946,AM,ARMENIA,99999,20190101,31.4,27.9,9999.9,9999.9,2.2,0.9,4.1,999.9,41.0*,23.0*,0.00I,999.9,100000,0,0,0
377850,AM,ARMENIA,99999,20190101,29.7,29.7,9999.9,897.3,15.5,1.9,1.9,999.9,37.4,23.0,0.13G,999.9,0,0,0,0
376990,AM,ARMENIA,99999,20190101,19.0,11.7,9999.9,811.5,999.9,999.9,999.9,999.9,25.7,8.6*,0.28G,999.9,1000,0,0,0
378010,AM,ARMENIA,99999,20190101,22.7,12.7,9999.9,802.4,999.9,2.4,3.9,999.9,31.6,14.0*,0.00G,999.9,0,0,0,0
377170,AM,ARMENIA,99999,20190101,23.0,18.1,9999.9,808.7,8.9,3.4,5.8,999.9,30.6,18.0*,0.09G,999.9,1000,0,0,0
376820,AM,ARMENIA,99999,20190101,17.0,15.4,9999.9,813.7,5.7,3.1,5.8,999.9,24.6,9.5*,0.00G,999.9,0,0,0,0
377040,AM,ARMENIA,99999,20190101,28.2,22.7,9999.9,865.6,8.7,4.4,7.8,999.9,36.5,21.9*,0.00G,999.9,0,0,0,0
377870,AM,ARMENIA,99999,20190101,31.6,25.0,1026.5,922.5,13.2,999.9,999.9,999.9,39.2,27.5,0.09G,999.9,0,0,0,0
377880,AM,ARMENIA,99999,20190101,31.6,28.0,1027.7,925.0,1.9,3.2,5.8,999.9,39.9,27.5,0.00I,999.9,100000,0,0,0
379590,AM,ARMENIA,99999,20190101,36.0,32.9,1026.8,941.6,999.9,999.9,999.9,999.9,50.7,28.4,0.00G,999.9,0,0,0,0


In [9]:
streak_res.groupby('COUNTRY_FULL').agg({'streak_1': 'max'}).sort(F.col("max(streak_1)").desc()).show()

+--------------------+-------------+
|        COUNTRY_FULL|max(streak_1)|
+--------------------+-------------+
|              CANADA|            2|
|         BAHAMAS THE|            2|
|      CAYMAN ISLANDS|            2|
|              ISRAEL|            2|
|               ITALY|            2|
|       UNITED STATES|            2|
|            MALDIVES|            1|
|              JERSEY|            1|
|             NIGERIA|            1|
|            TANZANIA|            1|
|            COLOMBIA|            1|
|               GABON|            1|
|COCOS (KEELING) I...|            1|
| ST. KITTS AND NEVIS|            1|
|               ARUBA|            1|
|             SENEGAL|            1|
|              POLAND|            1|
|             ALGERIA|            1|
|          COSTA RICA|            1|
|               MALTA|            1|
+--------------------+-------------+
only showing top 20 rows



### Which country had the second highest average mean wind speed over the year?

In [10]:
sparkDF[sparkDF['WDSP'] != 999.9].groupBy("COUNTRY_FULL").agg(F.mean('WDSP')).sort(F.col("avg(WDSP)").desc()).show()

+--------------------+------------------+
|        COUNTRY_FULL|         avg(WDSP)|
+--------------------+------------------+
|FALKLAND ISLANDS ...| 17.87783300198807|
|               ARUBA|15.975683060109283|
|       FAROE ISLANDS| 15.28067010309278|
|FRENCH SOUTHERN A...|14.203721841332031|
|            BARBADOS|14.097540983606562|
|ST. PIERRE AND MI...| 13.90767123287671|
|          CAPE VERDE| 13.61522184300341|
|     TROMELIN ISLAND|13.005277777777751|
|          ST. HELENA|12.730518394648827|
|          MAURITANIA|12.723464912280699|
|          ANTARCTICA| 12.27517270608179|
|             SOMALIA|12.274880611270296|
|COCOS (KEELING) I...|12.054545454545453|
|            GUERNSEY|12.018019257221466|
|        MAN  ISLE OF|11.893424657534247|
|          MONTSERRAT|11.808813559322036|
|             ICELAND|11.734775540827808|
|      WESTERN SAHARA|11.647547169811322|
|           ST. LUCIA|11.575239398084811|
|              JERSEY|11.477260273972604|
+--------------------+------------