In [1]:
# --- First cell: force Java 11 inside the notebook process ---
import os, subprocess, sys

# Point JAVA_HOME to JDK 11 (Temurin 11) on macOS
os.environ["JAVA_HOME"] = subprocess.check_output(
    ["/usr/libexec/java_home", "-v", "11"], text=True
).strip()

# sanity check
print("JAVA_HOME =", os.environ["JAVA_HOME"])
print("java -version:")
print(subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT, text=True))

# EDA: Setup & Load
from pyspark.sql import SparkSession, functions as F

spark = (
    SparkSession.builder.appName("ISD_2024_RF")
      .master("local[*]")  # local run
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.shuffle.partitions", "500")   # up from 200 for 18M rows
      .config("spark.default.parallelism", "500")
      .config("spark.driver.memory", "16g")             # if you have the RAM; else keep 8g
      .config("spark.memory.fraction", "0.6")
      .config("spark.memory.storageFraction", "0.4")
      .config("spark.kryoserializer.buffer.max", "1024m")
      .getOrCreate()
)
# PARQUET_PATH = "../data/cleansed/2024_cleansed.parquet"  
# df = spark.read.parquet(PARQUET_PATH)

# print("Loaded:", PARQUET_PATH)
# df.printSchema()
# print("Rows:", f"{df.count():,}")

JAVA_HOME = /Library/Java/JavaVirtualMachines/temurin-11.jdk/Contents/Home
java -version:
openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment Temurin-11.0.28+6 (build 11.0.28+6)
OpenJDK 64-Bit Server VM Temurin-11.0.28+6 (build 11.0.28+6, mixed mode)



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/16 12:40:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
train_output = "../data/train_2024.parquet"
test_output  = "../data/test_2024.parquet"

train_df = spark.read.parquet(train_output)
test_df = spark.read.parquet(test_output)

# print("Loaded:", train_df)
# train_df.printSchema()
# print("Rows:", f"{train_df.count():,}")

# print("Loaded:", test_df)
# test_df.printSchema()
# print("Rows:", f"{test_df.count():,}")

                                                                                

In [5]:
from pyspark.sql import functions as F
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark import StorageLevel
# Serialized MEMORY_AND_DISK equivalent
StorageLevel(useDisk=True, useMemory=True, useOffHeap=False, deserialized=False, replication=1)

label_col = "TMP_C"
feature_cols = [
    "LATITUDE","LONGITUDE","ELEVATION",
    "WND_DIR_DEG","WND_SPD_MS",
    "CIG_HEIGHT_M","VIS_DIST_M",
    "DEW_C","SLP_hPa",
    "year","month","day","hour"
]

# Keep DATE only in test for reporting
train_df = train_df.select(label_col, *feature_cols).repartition(500)
test_df  = test_df.select("DATE", label_col, *feature_cols)

# --- Fit lightweight stages ONCE (minimize broadcast size) ---
imp_cols = [c + "_imp" for c in feature_cols]
imputer = Imputer(strategy="median", inputCols=feature_cols, outputCols=imp_cols)
imputerModel = imputer.fit(train_df)

assembler = VectorAssembler(inputCols=imp_cols, outputCol="features")

# Transform & cache the vectorized train once (serialized)
train_feat = assembler.transform(imputerModel.transform(train_df)) \
                       .select(label_col, "features") \
                       .persist(StorageLevel(True, True, False, False, 1))
_ = train_feat.count()  # materialize to break lineage

test_feat = assembler.transform(imputerModel.transform(test_df)) \
                      .select("DATE", label_col, "features")

# --- Random Forest: start lean; scale up if metrics improve materially ---
rf = RandomForestRegressor(
    featuresCol="features", labelCol=label_col,
    numTrees=80,          # try 60–100; increase only if you see clear gains
    maxDepth=10,          # 8–10 is a sweet spot on laptop
    subsamplingRate=0.7,
    featureSubsetStrategy="sqrt",
    seed=42
)
rf_model = rf.fit(train_feat)

# Predict + cache for metrics/report
predictions = rf_model.transform(test_feat) \
    .select("DATE", label_col, F.col("prediction").alias("prediction")) \
    .persist(StorageLevel(True, True, False, False, 1))
_ = predictions.count()

# Evaluate
e_rmse = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
e_mae  = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="mae")
e_r2   = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2")
print(f"[RF-local] RMSE={e_rmse.evaluate(predictions):.4f} | "
      f"MAE={e_mae.evaluate(predictions):.4f} | R²={e_r2.evaluate(predictions):.4f}")

25/10/15 21:44:00 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/10/15 21:49:46 WARN DAGScheduler: Broadcasting large task binary with size 1676.2 KiB
25/10/15 21:52:05 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
25/10/15 21:54:58 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB
25/10/15 21:58:30 WARN DAGScheduler: Broadcasting large task binary with size 1986.1 KiB
25/10/15 21:58:51 WARN DAGScheduler: Broadcasting large task binary with size 12.6 MiB
25/10/15 22:03:58 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB

