In [1]:
# Name: ANAY ABHIJIT JOSHI
# Date: 30 OCTOBER 2024

# PROJECT 4: Big Data with PySpark using Anaconda & Jupyter notebook

In [2]:
# PySpark is now available in pypi. To install, I will just run pip install pyspark
!pip install pyspark



In [3]:
# Beautiful Soup is a Python package for parsing HTML and XML documents, including those with malformed markup.
!pip install requests beautifulsoup4



In [4]:
# Here are the required Python modules and libraries to be imported for QUESTION 2
import os
import requests
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession

# Base URL for NCEI Bulk Data Download
base_url = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access"

# Local directory to store the downloaded files of Cincinnati and Florida
data_directory = "./weather_data"

# Create the directory if it does not exist in the current working directory
if not os.path.exists(data_directory):
    # Create the directory now
    os.makedirs(data_directory)

# Here, I am downloading the weather data for Cincinnati and Florida for the years 2015 to 2024
years = range(2015, 2025)   # Last year is exclusive
# Stations for Cincinnati and Florida, respectively
stations = ["72429793812", "99495199999"]

# Now, let's download the weather data for Cincinnati and Florida for the years 2015 to 2024
def download_file(url, local_filename):
    # Download the file from the URL
    with requests.get(url, stream=True) as r:
        # Check if the request was successful, i.e., HTTP status code of '200'
        if r.status_code == 200:
            # Save the downloaded file to the local directory
            with open(local_filename, 'wb') as f:
                # Iterate over the content of the response
                for chunk in r.iter_content(chunk_size=8192):
                    # Write the content to the file
                    f.write(chunk)
            # Now, let's print the local filename to check the downloaded file's sta
            print(f"Successfully Downloaded: {local_filename} !")
        # If the request was not successful
        else:
            print(f"Error! Failed to download: {url}")

# Now, let's download the weather data for Cincinnati and Florida for the years 2015 to 2024, via a loop (for)
for year in years:
    # Construct the URL for the year
    year_url = f"{base_url}/{year}/"

    # Now, let's get the response from the URL
    response = requests.get(year_url)
    # Again, check if the response was successful, i.e., HTTP status code of '200'
    if response.status_code != 200:
        # Print the error message
        print(f"Error! Failed to access: {year_url}")
        # Now, skip to the next iteration
        continue

    # Parse the HTML content of the response using the 'BeautifulSoup' library
    soup = BeautifulSoup(response.content, 'html.parser')
    # Find all the links in the HTML content on the page
    links = soup.find_all('a')

    # Now, let's download the weather data for Cincinnati and Florida for the specific year
    for station in stations:
        # Construct the filename
        filename = f"{station}.csv"
        # Iterate over the links
        for link in links:
            # Check if the link's 'href' attribute matches the filename
            if link.get('href') == filename:
                # Construct the URL for the file
                file_url = f"{year_url}{filename}"
                # Construct the local path for the file
                local_path = os.path.join(data_directory, f"{year}_{filename}")
                # Download the file
                download_file(file_url, local_path)
                # Finally, break the loop
                break

# Now, let's check the number of rows in each dataset
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .getOrCreate()

# List to store the counts of the datasets. Initially, it is empty because I have not counted the rows yet
dataset_counts = []
print(f" ")

# Now, let's count the number of rows in each dataset
for year in years:
    # Iterate over the stations
    for station in stations:
        # Skip the year 2016 for the station "99495199999" because weather data for Florida (station #99495199999) does not exist for the year 2016
        if year == 2016 and station == "99495199999":
            # Skip to the next iteration
            continue

        # Construct the file path again
        file_path = os.path.join(data_directory, f"{year}_{station}.csv")

        # Check if the file exists already
        if os.path.exists(file_path):
            # Read the CSV file using Spark
            df = spark.read.option("header", "true").csv(file_path)
            # Count the total number of rows in one of the given datasets
            row_count = df.count()

            # Append the year, station, and row count to the list
            dataset_counts.append((year, station, row_count))
            # Cincinnati Station
            if station == "72429793812":
                # Print the year, station, and row count
                print(f"Cincinnati --> Year: {year}, Station: {station}, Count: {row_count}")
            # Florida Station
            elif station == "99495199999":
                # Print the year, station, and row count
                print(f"Florida    --> Year: {year}, Station: {station}, Count: {row_count}")
        # If the file does not exist
        else:
            print(f"Error! File not found for Year: {year}, Station: {station}")

# Now, let's stop the Spark session and see the total number of results
if len(dataset_counts) == 19:
    print("\nTotal Results: 19 (as expected)")
else:
    print(f"\nTotal Results: {len(dataset_counts)} (unexpected)")


