In [208]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import DataFrameStatFunctions as statFunc
import numpy as np
import pandas as pd
import os
from pyspark.sql.types import DecimalType

spark = SparkSession.builder.appName("Spark").getOrCreate() #INITIALIZE SPARK SESSION

fileList = [] 
spark = SparkSession.builder.appName("Spark").getOrCreate()

for f in os.listdir(fileDir):
    if f!=".DS_Store":
        file = os.path.join(fileDir, f)
        for year_file in os.listdir(file):
            if year_file != ".ipynb_checkpoints" and year_file != ".DS_Store":
                fileList.append(file+"/"+year_file)       #ALL FILE NAMES

In [209]:
df = spark.read.option("header","true").option("delimiter",",").csv(fileList) #CREATING THE DATAFRAME

#COUNT FOR PART 4 (for the NULL values) We do this before we drop the NULL Values 

df_2019 = df.filter(year(col("DATE")) == 2019)  # Filter rows for 2019

total2019 = df_2019.filter(year(col("DATE")) == 2019).count()
null2019 = df_2019.filter(col("GUST").isNull()).count()
out2019 = df_2019.filter(col("GUST") == 999).count()
miss2019 = null2019 + out2019
percent = miss2019/total2019 * 100


df = df.dropna() #DROP ALL NULL COLUMNS

In [210]:
#EXTRACT YEAR FROM DATE 
dfn = df.withColumn("YEAR", year(df["DATE"])).withColumn("DATE", date_format("DATE","yyyy-MM-dd"))#month(df["DATE"], date_format("DATE","yyyy-MM-dd")))
dfn = dfn.dropna() #REMOVE THE NULLS

#PART 1: Find the hottest day (column MAX) for each year, and provide the corresponding station code, station name and the date (columns STATION, NAME, DATE).  
dfn = dfn.filter(dfn.MAX != 9999)

(dfn.withColumn("MAX", col("MAX").cast("Decimal")).orderBy(desc("MAX"))
.withColumn("DATE", date_format("DATE", "yyyy-MM-dd"))
.groupBy(year("DATE").alias("YEAR")).agg(max("MAX"), first("STATION"), first("DATE"), first("NAME"))
.orderBy(asc("YEAR")).show())

+----+--------+--------------+-----------+--------------------+
|YEAR|max(MAX)|first(STATION)|first(DATE)|         first(NAME)|
+----+--------+--------------+-----------+--------------------+
|2010|      75|   99407099999| 2010-08-15|DESTRUCTION IS. W...|
|2011|      88|   01046099999| 2011-07-09|       SORKJOSEN, NO|
|2012|      72|   01023099999| 2012-07-05|       BARDUFOSS, NO|
|2013|      81|   01001499999| 2013-08-02|      SORSTOKKEN, NO|
|2014|      90|   01023099999| 2014-07-10|       BARDUFOSS, NO|
|2015|      72|   01025099999| 2015-07-30|          TROMSO, NO|
|2016|      77|   01023199999| 2016-07-21|         DRAUGEN, NO|
|2017|      79|   01023099999| 2017-06-09|       BARDUFOSS, NO|
|2018|      84|   01025099999| 2018-07-29|          TROMSO, NO|
|2019|      79|   01023099999| 2019-07-21|       BARDUFOSS, NO|
|2020|      80|   01023099999| 2020-06-22|       BARDUFOSS, NO|
|2021|      88|   01065099999| 2021-07-05|        KARASJOK, NO|
|2022|      86|   02095099999| 2022-07-0

In [211]:
#PART 2: Find the coldest day (column MIN) for the month of January across all years (2010 - 2022) ,and provide the corresponding station code, station name and the date (columns STATION,NAME, DATE). 
min_temps = df.filter(month("DATE") == 1) \
    .groupBy("STATION", "DATE", "NAME") \
    .agg(min("MIN").alias("MIN"))

result = min_temps.orderBy("MIN").limit(1)
result.show()

+-----------+----------+------------+------+
|    STATION|      DATE|        NAME|   MIN|
+-----------+----------+------------+------+
|01008099999|2020-01-03|LONGYEAR, SV|   0.1|
+-----------+----------+------------+------+



In [212]:
#PART 3: Maximum and Minimum precipitation (column PRCP ) for the year 2015, and provide the corresponding station code, station name and the date (columns STATION, NAME, DATE). 

#max
df = df.filter(df.PRCP != 99)

max_precipitation = df.filter(year("DATE") == 2015) \
    .groupBy("STATION", "DATE", "NAME") \
    .agg(max("PRCP").alias("PRCP"))

result = max_precipitation.orderBy("PRCP", ascending=False).limit(1)
result.show()

#min
min_precipitation = df.filter(year("DATE") == 2015) \
    .groupBy("STATION", "DATE", "NAME") \
    .agg(min("PRCP").alias("PRCP"))

result = min_precipitation.orderBy("PRCP").limit(1)
result.show()

+-----------+----------+----------+-----+
|    STATION|      DATE|      NAME| PRCP|
+-----------+----------+----------+-----+
|01025099999|2015-11-02|TROMSO, NO| 2.11|
+-----------+----------+----------+-----+

+-----------+----------+------------+-----+
|    STATION|      DATE|        NAME| PRCP|
+-----------+----------+------------+-----+
|01008099999|2015-01-01|LONGYEAR, SV| 0.00|
+-----------+----------+------------+-----+



In [213]:
#PART 4:Count percentage missing values for wind gust (column GUST) for the year 2019
print("Percentage of missing values in the GUST Column: " + str(percent) + " %") #VARIABLE DECLARED IN THE FIRST BLOCK

Percentage of missing values in the GUST Column: 82.87671232876713 %


In [214]:
#PART 5: Find the mean, median, mode and standard deviation of the Temperature (column TEMP) for each month for the year 2020.
results = df.filter(year(col("DATE")) == 2020)\
            .groupBy(month(col("DATE")).alias("MONTH"))\
            .agg(
                avg(col("TEMP")).alias("MEAN"),
                expr("percentile_approx(TEMP, 0.5)").alias("MEDIAN"),
                approx_count_distinct(col("TEMP")).alias("MODE"),
                stddev(col("TEMP")).alias("STDDEV")
            )\
            .orderBy("MONTH")

# display the results
results.show()


+-----+------------------+------+----+------------------+
|MONTH|              MEAN|MEDIAN|MODE|            STDDEV|
+-----+------------------+------+----+------------------+
|    1|15.210169491525424|  14.7|  57|12.653031460610185|
|    2|13.577358490566034|  15.3|  48|13.186832615404862|
|    3|15.023333333333333|  18.6|  54|15.829465837499537|
|    4|23.329999999999995|  26.0|  56| 13.02209725617009|
|    5| 36.21935483870967|  36.0|  53| 8.077246704851957|
|    6| 47.42999999999999|  46.0|  54| 8.877190347997287|
|    7| 52.88709677419356|  51.4|  52|  6.66378723291517|
|    8| 49.37666666666669|  48.7|  50| 6.615066692379807|
|    9| 40.92727272727273|  39.0|  11| 8.161138512375699|
|   10|29.690322580645166|  24.3|  31|10.800072679962533|
|   11|             31.01|  29.8|   9|   7.7448836157958|
|   12|18.642857142857142|  19.5|  19| 9.619956934860543|
+-----+------------------+------+----+------------------+

