In [318]:
pip install pyspark




In [319]:
try:
    import pyspark
    from pyspark.sql import SparkSession
    # Printing the PySpark version
    print(f"PySpark Installed. Version: {pyspark.__version__}")
    spark = SparkSession.builder.appName("TestInstallation").getOrCreate()
    print("Spark session has successfully been created.")
except ImportError:
    print("PySpark is not installed.")

PySpark Installed. Version: 3.5.3
Spark session has successfully been created.


In [320]:
def load_data(year, location):
    path = f"C:/Users/fazil/Documents/WeatherData/{year}/{location}.csv"
    return spark.read.csv(path, header=True, inferSchema=True)
# List of years and locations
years = range(2015, 2025)
locations = ['72429793812', '99495199999'] # Cincinnati and Florida
# Create a dictionary to hold DataFrames
dfs = {}
for year in years:
    for location in locations:
        if year == 2016 and location == '99495199999':
            continue
        dfs[(year, location)] = load_data(year, location)
print(len(dfs))


19


In [321]:
# Load the CSV files and display the count of each dataset:
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder.appName("WeatherAnalysis").getOrCreate()
# Define paths for the datasets
#Years 
years = range(2015, 2025)
#Station codes Cincinnati and Florida :
locations = ['72429793812', '99495199999']
datasets = {}
# Load CSV files for each year and location
for year in years:
    for location in locations:
        # if year == 2016 and location == '99495199999':
        #     break
        file_path = f"C:/Users/fazil/Documents/WeatherData/{year}/{location}.csv"
        if os.path.exists(file_path):
            datasets[f"{year}_{location}"] = spark.read.csv(file_path, header=True, inferSchema=True)
            print(f"Row count for {year} {location}: {datasets[f'{year}_{location}'].count()}")
print(f'The number of results displayed are:{len(datasets)}')


Row count for 2015 72429793812: 365
Row count for 2015 99495199999: 355
Row count for 2016 72429793812: 366
Row count for 2017 72429793812: 365
Row count for 2017 99495199999: 283
Row count for 2018 72429793812: 365
Row count for 2018 99495199999: 363
Row count for 2019 72429793812: 365
Row count for 2019 99495199999: 345
Row count for 2020 72429793812: 366
Row count for 2020 99495199999: 365
Row count for 2021 72429793812: 365
Row count for 2021 99495199999: 104
Row count for 2022 72429793812: 365
Row count for 2022 99495199999: 259
Row count for 2023 72429793812: 365
Row count for 2023 99495199999: 276
Row count for 2024 72429793812: 301
Row count for 2024 99495199999: 133
The number of results displayed are:19


In [322]:
from pyspark.sql import functions as F

def hottest_day_each_year(dfs):
    hottest_days = []
    for (year, location), df in dfs.items():
        # Check if the DataFrame is not empty
        if not df.isEmpty():  # Ensure the DataFrame has data
            hottest_day = df.orderBy(F.col("MAX").desc()).first()
            # Check if hottest_day is not None
            if hottest_day is not None:  # Ensure we have a valid hottest day
                hottest_days.append((year, hottest_day['STATION'], hottest_day['NAME'], hottest_day['DATE'], hottest_day['MAX']))
    return hottest_days

hottest_days_results = hottest_day_each_year(dfs)
print("The Station, Name, Date and Maximum Teamparature of each year:")
print(f"   Station   |                 Station Name                       |    Date    | Maximum Temperature")
for result in hottest_days_results:
    # print(result)
    if result[1]==72429793812:
        print(f"-------------|----------------------------------------------------|------------|---------------------")
    print(f"{result[1]:^12} | {result[2]:<50} | {result[3]} | {result[4]:>7.1f}")

The Station, Name, Date and Maximum Teamparature of each year:
   Station   |                 Station Name                       |    Date    | Maximum Temperature
-------------|----------------------------------------------------|------------|---------------------
72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   | 2015-06-12 |    91.9
99495199999  | SEBASTIAN INLET STATE PARK, FL US                  | 2015-07-28 |    90.0
-------------|----------------------------------------------------|------------|---------------------
72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   | 2016-07-24 |    93.9
-------------|----------------------------------------------------|------------|---------------------
72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   | 2017-07-22 |    91.9
99495199999  | SEBASTIAN INLET STATE PARK, FL US                  | 2017-02-22 |  9999.9
-------------|----------------------------------------------------|------------|-----