Successfully Downloaded: ./weather_data/2015_72429793812.csv !
Successfully Downloaded: ./weather_data/2015_99495199999.csv !
Successfully Downloaded: ./weather_data/2016_72429793812.csv !
Successfully Downloaded: ./weather_data/2017_72429793812.csv !
Successfully Downloaded: ./weather_data/2017_99495199999.csv !
Successfully Downloaded: ./weather_data/2018_72429793812.csv !
Successfully Downloaded: ./weather_data/2018_99495199999.csv !
Successfully Downloaded: ./weather_data/2019_72429793812.csv !
Successfully Downloaded: ./weather_data/2019_99495199999.csv !
Successfully Downloaded: ./weather_data/2020_72429793812.csv !
Successfully Downloaded: ./weather_data/2020_99495199999.csv !
Successfully Downloaded: ./weather_data/2021_72429793812.csv !
Successfully Downloaded: ./weather_data/2021_99495199999.csv !
Successfully Downloaded: ./weather_data/2022_72429793812.csv !
Successfully Downloaded: ./weather_data/2022_99495199999.csv !
Successfully Downloaded: ./weather_data/2023_7242979381

In [5]:
# Here are some more more Python modules and libraries to be imported
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, mean, stddev, count, when, expr, max as spark_max, min as spark_min, year, month

# Let's create a Spark session
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Now, let's load the data into DataFrames
data_directory = "./weather_data"

# Load CSV files into DataFrames for Cincinnati and Florida
cincinnati_files = [f"{data_directory}/{year}_72429793812.csv" for year in years]
# As I stated earlier, weather data for Florida (station #99495199999) does not exist for the year 2016
florida_files = [f"{data_directory}/{year}_99495199999.csv" for year in years if year != 2016]

# Read the data from the CSV files
cincinnati_df = spark.read.option("header", "true").csv(cincinnati_files)
florida_df = spark.read.option("header", "true").csv(florida_files)

# Display the total number of row counts
print(f"Cincinnati Data Count (Total Number of Rows): {cincinnati_df.count()}")
print(f"Florida Data Count (Total Number of Rows)   : {florida_df.count()}")

Cincinnati Data Count (Total Number of Rows): 3588
Florida Data Count (Total Number of Rows)   : 2483


In [6]:
# QUESTION 3
# Find the hottest day (column MAX) for each year:
# Provide the corresponding station code, station name, date, and temperature (columns: STATION, NAME, DATE, MAX).
# There should be 10 results.

# NOTE: Ignoring all the Missing Values of 9999.9 for fetching the accurate data

# Now, let's find the hottest day (column MAX) for each year in Cincinnati and Florida - excluding the year 2016 for Florida

# Import required modules and libraries of Python
from pyspark.sql import Row
from pyspark.sql.functions import col, max as spark_max, lit

# List to store the hottest days, initially, it would be empty
hottest_days = []

# Iterating over the years and stations, defined earlier, via a "for" loop
for year in years:
    for station in stations:
        file_path = f"{data_directory}/{year}_{station}.csv"

        # Check if the file exists
        if os.path.exists(file_path):
            df = spark.read.option("header", "true").csv(file_path)

            # Remove rows with 9999.9 values and find the maximum temperature
            valid_df = df.filter(col("MAX") != 9999.9)
            max_temp = valid_df.agg(spark_max("MAX").alias("Max_Temp")).collect()[0]["Max_Temp"]

            # If the max temperature is still 9999.9 after filtering, use the second-highest value
            hottest_day = valid_df.filter(col("MAX") == max_temp) \
                                  .select("STATION", "NAME", "DATE", "MAX") \
                                  .withColumn("YEAR", lit(year)) \
                                  .collect()[0]

            # Append the hottest day to the list now
            hottest_days.append(Row(YEAR=year, STATION=hottest_day["STATION"], NAME=hottest_day["NAME"], DATE=hottest_day["DATE"], MAX=hottest_day["MAX"]))

# Create a DataFrame from the list of hottest days for each year
hottest_days_df = spark.createDataFrame(hottest_days)

# Finally, let's display the hottest days for each year in Cincinnati and Florida (10 for Cincinnati and 9 for Florida)
hottest_days_df = hottest_days_df.orderBy("YEAR")

# Group the hottest days by year
grouped_years = hottest_days_df.collect()

# Print the hottest days by year
print("Hottest Days by Year (Cincinnati and Florida):\n")
print(f" Year  |    Station   |                 Station Name                       |    Date      | Maximum Temperature")
print(f"-------|--------------|----------------------------------------------------|--------------|---------------------")

# Iterating over the grouped years for the final time
last_year = None
for row in grouped_years:
    if last_year and row['YEAR'] != last_year:
        print("\n")  # Extra line for readability
    print(f"{row['YEAR']:^6} | {row['STATION']:^12} | {row['NAME']:<50} | {row['DATE']:^12} | {float(row['MAX']):>7.1f}")
    last_year = row['YEAR']

