# Setup environment and load in jupyter

If you already have a python environment with pyspark , please use that and do not run the next 3 steps. Please directly move on to creating a spark context


After running the setup-env script the kernel "spark-env" should show up in jupyter list of 
kernels. Please change the kernel before moving forward



In [None]:
!pwd

In [None]:
!ls -la

In [None]:
%%bash
sh setup-env.sh

# Create spark session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()

# Import packages

In [2]:
from pyspark.sql.window import Window
from pyspark.sql.types import *
import pyspark.sql.functions as F
import glob

# Read data

In [3]:
## create list of all gz files 
files_2019 = glob.glob("./data/2019/*.gz")

## changing column name of station number for equi joins 
weather = spark.read.option("header","true").csv(files_2019)\
                .withColumnRenamed("STN---","STN_NO")

countryList= spark.read.option("header","true").csv("countrylist.csv")

stationList= spark.read.option("header","true").csv("stationlist.csv")

# Join station list with country list

In [4]:
station_country = stationList.join(countryList,"COUNTRY_ABBR","left").select("STN_NO","COUNTRY_FULL")

# Join weather data with station number to get country names 

In [5]:
weather = weather.join(station_country,"STN_NO","left")

# Q1 Which country had the hottest average mean temperature over the year?

In [6]:
weather.printSchema()

root
 |-- STN_NO: string (nullable = true)
 |-- WBAN: string (nullable = true)
 |-- YEARMODA: string (nullable = true)
 |-- TEMP: string (nullable = true)
 |-- DEWP: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- STP: string (nullable = true)
 |-- VISIB: string (nullable = true)
 |-- WDSP: string (nullable = true)
 |-- MXSPD: string (nullable = true)
 |-- GUST: string (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: string (nullable = true)
 |-- FRSHTT: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)



In [7]:
weather.withColumn("year",F.substring(F.col("YEARMODA"),1,4)).select("year").distinct().show()

+----+
|year|
+----+
|2020|
|2019|
+----+



In [8]:
### Filtering for the 2019
### convert to float
### remove nulls
weather_q1 = weather.withColumn("year",F.substring(F.col("YEARMODA"),1,4))\
                    .filter(F.col("year")=="2019")\
                    .select("COUNTRY_FULL","YEARMODA","Temp")\
                    .withColumn("Temp",F.col("Temp").cast(FloatType()))\
                    .filter(F.col("Temp")!= 9999.9)\
                    .distinct()

In [9]:
# viewing data for random country
weather_q1.filter(F.col("COUNTRY_FULL")=="ARMENIA").orderBy(F.desc("YEARMODA")).show(10,False)

+------------+--------+----+
|COUNTRY_FULL|YEARMODA|Temp|
+------------+--------+----+
|ARMENIA     |20191231|22.6|
|ARMENIA     |20191231|34.1|
|ARMENIA     |20191231|29.5|
|ARMENIA     |20191231|15.2|
|ARMENIA     |20191231|21.8|
|ARMENIA     |20191231|35.8|
|ARMENIA     |20191231|24.5|
|ARMENIA     |20191231|36.5|
|ARMENIA     |20191231|35.4|
|ARMENIA     |20191231|18.6|
+------------+--------+----+
only showing top 10 rows



There is an inconsistency , there are different mean temp for same day. Will epxlore this later

We are dropping duplicates and just keeping one for now

In [10]:
weather_q1 = weather_q1.dropDuplicates(subset=["COUNTRY_FULL","YEARMODA"])

In [11]:
# viewing data for random country
weather_q1.filter(F.col("COUNTRY_FULL")=="ARMENIA").orderBy(F.desc("YEARMODA")).show(10,False)

+------------+--------+----+
|COUNTRY_FULL|YEARMODA|Temp|
+------------+--------+----+
|ARMENIA     |20191231|18.6|
|ARMENIA     |20191230|25.8|
|ARMENIA     |20191229|24.0|
|ARMENIA     |20191228|40.6|
|ARMENIA     |20191227|43.1|
|ARMENIA     |20191226|28.9|
|ARMENIA     |20191225|40.4|
|ARMENIA     |20191224|32.0|
|ARMENIA     |20191223|35.3|
|ARMENIA     |20191222|34.9|
+------------+--------+----+
only showing top 10 rows



In [12]:
#create window function
windowspec_q1 = Window.orderBy(F.desc("avg(Temp)"))

### calculating hottest temp
answer_q1 = weather_q1.groupBy("COUNTRY_FULL").avg("Temp")\
                .withColumn("rn", F.dense_rank().over(windowspec_q1))\
                .filter(F.col("rn")==1)

