In [18]:
##########################
#  LOAD WEATHER CSVs     #
##########################
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder.appName("WeatherAnalysis").getOrCreate()

base_path = "Big Data With PySpark"   


years = range(2015, 2025)
stations = ["72429793812", "99495199999"]  
df_list = []  

for y in years:
    for stn in stations:
        csv_path = os.path.join(base_path, str(y), f"{stn}.csv")
        
        if os.path.exists(csv_path):
            # Read the CSV
            df_temp = spark.read.csv(csv_path, header=True, inferSchema=True)
            row_count = df_temp.count()
            print(f"Loaded {csv_path} -> row count: {row_count}")
            from pyspark.sql.functions import lit
            df_temp = df_temp.withColumn("YEAR", lit(y)) \
                             .withColumn("STATION_ID", lit(stn))
            
            df_list.append(df_temp)
        else:
            # If file not found, just note it
            print(f"File NOT found: {csv_path}")

if df_list:
    from functools import reduce
    from pyspark.sql import DataFrame
    weather_df = reduce(lambda a, b: a.union(b), df_list)
    
    print("\nUnified DataFrame Schema:")
    weather_df.printSchema()
    
    print("\nShowing a few rows from the combined DataFrame:")
    weather_df.show(5)
else:
    print("No CSVs loaded at all. Please check your folder structure.")


Loaded Big Data With PySpark/2015/72429793812.csv -> row count: 365
Loaded Big Data With PySpark/2015/99495199999.csv -> row count: 355
Loaded Big Data With PySpark/2016/72429793812.csv -> row count: 366
File NOT found: Big Data With PySpark/2016/99495199999.csv
Loaded Big Data With PySpark/2017/72429793812.csv -> row count: 365
Loaded Big Data With PySpark/2017/99495199999.csv -> row count: 283
Loaded Big Data With PySpark/2018/72429793812.csv -> row count: 365
Loaded Big Data With PySpark/2018/99495199999.csv -> row count: 363
Loaded Big Data With PySpark/2019/72429793812.csv -> row count: 365
Loaded Big Data With PySpark/2019/99495199999.csv -> row count: 345
Loaded Big Data With PySpark/2020/72429793812.csv -> row count: 366
Loaded Big Data With PySpark/2020/99495199999.csv -> row count: 365
Loaded Big Data With PySpark/2021/72429793812.csv -> row count: 365
Loaded Big Data With PySpark/2021/99495199999.csv -> row count: 104
Loaded Big Data With PySpark/2022/72429793812.csv -> row 

In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window_year = Window.partitionBy("YEAR").orderBy(desc("MAX"))
hottest_df = weather_df.withColumn("rn", row_number().over(window_year)) \
                       .filter("rn = 1") \
                       .select("YEAR", "STATION_ID", "NAME", "DATE", "MAX")
hottest_df.show(10, truncate=False)


+----+-----------+------------------------------------------------+----------+------+
|YEAR|STATION_ID |NAME                                            |DATE      |MAX   |
+----+-----------+------------------------------------------------+----------+------+
|2015|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2015-06-12|91.9  |
|2016|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2016-07-24|93.9  |
|2017|99495199999|SEBASTIAN INLET STATE PARK, FL US               |2017-02-22|9999.9|
|2018|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2018-07-04|96.1  |
|2019|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2019-09-30|95.0  |
|2020|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2020-07-05|93.9  |
|2021|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2021-08-12|95.0  |
|2022|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2022-12-23|9999.9|
|2023|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN 

In [10]:
from pyspark.sql.functions import month

march_df = weather_df.filter(month(weather_df.DATE) == 3)

coldest_march = (
    march_df
    .orderBy("MIN")
    .limit(1)
)

coldest_march.select("STATION_ID", "NAME", "DATE", "MIN").show(truncate=False)


+-----------+------------------------------------------------+----------+---+
|STATION_ID |NAME                                            |DATE      |MIN|
+-----------+------------------------------------------------+----------+---+
|72429793812|CINCINNATI MUNICIPAL AIRPORT LUNKEN FIELD, OH US|2015-03-06|3.2|
+-----------+------------------------------------------------+----------+---+



