In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# ------------------------------------------------------
# Create Spark Session
# ------------------------------------------------------
spark = SparkSession.builder \
    .appName("WeatherPredictor-MLTraining") \
    .getOrCreate()

# Use raw string for Windows path
from pathlib import Path
gold_path = Path("A:/Personal Files/Education/Data Science/Weather_Predictor/data/gold/ml_ready_hourly.parquet")

print("Loading ML-ready dataset...")
df = spark.read.parquet(str(gold_path))

df.printSchema()
df.show(5)


Loading ML-ready dataset...
root
 |-- time: timestamp_ntz (nullable = true)
 |-- temperature_2m: double (nullable = true)
 |-- relative_humidity_2m: long (nullable = true)
 |-- dew_point_2m: double (nullable = true)
 |-- precipitation_probability: long (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- cloud_cover: long (nullable = true)
 |-- surface_pressure: double (nullable = true)
 |-- wind_speed_10m: double (nullable = true)
 |-- wind_gusts_10m: double (nullable = true)
 |-- wind_direction_10m: long (nullable = true)
 |-- temp_lag_1h: double (nullable = true)
 |-- humidity_lag_1h: double (nullable = true)
 |-- wind_lag_1h: double (nullable = true)
 |-- temp_lag_2h: double (nullable = true)
 |-- humidity_lag_2h: double (nullable = true)
 |-- wind_lag_2h: double (nullable = true)
 |-- temp_lag_3h: double (nullable = true)
 |-- humidity_lag_3h: double (nullable = true)
 |-- wind_lag_3h: double (nullable = true)
 |-- temp_change_1h: double (nullable = true)
 |-- humid

In [2]:
# Target label
label_col = "rain_next_hour"

# Exclude non-feature columns
exclude = ["time", "date", "rain_next_hour", "rain_next_3h"]

feature_cols = [c for c in df.columns if c not in exclude]

print("Using features:")
print(feature_cols)


Using features:
['temperature_2m', 'relative_humidity_2m', 'dew_point_2m', 'precipitation_probability', 'precipitation', 'cloud_cover', 'surface_pressure', 'wind_speed_10m', 'wind_gusts_10m', 'wind_direction_10m', 'temp_lag_1h', 'humidity_lag_1h', 'wind_lag_1h', 'temp_lag_2h', 'humidity_lag_2h', 'wind_lag_2h', 'temp_lag_3h', 'humidity_lag_3h', 'wind_lag_3h', 'temp_change_1h', 'humidity_change_1h', 'wind_change_1h']


In [3]:
df = df.dropna()


assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# Vectorized dataframe
df_ml = assembler.transform(df).select("features", label_col)

train_df, test_df = df_ml.randomSplit([0.8, 0.2], seed=42)

train_df.show(5)


+--------------------+--------------+
|            features|rain_next_hour|
+--------------------+--------------+
|[0.0,89.0,-1.6,0....|             0|
|[0.0,90.0,-1.5,0....|             0|
|[0.3,93.0,-0.7,1....|             0|
|[0.4,91.0,-0.9,1....|             0|
|[0.5,91.0,-0.8,0....|             0|
+--------------------+--------------+
only showing top 5 rows



In [4]:
gbt = GBTClassifier(
    labelCol=label_col,
    featuresCol="features",
    maxDepth=5,
    maxIter=50
)

model = gbt.fit(train_df)

print("Model trained!")


Model trained!


In [5]:
predictions = model.transform(test_df)
predictions.select("prediction", "probability", label_col).show(10)

evaluator = BinaryClassificationEvaluator(
    labelCol=label_col,
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print("AUC:", auc)


+----------+--------------------+--------------+
|prediction|         probability|rain_next_hour|
+----------+--------------------+--------------+
|       0.0|[0.97847911444165...|             0|
|       0.0|[0.97847911444165...|             0|
|       0.0|[0.97847911444165...|             0|
|       0.0|[0.97847911444165...|             0|
|       0.0|[0.97847911444165...|             0|
|       0.0|[0.97847911444165...|             0|
|       0.0|[0.97847911444165...|             0|
|       1.0|[0.02152088555834...|             1|
|       0.0|[0.97847911444165...|             0|
|       0.0|[0.97847911444165...|             0|
+----------+--------------------+--------------+
only showing top 10 rows

AUC: 1.0


In [6]:
model_path = r"A:\Personal Files\Education\Data Science\Weather_Predictor\models\gbt_weather_model"

model.write().overwrite().save(model_path)

print(f"Model saved to: {model_path}")

Py4JJavaError: An error occurred while calling o294.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1623)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1623)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1609)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1609)
	at org.apache.spark.ml.util.DefaultParamsWriter$.saveMetadata(ReadWrite.scala:413)
	at org.apache.spark.ml.tree.EnsembleModelReadWrite$.saveImpl(treeModels.scala:473)
	at org.apache.spark.ml.classification.GBTClassificationModel$GBTClassificationModelWriter.saveImpl(GBTClassifier.scala:412)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
	at org.apache.hadoop.mapred.OutputCommitter.commitJob(OutputCommitter.java:291)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$3(SparkHadoopWriter.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
	... 52 more


In [None]:
# --------------------------------------------------
# 1. Initialize Spark Session
# --------------------------------------------------
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("WeatherPredictor-ML-Training")
    .getOrCreate()
)

# --------------------------------------------------
# 2. Load ML-ready dataset
# --------------------------------------------------
from pathlib import Path

gold_path = Path("../../data/gold")
ml_file = gold_path / "ml_ready_hourly.parquet"

df = spark.read.parquet(str(ml_file))

df.printSchema()
df.show(10)

# --------------------------------------------------
# 3. Select Features + Labels
# --------------------------------------------------
from pyspark.sql.functions import col

label = "rain_next_hour"

feature_cols = [
    c for c in df.columns
    if c not in ["time", "rain_next_hour", "rain_next_3h", "silver_loaded_at"]
       and not c.startswith("date")
]

print("Using features:", feature_cols)

# --------------------------------------------------
# 4. Assemble Features Vector
# --------------------------------------------------
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

df_vec = assembler.transform(df).select("features", label)

df_vec.show(10)

# --------------------------------------------------
# 5. Train/Test Split
# --------------------------------------------------
train_df, test_df = df_vec.randomSplit([0.8, 0.2], seed=42)

# --------------------------------------------------
# 6. Train a Classifier (Random Forest)
# --------------------------------------------------
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol=label,
    featuresCol="features",
    numTrees=150,
    maxDepth=12
)

model = rf.fit(train_df)

# --------------------------------------------------
# 7. Evaluate Model
# --------------------------------------------------
from pyspark.ml.evaluation import BinaryClassificationEvaluator

preds = model.transform(test_df)

evaluator = BinaryClassificationEvaluator(
    labelCol=label,
    rawPredictionCol="rawPrediction"
)

auc = evaluator.evaluate(preds)

print("AUC:", auc)

# --------------------------------------------------
# 8. Save Model (Optional)
# --------------------------------------------------
model_path = "../../models/rf_rain_next_hour"

model.write().overwrite().save(model_path)
print("Model saved to:", model_path)

