# Step 1 - Setting Up the Data
## 1. Load the global weather data into your big data technology of choice.
## 2. Join the stationlist.csv with the countrylist.csv to get the full country name for each station number.
## 3. Join the global weather data with the full country names by station number.

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("PaytmWeatherChallenge") \
    .getOrCreate()

In [94]:
countries = spark.read.option("header",True).csv("countrylist.csv")
stations = spark.read.option("header",True).csv("stationlist.csv")
weather = spark.read.option("header", True).csv("data/2019/*.gz")
# countries.show(5)
# stations.show(5)
# weather.show(5)

In [85]:
stations_country = stations.join(countries,countries["COUNTRY_ABBR"] ==  stations["COUNTRY_ABBR"],"left")\
.select("STN_NO", "COUNTRY_FULL", stations.COUNTRY_ABBR)

global_weather = weather.join(stations_country, weather["STN---"] == stations_country["STN_NO"], "inner")

# Step 2 - Questions

## 1. Which country had the hottest average mean temperature over the year?


In [65]:
from pyspark.sql import functions as F
df1 = global_weather.groupBy("COUNTRY_FULL").agg(F.mean('TEMP').alias('mean_TEMP')).orderBy("mean_TEMP",ascending=False)
df1 = df1.limit(1).select("COUNTRY_FULL")
df1.show()

+------------+
|COUNTRY_FULL|
+------------+
|    DJIBOUTI|
+------------+



## 2. Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [93]:
from pyspark.sql.window import Window

df2 = global_weather.select("YEARMODA", "FRSHTT", "COUNTRY_FULL").orderBy("COUNTRY_FULL","YEARMODA")
df2 = df2.na.drop()
df2 = df2.withColumn('Tornado', F.substring(df2.FRSHTT, 6,1))
w=Window.orderBy(F.lit(1))

df2 = df2.withColumn("row_id",F.row_number().over(w))
df2 = df2.withColumn("date", F.to_date("YEARMODA", 'yyyyMMdd'))
# df2 = df2.withColumn('substracted_dates',F.date_sub(df2.date,df2.row_id))
# df2.show()


## 3. Which country had the second highest average mean wind speed over the year?

In [64]:
df3 = global_weather.groupBy("COUNTRY_FULL").agg(F.mean('WDSP').alias('mean_WDSP')).orderBy("mean_WDSP",ascending=False)
df3 = df3.limit(2).orderBy("mean_WDSP").limit(1).select("COUNTRY_FULL")
df3.show()


+------------+
|COUNTRY_FULL|
+------------+
|     ARMENIA|
+------------+

