In [422]:
# Run a shell command to ensure pyspark is downloaded in the Python environment.
!python -m pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import year, month, col, row_number, trim, input_file_name, split, mean, count, when, sum




[notice] A new release of pip is available: 23.0.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [423]:
# Set up Spark Session, which is an entry point to the PySpark application.
spark = SparkSession.builder \
    .appName("Weather Insights") \
    .getOrCreate()

In [424]:
# Load CSV files using filepath wildcard to open all files of the same type at once.
# In some analysis we are concerned with the dataset to which specific data belongs.
# To capture that information, we can add a filename column to the data frame on creation.
filepath = "C:/Users/benci/College/Class/6th Year/2024 Fall (CS)/INTRO TO CLOUD COMPUTING/Projects/P4/rashmi-p4/data/*.csv"
dataframe = spark.read.option("header", "true").option("mode", "DROPMALFORMED").csv(filepath).withColumn("filename", input_file_name())

# It's beneficial to trim the filename for readability in results, hence the following line.
# To be clear, it splits the filename by delimiters then selects the last listed value, the
# actual name.
dataframe = dataframe.withColumn("filename", split(dataframe["filename"], "/")[15]) # Indexing by -1 wasn't working???

# Looking at the printed schema, it's clear that all columns have string type by default.
# That won't work, as we need to do numeric computations on some values. So, I'll be cleaning
# the data by casting essential columns to floats and removing empty values, where needed.
dataframe = dataframe.withColumn("MAX", trim(col("MAX")).cast("float")) # Convert MAX to float.
dataframe = dataframe.withColumn("MIN", trim(col("MIN")).cast("float")) # Convert MIN to float.
dataframe = dataframe.withColumn("PRCP", trim(col("PRCP")).cast("float")) # Convert PRCP to float.
# I'M NOT TYPE CASTING "GUST" BECAUSE IT CAUSES FLOAT PRECISION ERRORS.
dataframe = dataframe.withColumn("DATE", col("DATE").cast("date")) # Convert DATE to DATE.

dataframe = dataframe \
    .withColumn("YEAR", year(col("DATE"))) \
    .withColumn("MONTH", month(col("DATE")))

In [425]:
# Check if the data was loaded correctly by printing the schema and a few rows.
dataframe.printSchema()
dataframe.show(5)

