In [39]:
import os
import requests
from bs4 import BeautifulSoup
import numpy as np
from scipy import stats
from tabulate import tabulate
import requests
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [40]:
# Base URL for NCEI Bulk Data Download
base_url = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access"
data_directory = "./weather_data"

# Create the directory if it does not exist
os.makedirs(data_directory, exist_ok=True)

# Download weather data for Cincinnati and Florida for years 2015 to 2024
years = range(2015, 2025)
stations = ["72429793812", "99495199999"]

def download_file(url, local_filename):
    """Download file from a URL and save it locally."""
    try:
        with requests.get(url, stream=True) as r:
            r.raise_for_status()  # Raise an error for bad responses
            with open(local_filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
        print(f"Downloaded: {local_filename}")
    except Exception as e:
        print(f"Failed to download {url}. Error: {e}")

def download_data_for_year(year):
    """Download data for all stations for a specific year."""
    year_url = f"{base_url}/{year}/"
    response = requests.get(year_url)

    if response.status_code != 200:
        print(f"Error accessing: {year_url}")
        return

    # Parse HTML to find all file links
    soup = BeautifulSoup(response.content, 'html.parser')
    links = soup.find_all('a')

    # Download data for each station
    for station in stations:
        filename = f"{station}.csv"
        for link in links:
            if link.get('href') == filename:
                file_url = f"{year_url}{filename}"
                local_path = os.path.join(data_directory, f"{year}_{filename}")
                download_file(file_url, local_path)
                break

# Download data for each year
for year in years:
    download_data_for_year(year)

# List to store the counts of the datasets
dataset_counts = []

for year in years:
    for station in stations:
        # Skip 2016 for Florida as data is not available
        if year == 2016 and station == "99495199999":
            continue

        file_path = os.path.join(data_directory, f"{year}_{station}.csv")

        if os.path.exists(file_path):
            try:
                # Read the CSV file using Pandas
                df = pd.read_csv(file_path)
                row_count = len(df)
                dataset_counts.append((year, station, row_count))
                location = "Cincinnati" if station == "72429793812" else "Florida"
                print(f"{location} --> Year: {year}, Station: {station}, Count: {row_count}")
            except pd.errors.EmptyDataError:
                print(f"File is empty: {file_path}")
            except Exception as e:
                print(f"Error reading {file_path}. Error: {e}")
        else:
            print(f"File not found for Year: {year}, Station: {station}")

# Display total results
if len(dataset_counts) == 19:
    print("\nTotal Results: 19 (as expected)")
else:
    print(f"\nTotal Results: {len(dataset_counts)} (unexpected)")


Downloaded: ./weather_data/2015_72429793812.csv
Downloaded: ./weather_data/2015_99495199999.csv
Downloaded: ./weather_data/2016_72429793812.csv
Downloaded: ./weather_data/2017_72429793812.csv
Downloaded: ./weather_data/2017_99495199999.csv
Downloaded: ./weather_data/2018_72429793812.csv
Downloaded: ./weather_data/2018_99495199999.csv
Downloaded: ./weather_data/2019_72429793812.csv
Downloaded: ./weather_data/2019_99495199999.csv
Downloaded: ./weather_data/2020_72429793812.csv
Downloaded: ./weather_data/2020_99495199999.csv
Downloaded: ./weather_data/2021_72429793812.csv
Downloaded: ./weather_data/2021_99495199999.csv
Downloaded: ./weather_data/2022_72429793812.csv
Downloaded: ./weather_data/2022_99495199999.csv
Downloaded: ./weather_data/2023_72429793812.csv
Downloaded: ./weather_data/2023_99495199999.csv
Downloaded: ./weather_data/2024_72429793812.csv
Downloaded: ./weather_data/2024_99495199999.csv
Cincinnati --> Year: 2015, Station: 72429793812, Count: 365
Florida --> Year: 2015, Stat

In [50]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .getOrCreate()

data_directory = "./weather_data"

# Years for which data is available
years = range(2015, 2025)
stations = ["72429793812", "99495199999"]

# Lists to store hottest day data
hottest_days_cincinnati = []
hottest_days_florida = []
hottest_days_overall = []

for year in years:
    year_data = []
    for station in stations:
        # Skip 2016 for Florida as data may not be available
        if year == 2016 and station == "99495199999":
            continue

        file_path = f"{data_directory}/{year}_{station}.csv"
        if os.path.exists(file_path):
            # Load data and filter out invalid temperature values
            df = spark.read.csv(file_path, header=True, inferSchema=True)
            df = df.filter(df["MAX"] != 9999.9)

            # Find the hottest day for this station if data is available
            if df.count() > 0:
                hottest_day = df.orderBy(df["MAX"].desc()).first()
                hottest_day_data = {
                    "YEAR": year,
                    "STATION": hottest_day["STATION"],
                    "NAME": hottest_day["NAME"],
                    "DATE": hottest_day["DATE"],
                    "MAX": hottest_day["MAX"]
                }
                year_data.append(hottest_day_data)

                # Separate lists for Cincinnati and Florida
                if station == "72429793812":
                    hottest_days_cincinnati.append(hottest_day_data)
                elif station == "99495199999":
                    hottest_days_florida.append(hottest_day_data)

    # Determine the hottest day overall for the year by comparing both stations
    if year_data:
        hottest_day_year = max(year_data, key=lambda x: x["MAX"])
        hottest_days_overall.append(hottest_day_year)

# Convert the results into Spark DataFrames
hottest_days_cincinnati_df = spark.createDataFrame(hottest_days_cincinnati)
hottest_days_florida_df = spark.createDataFrame(hottest_days_florida)
hottest_days_overall_df = spark.createDataFrame(hottest_days_overall)

# Display results using show() for Spark DataFrames
print("\nHottest Days by Year (Cincinnati):")
hottest_days_cincinnati_df.sort("YEAR").show(truncate=False)

print("\nHottest Days by Year (Florida):")
hottest_days_florida_df.sort("YEAR").show(truncate=False)

print("\nOverall Hottest Day by Year (Cincinnati and Florida):")
hottest_days_overall_df.sort("YEAR").show(truncate=False)

# Stop the Spark session
spark.stop()


24/11/03 20:46:23 WARN Utils: Your hostname, Harshs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.58 instead (on interface en0)
24/11/03 20:46:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/03 20:46:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/03 20:46:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.



Hottest Days by Year (Cincinnati):
+----------+-----+------------------------------------------------+-----------+----+
|DATE      |MAX  |NAME                                            |STATION    |YEAR|
+----------+-----+------------------------------------------------+-----------+----+
|2015-06-12|91.9 |CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|72429793812|2015|
|2016-07-24|93.9 |CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|72429793812|2016|
|2017-07-22|91.9 |CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|72429793812|2017|
|2018-07-04|96.1 |CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|72429793812|2018|
|2019-09-30|95.0 |CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|72429793812|2019|
|2020-07-05|93.9 |CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|72429793812|2020|
|2021-08-12|95.0 |CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|72429793812|2021|
|2022-06-14|96.1 |CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|72429793812|2022|
|2023-08-23|96.1 |CINCINNATI 

In [52]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .getOrCreate()

# Directory and years range
data_directory = "./weather_data"
years = range(2015, 2025)
stations = ["72429793812", "99495199999"]

# List to store coldest day data for each station across all years
march_min_temps = []

# Find the coldest day in March across all years for each station
for year in years:
    for station in stations:
        file_path = f"{data_directory}/{year}_{station}.csv"
        
        if os.path.exists(file_path):
            # Load data into a Spark DataFrame
            df = spark.read.csv(file_path, header=True, inferSchema=True)
            df = df.withColumn("DATE", F.to_date(df["DATE"]))

            # Filter for March and valid temperature values
            march_df = df.filter((F.month(df["DATE"]) == 3) & (df["MIN"] != 9999.9))
            
            # Find the coldest day in March for this DataFrame, if available
            if march_df.count() > 0:
                coldest_day = march_df.orderBy("MIN").first()
                march_min_temps.append({
                    "YEAR": year,
                    "STATION": str(coldest_day["STATION"]),  # Ensure Station ID is stored as a string
                    "NAME": coldest_day["NAME"],
                    "DATE": coldest_day["DATE"],
                    "MIN": coldest_day["MIN"]
                })

# Create a DataFrame from the results
all_march_min_df = spark.createDataFrame(march_min_temps)

# Find the overall coldest day in March across all years
if all_march_min_df.count() > 0:
    coldest_overall_day = all_march_min_df.orderBy("MIN").first()

    # Prepare data for display
    results = [
        {
            "Year": int(coldest_overall_day["YEAR"]),
            "Station ID": coldest_overall_day["STATION"],
            "Station Name": coldest_overall_day["NAME"],
            "Date": coldest_overall_day["DATE"],
            "Min Temp (°F)": round(coldest_overall_day["MIN"], 1)
        }
    ]

    # Display the result in a well-formatted table
    print("\nColdest Day Overall in March (2015-2024) across Cincinnati and Florida:\n")
    print("+" + "-" * 49 + "+")
    print("| {:<5} | {:<15} | {:<25} | {:<10} | {:<15} |".format("Year", "Station ID", "Station Name", "Date", "Min Temp (°F)"))
    print("+" + "-" * 49 + "+")
    for result in results:
        print("| {:<5} | {:<15} | {:<25} | {:<10} | {:<15} |".format(result["Year"], result["Station ID"], result["Station Name"], result["Date"], result["Min Temp (°F)"]))
    print("+" + "-" * 49 + "+")
else:
    print("No data available for the specified range.")

# Stop the Spark session
spark.stop()



Coldest Day Overall in March (2015-2024) across Cincinnati and Florida:

+-------------------------------------------------+
| Year  | Station ID      | Station Name              | Date       | Min Temp (°F)   |
+-------------------------------------------------+
| 2015  | 72429793812     | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US | <10 | 3.2             |
+-------------------------------------------------+


In [54]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .getOrCreate()

# Directory and years range
data_directory = "./weather_data"
years = range(2015, 2025)

# Calculate mean precipitation by year for Cincinnati
cincinnati_precip_data = []

for year in years:
    file_path = f"{data_directory}/{year}_72429793812.csv"
    if os.path.exists(file_path):
        # Load data into a Spark DataFrame
        df = spark.read.csv(file_path, header=True, inferSchema=True)
        # Filter out invalid precipitation values
        df = df.filter(df["PRCP"] != 9999.9)

        # Calculate mean precipitation if data is available
        if df.count() > 0:
            mean_prcp = df.agg(F.mean("PRCP")).first()[0]
            cincinnati_precip_data.append({
                "YEAR": year,
                "STATION": str(df.select("STATION").first()[0]),  # Convert to string
                "NAME": df.select("NAME").first()[0],
                "Mean_PRCP": mean_prcp
            })

cincinnati_precip_df = spark.createDataFrame(cincinnati_precip_data)
cincinnati_result = cincinnati_precip_df.orderBy(F.desc("Mean_PRCP")).first() if cincinnati_precip_df.count() > 0 else None

# Calculate mean precipitation by year for Florida (excluding 2016 as data is unavailable)
florida_precip_data = []

for year in years:
    if year == 2016:
        continue
    file_path = f"{data_directory}/{year}_99495199999.csv"
    if os.path.exists(file_path):
        df = spark.read.csv(file_path, header=True, inferSchema=True)
        df = df.filter(df["PRCP"] != 9999.9)

        if df.count() > 0:
            mean_prcp = df.agg(F.mean("PRCP")).first()[0]
            florida_precip_data.append({
                "YEAR": year,
                "STATION": str(df.select("STATION").first()[0]),  # Convert to string
                "NAME": df.select("NAME").first()[0],
                "Mean_PRCP": mean_prcp
            })

florida_precip_df = spark.createDataFrame(florida_precip_data)
florida_result = florida_precip_df.orderBy(F.desc("Mean_PRCP")).first() if florida_precip_df.count() > 0 else None

# Prepare data for display
results = []

if cincinnati_result:
    results.append({
        "Year": int(cincinnati_result["YEAR"]),
        "Station": cincinnati_result["STATION"],
        "Station Name": cincinnati_result["NAME"],
        "Mean PRCP": round(cincinnati_result["Mean_PRCP"], 2)
    })

if florida_result:
    results.append({
        "Year": int(florida_result["YEAR"]),
        "Station": florida_result["STATION"],
        "Station Name": florida_result["NAME"],
        "Mean PRCP": round(florida_result["Mean_PRCP"], 2)
    })

# Display the results in a well-formatted table
if results:
    print("\nYear with Most Precipitation for Cincinnati and Florida:\n")
    print("+" + "-" * 58 + "+")
    print("| {:<5} | {:<15} | {:<25} | {:<15} |".format("Year", "Station", "Station Name", "Mean PRCP"))
    print("+" + "-" * 58 + "+")
    for result in results:
        print("| {:<5} | {:<15} | {:<25} | {:<15} |".format(result["Year"], result["Station"], result["Station Name"], result["Mean PRCP"]))
    print("+" + "-" * 58 + "+")
else:
    print("No precipitation data available for the specified range.")

# Stop the Spark session
spark.stop()



Year with Most Precipitation for Cincinnati and Florida:

+----------------------------------------------------------+
| Year  | Station         | Station Name              | Mean PRCP       |
+----------------------------------------------------------+
| 2024  | 72429793812     | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US | 5.36            |
| 2021  | 99495199999     | SEBASTIAN INLET STATE PARK, FL US | 0.0             |
+----------------------------------------------------------+


In [56]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .getOrCreate()

# Define file paths for 2024 data
cincinnati_2024_file = "./weather_data/2024_72429793812.csv"
florida_2024_file = "./weather_data/2024_99495199999.csv"

# Initialize missing percentage variables
cincinnati_missing_percentage = None
florida_missing_percentage = None

# Load 2024 data for Cincinnati if the file exists
if os.path.exists(cincinnati_2024_file):
    cincinnati_df = spark.read.csv(cincinnati_2024_file, header=True, inferSchema=True)
    # Count missing GUST values (marked as 999.9)
    cincinnati_missing_count = cincinnati_df.filter(cincinnati_df["GUST"] == 999.9).count()
    cincinnati_total_count = cincinnati_df.count()
    if cincinnati_total_count > 0:
        cincinnati_missing_percentage = (cincinnati_missing_count / cincinnati_total_count) * 100

# Load 2024 data for Florida if the file exists
if os.path.exists(florida_2024_file):
    florida_df = spark.read.csv(florida_2024_file, header=True, inferSchema=True)
    florida_missing_count = florida_df.filter(florida_df["GUST"] == 999.9).count()
    florida_total_count = florida_df.count()
    if florida_total_count > 0:
        florida_missing_percentage = (florida_missing_count / florida_total_count) * 100

# Display the results
print("\nPercentage of Missing Values for Wind Gust (column GUST) for Cincinnati and Florida in 2024:\n")
if cincinnati_missing_percentage is not None:
    print(f"Cincinnati: {cincinnati_missing_percentage:.2f}%")
else:
    print("Cincinnati data file for 2024 not found.")

if florida_missing_percentage is not None:
    print(f"Florida: {florida_missing_percentage:.2f}%")
else:
    print("Florida data file for 2024 not found.")

# Stop the Spark session
spark.stop()



Percentage of Missing Values for Wind Gust (column GUST) for Cincinnati and Florida in 2024:

Cincinnati: 39.67%
Florida: 100.00%


In [66]:
import os
import pandas as pd
import numpy as np
from scipy import stats
from tabulate import tabulate


# File path for Cincinnati 2020 data
cincinnati_2020_file = "./weather_data/2020_72429793812.csv"

# Load 2020 data for Cincinnati if the file exists
if os.path.exists(cincinnati_2020_file):
    df = pd.read_csv(cincinnati_2020_file)
    
    # Filter out invalid temperature values and rows with missing TEMP values
    df = df[df["TEMP"] != 9999.9].dropna(subset=["TEMP"])
    df["TEMP"] = df["TEMP"].astype(float)
    df["DATE"] = pd.to_datetime(df["DATE"])
    df["MONTH"] = df["DATE"].dt.month_name()  # Get month name directly

    # Define the month order
    month_order = {
        "January": 1, "February": 2, "March": 3, "April": 4, "May": 5, "June": 6,
        "July": 7, "August": 8, "September": 9, "October": 10, "November": 11, "December": 12
    }

    # Calculate statistics for each month
    stats_results = []
    for month in df["MONTH"].unique():
        month_df = df[df["MONTH"] == month]
        if not month_df.empty:
            mean_temp = month_df["TEMP"].mean()
            std_dev_temp = month_df["TEMP"].std()
            median_temp = month_df["TEMP"].median()
            
            # Calculate mode, handling cases where mode might be a scalar
            mode_result = stats.mode(month_df["TEMP"], nan_policy='omit')
            mode_temp = mode_result.mode[0] if hasattr(mode_result.mode, "_len_") else mode_result.mode
            
            stats_results.append({
                "MONTH": month,
                "Mean_TEMP": mean_temp,
                "StandardDeviation_TEMP": std_dev_temp,
                "Median_TEMP": median_temp,
                "Mode_TEMP": mode_temp
            })

    # Convert results to DataFrame and sort by month order
    final_stats_df = pd.DataFrame(stats_results)
    final_stats_df["MONTH_ORDER"] = final_stats_df["MONTH"].map(month_order)
    final_stats_df = final_stats_df.sort_values(by="MONTH_ORDER").drop(columns="MONTH_ORDER")

    # Display results in a well-formatted table
    print("\nTemperature Statistics for Cincinnati for Each Month in 2020:\n")
    print(tabulate(final_stats_df, headers="keys", tablefmt="fancy_grid", floatfmt=".2f", showindex=False))
else:
    print("Cincinnati 2020 data file not found.")


Temperature Statistics for Cincinnati for Each Month in 2020:

╒═══════════╤═════════════╤══════════════════════════╤═══════════════╤═════════════╕
│ MONTH     │   Mean_TEMP │   StandardDeviation_TEMP │   Median_TEMP │   Mode_TEMP │
╞═══════════╪═════════════╪══════════════════════════╪═══════════════╪═════════════╡
│ January   │       37.95 │                     8.35 │         37.70 │       24.70 │
├───────────┼─────────────┼──────────────────────────┼───────────────┼─────────────┤
│ February  │       36.59 │                     7.90 │         36.00 │       25.90 │
├───────────┼─────────────┼──────────────────────────┼───────────────┼─────────────┤
│ March     │       49.07 │                     8.78 │         47.80 │       39.60 │
├───────────┼─────────────┼──────────────────────────┼───────────────┼─────────────┤
│ April     │       51.78 │                     7.31 │         51.10 │       39.20 │
├───────────┼─────────────┼──────────────────────────┼───────────────┼─────────────┤
│

In [68]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Lowest Wind Chill for Cincinnati 2017") \
    .getOrCreate()

# File path for Cincinnati 2017 data
cincinnati_2017_file = "./weather_data/2017_72429793812.csv"

# Load the 2017 data for Cincinnati if the file exists
if os.path.exists(cincinnati_2017_file):
    # Load data into a Spark DataFrame
    df = spark.read.csv(cincinnati_2017_file, header=True, inferSchema=True)

    # Convert TEMP and WDSP to float and filter for relevant conditions
    df_filtered = df.withColumn("TEMP", F.col("TEMP").cast("float")) \
                    .withColumn("WDSP", F.col("WDSP").cast("float")) \
                    .filter((F.col("TEMP") < 50) & (F.col("WDSP") > 3)) \
                    .dropna(subset=["TEMP", "WDSP"])

    # Calculate Wind Chill using the formula
    wind_chill_formula = (
        35.74 + (0.6215 * F.col("TEMP")) - (35.75 * (F.col("WDSP") ** 0.16)) +
        (0.4275 * F.col("TEMP") * (F.col("WDSP") ** 0.16))
    )
    
    df_filtered = df_filtered.withColumn("Wind_Chill", wind_chill_formula)

    # Get the top 10 days with the lowest Wind Chill
    top_10_lowest_wc = df_filtered.select("NAME", "DATE", "TEMP", "WDSP", "Wind_Chill") \
                                   .orderBy("Wind_Chill") \
                                   .limit(10)

    # Collect results to display
    top_10_results = top_10_lowest_wc.toPandas()

    # Display results in a well-formatted table
    print("\nTop 10 Days with the Lowest Wind Chill for Cincinnati in 2017:\n")
    print(tabulate(top_10_results, headers="keys", tablefmt="fancy_grid", floatfmt=".2f", showindex=False))

else:
    print("Cincinnati 2017 data file not found.")

# Stop the Spark session
spark.stop()


24/11/03 20:53:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.



Top 10 Days with the Lowest Wind Chill for Cincinnati in 2017:

╒══════════════════════════════════════════════════╤════════════╤════════╤════════╤══════════════╕
│ NAME                                             │ DATE       │   TEMP │   WDSP │   Wind_Chill │
╞══════════════════════════════════════════════════╪════════════╪════════╪════════╪══════════════╡
│ CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US │ 2017-01-07 │  10.50 │   7.00 │        -0.41 │
├──────────────────────────────────────────────────┼────────────┼────────┼────────┼──────────────┤
│ CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US │ 2017-12-31 │  11.00 │   5.30 │         2.03 │
├──────────────────────────────────────────────────┼────────────┼────────┼────────┼──────────────┤
│ CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US │ 2017-12-27 │  13.00 │   5.80 │         3.82 │
├──────────────────────────────────────────────────┼────────────┼────────┼────────┼──────────────┤
│ CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD,

In [70]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("Extreme Weather Days Count for Florida") \
    .getOrCreate()

# Directory and years for Florida files
data_directory = "./weather_data"
years = [y for y in range(2015, 2025) if y != 2016]  # Exclude 2016 if data is unavailable

# Initialize a counter for extreme weather days
extreme_weather_days_count = 0

# Load and process data for each year in the specified range
for year in years:
    florida_file = f"{data_directory}/{year}_99495199999.csv"
    
    if os.path.exists(florida_file):
        # Load data into a Spark DataFrame
        df = spark.read.csv(florida_file, header=True, inferSchema=True)

        # Ensure each FRSHTT value is a six-character string
        df = df.withColumn('FRSHTT', F.lpad(F.col('FRSHTT').cast('string'), 6, '0'))

        # Count days with any extreme weather indicator
        extreme_weather_days = df.filter(
            F.expr("array_contains(split(FRSHTT, ''), '1')")
        )

        # Update the total count of extreme weather days
        extreme_weather_days_count += extreme_weather_days.count()

# Display the result
print(f"\nNumber of Days with Extreme Weather Conditions in Florida from 2015 to 2024: {extreme_weather_days_count}\n")

# Stop the Spark session
spark.stop()



Number of Days with Extreme Weather Conditions in Florida from 2015 to 2024: 0



In [90]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Function to evaluate model performance
def evaluate_model(model, data):
    predictions = model.transform(data)
    evaluator = RegressionEvaluator(labelCol="MAX_TEMP", predictionCol="prediction")
    
    rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
    mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
    r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

    return rmse, mae, r2

# Create a Spark session
spark = SparkSession.builder \
    .appName("Cincinnati Temperature Prediction") \
    .getOrCreate()

# Load 2022 and 2023 data for Cincinnati
data_directory = "./weather_data"
cincinnati_files = [f"{data_directory}/{year}_72429793812.csv" for year in [2022, 2023]]

# Load and concatenate data
cincinnati_data = spark.read.csv(cincinnati_files[0], header=True, inferSchema=True)
for file in cincinnati_files[1:]:
    if os.path.exists(file):
        temp_df = spark.read.csv(file, header=True, inferSchema=True)
        cincinnati_data = cincinnati_data.union(temp_df)

# Filter data for valid MAX temperatures and convert DATE
cincinnati_data = cincinnati_data.filter(cincinnati_data["MAX"] != 9999.9)
cincinnati_data = cincinnati_data.withColumn("DATE", F.to_date("DATE")) \
                                   .withColumn("YEAR", F.year("DATE")) \
                                   .withColumn("MONTH", F.month("DATE"))

# Extract max temperatures for November and December for each year
november_data = cincinnati_data.filter(cincinnati_data["MONTH"] == 11) \
                               .groupBy("YEAR") \
                               .agg(F.max("MAX").alias("MAX_TEMP"))

december_data = cincinnati_data.filter(cincinnati_data["MONTH"] == 12) \
                               .groupBy("YEAR") \
                               .agg(F.max("MAX").alias("MAX_TEMP"))

# Prepare data for modeling
november_data = november_data.select("YEAR", "MAX_TEMP")
december_data = december_data.select("YEAR", "MAX_TEMP")

# Assemble features for November and December
nov_vector_assembler = VectorAssembler(inputCols=["YEAR"], outputCol="features")
november_data = nov_vector_assembler.transform(november_data)

dec_vector_assembler = VectorAssembler(inputCols=["YEAR"], outputCol="features")
december_data = dec_vector_assembler.transform(december_data)

# Train Linear Regression and Random Forest Regression models for November and December
lin_model_nov = LinearRegression(featuresCol="features", labelCol="MAX_TEMP").fit(november_data)
lin_model_dec = LinearRegression(featuresCol="features", labelCol="MAX_TEMP").fit(december_data)

rf_model_nov = RandomForestRegressor(featuresCol="features", labelCol="MAX_TEMP").fit(november_data)
rf_model_dec = RandomForestRegressor(featuresCol="features", labelCol="MAX_TEMP").fit(december_data)

# Evaluate models
lin_nov_metrics = evaluate_model(lin_model_nov, november_data)
lin_dec_metrics = evaluate_model(lin_model_dec, december_data)

rf_nov_metrics = evaluate_model(rf_model_nov, november_data)
rf_dec_metrics = evaluate_model(rf_model_dec, december_data)

# Print evaluation metrics
print("Linear Regression November Metrics: RMSE = {:.2f}, MAE = {:.2f}, R2 = {:.2f}".format(*lin_nov_metrics))
print("Linear Regression December Metrics: RMSE = {:.2f}, MAE = {:.2f}, R2 = {:.2f}".format(*lin_dec_metrics))

print("Random Forest November Metrics: RMSE = {:.2f}, MAE = {:.2f}, R2 = {:.2f}".format(*rf_nov_metrics))
print("Random Forest December Metrics: RMSE = {:.2f}, MAE = {:.2f}, R2 = {:.2f}".format(*rf_dec_metrics))

# Compare models and find the highest accuracy model
best_model = None
best_r2 = -float("inf")
best_month = ""

# Check November models
if lin_nov_metrics[2] > best_r2:
    best_r2 = lin_nov_metrics[2]
    best_model = lin_model_nov
    best_month = "November (Linear Regression)"
    
if rf_nov_metrics[2] > best_r2:
    best_r2 = rf_nov_metrics[2]
    best_model = rf_model_nov
    best_month = "November (Random Forest)"

# Check December models
if lin_dec_metrics[2] > best_r2:
    best_r2 = lin_dec_metrics[2]
    best_model = lin_model_dec
    best_month = "December (Linear Regression)"
    
if rf_dec_metrics[2] > best_r2:
    best_r2 = rf_dec_metrics[2]
    best_model = rf_model_dec
    best_month = "December (Random Forest)"

print(f"\nBest model is {best_month} with R2 = {best_r2:.2f}")

# Predict max temperatures for November and December 2024 using the best model
year_to_predict = spark.createDataFrame([(2024,)], ["YEAR"])
year_vector_assembler = VectorAssembler(inputCols=["YEAR"], outputCol="features")
prediction_data = year_vector_assembler.transform(year_to_predict)

# Use the best model for November
nov_pred_2024 = best_model.transform(prediction_data).select("prediction").collect()[0][0]

# Predict for December using Linear Regression (since it's the best model for December)
dec_pred_2024 = lin_model_dec.transform(prediction_data).select("prediction").collect()[0][0]

# Display predictions
print(f"Predicted Maximum Temperature for Cincinnati in November 2024: {nov_pred_2024:.2f}°F")
print(f"Predicted Maximum Temperature for Cincinnati in December 2024: {dec_pred_2024:.2f}°F")

# Stop the Spark session
spark.stop()


24/11/03 21:18:36 WARN Instrumentation: [d88dcc24] regParam is zero, which might cause numerical instability and overfitting.
24/11/03 21:18:36 WARN Instrumentation: [e7a15bac] regParam is zero, which might cause numerical instability and overfitting.
24/11/03 21:18:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 2 (= number of training instances)
24/11/03 21:18:37 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 2 (= number of training instances)


Linear Regression November Metrics: RMSE = 0.00, MAE = 0.00, R2 = 1.00
Linear Regression December Metrics: RMSE = 0.00, MAE = 0.00, R2 = 1.00
Random Forest November Metrics: RMSE = 8.29, MAE = 8.22, R2 = -14.57
Random Forest December Metrics: RMSE = 6.32, MAE = 6.30, R2 = -38.94

Best model is November (Linear Regression) with R2 = 1.00
Predicted Maximum Temperature for Cincinnati in November 2024: 84.30°F
Predicted Maximum Temperature for Cincinnati in December 2024: 62.00°F