[RF-local] RMSE=3.7224 | MAE=2.6887 | R²=0.9123


                                                                                

In [7]:
spark.stop()

[RF-local] RMSE=3.7224 | MAE=2.6887 | R²=0.9123

In [6]:
# Save the trained RF model
rf_model.write().overwrite().save("models/rf_weather_model")

# Save predictions for reuse (optional but useful for analysis)
predictions.write.mode("overwrite").parquet("results/rf_predictions.parquet")

# Save imputer model too if you fitted one separately
imputerModel.write().overwrite().save("models/rf_imputer_model")

                                                                                

## Run the below one tomorrow

In [7]:
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark import StorageLevel

label_col = "TMP_C"
feature_cols = [
    "LATITUDE","LONGITUDE","ELEVATION",
    "WND_DIR_DEG","WND_SPD_MS",
    "CIG_HEIGHT_M","VIS_DIST_M",
    "DEW_C","SLP_hPa",
    "year","month","day","hour"
]
imp_cols = [c + "_imp" for c in feature_cols]

# 0) Sanity: confirm the saved imputer expects the same inputs
assert list(imputerModel.getInputCols()) == feature_cols, \
    f"Imputer inputs {imputerModel.getInputCols()} don't match expected {feature_cols}"

# 1) Keep only what you need; ensure DATE is present for reporting
req_cols = ["DATE", label_col] + feature_cols
missing = [c for c in req_cols if c not in test_df.columns]
if missing:
    raise ValueError(f"Missing columns in test_df: {missing}")

test_df = test_df.select(*req_cols)

# 2) Transform with the *fitted* imputer, then assemble
assembler = VectorAssembler(inputCols=imp_cols, outputCol="features")
test_feat = (
    assembler
    .transform(imputerModel.transform(test_df))
    .select("DATE", label_col, "features")
)

# 3) Predict + cache (serialized)
SERIALIZED_STORAGE = StorageLevel(True, True, False, False, 1)
predictions = (
    rf_model.transform(test_feat)
            .select("DATE", F.col(label_col).alias("TMP_C"), F.col("prediction"))
            .persist(SERIALIZED_STORAGE)
)
_ = predictions.count()  # materialize

# 4) Evaluate & save
e_rmse = RegressionEvaluator(labelCol="TMP_C", predictionCol="prediction", metricName="rmse")
e_mae  = RegressionEvaluator(labelCol="TMP_C", predictionCol="prediction", metricName="mae")
e_r2   = RegressionEvaluator(labelCol="TMP_C", predictionCol="prediction", metricName="r2")

rmse = float(e_rmse.evaluate(predictions))
mae  = float(e_mae.evaluate(predictions))
r2   = float(e_r2.evaluate(predictions))
print(f"[RF-reload] RMSE={rmse:.4f} | MAE={mae:.4f} | R²={r2:.4f}")

# Save artifacts
predictions.write.mode("overwrite").parquet("results/rf_predictions_reload.parquet")
spark.createDataFrame([(rmse, mae, r2)], ["rmse","mae","r2"]) \
     .coalesce(1).write.mode("overwrite").option("header", True).csv("results/rf_metrics_reload_csv")

                                                                                

[RF-reload] RMSE=3.7224 | MAE=2.6887 | R²=0.9123


                                                                                

In [8]:
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator

e_rmse = RegressionEvaluator(labelCol="TMP_C", predictionCol="prediction", metricName="rmse")
e_mae  = RegressionEvaluator(labelCol="TMP_C", predictionCol="prediction", metricName="mae")
e_r2   = RegressionEvaluator(labelCol="TMP_C", predictionCol="prediction", metricName="r2")

rmse = float(e_rmse.evaluate(predictions))
mae  = float(e_mae.evaluate(predictions))
r2   = float(e_r2.evaluate(predictions))

spark.createDataFrame([(rmse, mae, r2)], ["rmse","mae","r2"]) \
     .coalesce(1).write.mode("overwrite").option("header", True).csv("results/rf_metrics_csv")
print(f"[saved] results/rf_metrics_csv")

[Stage 89:>                                                         (0 + 1) / 1]

[saved] results/rf_metrics_csv


                                                                                

In [10]:
## Feature importance CSV
# If rf_model is not in scope, reload it as above
imp_vec = rf_model.featureImportances
feat_imps = [(feature_cols[i], float(imp_vec[i])) for i in range(len(feature_cols))]
spark.createDataFrame(feat_imps, ["feature","importance"]) \
     .orderBy(F.desc("importance")) \
     .coalesce(1).write.mode("overwrite").option("header", True).csv("results/rf_feature_importance_csv")
print(f"[saved] results/rf_feature_importance_csv")



[saved] results/rf_feature_importance_csv


                                                                                

