**SETTING UP THE SPARK SESSION**

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c3f77319eeae134e490f7e6e29ad7dafc56ecceebe89ef83473f59e48fdf9ca9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, min, avg, count, countDistinct, lit, sum, when, year, month, dayofmonth, stddev, expr, first, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
import pyspark.sql.functions as F

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .getOrCreate()

print('Spark Session Initialized')

Spark Session Initialized


In [13]:
import zipfile
import os

# Path to the zip file assuming the data.zip is in the same directory as this ipynb file
zip_file_path = 'data.zip'

# Directory where the zip file will be extracted
extract_to_dir = 'data'

# Check if the extraction directory exists, create if it doesn't
if not os.path.exists(extract_to_dir):
    os.makedirs(extract_to_dir)

# Open the zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    # Extract all the contents into the directory
    zip_ref.extractall(extract_to_dir)

print(f"Extracted all files in {zip_file_path} to {extract_to_dir}")

Extracted all files in data.zip to data


In [14]:
# Assuming the data.zip has been extracted into a folder named 'data'
weather_df = spark.read.csv("data/data/*/*.csv", header=True, inferSchema=True)

In [15]:
# Filter out missing values and add a column for the year
filtered_weather_df = weather_df.filter(
    (weather_df["MAX"] != 9999.9) &
    (weather_df["MIN"] != 9999.9) &
    (weather_df["PRCP"] != 99.99) &
    (weather_df["TEMP"] != 9999.9)
).withColumn("YEAR", year("DATE"))

**TASK 1**

Finding the hottest day (column MAX) for each year, and providing the corresponding station code, station name and the date (columns STATION, NAME, DATE)

In [16]:
# Creating a window partitioned by year and ordered by maximum temperature
window_spec = Window.partitionBy("YEAR").orderBy(filtered_weather_df["MAX"].desc())

# Adding a row number column to identify the row with maximum temperature for each year
max_temps_per_year = filtered_weather_df.withColumn("row_number", row_number().over(window_spec)) \
    .where(col("row_number") == 1) \
    .drop("row_number")

# Selecting columns in the desired order
max_temps_per_year.select("STATION", "NAME", "DATE", "MAX").orderBy("DATE", ascending=False).show()

+-----------+--------------------+----------+----+
|    STATION|                NAME|      DATE| MAX|
+-----------+--------------------+----------+----+
| 2095099999|          PAJALA, SW|2022-07-01|85.5|
| 1065099999|        KARASJOK, NO|2021-07-05|88.3|
| 1023099999|       BARDUFOSS, NO|2020-06-22|79.9|
| 1023099999|       BARDUFOSS, NO|2019-07-21|78.8|
| 1025099999|          TROMSO, NO|2018-07-29|84.2|
| 1023099999|       BARDUFOSS, NO|2017-06-09|78.6|
| 1023199999|         DRAUGEN, NO|2016-07-21|77.0|
| 1025099999|          TROMSO, NO|2015-07-30|71.6|
| 1023099999|       BARDUFOSS, NO|2014-07-10|89.6|
| 1001499999|      SORSTOKKEN, NO|2013-08-02|80.6|
| 1023099999|       BARDUFOSS, NO|2012-07-05|72.0|
| 1046099999|       SORKJOSEN, NO|2011-07-09|87.8|
|99407099999|DESTRUCTION IS. W...|2010-08-15|74.8|
+-----------+--------------------+----------+----+



**TASK 2**

Finding the coldest day (column MIN) for the month of January across all years (2010 - 2022), and providing the corresponding station code, station name and the date (columns STATION, NAME, DATE).

In [17]:
# Find the coldest day in January
coldest_january = filtered_weather_df.withColumn("MONTH", month("DATE")) \
    .filter(col("MONTH") == 1) \
    .select("STATION", "NAME", "DATE", "MIN") \
    .orderBy("MIN").limit(1)

coldest_january.show()

+----------+-------------+----------+-----+
|   STATION|         NAME|      DATE|  MIN|
+----------+-------------+----------+-----+
|1023099999|BARDUFOSS, NO|2017-01-05|-28.3|
+----------+-------------+----------+-----+



**TASK 3**

Finding the maximum and minimum precipitation (column PRCP) for the year 2015, and providing the corresponding station code, station name and the date (columns STATION, NAME, DATE).

In [18]:
weather_2015 = filtered_weather_df.filter(col("YEAR") == 2015)

# Calculate maximum and minimum precipitation for the year 2015
max_precipitation = weather_2015.agg(max("PRCP").alias("MAX_PRCP")).collect()[0]["MAX_PRCP"]
min_precipitation = weather_2015.agg(min("PRCP").alias("MIN_PRCP")).collect()[0]["MIN_PRCP"]

# Find corresponding station code, station name, and date for maximum precipitation
max_precipitation_data = weather_2015.filter(col("PRCP") == max_precipitation).select("STATION", "NAME", "DATE", "PRCP")

# Find corresponding station code, station name, and date for minimum precipitation
min_precipitation_data = weather_2015.filter(col("PRCP") == min_precipitation).select("STATION", "NAME", "DATE", "PRCP").limit(1)

# Show the results
print("Maximum Precipitation for 2015:")
max_precipitation_data.show()

print("Minimum Precipitation for 2015:")
min_precipitation_data.show()

