In [1]:
import pyspark
from pyspark.sql import SparkSession

In [129]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder \
            .getOrCreate()

In [4]:
### Step 1 - Settng Up the Data ###

In [54]:
# 1. Load the global weather data into your big data technology of choice.

# Load Country List
country_list = spark.read.format("csv") \
        .option("header", "true") \
        .load(f"countrylist.csv")

# Station Lists
station_list = spark.read.format("csv") \
    .option("header", "true") \
    .load(f"stationlist.csv")

# Global Weather
global_weather = spark.read.format("csv") \
    .option("header", "true") \
    .load(f"./data/2019") \
    .withColumnRenamed('STN---', 'STN')

In [55]:
country_list_suf = country_list.select(*[f.col(column).alias(f"{column}_CL") for column in country_list.columns])
station_list_suf = station_list.select(*[f.col(column).alias(f"{column}_SL") for column in station_list.columns])

In [56]:
# 2. Join the stationlist.csv with the countrylist.csv to get the full 
# country name for each station number.
full_country_name = country_list_suf \
    .join(station_list_suf, country_list_suf.COUNTRY_ABBR_CL == station_list_suf.COUNTRY_ABBR_SL,
          'left') \
    .select(*['STN_NO_SL', 'COUNTRY_FULL_CL', 'COUNTRY_ABBR_CL', 'COUNTRY_ABBR_SL'])

full_country_name.show(n = 5)

+---------+-------------------+---------------+---------------+
|STN_NO_SL|    COUNTRY_FULL_CL|COUNTRY_ABBR_CL|COUNTRY_ABBR_SL|
+---------+-------------------+---------------+---------------+
|   789820|              ARUBA|             AA|             AA|
|   788620|ANTIGUA AND BARBUDA|             AC|             AC|
|   998441|ANTIGUA AND BARBUDA|             AC|             AC|
|   788610|ANTIGUA AND BARBUDA|             AC|             AC|
|   409794|        AFGHANISTAN|             AF|             AF|
+---------+-------------------+---------------+---------------+
only showing top 5 rows



In [58]:
# 3. Join the global weather data with the full country names by station number.
# Leaking : 4158416 (Weather) versus 4156323 After Join
weather_by_station = full_country_name \
    .join(global_weather, full_country_name.STN_NO_SL == global_weather.STN, 'left')

In [77]:
weather_by_station.printSchema()