In [13]:
answer_q1.show()

+------------+-----------------+---+
|COUNTRY_FULL|        avg(Temp)| rn|
+------------+-----------------+---+
|    DJIBOUTI|90.06114474836602|  1|
+------------+-----------------+---+



# Q2 Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [14]:
weather.printSchema()

root
 |-- STN_NO: string (nullable = true)
 |-- WBAN: string (nullable = true)
 |-- YEARMODA: string (nullable = true)
 |-- TEMP: string (nullable = true)
 |-- DEWP: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- STP: string (nullable = true)
 |-- VISIB: string (nullable = true)
 |-- WDSP: string (nullable = true)
 |-- MXSPD: string (nullable = true)
 |-- GUST: string (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: string (nullable = true)
 |-- FRSHTT: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)



In [15]:
weather.select("FRSHTT").show(2,False)

+------+
|FRSHTT|
+------+
|011010|
|010000|
+------+
only showing top 2 rows



In [16]:
### no mention of year, therefore we do nopt filter for 2019
### create new column for 6th digit (Tornado/Funnel cloud)
weather_q2 = weather.withColumn("Tornado_Funnel", F.substring(F.col("FRSHTT"),6,1))\
                    .withColumn("YEARMODA", F.from_unixtime(F.unix_timestamp('YEARMODA', 'yyyyMMdd')).cast("timestamp"))\
                    .select("COUNTRY_FULL","YEARMODA","FRSHTT","Tornado_Funnel")

In [17]:
# viewing data for random country
weather_q2.filter(F.col("COUNTRY_FULL")=="ARMENIA").distinct().orderBy(F.desc("YEARMODA")).show(10,False)

+------------+-------------------+------+--------------+
|COUNTRY_FULL|YEARMODA           |FRSHTT|Tornado_Funnel|
+------------+-------------------+------+--------------+
|ARMENIA     |2019-12-31 00:00:00|000000|0             |
|ARMENIA     |2019-12-31 00:00:00|100000|0             |
|ARMENIA     |2019-12-30 00:00:00|000000|0             |
|ARMENIA     |2019-12-30 00:00:00|100000|0             |
|ARMENIA     |2019-12-29 00:00:00|100000|0             |
|ARMENIA     |2019-12-29 00:00:00|001000|0             |
|ARMENIA     |2019-12-29 00:00:00|010000|0             |
|ARMENIA     |2019-12-29 00:00:00|000000|0             |
|ARMENIA     |2019-12-28 00:00:00|000000|0             |
|ARMENIA     |2019-12-28 00:00:00|100000|0             |
+------------+-------------------+------+--------------+
only showing top 10 rows



There is an inconsistency, for example day 20191231 in armenia could be foggy and not foggy, please look at values 1 and 2 in above sample

we are dropping duplicates for now and just keeping one

In [18]:
weather_q2 = weather_q2.dropDuplicates(subset=["COUNTRY_FULL","YEARMODA"]).filter(F.col("Tornado_Funnel")==1)

In [19]:
# viewing data for random country
weather_q2.distinct().orderBy(F.col("COUNTRY_FULL"),F.desc("YEARMODA")).show(10,False)

+--------------+-------------------+------+--------------+
|COUNTRY_FULL  |YEARMODA           |FRSHTT|Tornado_Funnel|
+--------------+-------------------+------+--------------+
|ARUBA         |2019-09-23 00:00:00|010011|1             |
|BAHAMAS THE   |2019-08-29 00:00:00|010011|1             |
|BAHAMAS THE   |2019-07-04 00:00:00|010011|1             |
|BAHAMAS THE   |2019-02-25 00:00:00|000001|1             |
|BURKINA FASO  |2019-08-13 00:00:00|000001|1             |
|CAMEROON      |2019-08-13 00:00:00|010011|1             |
|CAYMAN ISLANDS|2019-12-21 00:00:00|000001|1             |
|CAYMAN ISLANDS|2019-11-12 00:00:00|010011|1             |
|CAYMAN ISLANDS|2019-11-08 00:00:00|010001|1             |
|CAYMAN ISLANDS|2019-11-03 00:00:00|000001|1             |
+--------------+-------------------+------+--------------+
only showing top 10 rows



In [20]:
### create window function
windowspec_q2 = Window.partitionBy("COUNTRY_FULL").orderBy(F.col("YEARMODA"))

In [21]:
weather_q2.printSchema()