In [323]:
def coldest_day_march(dfs):
    coldest_day = None
    for (year, location), df in dfs.items():
        march_data = df.filter(F.month("DATE") == 3)
        if coldest_day is None or march_data.agg(F.min("MIN")).first()[0] < coldest_day['MIN']:
            coldest_day = march_data.orderBy("MIN").first()
    return (coldest_day['STATION'], coldest_day['NAME'], coldest_day['DATE'], coldest_day['MIN'])
coldest_day_result = coldest_day_march(dfs)
print("The coldest Day in March is:")
print(f"   Station   |                 Station Name                       |    Date    | Minimum Temperature")
print(f"-------------|----------------------------------------------------|------------|---------------------")
print(f"{coldest_day_result[0]:^12} | {coldest_day_result[1]:<50} | {coldest_day_result[2]} | {coldest_day_result[3]}")


The coldest Day in March is:
   Station   |                 Station Name                       |    Date    | Minimum Temperature
-------------|----------------------------------------------------|------------|---------------------
72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   | 2015-03-06 | 3.2


In [324]:
def year_most_precipitation(dfs):
    precipitation_results = {}
    for (year, location), df in dfs.items():
        year_mean_prcp = df.groupBy(F.year("DATE")).agg(F.mean("PRCP").alias("Mean_PRCP"))
        if location not in precipitation_results or year_mean_prcp.first()['Mean_PRCP'] > precipitation_results[location][1]:
            precipitation_results[location] = (year,  year_mean_prcp.first()['Mean_PRCP'])
    return precipitation_results
most_precipitation_results = year_most_precipitation(dfs)
print("Year with Most Precipitation:")
print("---------------------------------------------------------------------------------------")
print(f"   Station   |                 Station Name                       | Year | Mean PRCP")
print(f"-------------|----------------------------------------------------|------|------------")
for location, result in most_precipitation_results.items():
    if location == '72429793812':
        name = 'CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US'
    else:
        name = 'SEBASTIAN INLET STATE PARK, FL US'
    print(f"{location:^12} | {name:<50} | {result[0]} | {result[1]}")

Year with Most Precipitation:
---------------------------------------------------------------------------------------
   Station   |                 Station Name                       | Year | Mean PRCP
-------------|----------------------------------------------------|------|------------
72429793812  | CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US   | 2024 | 5.435647840531555
99495199999  | SEBASTIAN INLET STATE PARK, FL US                  | 2015 | 0.0


In [325]:
def missing_wind_gust_percentage(dfs):
    missing_percentages = {}
    for (year, location), df in dfs.items():
        if year == 2024:
            total = df.count()
            missing = df.filter(df["GUST"].isNull()).count()
            percentage = (missing / total) * 100 if total > 0 else 0
            missing_percentages[location] = percentage
    return missing_percentages
missing_gust_results = missing_wind_gust_percentage(dfs)
print("Missing Wind Gust Percentages 2024 are:")
print(" location  | Missing Gust Percentage")
for location, percentage in missing_gust_results.items():
    print("-----------|-------------------------")
    print(f"{location}|  {percentage:.2f}%")


Missing Wind Gust Percentages 2024 are:
 location  | Missing Gust Percentage
-----------|-------------------------
72429793812|  0.00%
-----------|-------------------------
99495199999|  0.00%


In [326]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, mean, stddev, expr, count
from pyspark.sql import Window

# Define file path for Cincinnati 2020 data of the weather
cin_2020_file = "C:/Users/fazil/Documents/WeatherData/2020/72429793812.csv"
# Load 2020 data for Cincinnati, now!
cin_2020_df = spark.read.option("header", "true").csv(cin_2020_file)
# Convert TEMP to float now
cin_2020_df = cin_2020_df.withColumn("TEMP", col("TEMP").cast("float"))
# Filter out rows with missing or placeholder values for TEMP, e.g., 9999.9
cin_2020_df = cin_2020_df.filter((col("TEMP") != 9999.9) & (col("TEMP").isNotNull()))
# Group data by month and calculate mean and standard deviation, EACH MONTH of 2020
temp_stats_df = cin_2020_df.withColumn("MONTH", month(col("DATE"))) \
    .groupBy("MONTH") \
    .agg(
        mean("TEMP").alias("Mean "),
        stddev("TEMP").alias("StandardDeviation")
    )
# Calculate the median for each month of 2020
temp_median_df = cin_2020_df.withColumn("MONTH", month(col("DATE"))) \
    .groupBy("MONTH") \
    .agg(expr("percentile_approx(TEMP, 0.5)").alias("Median"))