Hottest Days by Year (Cincinnati and Florida):

 Year  |    Station   |                 Station Name                       |    Date      | Maximum Temperature
-------|--------------|----------------------------------------------------|--------------|---------------------
 2015  | 72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   |  2015-06-12  |    91.9
 2015  | 99495199999  | SEBASTIAN INLET STATE PARK, FL US                  |  2015-07-28  |    90.0


 2016  | 72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   |  2016-07-24  |    93.9


 2017  | 72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   |  2017-07-22  |    91.9
 2017  | 99495199999  | SEBASTIAN INLET STATE PARK, FL US                  |  2017-05-13  |    88.3


 2018  | 72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   |  2018-07-04  |    96.1
 2018  | 99495199999  | SEBASTIAN INLET STATE PARK, FL US                  |  2018-09-15  |    90.1


 2019  | 7242979381

In [7]:
# QUESTION 4:
  # Find the coldest day (column MIN) for the month of March across all years (2015-2024) (Points: 1):
  # Provide the corresponding station code, station name, date, and temperature (columns: STATION, NAME, DATE, MIN).
  # There should be 1 result.

# Here is an empty list to store the coldest days in March
march_min_temps = []

# Iterate over the years and stations
for year in years:
    # Iterate over the stations
    for station in stations:
        # Now, here is the file path to each CSV file for reading the data
        file_path = f"{data_directory}/{year}_{station}.csv"

        # Check if the file exists
        if os.path.exists(file_path):
            # Read the CSV file using Spark
            df = spark.read.option("header", "true").csv(file_path)

            # Filter the data for the month of March
            march_df = df.filter(month(col("DATE")) == 3)
            min_temp = march_df.agg(spark_min("MIN").alias("Min_Temp")).collect()[0]["Min_Temp"]

            # If the minimum temperature is not None then, find the coldest day
            if min_temp is not None:
                # Coldest day for the month of March
                coldest_day = march_df.filter(col("MIN") == min_temp) \
                                      .select("STATION", "NAME", "DATE", "MIN") \
                                      .withColumn("YEAR", lit(year)) \
                                      .collect()[0]
                # Append the coldest day to the list
                march_min_temps.append(coldest_day)

# Create a DataFrame from the list of coldest days in March
march_min_temps_df = spark.createDataFrame(march_min_temps)
# Finally, find that one coldest day in March across all years (2015-2024)
coldest_march_day = march_min_temps_df.orderBy("MIN").limit(1).collect()[0]

# Display the result in table format for better visualization and readability
print("\nOne Coldest Day in March (2015-2024) from Cincinnati and Florida (both):\n")
print(f" Year  |    Station   |                 Station Name                       |    Date      | Minimum Temperature")
print(f"-------|--------------|----------------------------------------------------|--------------|---------------------")
print(f"{coldest_march_day['YEAR']:^6} | {coldest_march_day['STATION']:^12} | {coldest_march_day['NAME']:<50} | {coldest_march_day['DATE']:^12} | {float(coldest_march_day['MIN']):>7.1f}")



One Coldest Day in March (2015-2024) from Cincinnati and Florida (both):

 Year  |    Station   |                 Station Name                       |    Date      | Minimum Temperature
-------|--------------|----------------------------------------------------|--------------|---------------------
 2015  | 72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   |  2015-03-06  |     3.2


In [8]:
# QUESTION 4 (continued) -
# Here is an empty list to store the coldest days in March as a dictionary for each station (Cincinnati and Florida)
march_min_temps = {"72429793812": [], "99495199999": []}

# Iterate over the years and stations
for year in years:
    # Iterate over the stations
    for station in stations:
        # Now, construct the file path to each CSV file for reading the data
        file_path = f"{data_directory}/{year}_{station}.csv"

        # Finally, check if the file exists
        if os.path.exists(file_path):
            # Let's read the CSV file using Spark
            df = spark.read.option("header", "true").csv(file_path)

            # Now, filter the data for the month of March, as per the requirement
            march_df = df.filter(month(col("DATE")) == 3)
            # Find the minimum temperature for the month of March
            min_temp = march_df.agg(spark_min("MIN").alias("Min_Temp")).collect()[0]["Min_Temp"]

            # If the minimum temperature is not None, then, find the coldest day
            if min_temp is not None:
                # Find the coldest day for the month of March
                coldest_day = march_df.filter(col("MIN") == min_temp) \
                                      .select("STATION", "NAME", "DATE", "MIN") \
                                      .withColumn("YEAR", lit(year)) \
                                      .collect()[0]

                # Now, append the coldest day to the list
                march_min_temps[station].append(coldest_day)

# Convert the list of coldest days to DataFrames for Cincinnati and Florida each
cincinnati_min_df = spark.createDataFrame(march_min_temps["72429793812"]).orderBy("MIN").limit(1)
florida_min_df = spark.createDataFrame(march_min_temps["99495199999"]).orderBy("MIN").limit(1)

