In [None]:
from pathlib import Path
import findspark
from pprint import pprint

findspark.init()

import pyspark
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F
import pandas as pd
import numpy as np

spark = SparkSession.builder.getOrCreate()

weather_paths = [str(p.resolve()) for p in Path("weather_data").glob("*/*")]

rows = spark.read.csv(weather_paths, header=True, inferSchema=True)
pd_df = rows.toPandas()
rows.registerTempTable("weather")


In [None]:

years = spark.sql("select distinct year(date) as year from weather").orderBy('year').collect()
df_dict = {}
for y in years:
    df_dict[y.year] = rows.filter(F.year("date") == y.year)


# Task 1 
* Find the hottest day (column MAX) for each year, and provide the corresponding station code, station name and the date (columns STATION, NAME, DATE).   > There should be 13 results.


In [None]:
max_temp = []
for key, df_weather in df_dict.items():
        max_filter = df_weather.select("date", "max", "station", "name").orderBy(F.desc("max"))
        max_temp.append(max_filter.collect()[0])
max_df = spark.createDataFrame(max_temp)
max_df.select(F.year("date"), "max", "station", "name").orderBy(F.asc("date")).show()

## Task 2:

* Find the coldest day (column MIN) for the month of January across all years (2010 - 2022) , and provide the corresponding station code, station name and the date (columns STATION, NAME, DATE). > There should be 1 result

In [None]:

min_temp = []
for key, df_weather in df_dict.items():
        min_filter = df_weather.select("date", "min", "station", "name").filter(F.month("date") == 1).orderBy(F.asc("min"))
        min_temp.append(min_filter.collect()[0])
min_df = spark.createDataFrame(min_temp)
min_df.select("date", "min", "station", "name").orderBy(F.asc("min")).show(1)


## Task 3
* Maximum and Minimum precipitation (column PRCP ) for the year 2015, and provide the corresponding station code, station name and the date (columns STATION, NAME, DATE). > There should be 2 results.  Any max or min would do.  Just choose 1 or each.


In [None]:
max_min_max_filter = df_dict[2015].select("date", "prcp", "station", "name").orderBy(F.asc("prcp")).collect()[0]
max_min_min_filter = df_dict[2015].select("date", "prcp", "station", "name").orderBy(F.desc("prcp")).collect()[0]
max_min_df = spark.createDataFrame([max_min_max_filter, max_min_min_filter])
max_min_df.select("*").show()


## Task 4
* Count percentage missing values for wind gust (column GUST) for the year 2019. > There should be 1 result

In [None]:
gust_total = df_dict[2019].select("date", "gust", "station", "name").orderBy(F.desc("gust")).count()
gust_missing = df_dict[2019].select("date", "gust", "station", "name").filter(F.col("gust") == "999.9").count()
print("Percentage of missing gusts is: " + format(gust_missing/gust_total, ".2%"))
#max_min_df.select("*").show()

## Task 5
* Find the mean, median, mode and standard deviation of the Temperature (column TEMP) for each month for the year 2020. > There should be 12 results, one for each month with 4 values for each result(row).

In [None]:
from pyspark.sql.types import FloatType
import statistics as st

def find_median(val_list):
    try:
        median = np.median(val_list)
        return round(float(median), 2)
    except Exception:
        return None
    
def find_mode(val_list):
    try:
        mode = st.mode(val_list)
        return round(float(mode), 2)
    except Exception:
        return None

df_dict[2020].registerTempTable("2020_weather")
months = spark.sql("select distinct month(date) as month from 2020_weather").orderBy('month').collect()
df_dict_2020 = {}
for m in months:
    df_dict_2020[m.month] = df_dict[2020].filter(F.month("date") == m.month)
df_dict_2020

median_finder = F.udf(find_median, FloatType())
mode_finder = F.udf(find_mode, FloatType())

for month, df_data in df_dict_2020.items():
    c = df_data.groupBy(F.month("date").alias("Month")).agg(F.collect_list("TEMP").alias("TEMP")
                                                            , F.mean("TEMP").alias("MEAN")
                                                            , F.stddev("TEMP").alias("STDDEV"))
    maths = c.collect()
    d = c.withColumn("MEAN", F.round("MEAN", 3)).withColumn("MEDIAN", median_finder("TEMP")).withColumn("MODE", mode_finder("TEMP")).withColumn("STDDEV", F.round("STDDEV", 3)).drop("TEMP")
    d.show()