# Calculate the mode for each month of 2020
window = Window.partitionBy("MONTH")
temp_mode_df = cin_2020_df.withColumn("MONTH", month(col("DATE"))) \
    .groupBy("MONTH", "TEMP") \
    .agg(count("TEMP").alias("Frequency")) \
    .withColumn("Max_Freq", expr("max(Frequency) over (PARTITION BY MONTH)")) \
    .filter(col("Frequency") == col("Max_Freq")) \
    .groupBy("MONTH") \
    .agg(expr("first(TEMP)").alias("Mode"))
# Combine all statistics into a single DataFrame now
final_stats_df = temp_stats_df \
    .join(temp_median_df, "MONTH") \
    .join(temp_mode_df, "MONTH") \
    .orderBy("MONTH")
# Display results in table format for the final answer
print("\nThe Temperature Statistics(Mean, Standard Deviation, Median, Mode) for Cincinnati for Each Month in 2020:\n")
# Let's not truncate the values for better results (accurate)!
final_stats_df.show(truncate=False)


The Temperature Statistics(Mean, Standard Deviation, Median, Mode) for Cincinnati for Each Month in 2020:

+-----+------------------+------------------+------+----+
|MONTH|Mean              |StandardDeviation |Median|Mode|
+-----+------------------+------------------+------+----+
|1    |37.945161081129505|8.345810838316384 |37.7  |24.7|
|2    |36.58965525133856 |7.901597947537755 |36.0  |25.9|
|3    |49.0741934007214  |8.77940669347644  |47.8  |39.6|
|4    |51.77999992370606 |7.3131621276074465|51.0  |49.4|
|5    |60.89032290058751 |9.314768319579512 |63.7  |73.9|
|6    |72.54666570027669 |4.8999458590264515|73.7  |70.7|
|7    |77.6000001968876  |2.337947626620972 |77.9  |78.4|
|8    |73.34516143798828 |3.4878690606063563|73.7  |78.3|
|9    |66.09999961853028 |7.118261579669542 |65.8  |74.5|
|10   |55.19354851015152 |6.7286914818367975|54.0  |52.2|
|11   |48.00333340962728 |6.825938707865554 |47.7  |47.7|
|12   |35.99354830095845 |6.6427872766495755|35.2  |32.1|
+-----+---------------

In [327]:
import pandas as pd
# Assuming you have a CSV file with columns 'DATE', 'TEMP', and 'WDSP'
df = pd.read_csv('C:/Users/fazil/Documents/WeatherData/2017/72429793812.csv')
# Filter data
df = df[(df['TEMP'] < 50) & (df['WDSP'] > 3)]
# Calculate Wind Chill
df['Wind_Chill'] = 35.74 + 0.6215 * df['TEMP'] - 35.75 * df['WDSP']**0.16 + 0.4275 * df['TEMP'] * df['WDSP']**0.16
# Sort by Wind Chill
df_sorted = df.sort_values(by='Wind_Chill', ascending=True)
# Display top 10 days
df_sorted_top10 = df_sorted.head(10)
print(df_sorted_top10)

         STATION        DATE  LATITUDE  LONGITUDE  ELEVATION  \
6    72429793812  2017-01-07    39.106  -84.41609      144.8   
364  72429793812  2017-12-31    39.106  -84.41609      144.8   
360  72429793812  2017-12-27    39.106  -84.41609      144.8   
361  72429793812  2017-12-28    39.106  -84.41609      144.8   
5    72429793812  2017-01-06    39.106  -84.41609      144.8   
7    72429793812  2017-01-08    39.106  -84.41609      144.8   
358  72429793812  2017-12-25    39.106  -84.41609      144.8   
363  72429793812  2017-12-30    39.106  -84.41609      144.8   
4    72429793812  2017-01-05    39.106  -84.41609      144.8   
359  72429793812  2017-12-26    39.106  -84.41609      144.8   

                                                 NAME  TEMP  TEMP_ATTRIBUTES  \
6    CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US  10.5               24   
364  CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US  11.0               24   
360  CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US  

In [328]:
def extreme_weather_days(dfs):
    for (year, location), df in dfs.items():
        if location == '99495199999':
            extreme_conditions = df.filter(df['FRSHTT'] != 0).count()
            return extreme_conditions
    return 0