# Collect the results from the DataFrames
coldest_cincinnati_day = cincinnati_min_df.collect()[0]
coldest_florida_day = florida_min_df.collect()[0]

# Display the result in table format for better readability and visualization
print("\nColdest Day in March (2015-2024) for Cincinnati and Florida (each):\n")
print(f" Year  |    Station   |                 Station Name                       |    Date      | Minimum Temperature")
print(f"-------|--------------|----------------------------------------------------|--------------|---------------------")
print(f"{coldest_cincinnati_day['YEAR']:^6} | {coldest_cincinnati_day['STATION']:^12} | {coldest_cincinnati_day['NAME']:<50} | {coldest_cincinnati_day['DATE']:^12} | {float(coldest_cincinnati_day['MIN']):>7.1f}")
print(f"{coldest_florida_day['YEAR']:^6} | {coldest_florida_day['STATION']:^12} | {coldest_florida_day['NAME']:<50} | {coldest_florida_day['DATE']:^12} | {float(coldest_florida_day['MIN']):>7.1f}")


Coldest Day in March (2015-2024) for Cincinnati and Florida (each):

 Year  |    Station   |                 Station Name                       |    Date      | Minimum Temperature
-------|--------------|----------------------------------------------------|--------------|---------------------
 2015  | 72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   |  2015-03-06  |     3.2
 2022  | 99495199999  | SEBASTIAN INLET STATE PARK, FL US                  |  2022-03-13  |    45.5


In [9]:
# QUESTION 5:
# Find the year with the most precipitation for Cincinnati and Florida (Points: 1):
# Provide the corresponding station code, station name, and year (columns: STATION, NAME, YEAR, Mean of PRCP).
# There should be 2 results.

# Again, necessary imports
from pyspark.sql.functions import year, col, mean

# Again, load the data into DataFrames from the CSV files for Cincinnati
cincinnati_files = [f"{data_directory}/{year}_72429793812.csv" for year in years]
cincinnati_df = spark.read.option("header", "true").csv(cincinnati_files)

# Calculate mean precipitation by year for Cincinnati only
cincinnati_precip = cincinnati_df.withColumn("YEAR", year(col("DATE"))) \
    .groupBy("YEAR", "STATION", "NAME") \
    .agg(mean("PRCP").alias("Mean_PRCP")) \
    .orderBy(col("Mean_PRCP").desc()) \
    .limit(1)

# Now, load the data into DataFrames from the CSV files for Florida
florida_files = [f"{data_directory}/{year}_99495199999.csv" for year in years if year != 2016]
florida_df = spark.read.option("header", "true").csv(florida_files)

# Calculate mean precipitation by year for Florida only
florida_precip = florida_df.withColumn("YEAR", year(col("DATE"))) \
    .groupBy("YEAR", "STATION", "NAME") \
    .agg(mean("PRCP").alias("Mean_PRCP")) \
    .orderBy(col("Mean_PRCP").desc()) \
    .limit(1)

# Collect results from the DataFrames for both Cincinnati and Florida
cincinnati_result = cincinnati_precip.collect()[0]
florida_result = florida_precip.collect()[0]

# Display the result in table format for better readability and visualization
print("\nYear with Most Precipitation for Cincinnati and Florida:\n")
print(f" Year  |    Station   |                 Station Name                       | Mean PRCP")
print(f"-------|--------------|----------------------------------------------------|----------")
print(f"{cincinnati_result['YEAR']:^6} | {cincinnati_result['STATION']:^12} | {cincinnati_result['NAME']:<50} | {cincinnati_result['Mean_PRCP']:.2f}")
print(f"{florida_result['YEAR']:^6} | {florida_result['STATION']:^12} | {florida_result['NAME']:<50} | {florida_result['Mean_PRCP']:.2f}")


Year with Most Precipitation for Cincinnati and Florida:

 Year  |    Station   |                 Station Name                       | Mean PRCP
-------|--------------|----------------------------------------------------|----------
 2024  | 72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   | 5.44
 2020  | 99495199999  | SEBASTIAN INLET STATE PARK, FL US                  | 0.00


In [10]:
# QUESTION 6
# Count the percentage of missing values for wind gust (column GUST) for Cincinnati and Florida in the year 2024!

# There should be 2 results.

# Again, as usual, import the necessary libraries and modules.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Again, define file paths for 2024 data for Cincinnati and Florida
cincinnati_2024_file = "./weather_data/2024_72429793812.csv"
florida_2024_file = "./weather_data/2024_99495199999.csv"

# Let's load 2024 data for Cincinnati again
cincinnati_2024_df = spark.read.option("header", "true").csv(cincinnati_2024_file)
florida_2024_df = spark.read.option("header", "true").csv(florida_2024_file)

