In [0]:
%pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, year, avg, to_date
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("Evapotranspiration_MLlib").getOrCreate()

df = spark.read.option("header", "true").option("inferSchema", "true").csv("/opt/data/weatherData.csv")

print("Data loaded successfully!")
print(f"Total records: {df.count()}")
# df.printSchema()

In [1]:
%pyspark
df_clean = df.withColumnRenamed("precipitation_hours (h)", "precipitation_hours").withColumnRenamed("sunshine_duration (s)", "sunshine_duration") \
            .withColumnRenamed("wind_speed_10m_max (km/h)", "wind_speed").withColumnRenamed("et0_fao_evapotranspiration (mm)", "evapotranspiration")

df_clean = df_clean.withColumn("date_parsed", to_date(col("date"), "M/d/yyyy")).withColumn("year", year("date_parsed")).withColumn("month", month("date_parsed"))

# Filter for May (month = 5)
df_may = df_clean.filter(col("month") == 5)

df_features = df_may.select("precipitation_hours", "sunshine_duration", "wind_speed", "evapotranspiration").na.drop()

print(f"May records: {df_features.count()}")
df_features.describe().show()


In [2]:
%pyspark
feature_cols = ["precipitation_hours", "sunshine_duration", "wind_speed"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)

df_assembled = assembler.transform(df_features)
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

print("Features assembled and scaled!")
df_scaled.select("features", "evapotranspiration").show(5, truncate=False)

In [3]:
%pyspark
# Split data: 80% training, 20% validation
train_data, test_data = df_scaled.randomSplit([0.8, 0.2], seed=42)

print(f"Training samples: {train_data.count()}")
print(f"Validation samples: {test_data.count()}")

In [4]:
%pyspark
# Linear Regression
lr = LinearRegression(
    featuresCol="features",
    labelCol="evapotranspiration",
    maxIter=100,
    regParam=0.3,
    elasticNetParam=0.8)

lr_model = lr.fit(train_data)

# Random Forest Regresssor
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="evapotranspiration",
    numTrees=100,
    maxDepth=10,
    seed=42)

rf_model = rf.fit(train_data)

print("=== Random Forest Model ===")
print(f"Features: {feature_cols}")
print(f"Feature Importances: {rf_model.featureImportances}")

# Gradient Boosted Trees Regressor
gbt = GBTRegressor(
    featuresCol="features",
    labelCol="evapotranspiration",
    maxIter=100,
    seed=42)

gbt_model = gbt.fit(train_data)
print("\n=== Gradient Boosted Trees (GBT) Model ===")
print(f"Features: {feature_cols}")
print(f"Feature Importances: {gbt_model.featureImportances}")

In [5]:
%pyspark
# Model evaluation
evaluator_rmse = RegressionEvaluator(labelCol="evapotranspiration", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="evapotranspiration", predictionCol="prediction", metricName="r2")
evaluator_mae = RegressionEvaluator(labelCol="evapotranspiration", predictionCol="prediction", metricName="mae")

# Linear Regression predictions
lr_predictions = lr_model.transform(test_data)
lr_rmse = evaluator_rmse.evaluate(lr_predictions)
lr_r2 = evaluator_r2.evaluate(lr_predictions)
lr_mae = evaluator_mae.evaluate(lr_predictions)

# Random Forest predictions
rf_predictions = rf_model.transform(test_data)
rf_rmse = evaluator_rmse.evaluate(rf_predictions)
rf_r2 = evaluator_r2.evaluate(rf_predictions)
rf_mae = evaluator_mae.evaluate(rf_predictions)

# GBT predictions
gbt_predictions = gbt_model.transform(test_data)
gbt_rmse = evaluator_rmse.evaluate(gbt_predictions)
gbt_r2 = evaluator_r2.evaluate(gbt_predictions)
gbt_mae = evaluator_mae.evaluate(gbt_predictions)

print("=== Linear Regression Evaluation ===")
print(f"RMSE: {lr_rmse:.4f}")
print(f"R-2: {lr_r2:.4f}")
print(f"MAE: {lr_mae:.4f}")

print("\n=== Random Forest Evaluation ===")
print(f"RMSE: {rf_rmse:.4f}")
print(f"R-2: {rf_r2:.4f}")
print(f"MAE: {rf_mae:.4f}")