extreme_weather_result = extreme_weather_days(dfs)
print("Extreme Weather Days for Florida:", extreme_weather_result)


Extreme Weather Days for Florida: 0


In [329]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
# Create a Spark session
spark = SparkSession.builder \
.appName("Extreme Weather Conditions for Florida") \
.getOrCreate()
# Path to the directory containing Florida weather data
data_directory = "C:/Users/fazil/Documents/WeatherData/" # Update this with your actual path
# Initialize an empty DataFrame
florida_df = None
# Load data for the years 2015 to 2024
for year in range(2015, 2025):
    try:
        # Construct the file path for the Florida weather data
        file_path = os.path.join(data_directory, str(year), "99495199999.csv") # Update with the correct station code
        if (os.path.exists(file_path)):
            yearly_data = spark.read.csv(file_path, header=True, inferSchema=True)
        # Union the current year's data with the overall DataFrame
        if florida_df is None:
            florida_df = yearly_data
        else:
            florida_df = florida_df.union(yearly_data)
    except Exception as e:
        print(f"Error loading data for {year}: {e}")
# Check if the DataFrame is empty
if florida_df is not None:
    # Define extreme weather conditions: F = Fog, R = Rain, S = Snow
    extreme_weather_conditions = ['F', 'R', 'S']
    # Count distinct days with extreme weather conditions
    extreme_weather_days_count = florida_df.filter(F.col("FRSHTT") != 0) \
    .select("DATE") \
    .distinct() \
    .count()
    # Print the result
    print(f"The number of days with extreme weather conditions in Florida from 2015-2024 is: {extreme_weather_days_count}")
else:
    print("data was not loaded for Florida.")
spark.stop()

The number of days with extreme weather conditions in Florida from 2015-2024 is: 0


In [330]:
from sklearn.metrics import mean_squared_error
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
# Load the data
cincinnati_2022 = pd.read_csv('C:/Users/fazil/Documents/WeatherData/2015/72429793812.csv') # Adjust the path accordingly
cincinnati_2023 = pd.read_csv('C:/Users/fazil/Documents/WeatherData/2016/72429793812.csv') # Adjust the path accordingly
# Assuming the max temperature is under a different name, e.g., 'TMAX'
# Prepare the data
cincinnati_data = pd.concat([cincinnati_2022, cincinnati_2023])
cincinnati_data['DATE'] = pd.to_datetime(cincinnati_data['DATE']) # Ensure DATE is in datetime format
cincinnati_data['year'] = cincinnati_data['DATE'].dt.year
cincinnati_data['month'] = cincinnati_data['DATE'].dt.month
cincinnati_data['day'] = cincinnati_data['DATE'].dt.day
# Replace 'TMAX' with the actual max temperature column name
cincinnati_data = cincinnati_data[['year', 'month', 'day', 'TEMP']] # Use the correct column
# Create lagged features
for lag_year in range(1, 3):
    lagged = cincinnati_data.copy()
    lagged['year'] += lag_year
    lagged.rename(columns={'TEMP': f'lag_max_temp_{lag_year}'}, inplace=True)
    cincinnati_data = cincinnati_data.merge(lagged[['year', 'month', 'day', f'lag_max_temp_{lag_year}']],
                                            on=['year', 'month', 'day'], how='left')
# Prepare data for model training
X = cincinnati_data[[f'lag_max_temp_{year}' for year in range(1, 3)]]
y = cincinnati_data['TEMP'] # Use the actual max temp column
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train the model
model = RandomForestRegressor()
model.fit(X_train, y_train)
# Predict for November and December 2024
nov_dec_2024 = pd.DataFrame({'lag_max_temp_1': [X['lag_max_temp_1'].iloc[-1]],
'lag_max_temp_2': [X['lag_max_temp_2'].iloc[-1]]})
predictions = model.predict(nov_dec_2024)
print("Predictions for November and December 2024:", predictions)
# Evaluate the model
mse = mean_squared_error(y_test, model.predict(X_test))
print("Mean Squared Error:", mse)

Predictions for November and December 2024: [38.46783333]
Mean Squared Error: 259.26903786657556


In [355]:
# QUESTION 10
# Predict the maximum Temperature for Cincinnati for November and December 2024, based on the previous 2 years of weather data (Points: 1):
# There should be 2 results.
# You can use any model  to forecast on the historical weather data.

# Here are the necessary imports
from pyspark.sql.functions import col, avg, month, max, year
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, mean, stddev, count, when, expr, max as spark_max, min as spark_min, year, month