# Count missing GUST values and total rows for Cincinnati file of 2024
cincinnati_missing_count = cincinnati_2024_df.filter(col("GUST") == 999.9).count()
cincinnati_total_count = cincinnati_2024_df.count()
cincinnati_missing_percentage = (cincinnati_missing_count / cincinnati_total_count) * 100

# Count missing GUST values and total rows for Florida file of 2024
florida_missing_count = florida_2024_df.filter(col("GUST") == 999.9).count()
florida_total_count = florida_2024_df.count()
florida_missing_percentage = (florida_missing_count / florida_total_count) * 100

# Display the results for QUESTION 6
print("\nPercentage of Missing Values for Wind Gust (column GUST) for Cincinnati and Florida in 2024:\n")
print(f"Cincinnati: {cincinnati_missing_percentage:.2f}%")
print(f"Florida: {florida_missing_percentage:.2f}%")


Percentage of Missing Values for Wind Gust (column GUST) for Cincinnati and Florida in 2024:

Cincinnati: 39.53%
Florida: 100.00%


In [11]:
# QUESTION 7
# Find the mean, median, mode, and standard deviation of the temperature (column TEMP) for Cincinnati in each month for the year 2020.
# There should be 12 results (one for each month, with 4 values for each result).

# Here are the imported modules and libraries, again
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, mean, stddev, expr, count
from pyspark.sql import Window

# Define file path for Cincinnati 2020 data of the weather
cincinnati_2020_file = "./weather_data/2020_72429793812.csv"

# Load 2020 data for Cincinnati, now!
cincinnati_2020_df = spark.read.option("header", "true").csv(cincinnati_2020_file)

# Convert TEMP to float now
cincinnati_2020_df = cincinnati_2020_df.withColumn("TEMP", col("TEMP").cast("float"))

# Filter out rows with missing or placeholder values for TEMP, e.g., 9999.9
cincinnati_2020_df = cincinnati_2020_df.filter((col("TEMP") != 9999.9) & (col("TEMP").isNotNull()))

# Group data by month and calculate mean and standard deviation, EACH MONTH of 2020
temp_stats_df = cincinnati_2020_df.withColumn("MONTH", month(col("DATE"))) \
    .groupBy("MONTH") \
    .agg(
        mean("TEMP").alias("Mean_TEMP"),
        stddev("TEMP").alias("StandardDeviation_TEMP")
    )

# Calculate the median for each month of 2020
temp_median_df = cincinnati_2020_df.withColumn("MONTH", month(col("DATE"))) \
    .groupBy("MONTH") \
    .agg(expr("percentile_approx(TEMP, 0.5)").alias("Median_TEMP"))

# Calculate the mode for each month of 2020
window = Window.partitionBy("MONTH")
temp_mode_df = cincinnati_2020_df.withColumn("MONTH", month(col("DATE"))) \
    .groupBy("MONTH", "TEMP") \
    .agg(count("TEMP").alias("Frequency")) \
    .withColumn("Max_Freq", expr("max(Frequency) over (PARTITION BY MONTH)")) \
    .filter(col("Frequency") == col("Max_Freq")) \
    .groupBy("MONTH") \
    .agg(expr("first(TEMP)").alias("Mode_TEMP"))

# Combine all statistics into a single DataFrame now
final_stats_df = temp_stats_df \
    .join(temp_median_df, "MONTH") \
    .join(temp_mode_df, "MONTH") \
    .orderBy("MONTH")

# Display results in table format for the final answer
print("\nTemperature Statistics for Cincinnati for Each Month in 2020:\n")
# Let's not truncate the values for better results (accurate)!
final_stats_df.show(truncate=False)


Temperature Statistics for Cincinnati for Each Month in 2020:

+-----+------------------+----------------------+-----------+---------+
|MONTH|Mean_TEMP         |StandardDeviation_TEMP|Median_TEMP|Mode_TEMP|
+-----+------------------+----------------------+-----------+---------+
|1    |37.945161081129505|8.345810838316384     |37.7       |24.7     |
|2    |36.58965525133856 |7.901597947537755     |36.0       |25.9     |
|3    |49.0741934007214  |8.77940669347644      |47.8       |39.6     |
|4    |51.77999992370606 |7.3131621276074465    |51.0       |49.4     |
|5    |60.89032290058751 |9.314768319579512     |63.7       |73.9     |
|6    |72.54666570027669 |4.8999458590264515    |73.7       |70.7     |
|7    |77.6000001968876  |2.337947626620972     |77.9       |78.4     |
|8    |73.34516143798828 |3.4878690606063563    |73.7       |78.3     |
|9    |66.09999961853028 |7.118261579669542     |65.8       |74.5     |
|10   |55.19354851015152 |6.7286914818367975    |54.0       |52.2     |