root
 |-- STN_NO_SL: string (nullable = true)
 |-- COUNTRY_FULL_CL: string (nullable = true)
 |-- COUNTRY_ABBR_CL: string (nullable = true)
 |-- COUNTRY_ABBR_SL: string (nullable = true)
 |-- STN: string (nullable = true)
 |-- WBAN: string (nullable = true)
 |-- YEARMODA: string (nullable = true)
 |-- TEMP: string (nullable = true)
 |-- DEWP: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- STP: string (nullable = true)
 |-- VISIB: string (nullable = true)
 |-- WDSP: string (nullable = true)
 |-- MXSPD: string (nullable = true)
 |-- GUST: string (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: string (nullable = true)
 |-- FRSHTT: string (nullable = true)



In [106]:
### Step 2 - Questions ###

# NOTE : STATIONs without a Corresponding STN_NO in 'station_list' were intentionally left out...

# 1. Which country had the hottest average mean temperature over the year? # DJIBOUTI
weather_by_station \
    .withColumn("TEMP", weather_by_station['TEMP'].cast(FloatType())) \
    .filter(~f.col("TEMP").isNull()) \
    .groupby(*['COUNTRY_FULL_CL', ]) \
    .agg({"TEMP" : "avg"}) \
    .select(*['COUNTRY_FULL_CL', f.col('avg(TEMP)').alias('TEMP'), ]) \
    .orderBy(f.desc('TEMP')).show(n = 1)

+---------------+-----------------+
|COUNTRY_FULL_CL|             TEMP|
+---------------+-----------------+
|       DJIBOUTI|90.06114474836602|
+---------------+-----------------+
only showing top 1 row



In [135]:
# 2. Which country had the most consecutive days of tornadoes/funnel cloud formations?

# NOTE : Incomplete

dates_for_2019 = weather_by_station \
    .select([f.to_date(f.col('YEARMODA'), 'yyyyMMdd')]) \
    .distinct()

window = Window.partitionBy('COUNTRY_FULL_CL').orderBy('YEARMODA')

weather_by_station_tornado_funnel = weather_by_station \
    .select([f.to_date(f.col('YEARMODA'), 'yyyyMMdd').alias('YEARMODA'), 'FRSHTT', 'COUNTRY_FULL_CL', ]) \
    .filter(f.col('FRSHTT').endswith('1').alias('FRSHTT')) \
    .select(*['COUNTRY_FULL_CL', 'YEARMODA', 'FRSHTT', ]) \
    .distinct() \
    .orderBy(*['COUNTRY_FULL_CL', 'YEARMODA', 'FRSHTT'], asc = [True, True, True, ]) \
    .select(*['COUNTRY_FULL_CL', 'YEARMODA', 'FRSHTT', f.lag('YEARMODA', 1).over(window).alias('PREV_YEARMODA')])

In [136]:
weather_by_station_tornado_funnel.filter(f.col('PREV_YEARMODA').isNotNull() == True).count()

232

In [109]:
# 3. Which country had the second highest average mean wind speed over the year?
weather_by_station \
    .withColumn("WDSP", weather_by_station['WDSP'].cast(FloatType())) \
    .filter(~f.col('WDSP').isNull()) \
    .groupby(*['COUNTRY_FULL_CL', ]) \
    .agg({"WDSP" : "avg"}) \
    .select(*['COUNTRY_FULL_CL', f.col('avg(WDSP)').alias('WDSP')]) \
    .orderBy(f.desc("WDSP")) \
    .show(n = 2)

+---------------+------------------+
|COUNTRY_FULL_CL|              WDSP|
+---------------+------------------+
|          GABON|485.17948985044114|
|        ARMENIA| 457.3659429499847|
+---------------+------------------+
only showing top 2 rows



In [10]:
#### Data Discovery ####
# 288
country_list.count()
# 288
country_list.select(*['COUNTRY_ABBR', ]).distinct().count()

288

In [11]:
# 25 297
station_list.select(*['STN_NO', ]).distinct().count()
# 25 306
station_list.count()

25297

In [47]:
full_country_name \
    .groupby(*['STN_NO_SL', ]) \
    .agg(f.countDistinct(f.col('COUNTRY_ABBR_SL'))) \
    .orderBy(*[f.col('count(COUNTRY_ABBR_SL)').alias('COUNTRY_ABBR_SL'), ], asc = [False]) \
    .take(5)

[Row(STN_NO_SL=None, count(COUNTRY_ABBR_SL)=0),
 Row(STN_NO_SL='847570', count(COUNTRY_ABBR_SL)=1),
 Row(STN_NO_SL='726838', count(COUNTRY_ABBR_SL)=1),
 Row(STN_NO_SL='957560', count(COUNTRY_ABBR_SL)=1),
 Row(STN_NO_SL='704140', count(COUNTRY_ABBR_SL)=1)]

In [30]:
global_weather.take(5)

[Row(STN---='010260', WBAN='99999', YEARMODA='20190101', TEMP='26.1', DEWP='21.2', SLP='1001.9', STP='987.5', VISIB='20.6', WDSP='9.0', MXSPD='15.9', GUST='29.7', MAX='29.8', MIN='21.7*', PRCP='0.02G', SNDP='18.5', FRSHTT='001000'),
 Row(STN---='010260', WBAN='99999', YEARMODA='20190102', TEMP='24.9', DEWP='22.1', SLP='1020.1', STP='1005.5', VISIB='5.4', WDSP='5.6', MXSPD='13.6', GUST='22.1', MAX='27.1*', MIN='20.7', PRCP='0.48G', SNDP='22.8', FRSHTT='001000'),
 Row(STN---='010260', WBAN='99999', YEARMODA='20190103', TEMP='31.7', DEWP='29.1', SLP='1008.9', STP='994.7', VISIB='13.6', WDSP='11.6', MXSPD='21.4', GUST='49.5', MAX='37.4*', MIN='26.8*', PRCP='0.25G', SNDP='999.9', FRSHTT='011000'),
 Row(STN---='010260', WBAN='99999', YEARMODA='20190104', TEMP='32.9', DEWP='30.3', SLP='1011.4', STP='997.1', VISIB='15.8', WDSP='4.9', MXSPD='7.8', GUST='10.9', MAX='36.1', MIN='31.8', PRCP='0.52G', SNDP='999.9', FRSHTT='001000'),
 Row(STN---='010260', WBAN='99999', YEARMODA='20190105', TEMP='35.

In [None]:
weather_by_station.take(2)