print("\n=== Gradient Boosted Trees Evaluation ===")
print(f"RMSE: {gbt_rmse:.4f}")
print(f"R-2: {gbt_r2:.4f}")
print(f"MAE: {gbt_mae:.4f}")

# === Linear Regression Evaluation ===
# RMSE: 0.6757
# R-2: 0.6792
# MAE: 0.5153

# === Random Forest Evaluation ===
# RMSE: 0.4948
# R-2: 0.8279
# MAE: 0.3855

# === Gradient Boosted Trees Evaluation ===
# RMSE: 0.4972
# R-2: 0.8263
# MAE: 0.3871

In [6]:
%pyspark
# Find patterns where evapotranspiration < 1.5mm
low_et = df_features.filter(col("evapotranspiration") < 1.5)

print("=== Conditions for Evapotranspiration < 1.5mm ===")
print(f"Number of days with ET < 1.5mm: {low_et.count()}")

low_et_stats = low_et.agg(
    avg("precipitation_hours").alias("mean_precipitation_hours"),
    avg("sunshine_duration").alias("mean_sunshine_duration"),
    avg("wind_speed").alias("mean_wind_speed"))

print("\nMean values when evapotranspiration < 1.5mm:")
low_et_stats.show()

# === Conditions for Evapotranspiration < 1.5mm ===
# Number of days with ET < 1.5mm: 134

# Mean values when evapotranspiration < 1.5mm:
# +------------------------+----------------------+------------------+
# |mean_precipitation_hours|mean_sunshine_duration|   mean_wind_speed|
# +------------------------+----------------------+------------------+
# |      22.134328358208954|    1593.0579850746274|18.490298507462686|
# +------------------------+----------------------+------------------+

In [7]:
%pyspark
# Check prediction floors
print("=== Model Prediction Ranges on Test Data ===")

lr_preds = lr_model.transform(test_data)
print(f"Linear Regression: Min={lr_preds.agg({'prediction': 'min'}).collect()[0][0]:.4f}, Max={lr_preds.agg({'prediction': 'max'}).collect()[0][0]:.4f}")

rf_preds = rf_model.transform(test_data)
print(f"Random Forest:     Min={rf_preds.agg({'prediction': 'min'}).collect()[0][0]:.4f}, Max={rf_preds.agg({'prediction': 'max'}).collect()[0][0]:.4f}")

gbt_preds = gbt_model.transform(test_data)
print(f"GBT:               Min={gbt_preds.agg({'prediction': 'min'}).collect()[0][0]:.4f}, Max={gbt_preds.agg({'prediction': 'max'}).collect()[0][0]:.4f}")

# === Model Prediction Ranges on Test Data ===
# Linear Regression: Min=1.7946, Max=5.3591
# Random Forest:     Min=1.6081, Max=6.3490
# GBT:               Min=1.2874, Max=6.9398

# Here, in Linear Regression and Random Forest, for unseen data, the predictions' range varies above 1.5mm.
# So those can not be used to predict the mean precipitation_hours, sunshine, and wind_speed during May in 2026 to have evapotranspiration lower than 1.5mm


In [8]:
%pyspark
import random
from pyspark.sql.types import StructType, StructField, DoubleType
from pyspark.sql.functions import avg, col

# Grid Search Parameters
# Based on May analysis: High Rain, Low Sun, Variable Wind
NUM_SAMPLES = 50000
MIN_SUNSHINE = 0.0         # 0 hours
MAX_SUNSHINE = 3600.0      # 1 hour 
MIN_WIND = 15.0            # Range around the 18.5 km/h mean
MAX_WIND = 25.0            # Range around the 18.5 km/h mean
MIN_PRECIP = 20.0          # Range around 22h
MAX_PRECIP = 24.0          # Range around 22h

synthetic_data = []
for _ in range(NUM_SAMPLES):
    synthetic_data.append((
        random.uniform(MIN_PRECIP, MAX_PRECIP),
        random.uniform(MIN_SUNSHINE, MAX_SUNSHINE),
        random.uniform(MIN_WIND, MAX_WIND)
    ))

schema = StructType([
    StructField("precipitation_hours", DoubleType(), True),
    StructField("sunshine_duration", DoubleType(), True),
    StructField("wind_speed", DoubleType(), True)
])

df_synthetic = spark.createDataFrame(synthetic_data, schema)

print(f"Generating {NUM_SAMPLES} synthetic realistic weather points...")

