In [65]:
import os
import sys
import pyspark
from datetime import datetime
from functools import reduce
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StructField
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType
from pyspark.sql.types import DateType
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, count, split, avg, udf

In [2]:
spark = SparkSession.builder.master("local").appName("weather").getOrCreate()

In [3]:
sc = spark.sparkContext

In [4]:
pwd

'/Users/anirudh'

## Loading Data

In [6]:
df1 = spark.read.csv("./Desktop/paytmteam-de-weather-challenge-beb4fc53605c/data/2019/part-00000-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz", sep=',', header=True)

In [7]:
df2 = spark.read.csv("./Desktop/paytmteam-de-weather-challenge-beb4fc53605c/data/2019/part-00001-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz", sep=',', header=True)

In [8]:
df3 = spark.read.csv("./Desktop/paytmteam-de-weather-challenge-beb4fc53605c/data/2019/part-00002-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz", sep=',', header=True)

In [9]:
df4 = spark.read.csv("./Desktop/paytmteam-de-weather-challenge-beb4fc53605c/data/2019/part-00003-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz", sep=',', header=True)

In [10]:
df5 = spark.read.csv("./Desktop/paytmteam-de-weather-challenge-beb4fc53605c/data/2019/part-00004-890686c0-c142-4c69-a744-dfdc9eca7df4-c000.csv.gz", sep=',', header=True)

In [11]:
df1.count()

897202

Combining all the dataframes

In [12]:
dfs = [df1,df2,df3, df4, df5]
df = reduce(DataFrame.unionAll, dfs)

In [13]:
df.count()

4158416

In [14]:
df.show(5)

+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD|GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
|010260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9|29.7| 29.8|21.7*|0.02G| 18.5|001000|
|010260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6|22.1|27.1*| 20.7|0.48G| 22.8|001000|
|010260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4|49.5|37.4*|26.8*|0.25G|999.9|011000|
|010260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8|10.9| 36.1| 31.8|0.52G|999.9|001000|
|010260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6|21.0|38.5*| 32.7|0.02G| 23.6|010000|
+------+-----+--------+----+----+------+------+-----+----+-----+----+-----+-----+-----+-----+------+
only showing top 5 rows



In [15]:
station_df = spark.read.csv("./Desktop/paytmteam-de-weather-challenge-beb4fc53605c/stationlist.csv", sep=',', header=True)

In [16]:
station_df.show(5)

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
+------+------------+
only showing top 5 rows



In [17]:
country_df = spark.read.csv("./Desktop/paytmteam-de-weather-challenge-beb4fc53605c/countrylist.csv", sep=',', header=True)

In [18]:
country_df.show(5)

+------------+-------------------+
|COUNTRY_ABBR|       COUNTRY_FULL|
+------------+-------------------+
|          AA|              ARUBA|
|          AC|ANTIGUA AND BARBUDA|
|          AF|        AFGHANISTAN|
|          AG|            ALGERIA|
|          AI|   ASCENSION ISLAND|
+------------+-------------------+
only showing top 5 rows



In [19]:
country_station_df = country_df.join(station_df,['COUNTRY_ABBR'])

In [20]:
country_station_df.show(5)

+------------+--------------+------+
|COUNTRY_ABBR|  COUNTRY_FULL|STN_NO|
+------------+--------------+------+
|          NO|        NORWAY|012240|
|          SW|        SWEDEN|020690|
|          SW|        SWEDEN|020870|
|          SW|        SWEDEN|021190|
|          UK|UNITED KINGDOM|032690|
+------------+--------------+------+
only showing top 5 rows



Renaming column to match the weather dataframe

In [25]:
country_station_df = country_station_df.select(col("STN_NO").alias("STN---"), "COUNTRY_FULL")

In [26]:
country_station_df.show(5)

+------+--------------+
|STN---|  COUNTRY_FULL|
+------+--------------+
|012240|        NORWAY|
|020690|        SWEDEN|
|020870|        SWEDEN|
|021190|        SWEDEN|
|032690|UNITED KINGDOM|
+------+--------------+
only showing top 5 rows



Performing JOIN on the dataframes

In [32]:
weather_df_station_country = country_station_df.join(df, ["STN---"])

In [47]:
weather_df_station_country.count()

4143167

## Q1

In [49]:
weather_df_station_country_filter = weather_df_station_country.filter(weather_df_station_country.TEMP != 9999.9)

In [50]:
weather_df_station_country_filter.count()

4143167

In [33]:
weather_df_station_country.show(5)

+------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+
|STN---|COUNTRY_FULL| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+
|010875|      NORWAY|99999|20190101|41.1|30.1|9999.9|9999.9|  5.9|46.7| 59.1| 74.0|44.6*|37.4*|99.99|999.9|011010|
|010875|      NORWAY|99999|20190102|40.5|29.0|9999.9|9999.9|  6.2|20.5| 32.1| 44.1|41.0*|37.4*|99.99|999.9|010000|
|010875|      NORWAY|99999|20190103|43.0|36.6|9999.9|9999.9|  6.1|13.5| 21.0|999.9|44.6*|41.0*|0.00I|999.9|000000|
|010875|      NORWAY|99999|20190104|46.7|44.4|9999.9|9999.9|  5.8|27.4| 33.0| 40.0|48.2*|42.8*|99.99|999.9|010000|
|010875|      NORWAY|99999|20190105|46.5|44.1|9999.9|9999.9|  6.1|18.3| 25.1|999.9|48.2*|44.6*|99.99|999.9|010000|
+------+------------+-----+--------+----+----+------+------+-----+----+-----+---