In [11]:
from pyspark.sql.functions import avg, row_number

prcp_df = weather_df.groupBy("STATION_ID", "YEAR").agg(avg("PRCP").alias("mean_prcp"))

window_station = Window.partitionBy("STATION_ID").orderBy(desc("mean_prcp"))

prcp_ranked = prcp_df.withColumn("rn", row_number().over(window_station))
top_prcp = prcp_ranked.filter("rn = 1").select("STATION_ID", "YEAR", "mean_prcp")

top_prcp.show(truncate=False)


+-----------+----+-----------------+
|STATION_ID |YEAR|mean_prcp        |
+-----------+----+-----------------+
|72429793812|2024|4.503497267759559|
|99495199999|2015|0.0              |
+-----------+----+-----------------+



In [12]:
from pyspark.sql.functions import when, sum as spark_sum, count, col

df_2024 = weather_df.filter(col("YEAR") == 2024)

missing_cond = (col("GUST").isNull()) | (col("GUST") == 999.9) | (col("GUST") == 9999.9)

gust_missing = (
    df_2024
    .groupBy("STATION_ID")
    .agg(
        (spark_sum(when(missing_cond, 1).otherwise(0)) / count("*") * 100).alias("pct_missing_gust")
    )
)

gust_missing.show(truncate=False)


+-----------+-----------------+
|STATION_ID |pct_missing_gust |
+-----------+-----------------+
|72429793812|39.07103825136612|
|99495199999|100.0            |
+-----------+-----------------+



In [13]:
from pyspark.sql.functions import month, mean as spark_mean, stddev as spark_stddev
from pyspark.sql.functions import percentile_approx

# 1) Filter data
cincy_2020 = weather_df.filter(
    (col("STATION_ID") == "72429793812") &
    (col("YEAR") == 2020)
)

# 2) Create a MONTH column
cincy_2020 = cincy_2020.withColumn("MONTH", month(cincy_2020.DATE))

# 3) Calculate mean, median, stddev
stats_df = cincy_2020.groupBy("MONTH").agg(
    spark_mean("TEMP").alias("mean_temp"),
    percentile_approx("TEMP", 0.5).alias("median_temp"),
    spark_stddev("TEMP").alias("stddev_temp")
)

# 4) Calculate mode
#    Count how often each TEMP occurs in each month
temp_count = cincy_2020.groupBy("MONTH", "TEMP").count()

from pyspark.sql.window import Window
window_mode = Window.partitionBy("MONTH").orderBy(desc("count"))

mode_df = (
    temp_count
    .withColumn("rn", row_number().over(window_mode))
    .filter("rn = 1")
    .select(col("MONTH").alias("MODE_MONTH"), col("TEMP").alias("mode_temp"))
)

# 5) Join the mode back to the stats
final_stats = (
    stats_df
    .join(mode_df, stats_df.MONTH == mode_df.MODE_MONTH, "left")
    .drop("MODE_MONTH")
    .orderBy("MONTH")
)

final_stats.show(12, truncate=False)


+-----+------------------+-----------+-----------------+---------+
|MONTH|mean_temp         |median_temp|stddev_temp      |mode_temp|
+-----+------------------+-----------+-----------------+---------+
|1    |37.94516129032259 |37.7       |8.345810873712928|24.7     |
|2    |36.5896551724138  |36.0       |7.90159770587055 |25.9     |
|3    |49.0741935483871  |47.8       |8.779406500135623|43.0     |
|4    |51.779999999999994|51.0       |7.313162436838541|55.7     |
|5    |60.89032258064518 |63.7       |9.314768017820217|73.9     |
|6    |72.54666666666667 |73.7       |4.899946047087439|74.7     |
|7    |77.6              |77.9       |2.33794781806609 |77.5     |
|8    |73.34516129032258 |73.7       |3.487868375734898|73.2     |
|9    |66.1              |65.8       |7.118262089331474|75.3     |
|10   |55.193548387096776|54.0       |6.72869157582517 |63.8     |
|11   |48.003333333333345|47.7       |6.825938527529321|47.7     |
|12   |35.99354838709677 |35.2       |6.642787340861814|32.1  