In [12]:
# QUESTION 8
# Find the top 10 days with the lowest Wind Chill for Cincinnati in 2017 (Points: 1):
# For days where TEMP is below 50°F and WDSP (wind speed) is above 3 mph, calculate Wind Chill using the formula:
# WC = 35.74 + 0.6215 × TEMP − 35.75 × (WDSP)^0.16 + 0.4275 × TEMP × (WDSP)^0.16
# Add a new column for Wind Chill and display the top 10 days with the lowest Wind Chill.


# Here are the necessary imports again...
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

# Defining the file path for Cincinnati (2017)
cincinnati_2017_file = "./weather_data/2017_72429793812.csv"

# Loading the 2017's data for Cincinnati
cincinnati_2017_df = spark.read.option("header", "true").csv(cincinnati_2017_file)

# Convert TEMP and WDSP to float for calculations
cincinnati_2017_df = cincinnati_2017_df.withColumn("TEMP", col("TEMP").cast("float")) \
                                       .withColumn("WDSP", col("WDSP").cast("float"))

# Filtering for days where TEMP < 50°F and WDSP > 3 mph
filtered_df = cincinnati_2017_df.filter((col("TEMP") < 50) & (col("WDSP") > 3))

# Calculating Wind Chill using the given formula
wind_chill_df = filtered_df.withColumn("Wind_Chill",
    (35.74 + (0.6215 * col("TEMP")) - (35.75 * (col("WDSP") ** 0.16))) + (0.4275 * col("TEMP") * (col("WDSP") ** 0.16))
)

# Now, finally, let's select and display the top 10 days with the lowest Wind Chill
top_10_lowest_wc = wind_chill_df.orderBy(col("Wind_Chill").asc()).select("NAME", "DATE", "TEMP", "WDSP", "Wind_Chill").limit(10)

# Finally, let's show the results/answer of this question
print("\nTop 10 Days with the Lowest Wind Chill for Cincinnati in 2017:\n")
# Let's not truncate the values for better results (accurate)!
top_10_lowest_wc.show(truncate=False)


Top 10 Days with the Lowest Wind Chill for Cincinnati in 2017:

+------------------------------------------------+----------+----+----+-------------------+
|NAME                                            |DATE      |TEMP|WDSP|Wind_Chill         |
+------------------------------------------------+----------+----+----+-------------------+
|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2017-01-07|10.5|7.0 |-0.4140156367932173|
|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2017-12-31|11.0|5.3 |2.0339764741541018 |
|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2017-12-27|13.0|5.8 |3.8206452986638073 |
|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2017-12-28|13.6|5.8 |4.533355513517824  |
|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2017-01-06|13.6|5.5 |4.868933492954463  |
|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2017-01-08|15.9|5.2 |7.929747979856229  |
|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2017-12-25|25.8|13.5|14.285112249501509 |
|CINCINNATI MUN

In [13]:
# QUESTION 9 (for just the year of 2024)
# Investigate how many days in 2024 had extreme weather conditions for Florida (fog, rain, snow, etc.) using the FRSHTT column (Points: 1).
# There should be 1 result.

# FRSHTT - Indicators (1 = yes, 0 = no/not reported) for the occurrence during the day of:
    # Fog ('F' - 1st digit).
    # Rain or Drizzle ('R' - 2nd digit).
    # Snow or Ice Pellets ('S' - 3rd digit).
    # Hail ('H' - 4th digit).
    # Thunder ('T' - 5th digit).
    # Tornado or Funnel Cloud ('T' - 6th digit).

# Here are the necessary imports, again
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Defining file path for Florida data for 2024
florida_file = "./weather_data/2024_99495199999.csv"  # Replace with the file path as needed, I am choosing Florida

# There are 158 days for Cincinnati in 2024, and I get the same result (I have Tested it as well)
# Cincinnati for 2024 (please ignore the "file name" variable, file path is for Cincinnati 2024)
# florida_file = "./weather_data/2024_72429793812.csv"  # Replace with the file path as needed, I am choosing Florida

# Loading the data for Florida of 2024
florida_df = spark.read.option("header", "true").csv(florida_file)

# Filtering for extreme weather days where any FRSHTT indicator is 1 (Yes) as stated in the README.pdf
extreme_weather_df = florida_df.filter(
    (col("FRSHTT").substr(1, 1) == "1") |  # Fog
    (col("FRSHTT").substr(2, 1) == "1") |  # Rain or Drizzle
    (col("FRSHTT").substr(3, 1) == "1") |  # Snow or Ice Pellets
    (col("FRSHTT").substr(4, 1) == "1") |  # Hail
    (col("FRSHTT").substr(5, 1) == "1") |  # Thunder
    (col("FRSHTT").substr(6, 1) == "1")    # Tornado or Funnel Cloud
)