In [11]:
## RMSE by hour and month
from pyspark.sql import functions as F

eval_df = (predictions
    .withColumn("residual", F.col("TMP_C") - F.col("prediction"))
    .withColumn("sq_err", F.pow(F.col("residual"), 2))
    .withColumn("hour",  F.hour("DATE"))
    .withColumn("month", F.month("DATE"))
)

rmse_by_hour = eval_df.groupBy("hour") \
    .agg(F.sqrt(F.avg("sq_err")).alias("rmse"),
         F.avg("residual").alias("bias")) \
    .orderBy("hour")
rmse_by_hour.coalesce(1).write.mode("overwrite").option("header", True).csv("results/rf_rmse_by_hour_csv")

rmse_by_month = eval_df.groupBy("month") \
    .agg(F.sqrt(F.avg("sq_err")).alias("rmse"),
         F.avg("residual").alias("bias")) \
    .orderBy("month")
rmse_by_month.coalesce(1).write.mode("overwrite").option("header", True).csv("results/rf_rmse_by_month_csv")

print(f"[saved] results/rf_rmse_by_hour_csv, results/rf_rmse_by_month_csv")

[saved] results/rf_rmse_by_hour_csv, results/rf_rmse_by_month_csv


In [12]:
predictions.select("DATE","TMP_C","prediction") \
    .coalesce(1).write.mode("overwrite").option("header", True).csv("results/rf_predictions_csv")
print(f"[saved] results/rf_predictions_csv")

[Stage 110:>                                                        (0 + 1) / 1]

[saved] results/rf_predictions_csv


                                                                                

In [16]:
import pandas as pd
import matplotlib.pyplot as plt
import os

# --- Paths ---
FIG_DIR = "results/figures"
os.makedirs(FIG_DIR, exist_ok=True)

# --- 1. Predicted vs Actual Scatter ---
df = pd.read_csv("results/rf_predictions_csv/part-*.csv")
plt.figure(figsize=(6,6))
plt.scatter(df["TMP_C"], df["prediction"], s=5, alpha=0.5, color="teal")
plt.plot([df["TMP_C"].min(), df["TMP_C"].max()],
         [df["TMP_C"].min(), df["TMP_C"].max()],
         'r--', lw=2, label="Perfect fit")
plt.xlabel("Actual Temperature (°C)")
plt.ylabel("Predicted Temperature (°C)")
plt.title("Random Forest – Predicted vs Actual TMP_C")
plt.legend()
plt.tight_layout()
plt.savefig(f"{FIG_DIR}/rf_pred_vs_actual.png", dpi=300)
plt.close()

# --- 2. Residual Distribution Histogram ---
df["residual"] = df["TMP_C"] - df["prediction"]
plt.figure(figsize=(6,4))
plt.hist(df["residual"], bins=50, color="orange", edgecolor="k", alpha=0.7)
plt.title("Residual Distribution (TMP_C - Predicted)")
plt.xlabel("Residual (°C)")
plt.ylabel("Frequency")
plt.tight_layout()
plt.savefig(f"{FIG_DIR}/rf_residual_hist.png", dpi=300)
plt.close()

# --- 3. Feature Importance Bar Chart ---
imp = pd.read_csv("results/rf_feature_importance_csv/part-*.csv")
imp = imp.sort_values("importance", ascending=False)
plt.figure(figsize=(8,5))
plt.barh(imp["feature"], imp["importance"], color="skyblue")
plt.gca().invert_yaxis()
plt.title("Random Forest Feature Importance")
plt.xlabel("Importance Score")
plt.tight_layout()
plt.savefig(f"{FIG_DIR}/rf_feature_importance.png", dpi=300)
plt.close()

# --- 4. RMSE by Hour ---
rmse_hour = pd.read_csv("results/rf_rmse_by_hour_csv/part-*.csv")
plt.figure(figsize=(7,4))
plt.plot(rmse_hour["hour"], rmse_hour["rmse"], marker="o", color="slateblue")
plt.title("RMSE by Hour of Day")
plt.xlabel("Hour")
plt.ylabel("RMSE (°C)")
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.savefig(f"{FIG_DIR}/rf_rmse_by_hour.png", dpi=300)
plt.close()

# --- 5. RMSE by Month ---
rmse_month = pd.read_csv("results/rf_rmse_by_month_csv/part-*.csv")
plt.figure(figsize=(7,4))
plt.plot(rmse_month["month"], rmse_month["rmse"], marker="o", color="seagreen")
plt.title("RMSE by Month")
plt.xlabel("Month")
plt.ylabel("RMSE (°C)")
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.savefig(f"{FIG_DIR}/rf_rmse_by_month.png", dpi=300)
plt.close()

print(f"✅ All RF figures saved in {FIG_DIR}/")

FileNotFoundError: [Errno 2] No such file or directory: 'results/rf_predictions_csv/part-*.csv'