In [14]:
from pyspark.sql.functions import pow, asc

df_cincy_2017 = weather_df.filter(
    (col("STATION_ID") == "72429793812") &
    (col("YEAR") == 2017) &
    (col("TEMP") < 50) &
    (col("WDSP") > 3)
)

df_cincy_2017 = df_cincy_2017.withColumn(
    "WIND_CHILL",
    35.74 + 0.6215*col("TEMP")
    - 35.75*pow(col("WDSP"), 0.16)
    + 0.4275*col("TEMP")*pow(col("WDSP"), 0.16)
)

lowest_wc = df_cincy_2017.orderBy(asc("WIND_CHILL")).limit(10)

lowest_wc.select("DATE", "TEMP", "WDSP", "WIND_CHILL").show(10, truncate=False)


+----------+----+----+-------------------+
|DATE      |TEMP|WDSP|WIND_CHILL         |
+----------+----+----+-------------------+
|2017-01-07|10.5|7.0 |-0.4140156367932173|
|2017-12-31|11.0|5.3 |2.0339767075993116 |
|2017-12-27|13.0|5.8 |3.820645509123832  |
|2017-12-28|13.6|5.8 |4.533355269061226  |
|2017-01-06|13.6|5.5 |4.868933041653884  |
|2017-01-08|15.9|5.2 |7.929748208036862  |
|2017-12-25|25.8|13.5|14.285113218297408 |
|2017-12-30|21.6|5.3 |14.539211253038193 |
|2017-01-05|22.2|5.8 |14.748861828163854 |
|2017-12-26|23.3|6.2 |15.688977805634499 |
+----------+----+----+-------------------+



In [15]:
df_fl = weather_df.filter(col("STATION_ID") == "99495199999")

extreme_count = df_fl.filter(col("FRSHTT") != 0).count()

print("Number of days with extreme weather in Florida:", extreme_count)


Number of days with extreme weather in Florida: 0


In [17]:
from pyspark.sql.functions import udf
import datetime

def day_of_year_func(d):
    if d is None:
        return None
    return d.timetuple().tm_yday

from pyspark.sql.types import IntegerType
day_of_year_udf = udf(day_of_year_func, IntegerType())

# 1) Training data (Cincinnati, 2022–2023)
train_df = weather_df.filter(
    (col("STATION_ID") == "72429793812") &
    (col("YEAR").isin([2022, 2023]))
).select("DATE", "MAX")

train_df = train_df.withColumn("day_of_year", day_of_year_udf(col("DATE")))
train_df = train_df.dropna(subset=["day_of_year", "MAX"])

# 2) Spark ML setup
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(inputCols=["day_of_year"], outputCol="features")
train_model_df = assembler.transform(train_df).select("features", col("MAX").alias("label"))

lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_model_df)

print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)
print("Training RMSE:", lr_model.summary.rootMeanSquaredError)

# 3) Predict for 2024 (Nov/Dec)
predict_2024 = weather_df.filter(
    (col("STATION_ID") == "72429793812") &
    (col("YEAR") == 2024)
    & (month(col("DATE")).isin([11, 12]))
).select("DATE", "MAX")

predict_2024 = predict_2024.withColumn("day_of_year", day_of_year_udf(col("DATE")))
predict_2024_df = assembler.transform(predict_2024).select("DATE", "MAX", "features")

predictions = lr_model.transform(predict_2024_df)

from pyspark.sql.functions import month, max as spark_max

# If you just want a single predicted max for each month
pred_month = (
    predictions
    .withColumn("MONTH", month(col("DATE")))
    .groupBy("MONTH")
    .agg(spark_max("prediction").alias("PredictedMax"))
    .orderBy("MONTH")
)

pred_month.show()


25/03/23 19:06:51 WARN Instrumentation: [28a8ae06] regParam is zero, which might cause numerical instability and overfitting.


Coefficients: [0.24552368695755147]
Intercept: 38.237247478548916
Training RMSE: 366.7695009694051
+-----+------------------+
|MONTH|      PredictedMax|
+-----+------------------+
|   11|120.48768260932866|
|   12|128.09891690501274|
+-----+------------------+