# Let's create a Spark session
spark = SparkSession.builder \
    .appName("Weather Data Analysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

cincinnati_files = [f"C:/Users/fazil/Documents/WeatherData/{year}/72429793812.csv" for year in years]
cincinnati_df = spark.read.option("header", "true").csv(cincinnati_files)
# Display the total number of row counts
print(f"Cincinnati Data Count (Total Number of Rows): {cincinnati_df.count()}")
print("----------------------------------------------------------------------")

# Filter the data for 2022 and 2023 values
cincinnati_2022_2023 = cincinnati_df.filter((year(col("DATE")).isin([2022, 2023])) & (col("MAX") != 9999.9))

# Calculating the max temperature for November in 2022 and 2023 in Cincinnati
# 2022 Novemver max value
nov_max_temp_2022 = cincinnati_2022_2023.filter((year(col("DATE")) == 2022) & (month(col("DATE")) == 11)) \
    .agg(max("MAX").alias("Max_Nov_2022")) \
    .collect()[0]["Max_Nov_2022"]

# 2023 November max value
nov_max_temp_2023 = cincinnati_2022_2023.filter((year(col("DATE")) == 2023) & (month(col("DATE")) == 11)) \
    .agg(max("MAX").alias("Max_Nov_2023")) \
    .collect()[0]["Max_Nov_2023"]

# Now, 2022 DECEMBER
dec_2022_all_temps = cincinnati_2022_2023.filter((year(col("DATE")) == 2022) & (month(col("DATE")) == 12)) \
    .orderBy(col("MAX").desc())

# Convert results to a list and get the highest values, ignoring the missing data which has a value of 9999.9
dec_2022_max_values = [row["MAX"] for row in dec_2022_all_temps.collect() if row["MAX"] != 9999.9]

if len(dec_2022_max_values) >= 2:
    dec_max_temp_2022 = dec_2022_max_values[1]  # Highest Temperature Value
elif len(dec_2022_max_values) == 1:
    dec_max_temp_2022 = dec_2022_max_values[0]  # Only 1 Value
else:
    dec_max_temp_2022 = None  # No valid values found

# Calculate the MAX temperature for December 2023
dec_max_temp_2023 = cincinnati_2022_2023.filter((year(col("DATE")) == 2023) & (month(col("DATE")) == 12)) \
    .agg(max("MAX").alias("Max_Dec_2023")) \
    .collect()[0]["Max_Dec_2023"]

# Calculating the averages of 2022 and 2023 for predicting the temperatures in November 2024 and December 2024
nov_avg_max_temp = (float(nov_max_temp_2022) + float(nov_max_temp_2023)) / 2
dec_avg_max_temp = (float(dec_max_temp_2022) + float(dec_max_temp_2023)) / 2 if dec_max_temp_2022 is not None else None

#Printing the max temps for 2022 and 2023, and the predictions for 2024
print(f"Maximum Temperature for Cincinnati in November 2022                                     : {float(nov_max_temp_2022):.2f}°F")
print(f"Maximum Temperature for Cincinnati in November 2023                                     : {float(nov_max_temp_2023):.2f}°F")
print(f"The Predicted Maximum Temperature for Cincinnati in November 2024                       : {float(nov_avg_max_temp):.2f}°F\n")

if dec_max_temp_2022 is not None:
    print(f"Maximum Temperature for December 2022                                               : {float(dec_max_temp_2022):.2f}°F")
else:
    print("Maximum Temperature is not found for December 2022.")
print(f"Cincinnati's Maximum Temperature for December 2023                                      : {float(dec_max_temp_2023):.2f}°F")
if dec_avg_max_temp is not None:
    print(f"The Predicted Maximum Temperature for Cincinnati in December 2024                   : {float(dec_avg_max_temp):.2f}°F")
else:
    print("Missing Data")

Cincinnati Data Count (Total Number of Rows): 3588
----------------------------------------------------------------------
Maximum Temperature for Cincinnati in November 2022                                     : 75.90°F
Maximum Temperature for Cincinnati in November 2023                                     : 80.10°F
The Predicted Maximum Temperature for Cincinnati in November 2024                       : 78.00°F

Maximum Temperature for December 2022                                               : 66.00°F
Cincinnati's Maximum Temperature for December 2023                                      : 64.00°F
The Predicted Maximum Temperature for Cincinnati in December 2024                   : 65.00°F
