In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lit, count, split, avg

In [2]:
# Create a spark session
spark = SparkSession.builder.appName("Paytm_weather").master("local[*]").getOrCreate()

In [3]:
data_path = "/FileStore/tables/data/*"
df_weather = spark.read.csv(data_path)
df_weather.take(10)

In [4]:
# define schema
schema = StructType(
    [
        StructField("STN_NO", IntegerType(), True),
        StructField("WBAN", IntegerType(), True),
        StructField("YEARMODA", IntegerType(), True),
        StructField("TEMP", FloatType(), True),
        StructField("DEWP", FloatType(), True),
        StructField("SLP", FloatType(), True),
        StructField("STP", FloatType(), True),
        StructField("VISIB", FloatType(), True),
        StructField("WDSP", FloatType(), True),
        StructField("MXSPD", FloatType(), True),
        StructField("GUST", FloatType(), True),
        StructField("MAX", FloatType(), True),
        StructField("MIN", FloatType(), True),
        StructField("PRCP", FloatType(), True),
        StructField("SNDP", FloatType(), True),
        StructField("FRSHTT", StringType(), True),
    ]
)

In [5]:
df_weather = (
    spark.read.schema(schema)
    .option("header","true")
    .csv(
       data_path
    )
)

df_weather.take(10)

In [6]:
country_schema = StructType(
    [
       StructField("COUNTRY_ABBR", StringType(), True),
       StructField("COUNTRY_FULL", StringType(), True)
    ]
)


df_country = (
    spark.read.schema(country_schema)
    .option("header","true")
    .csv(
       "/FileStore/tables/countrylist.csv"
    )
)

df_country.take(10)

In [7]:
station_schema = StructType(
    [
        StructField("STN_NO", StringType(), True),
       StructField("COUNTRY_ABBR", StringType(), True)
    ]
)

df_stations = (
    spark.read.schema(station_schema)
    .option("header","true")
    .csv(
       "/FileStore/tables/stationlist.csv"
    )
)

df_stations.take(10)

In [8]:
df_country_station = df_stations.join(df_country, on=['COUNTRY_ABBR'], how='left')

In [9]:
df_country_station.take(10)

In [10]:
df = df_weather.join(df_country_station, on=['STN_NO'], how='left')

In [11]:
df.take(10)

In [12]:
# filter out null values
# I just filter the columns I will need for answering the questions
df = df.filter( (df.TEMP < 9999) &
                (df.WDSP < 999) 
        )
# select columns for questions
cols = ["YEARMODA", "COUNTRY_FULL", "TEMP", "WDSP", "FRSHTT"]

df = df.select(cols)

In [13]:
df.take(10)

In [14]:
# QUESTION 1
# Which country had the hottest average mean temperature over the year?

hottest_df = (
    df.groupBy("COUNTRY_FULL")
    .agg(max("TEMP").alias("HOTTEST_AVG_MEAN_TEMPERATURE"))
        )



In [15]:
# ANSWER: HOTTEST AVERAGE MEAN TEMPERATURE
hottest_df.sort('HOTTEST_AVG_MEAN_TEMPERATURE', ascending=False).take(10)

In [16]:
# QUESTION 2
# Which country had the most consecutive days of tornadoes/funnel cloud
# formations?

In [17]:
from pyspark.sql.functions import substring 

df_tornado = df.withColumn('tornado', (substring(df.FRSHTT,5, 1)) )

In [18]:
df_tornado = (
    df_tornado.groupBy(["COUNTRY_FULL", "YEARMODA"])
    .agg(max("tornado").alias("tornado"))
        )


In [19]:
df_tornado.take(3)

In [20]:
from pyspark.sql import Window 
from pyspark.sql import functions as f
#Windows definition
w1 = Window.partitionBy(df_tornado.COUNTRY_FULL).orderBy(df_tornado.YEARMODA)
w2 = Window.partitionBy(df_tornado.COUNTRY_FULL, df_tornado.tornado).orderBy(df_tornado.YEARMODA)

res = df_tornado.withColumn('grp',f.row_number().over(w1)-f.row_number().over(w2))
#Window definition for streak
w3 = Window.partitionBy(res.COUNTRY_FULL,res.tornado,res.grp).orderBy(res.YEARMODA)
streak_res = res.withColumn('streak_0',f.when(res.tornado == 1,0).otherwise(f.row_number().over(w3))) \
                .withColumn('streak_1',f.when(res.tornado == 0,0).otherwise(f.row_number().over(w3)))
streak_res.take(3)

In [21]:
streak_res.groupBy("COUNTRY_FULL").agg(max("streak_1").alias("consecutive_tornado_days")).sort('consecutive_tornado_days', ascending=False).show()


In [22]:
# QUESTION 3
# Which country had the second highest average mean wind speed over the year?

In [23]:
wind_df = (
    df.groupBy("COUNTRY_FULL")
    .agg(max("WDSP").alias("max_wind_speed"))
        )

In [24]:
wind_df.sort('max_wind_speed', ascending=False).take(10)