#Paytm Labs Data Challenge

####Problem Statement:
Using the global weather data, answer the following:
1. Which country had the hottest average mean temperature over the year?
2. Which country had the most consecutive days of tornadoes/funnel cloud
formations?
3. Which country had the second highest average mean wind speed over the year?

In [0]:
# Importing necessary packages
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *

In [0]:
# Constants
file_format = "csv"
file_header = "true"
file_path = "/FileStore/tables"
file_delimiter = ","
stations_path = file_path + "/stationlist.csv"
countries_path = file_path + "/countrylist.csv"
data_path = file_path + "/*.gz"

In [0]:
#Loading Stations
station_df = spark.read.format(file_format) \
  .options(header=file_header, delimiter = ',') \
  .options(inferSchema="true")\
  .load(stations_path) 

#Loading countries
country_df = spark.read.format(file_format) \
  .options(header=file_header, delimiter = ',') \
  .options(inferSchema="true")\
  .load(countries_path) 

station_df.printSchema()
country_df.printSchema()

In [0]:
#Schema for data DF
data_schema = StructType() \
      .add("STN---",IntegerType(),True) \
      .add("WBAN",IntegerType(),True) \
      .add("YEARMODA",IntegerType(),True) \
      .add("TEMP",DoubleType(),True) \
      .add("DEWP",DoubleType(),True) \
      .add("SLP",DoubleType(),True) \
      .add("STP",DoubleType(),True) \
      .add("VISIB",DoubleType(),True) \
      .add("WDSP",DoubleType(),True) \
      .add("MXSPD",DoubleType(),True) \
      .add("GUST",DoubleType(),True) \
      .add("MAX",DoubleType(),True) \
      .add("MIN",DoubleType(),True) \
      .add("PRCP",DoubleType(),True) \
      .add("SNDP",DoubleType(),True) \
      .add("FRSHTT",StringType(),True) 

#Loading data DF
data_df = spark.read.format(file_format) \
  .options(header=file_header, delimiter = ',') \
  .schema(data_schema)\
  .load(data_path) 

data_df.printSchema()

In [0]:
data_df.show()

In [0]:
#Not Caching - Using once
data_df.count()

In [0]:
#Joining stations DF with country DF while broadcasting country DF since it is smaller in size when compared to stations
station_country_df = station_df.join(country_df.hint("broadcast"), "COUNTRY_ABBR")
station_country_df.show()

In [0]:
#Joining data DF with station_country DF while broadcasting station_country DF since it is smaller in size when compared to data DF

#First converitng STN_NO to match data_df STN_NO
station_country_df = station_country_df.withColumn('STN_NO', station_country_df['STN_NO'].cast('int'))

#Making the join
global_weather_data_df = data_df.join(station_country_df.hint("broadcast"), station_country_df["STN_NO"] == data_df['STN---'])
global_weather_data_df.show()

In [0]:
#Caching for reuse
global_weather_data_df.cache().count()

In [0]:
#Splitting Second FRSHTT column for individual values 
split_frshtt = split(col('FRSHTT'), "")
global_weather_data_df = global_weather_data_df\
          .withColumn('Fog', split_frshtt.getItem(0)) \
          .withColumn('Rain or Drizzle', split_frshtt.getItem(1))\
          .withColumn('Snow or Ice Pellets', split_frshtt.getItem(2))\
          .withColumn('Hail', split_frshtt.getItem(3))\
          .withColumn('Thunder', split_frshtt.getItem(4))\
          .withColumn('Tornado or Funnel', split_frshtt.getItem(5))
      
global_weather_data_df.printSchema()  

In [0]:
global_weather_data_df.show()

#### Question 1: Which country had the hottest average mean temperature over the year?

In [0]:
#Check for any NULL values:
global_weather_data_df.filter(col('TEMP') == '9999.9').count()

In [0]:
#Creating the Temperature DF
temperature_df = global_weather_data_df.select('STN---', 'YEARMODA', 'COUNTRY_FULL', 'TEMP' )
temperature_df.show()

In [0]:
temperature_df.groupBy('COUNTRY_FULL')\
              .agg(avg('TEMP').alias('Avg'))\
              .createOrReplaceTempView('countryTemperature')


In [0]:
%sql
select ct.COUNTRY_FULL, ct.Avg 
from countryTemperature ct
where ct.Avg IN (select MAX(ctt.Avg) from countryTemperature ctt)

COUNTRY_FULL,Avg
DJIBOUTI,90.06114457831325


The country which had the hottest average mean temperature over the year is __DJIBOUTI__

#### Question 3: Which country had the second highest average mean wind speed over the year?

In [0]:
#Check for any NULL values
global_weather_data_df.filter(col('TEMP') == '999.9').count()

In [0]:
windspeed_df = global_weather_data_df.select('STN---', 'YEARMODA', 'COUNTRY_FULL', 'WDSP' )
windspeed_df.show()

In [0]:
windspeed_df.groupBy('COUNTRY_FULL')\
              .agg(avg('WDSP').alias('Avg'))\
              .createOrReplaceTempView('countryWindspeed')

In [0]:
%sql
select * from countryWindspeed

COUNTRY_FULL,Avg
SOUTH AFRICA,5.568036944632776
JAPAN,6.756455810670751
TANZANIA,5.190688184010396
BELARUS,153.37015143017527
LITHUANIA,25.48795405982908
UNITED KINGDOM,31.700467401221324
CZECH REPUBLIC,8.80699532741923
ESTONIA,49.47997024150183
CANADA,24.665389873002272
POLAND,8.217994430861172


In [0]:
%sql
select c.COUNTRY_FULL
from countryWindspeed c
where c.Avg IN (select MAX( cw.Avg )
                from countryWindspeed cw
                where cw.Avg NOT IN ( SELECT MAX( cww.Avg )
                                      FROM countryWindspeed cww
                                      )
                 )

COUNTRY_FULL
ARMENIA


The country which had the second highest average mean wind speed over the year is __ARMENIA__

#### Question 2: Which country had the most consecutive days of tornadoes/funnel cloud formations?

In [0]:
#Creating country with Tornado DF
countryTornado_df = global_weather_data_df.select('STN---', 'YEARMODA', 'COUNTRY_FULL', 'Tornado or Funnel' )
countryTornado_df.show()

In [0]:
from datetime import datetime

#Convert YEARMODA to yyyy-mm-dd format
func =  udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
countryTornado_df = countryTornado_df.withColumn('new_date', date_format(func(col('YEARMODA').astype('string')   ), 'yyyy-mm-dd'))

#Split the date into respective parts
split_date = split(col('new_date'), "-")
countryTornado_df = countryTornado_df.withColumn('year', split_date.getItem(0)) \
       .withColumn('month', split_date.getItem(1)) \
       .withColumn('day', split_date.getItem(2))
countryTornado_df.show()

In [0]:
w1 = Window.partitionBy('COUNTRY_FULL').orderBy('YEARMODA')
w2 = Window.partitionBy('COUNTRY_FULL', 'Tornado or Funnel').orderBy('YEARMODA')

res = countryTornado_df.withColumn('GROUP', row_number().over(w1)- row_number().over(w2))

#Window definition for streak to calculate the highest streaks
w3 = Window.partitionBy('COUNTRY_FULL', 'Tornado or Funnel', 'GROUP').orderBy('YEARMODA')
streak_res = res.withColumn('streak_0', when(col('Tornado or Funnel') == 1,0).otherwise(row_number().over(w3))) \
                .withColumn('streak_1', when(col('Tornado or Funnel') == 0,0).otherwise(row_number().over(w3)))