root
 |-- STATION: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: string (nullable = true)
 |-- TEMP_ATTRIBUTES: string (nullable = true)
 |-- DEWP: string (nullable = true)
 |-- DEWP_ATTRIBUTES: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- SLP_ATTRIBUTES: string (nullable = true)
 |-- STP: string (nullable = true)
 |-- STP_ATTRIBUTES: string (nullable = true)
 |-- VISIB: string (nullable = true)
 |-- VISIB_ATTRIBUTES: string (nullable = true)
 |-- WDSP: string (nullable = true)
 |-- WDSP_ATTRIBUTES: string (nullable = true)
 |-- MXSPD: string (nullable = true)
 |-- GUST: string (nullable = true)
 |-- MAX: float (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- MIN: float (nullable = true)
 |-- MIN_ATTRIBUTES: string (nullable = true)
 |-- PRCP: float (nullable = true)


In [426]:
# Group by filename and count rows
file_counts = dataframe.groupBy("filename").count()
file_counts.show(truncate=False)

+--------------------+-----+
|filename            |count|
+--------------------+-----+
|2020_72429793812.csv|366  |
|2016_72429793812.csv|366  |
|2017_72429793812.csv|365  |
|2015_72429793812.csv|365  |
|2018_72429793812.csv|365  |
|2019_72429793812.csv|365  |
|2022_72429793812.csv|365  |
|2021_72429793812.csv|365  |
|2020_99495199999.csv|365  |
|2023_72429793812.csv|365  |
|2018_99495199999.csv|363  |
|2015_99495199999.csv|355  |
|2019_99495199999.csv|345  |
|2024_72429793812.csv|301  |
|2023_99495199999.csv|276  |
|2017_99495199999.csv|283  |
|2022_99495199999.csv|259  |
|2024_99495199999.csv|133  |
|2021_99495199999.csv|104  |
+--------------------+-----+



In [427]:
# THIS CELL COMPUTES THE HOTTEST DAY IN EACH YEAR

# Before doing anything, we need to get rid of any erroneously high temperature readings.
# Using my personal discretion, I'm choosing to consider any temperature above 150F as
# error.
hottest_cleaned = dataframe \
    .filter(col("MAX") < 150)

# Create a window specification for ranking the days within each year by their
# highest temperature.
hottest_window = Window.partitionBy("YEAR").orderBy(col("MAX").desc())

# Add a column to store the "RANK" of each day. That is, how its high temperature
# compared with other days in the same year.
hottest_days = hottest_cleaned.withColumn("RANK", row_number().over(hottest_window))

# Run a filter to get the single hottest day of each year.
hottest_days = hottest_days.filter(col("RANK") == 1) \
    .select("STATION", "NAME", "DATE", "MAX") \
    .orderBy(col("MAX").desc())

hottest_days.show()

+-----------+--------------------+----------+-----+
|    STATION|                NAME|      DATE|  MAX|
+-----------+--------------------+----------+-----+
|72429793812|CINCINNATI MUNICI...|2024-08-30|100.9|
|72429793812|CINCINNATI MUNICI...|2018-07-04| 96.1|
|72429793812|CINCINNATI MUNICI...|2022-06-14| 96.1|
|72429793812|CINCINNATI MUNICI...|2023-08-23| 96.1|
|72429793812|CINCINNATI MUNICI...|2019-09-30| 95.0|
|72429793812|CINCINNATI MUNICI...|2021-08-12| 95.0|
|72429793812|CINCINNATI MUNICI...|2016-07-24| 93.9|
|72429793812|CINCINNATI MUNICI...|2020-07-05| 93.9|
|72429793812|CINCINNATI MUNICI...|2015-06-12| 91.9|
|72429793812|CINCINNATI MUNICI...|2017-07-22| 91.9|
+-----------+--------------------+----------+-----+



In [428]:
# THIS CELL COMPUTES THE COLDEST DAY OF EACH YEAR, BUT ONLY FROM MARCH

# Run a filter to ensure we're only looking at data from March. There was no obviously
# erroneus data for the coldest temperatures, so I didn't worry about filtering it.
coldest_cleaned = dataframe \
    .filter(col("MONTH") == 3)

# Create a window specification for ranking the days within each year by their
# lowest temperature. This time we'll rank in ascending order, so the coldest
# days get the highest ranks.
coldest_window = Window.partitionBy("YEAR").orderBy(col("MIN").asc())

# Add a column to store the "RANK" of each day.
coldest_days = coldest_cleaned.withColumn("RANK", row_number().over(coldest_window))

# Run a filter to get the single hottest day of each year.
coldest_days = coldest_days.filter(col("RANK") == 1) \
    .select("STATION", "NAME", "DATE", "MIN") \
    .orderBy(col("MIN").asc())

coldest_days.show()

+-----------+--------------------+----------+----+
|    STATION|                NAME|      DATE| MIN|
+-----------+--------------------+----------+----+
|72429793812|CINCINNATI MUNICI...|2015-03-06| 3.2|
|72429793812|CINCINNATI MUNICI...|2019-03-05|10.0|
|72429793812|CINCINNATI MUNICI...|2023-03-15|17.1|
|72429793812|CINCINNATI MUNICI...|2022-03-13|18.0|
|72429793812|CINCINNATI MUNICI...|2017-03-15|19.0|
|72429793812|CINCINNATI MUNICI...|2020-03-01|19.0|
|72429793812|CINCINNATI MUNICI...|2018-03-22|21.0|
|72429793812|CINCINNATI MUNICI...|2024-03-01|23.0|
|72429793812|CINCINNATI MUNICI...|2021-03-02|24.1|
|72429793812|CINCINNATI MUNICI...|2016-03-02|26.1|
+-----------+--------------------+----------+----+



In [429]:
# THIS CELL COMPUTES THE YEAR WITH THE MOST PRECIPITATION AT EACH STATION

# Clean erroneous precipitation readings.
wettest_cleaned = dataframe \
    .filter(col("PRCP") < 99)

# Aggregate the mean precipitation by station and year.
wettest_year_at_station = wettest_cleaned \
    .groupBy("YEAR", "NAME") \
    .agg(mean("PRCP").alias("MEAN_PRECIPITATION"))
    
# Rank the mean precipitations in order to find the greatest for both station. Filter
# the dataframe for only those top ranked records.
wettest_window = Window.partitionBy("NAME").orderBy(col("MEAN_PRECIPITATION").desc())
    
wettest_year_at_station = wettest_year_at_station \
    .withColumn("RANK", row_number().over(wettest_window)) \
    .filter(col("RANK") == 1) \
    .select("NAME", "YEAR", "MEAN_PRECIPITATION")

wettest_year_at_station.show()

+--------------------+----+-------------------+
|                NAME|YEAR| MEAN_PRECIPITATION|
+--------------------+----+-------------------+
|CINCINNATI MUNICI...|2018|0.15789040991500633|
|SEBASTIAN INLET S...|2020|                0.0|
+--------------------+----+-------------------+



In [430]:
# THIS CELL COMPUTES THE PERCENTAGE OF MISSING VALUES FOR EACH STATION IN 2024

# Clean the data. In this case, we have no interest in removing missing values. Rather,
# we want to filter out any data not from the year 2024.
gust_cleaned = dataframe \
    .filter(col("YEAR") == 2024)

# Compute total records and missing GUST data, grouped by name of station to get
# distinct values for each. THIS RESULT DEPENDS ON WHAT YOU CONSIDER TO BE A MISSING
# VALUE: NULL OR ANY ERRONEOUS CASE. I'M CONSIDERING ERRONEOUS DATA AS MISSING. THAT
# IS, GUSTS OF 999.9 ARE MISSING.
missing_gust = gust_cleaned \
    .groupBy("NAME") \
    .agg(
        count("*").alias("total"),  # Count total records per station
        count(when(col("GUST").isNull() | (col("GUST") == "999.9"), True)).alias("missing")  # Count missing values
    )

# Derive % from the above.
missing_gust_pct = missing_gust \
    .withColumn("MISSING_%", (col("missing") / col("total")) * 100) \
    .select("NAME", "MISSING_%")

missing_gust_pct.show()

+--------------------+-----------------+
|                NAME|        MISSING_%|
+--------------------+-----------------+
|CINCINNATI MUNICI...|39.53488372093023|
|SEBASTIAN INLET S...|            100.0|
+--------------------+-----------------+