# Transform features
df_synthetic_assembled = assembler.transform(df_synthetic)
df_synthetic_scaled = scaler_model.transform(df_synthetic_assembled)

In [9]:
%pyspark
# Predict using GBT
predictions = gbt_model.transform(df_synthetic_scaled)

# Keep ONLY the days where Predicted ET < 1.5
successful_days = predictions.filter(col("prediction") < 1.5)
success_count = successful_days.count()

print(f"Found {success_count} days with ET < 1.5mm out of {NUM_SAMPLES} samples.")

if success_count > 0:
    # Calculate Mean of those successful days
    mean_values = successful_days.agg(
        avg("precipitation_hours").alias("avg_precip"),
        avg("sunshine_duration").alias("avg_sun"),
        avg("wind_speed").alias("avg_wind"),
        avg("prediction").alias("avg_et")
    ).collect()[0]
    
    recommended_stats = {
        'precip': mean_values["avg_precip"],
        'sun': mean_values["avg_sun"],
        'wind': mean_values["avg_wind"]
    }
    
    print(f"\n*** Recommended Mean Values for May 2026 (Target < 1.5mm) ***")
    print(f"Precipitation Hours: {recommended_stats['precip']:.2f} hours")
    print(f"Sunshine Duration: {recommended_stats['sun']:.2f} seconds ({recommended_stats['sun']/3600:.2f} hours)")
    print(f"Wind Speed: {recommended_stats['wind']:.2f} km/h")
    print(f"\nExpected Average ET with these conditions: {mean_values['avg_et']:.4f} mm")

else:
    print(f"\nFAILURE: Could not find any weather combinations with ET < 1.5mm.")
    print("Model indicates < 1.5mm is extremely difficult/impossible within these search parameters.")


In [10]:
%pyspark
# Predict using Linear Regression model on the SAME synthetic data
lr_predictions = lr_model.transform(df_synthetic_scaled)

# Filter for Target and Calculate Means
lr_successful = lr_predictions.filter(col("prediction") < 1.5)
lr_success_count = lr_successful.count()

print(f"Found {lr_success_count} days with ET < 1.5mm out of {NUM_SAMPLES} samples.")

if lr_success_count > 0:
    lr_mean = lr_successful.agg(
        avg("precipitation_hours").alias("avg_precip"),
        avg("sunshine_duration").alias("avg_sun"),
        avg("wind_speed").alias("avg_wind"),
        avg("prediction").alias("avg_et")
    ).collect()[0]
    
    print(f"\n*** Linear Regression - Recommended Mean Values for May 2026 ***")
    print(f"Precipitation Hours: {lr_mean['avg_precip']:.2f} hours")
    print(f"Sunshine Duration: {lr_mean['avg_sun']:.2f} seconds ({lr_mean['avg_sun']/3600:.2f} hours)")
    print(f"Wind Speed: {lr_mean['avg_wind']:.2f} km/h")
    print(f"Expected Average ET: {lr_mean['avg_et']:.4f} mm")
else:
    print("Linear Regression could not find any scenarios with ET < 1.5mm.")
    lr_mean = None


In [11]:
%pyspark
# Predict using Random Forest model on the SAME synthetic data
rf_predictions = rf_model.transform(df_synthetic_scaled)

# Filter for Target and Calculate Means
rf_successful = rf_predictions.filter(col("prediction") < 1.5)
rf_success_count = rf_successful.count()

print(f"Found {rf_success_count} days with ET < 1.5mm out of {NUM_SAMPLES} samples.")

if rf_success_count > 0:
    rf_mean = rf_successful.agg(
        avg("precipitation_hours").alias("avg_precip"),
        avg("sunshine_duration").alias("avg_sun"),
        avg("wind_speed").alias("avg_wind"),
        avg("prediction").alias("avg_et")
    ).collect()[0]
    
    print(f"\n*** Random Forest - Recommended Mean Values for May 2026 ***")
    print(f"Precipitation Hours: {rf_mean['avg_precip']:.2f} hours")
    print(f"Sunshine Duration: {rf_mean['avg_sun']:.2f} seconds ({rf_mean['avg_sun']/3600:.2f} hours)")
    print(f"Wind Speed: {rf_mean['avg_wind']:.2f} km/h")
    print(f"Expected Average ET: {rf_mean['avg_et']:.4f} mm")
else:
    print("Random Forest could not find any scenarios with ET < 1.5mm.")
    rf_mean = None