# Counting the number of extreme weather days in Florida for 2024
extreme_weather_days_count = extreme_weather_df.count()

# Finally, let's display the results now
print(f"\nNumber of Days with Extreme Weather Conditions in Florida in 2024: {extreme_weather_days_count}\n")


Number of Days with Extreme Weather Conditions in Florida in 2024: 0



In [14]:
# QUESTION 9 (for years from 2015-2024) -- continued
# Investigate how many days from 2015-2024 had extreme weather conditions for Florida (fog, rain, snow, etc.) using the FRSHTT column (Points: 1).
# There should be 1 result.

# FRSHTT - Indicators (1 = yes, 0 = no/not reported) for the occurrence during the day of:
    # Fog ('F' - 1st digit).
    # Rain or Drizzle ('R' - 2nd digit).
    # Snow or Ice Pellets ('S' - 3rd digit).
    # Hail ('H' - 4th digit).
    # Thunder ('T' - 5th digit).
    # Tornado or Funnel Cloud ('T' - 6th digit).

# Here are the necessary imports, again
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Again, define the base directory and years for Florida files
data_directory = "./weather_data"
years = [y for y in range(2015, 2025) if y != 2016]  # Skip 2016 if file does not exist

# Let's list to store DataFrames for each year, it's empty, initially
florida_dfs = []

# Loading data for each year and append to the list - only FLORIDA
for year in years:
    florida_file = f"{data_directory}/{year}_99495199999.csv"
    florida_df = spark.read.option("header", "true").csv(florida_file)
    florida_dfs.append(florida_df)

# ==================================================================================================================
# ==================================================================================================================

# # For Cincinnati, please execute this "for" loop and make sure to comment-out the "for" loop written above.
# # Ignore the variable names, because the file path is of Cincinnati

# for year in years:
#     florida_file = f"{data_directory}/{year}_72429793812.csv"
#     florida_df = spark.read.option("header", "true").csv(florida_file)
#     florida_dfs.append(florida_df)

# ==================================================================================================================
# ==================================================================================================================

# Union/Combine all DataFrames for Florida from 2015 to 2024
all_florida_df = florida_dfs[0]
for df in florida_dfs[1:]:
    all_florida_df = all_florida_df.union(df)

# Convert FRSHTT to string for substring operations
all_florida_df = all_florida_df.withColumn("FRSHTT", col("FRSHTT").cast("string"))

# Filter for extreme weather days where any FRSHTT indicator is 1
extreme_weather_df = all_florida_df.filter(
    (col("FRSHTT").substr(1, 1) == "1") |  # Fog
    (col("FRSHTT").substr(2, 1) == "1") |  # Rain or Drizzle
    (col("FRSHTT").substr(3, 1) == "1") |  # Snow or Ice Pellets
    (col("FRSHTT").substr(4, 1) == "1") |  # Hail
    (col("FRSHTT").substr(5, 1) == "1") |  # Thunder
    (col("FRSHTT").substr(6, 1) == "1")    # Tornado/Funnel Cloud
)

# Count the number of extreme weather days for the years ranging from 2015-2024
extreme_weather_days_count = extreme_weather_df.count()

# Finally, let's display the results now
print(f"\nNumber of Days with Extreme Weather Conditions in Florida for all years from 2015 to 2024: {extreme_weather_days_count}\n")


Number of Days with Extreme Weather Conditions in Florida for all years from 2015 to 2024: 0



In [15]:
# QUESTION 10
# Predict the maximum Temperature for Cincinnati for November and December 2024, based on the previous 2 years of weather data (Points: 1):
# There should be 2 results.
# You can use any model  to forecast on the historical weather data.

# Here are the necessary imports
from pyspark.sql.functions import col, avg, month, max, year

# Filtering the data for 2022 and 2023 and remove invalid MAX values
cincinnati_2022_2023 = cincinnati_df.filter((year(col("DATE")).isin([2022, 2023])) & (col("MAX") != 9999.9))

# Calculating the max temperature for November in 2022 and 2023 in Cincinnati
# 2022 NOVEMBER
nov_max_temp_2022 = cincinnati_2022_2023.filter((year(col("DATE")) == 2022) & (month(col("DATE")) == 11)) \
    .agg(max("MAX").alias("Max_Nov_2022")) \
    .collect()[0]["Max_Nov_2022"]

# 2023 NOVEMBER
nov_max_temp_2023 = cincinnati_2022_2023.filter((year(col("DATE")) == 2023) & (month(col("DATE")) == 11)) \
    .agg(max("MAX").alias("Max_Nov_2023")) \
    .collect()[0]["Max_Nov_2023"]

# Now, 2022 DECEMBER
dec_2022_all_temps = cincinnati_2022_2023.filter((year(col("DATE")) == 2022) & (month(col("DATE")) == 12)) \
    .orderBy(col("MAX").desc())