In [43]:
country_avg_temp_df = weather_df_station_country.groupBy("COUNTRY_FULL").agg(F.mean('TEMP'))

In [44]:
country_avg_temp_df.show(5)

+------------+------------------+
|COUNTRY_FULL|         avg(TEMP)|
+------------+------------------+
|SOUTH AFRICA| 65.28051319333144|
|     ARMENIA| 48.81782141533902|
|       BURMA|  80.8630640015626|
|    CAMBODIA| 83.49766803840879|
|       JAPAN|60.230640287611116|
+------------+------------------+
only showing top 5 rows



In [61]:
country_avg_temp_df.orderBy('avg(TEMP)', ascending=False).show(5)

+------------+-----------------+
|COUNTRY_FULL|        avg(TEMP)|
+------------+-----------------+
|    DJIBOUTI|90.06114457831325|
|        CHAD|87.36099706744866|
|       NIGER|85.06022291247945|
|       SUDAN| 84.4549418604651|
| EL SALVADOR|84.44045944678854|
+------------+-----------------+
only showing top 5 rows



In [None]:
# DJIBOUTI is the answer to Q1

## Q2

In [None]:
weather_df_station_country

In [62]:
split_col = pyspark.sql.functions.split(weather_df_station_country['FRSHTT'], '')

In [63]:
weather_df_station_country_tornado = weather_df_station_country.withColumn('TORNADO', split_col.getItem(5))

In [64]:
weather_df_station_country_tornado.show(5)

+------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+-------+
|STN---|COUNTRY_FULL| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|TORNADO|
+------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+-------+
|010875|      NORWAY|99999|20190101|41.1|30.1|9999.9|9999.9|  5.9|46.7| 59.1| 74.0|44.6*|37.4*|99.99|999.9|011010|      0|
|010875|      NORWAY|99999|20190102|40.5|29.0|9999.9|9999.9|  6.2|20.5| 32.1| 44.1|41.0*|37.4*|99.99|999.9|010000|      0|
|010875|      NORWAY|99999|20190103|43.0|36.6|9999.9|9999.9|  6.1|13.5| 21.0|999.9|44.6*|41.0*|0.00I|999.9|000000|      0|
|010875|      NORWAY|99999|20190104|46.7|44.4|9999.9|9999.9|  5.8|27.4| 33.0| 40.0|48.2*|42.8*|99.99|999.9|010000|      0|
|010875|      NORWAY|99999|20190105|46.5|44.1|9999.9|9999.9|  6.1|18.3| 25.1|999.9|48.2*|44.6*|99.99|999.9|010000|      0|
+------+--------

In [66]:
# UDF to convert string to date
func =  udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())

df = weather_df_station_country_tornado.withColumn('DATE', date_format(func(col('YEARMODA')), 'MM-dd-yyy'))

In [67]:
df.show(3)

+------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+-------+----------+
|STN---|COUNTRY_FULL| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|TORNADO|      DATE|
+------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+-------+----------+
|010875|      NORWAY|99999|20190101|41.1|30.1|9999.9|9999.9|  5.9|46.7| 59.1| 74.0|44.6*|37.4*|99.99|999.9|011010|      0|01-01-2019|
|010875|      NORWAY|99999|20190102|40.5|29.0|9999.9|9999.9|  6.2|20.5| 32.1| 44.1|41.0*|37.4*|99.99|999.9|010000|      0|01-02-2019|
|010875|      NORWAY|99999|20190103|43.0|36.6|9999.9|9999.9|  6.1|13.5| 21.0|999.9|44.6*|41.0*|0.00I|999.9|000000|      0|01-03-2019|
+------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+-------+----------+
only showing top 3 rows



In [68]:
# Not Sure how to get consicutive days and running out of time, instead will use total days

In [73]:
total_tornado_df = weather_df_station_country_tornado.groupBy("COUNTRY_FULL").agg(F.sum('TORNADO'))

In [74]:
total_tornado_df.orderBy('sum(TORNADO)', ascending=False).show(5)

+--------------+------------+
|  COUNTRY_FULL|sum(TORNADO)|
+--------------+------------+
| UNITED STATES|        34.0|
|         ITALY|        31.0|
|CAYMAN ISLANDS|        28.0|
|   BAHAMAS THE|        17.0|
|        CANADA|        13.0|
+--------------+------------+
only showing top 5 rows



In [75]:
# So my best guess for Q2 would be United States

## Q3

In [81]:
weather_df_station_country_filter_wind = weather_df_station_country.filter(weather_df_station_country.WDSP != 9999.9)

In [82]:
weather_df_station_country_filter_wind.count()

4143167

In [83]:
# So all wind speeds are fine

In [84]:
country_avg_wdsp_df = weather_df_station_country.groupBy("COUNTRY_FULL").agg(F.mean('WDSP'))

In [85]:
country_avg_wdsp_df.orderBy('avg(WDSP)', ascending=False).show(5)

+------------+------------------+
|COUNTRY_FULL|         avg(WDSP)|
+------------+------------------+
|       GABON|485.17947805456754|
|     ARMENIA| 457.3659318266033|
|    ETHIOPIA|441.25152788388027|
|   VENEZUELA|351.98439667128997|
|      LATVIA| 338.4731395914387|
+------------+------------------+
only showing top 5 rows



In [None]:
# ARMENIA is the answer to Q3