root
 |-- COUNTRY_FULL: string (nullable = true)
 |-- YEARMODA: timestamp (nullable = true)
 |-- FRSHTT: string (nullable = true)
 |-- Tornado_Funnel: string (nullable = true)



In [22]:
weather_q2=weather_q2.withColumn("rn",F.row_number().over(windowspec_q2))\
                     .withColumn("groups",F.expr("date_sub(YEARMODA,rn)"))

In [23]:
windowspec_q2_1 = Window.partitionBy("COUNTRY_FULL","groups")

In [24]:
weather_q2_final = weather_q2.withColumn("counts",F.count("COUNTRY_FULL").over(windowspec_q2_1))

In [25]:
windowspec_q2_2 = Window.orderBy(F.desc("counts"))

In [26]:
answer_q2 = weather_q2_final.withColumn("ranks",F.dense_rank().over(windowspec_q2_2))\
            .filter(F.col("ranks")==1)

In [27]:
answer_q2.show()

+--------------+-------------------+------+--------------+---+----------+------+-----+
|  COUNTRY_FULL|           YEARMODA|FRSHTT|Tornado_Funnel| rn|    groups|counts|ranks|
+--------------+-------------------+------+--------------+---+----------+------+-----+
|CAYMAN ISLANDS|2019-10-31 00:00:00|010001|             1| 19|2019-10-12|     2|    1|
|CAYMAN ISLANDS|2019-11-01 00:00:00|000001|             1| 20|2019-10-12|     2|    1|
+--------------+-------------------+------+--------------+---+----------+------+-----+



# Q3 Which country had the second highest average mean wind speed over the year?

In [28]:
### Filtering for the 2019
### convert to float
### remove nulls
weather_q3 = weather.withColumn("year",F.substring(F.col("YEARMODA"),1,4))\
                    .filter(F.col("year")=="2019")\
                    .select("COUNTRY_FULL","YEARMODA","WDSP")\
                    .withColumn("WDSP",F.col("WDSP").cast(FloatType()))\
                    .filter(F.col("WDSP")!= "999.9")\
                    .distinct()

In [29]:
# viewing data for random country
weather_q3.filter(F.col("COUNTRY_FULL")=="ARMENIA").orderBy(F.desc("YEARMODA")).show(10,False)

+------------+--------+----+
|COUNTRY_FULL|YEARMODA|WDSP|
+------------+--------+----+
|ARMENIA     |20191231|3.4 |
|ARMENIA     |20191231|4.4 |
|ARMENIA     |20191231|1.9 |
|ARMENIA     |20191231|2.3 |
|ARMENIA     |20191231|2.9 |
|ARMENIA     |20191231|1.3 |
|ARMENIA     |20191231|2.4 |
|ARMENIA     |20191230|3.9 |
|ARMENIA     |20191230|1.9 |
|ARMENIA     |20191230|2.7 |
+------------+--------+----+
only showing top 10 rows



There is an inconsistency , there are different mean wind speed for same day. Will epxlore this later

We are dropping duplicates and just keeping one for now

In [30]:
weather_q3 = weather_q3.dropDuplicates(subset=["COUNTRY_FULL","YEARMODA"])

In [31]:
# viewing data for random country
weather_q3.filter(F.col("COUNTRY_FULL")=="ARMENIA").orderBy(F.desc("YEARMODA")).show(10,False)

+------------+--------+----+
|COUNTRY_FULL|YEARMODA|WDSP|
+------------+--------+----+
|ARMENIA     |20191231|1.9 |
|ARMENIA     |20191230|2.7 |
|ARMENIA     |20191229|2.4 |
|ARMENIA     |20191228|2.9 |
|ARMENIA     |20191227|3.4 |
|ARMENIA     |20191226|3.2 |
|ARMENIA     |20191225|1.9 |
|ARMENIA     |20191224|5.8 |
|ARMENIA     |20191223|2.9 |
|ARMENIA     |20191222|2.2 |
+------------+--------+----+
only showing top 10 rows



In [32]:
#create window function
windowspec_q3 = Window.orderBy(F.desc("avg(WDSP)"))

### calculating second highest average wind speed
answer_q3 = weather_q3.groupBy("COUNTRY_FULL").avg("WDSP")\
                .withColumn("rn", F.dense_rank().over(windowspec_q3))\
                .filter(F.col("rn")==2)

In [33]:
answer_q3.show()

+------------+------------------+---+
|COUNTRY_FULL|         avg(WDSP)| rn|
+------------+------------------+---+
|       ARUBA|15.981917790190815|  2|
+------------+------------------+---+



In [34]:
spark.stop()