# Convert results to a list and get the highest values, ignoring the missing data which has a value of 9999.9
dec_2022_max_values = [row["MAX"] for row in dec_2022_all_temps.collect() if row["MAX"] != 9999.9]

if len(dec_2022_max_values) >= 2:
    dec_max_temp_2022 = dec_2022_max_values[1]  # Highest Temperature Value
elif len(dec_2022_max_values) == 1:
    dec_max_temp_2022 = dec_2022_max_values[0]  # Only 1 Value
else:
    dec_max_temp_2022 = None  # No valid values found

# Let's calculate the MAX temperature for December 2023
dec_max_temp_2023 = cincinnati_2022_2023.filter((year(col("DATE")) == 2023) & (month(col("DATE")) == 12)) \
    .agg(max("MAX").alias("Max_Dec_2023")) \
    .collect()[0]["Max_Dec_2023"]

# Now, calculating the averages of 2022 and 2023 for predicting the temperatures in November 2024 and December 2024
nov_avg_max_temp = (float(nov_max_temp_2022) + float(nov_max_temp_2023)) / 2
dec_avg_max_temp = (float(dec_max_temp_2022) + float(dec_max_temp_2023)) / 2 if dec_max_temp_2022 is not None else None

# Finally, let's display the max temps for 2022 and 2023, and the predictions for 2024
print(f"Cincinnati's Maximum Temperature for November 2022                                     : {float(nov_max_temp_2022):.2f}°F")
print(f"Cincinnati's Maximum Temperature for November 2023                                     : {float(nov_max_temp_2023):.2f}°F")
print(f"Cincinnati's Predicted Maximum Temperature for November 2024                           : {float(nov_avg_max_temp):.2f}°F\n")

if dec_max_temp_2022 is not None:
    print(f"Highest Maximum Temperature for December 2022 (ignoring the MISSING VALUES=9999.9)     : {float(dec_max_temp_2022):.2f}°F")
else:
    print("No Valid Maximum Temperature found for December 2022.")

print(f"Cincinnati's Maximum Temperature for December 2023                                     : {float(dec_max_temp_2023):.2f}°F")
if dec_avg_max_temp is not None:
    print(f"Cincinnati's Predicted Maximum Temperature for December 2024                           : {float(dec_avg_max_temp):.2f}°F")
else:
    print("Cannot Calculate Predicted Max Temperature for December 2024 due to Missing Valid Data.")

Cincinnati's Maximum Temperature for November 2022                                     : 75.90°F
Cincinnati's Maximum Temperature for November 2023                                     : 80.10°F
Cincinnati's Predicted Maximum Temperature for November 2024                           : 78.00°F

Highest Maximum Temperature for December 2022 (ignoring the MISSING VALUES=9999.9)     : 66.00°F
Cincinnati's Maximum Temperature for December 2023                                     : 64.00°F
Cincinnati's Predicted Maximum Temperature for December 2024                           : 65.00°F


In [16]:
# QUESTION 10 (Brief Discussion on the model’s performance and potential improvements)

#   In my approach, I focused on predicting Cincinnati's maximum temperatures for November and December 2024 by looking at data
# from the previous two years (i.e., 2022's and 2023's November and December). I filtered out erroneous values, specifically those
# marked as 9999.9, which often indicate missing data, and then calculated the average of the highest valid temperatures from each
# month to estimate the 2024's Maximum Temperatures for the month of November and December in Cincinnati.
#   Now, one might be wondering - "Why I decided/chose to ignore those missing values?", this is because, 9999.9 value(s) represent
# missing or invalid data and including them could have skewed the results. Replacing them with a mean/average temperature of that particular
# month of that specific year would artificially alter the dataset and could misrepresent extreme temperature trends, so skipping these values
# gives a more accurate prediction.
#   In my opinion, this averaging method provides a reasonable estimate, but it doesn’t capture yearly trends or account for other possible
# influences like humidity or precipitation. Here’s how I think the model could be improved in future, if needed:
#       1. Exploring a Time-Series Model (like ARIMA, Prophet or LSTM, etc.): These Deep Learning architectures/models are designed to handle seasonal
#                                                                             variations, which would likely make the predictions more accurate. I would
#                                                                             use Mean Square Error (MSE) as my Loss Function because the data is of
#                                                                             "Regression" type and not "Classification" type.
#       2. Incorporating Additional Weather Features: Including other factors could better capture the dependencies that affect maximum temperatures.

# Finally, in summary, I can definitely say that the current approach works as a quick estimation for predicting the Maximum Temperature for Cincinnati (accurately) for
# November and December 2024, based on the previous 2 years of weather data (i.e., 2022 and 2023), using a time-series model would likely give me a better, more nuanced
# prediction by recognizing patterns in the data.