Maximum Precipitation for 2015:
+----------+----------+----------+----+
|   STATION|      NAME|      DATE|PRCP|
+----------+----------+----------+----+
|1025099999|TROMSO, NO|2015-11-02|2.11|
+----------+----------+----------+----+

Minimum Precipitation for 2015:
+----------+------------+----------+----+
|   STATION|        NAME|      DATE|PRCP|
+----------+------------+----------+----+
|1008099999|LONGYEAR, SV|2015-01-01| 0.0|
+----------+------------+----------+----+



**TASK 4**

Counting the percentage of missing values for wind gust (column GUST) for the year 2019.

In [19]:
# Calculate the percentage of missing gust data for the year 2019
gust_missing_2019 = weather_df.filter(year("DATE") == 2019) \
    .select((count(when(col("GUST") == 999.9, True)) / count("*") * 100).alias("MISSING GUST PERCENTAGE"))

gust_missing_2019.show()

+-----------------------+
|MISSING GUST PERCENTAGE|
+-----------------------+
|      82.87671232876713|
+-----------------------+



**TASK 5**

Finding the mean, median, mode and standard deviation of the temperature (column TEMP) for each month for the year 2020.

In [20]:
weather_2020 = filtered_weather_df.filter(year("DATE") == 2020)

# Extract month from the date
weather_2020 = weather_2020.withColumn("Month", month("DATE"))

# Calculate mean, median, mode, and standard deviation for each month
monthly_stats_2020 = weather_2020.groupBy("Month") \
    .agg(avg("TEMP").alias("Mean TEMP"),
         expr("percentile_approx(TEMP, 0.5)").alias("Median TEMP"),
         expr("sort_array(collect_list(TEMP))[(cast(size(collect_list(TEMP)) as int) + 1) DIV 2]").alias("Mode TEMP"),
         stddev("TEMP").alias("Stddev TEMP")) \
    .orderBy("Month")

monthly_stats_2020.show()

+-----+------------------+-----------+---------+------------------+
|Month|         Mean TEMP|Median TEMP|Mode TEMP|       Stddev TEMP|
+-----+------------------+-----------+---------+------------------+
|    1|15.210169491525424|       14.7|     14.9|12.653031460610185|
|    2|13.577358490566038|       15.3|     15.5|13.186832615404859|
|    3|15.023333333333335|       18.6|     19.9|15.829465837499535|
|    4|23.329999999999995|       26.0|     28.6|13.022097256170087|
|    5| 36.21935483870968|       36.0|     36.1| 8.077246704851957|
|    6|47.429999999999986|       46.0|     46.2| 8.877190347997288|
|    7| 52.88709677419355|       51.4|     51.7| 6.663787232915164|
|    8|49.376666666666665|       48.7|     49.0| 6.615066692379813|
|    9| 40.92727272727273|       39.0|     43.0| 8.161138512375697|
|   10|29.690322580645162|       24.3|     25.2|10.800072679962533|
|   11|             31.01|       29.8|     30.4| 7.744883615795801|
|   12|18.642857142857142|       19.5|     20.1|

**PRINTING ALL THE DESIRED OUTPUT TO "result.txt"**

In [21]:
import pandas as pd
from tabulate import tabulate

max_temps_per_year_df = max_temps_per_year.select("STATION", "NAME", "DATE", "MAX").orderBy("DATE", ascending=False)

# Convert to Pandas DataFrame
max_temps_per_year_pd = max_temps_per_year_df.toPandas()
coldest_january_pd = coldest_january.toPandas()
max_precipitation_data_pd = max_precipitation_data.toPandas()
min_precipitation_data_pd = min_precipitation_data.toPandas()
gust_missing_2019_pd = gust_missing_2019.toPandas()
monthly_stats_2020_pd = monthly_stats_2020.toPandas()

# Define the file path
file_path = "/result.txt"

# Formatting the output file
with open(file_path, "w") as f:
    f.write("TASK 1:\n")
    f.write("The hottest day for each year.\n")
    f.write(tabulate(max_temps_per_year_pd, headers='keys', tablefmt='psql', showindex=False))

    f.write("\n\nTASK 2:\n")
    f.write("The coldest day for the month of January across all years.\n")
    f.write(tabulate(coldest_january_pd, headers='keys', tablefmt='psql', showindex=False))

    f.write("\n\nTASK 3:\n")
    f.write("Maximum Precipitation for 2015.\n")
    f.write(tabulate(max_precipitation_data_pd, headers='keys', tablefmt='psql', showindex=False))
    f.write("\nMinimum Precipitation for 2015.\n")
    f.write(tabulate(min_precipitation_data_pd, headers='keys', tablefmt='psql', showindex=False))

    f.write("\n\nTASK 4:\n")
    f.write("Percentage of missing values for wind gust for the year 2019.\n")
    f.write(tabulate(gust_missing_2019_pd, headers='keys', tablefmt='psql', showindex=False))

    f.write("\n\nTASK 5:\n")
    f.write("The mean, median, mode and standard deviation of the temperature for each month for the year 2020.\n")
    f.write(tabulate(monthly_stats_2020_pd, headers='keys', tablefmt='psql', showindex=False))

print("Data has been saved to: ", file_path)

Data has been saved to:  /result.txt
