In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Load Parquet Files") \
    .getOrCreate()

In [None]:
df = spark.read.parquet('/content/drive/MyDrive/bigdata_data/df_DE_solar_new.parquet')

In [None]:
df.show()

+--------------+------------------------------+-------------------------------+--------------------------+
|DE_temperature|DE_radiation_direct_horizontal|DE_radiation_diffuse_horizontal|DE_solar_generation_actual|
+--------------+------------------------------+-------------------------------+--------------------------+
|        -1.692|                0.432794925491|                  6.76800507451|                      71.0|
|        -1.046|                 8.87734881749|                  51.9463511825|                     773.0|
|         0.072|                 34.1582502313|                  97.0929497687|                    2117.0|
|         0.866|                 56.5458478345|                  120.290652165|                    3364.0|
|         1.493|                  62.869001592|                  126.211198408|                    4198.0|
|         1.818|                 54.2607041126|                  115.727495887|                    3500.0|
|         1.726|                 28.5

In [None]:
df.count()

500000

In [None]:
df1 = df

In [None]:
df1 = df1.filter(df1['DE_solar_generation_actual'] > 0)

In [None]:
df1.count()

500000

In [None]:
df1.show()

+--------------+------------------------------+-------------------------------+--------------------------+
|DE_temperature|DE_radiation_direct_horizontal|DE_radiation_diffuse_horizontal|DE_solar_generation_actual|
+--------------+------------------------------+-------------------------------+--------------------------+
|        -1.692|                0.432794925491|                  6.76800507451|                      71.0|
|        -1.046|                 8.87734881749|                  51.9463511825|                     773.0|
|         0.072|                 34.1582502313|                  97.0929497687|                    2117.0|
|         0.866|                 56.5458478345|                  120.290652165|                    3364.0|
|         1.493|                  62.869001592|                  126.211198408|                    4198.0|
|         1.818|                 54.2607041126|                  115.727495887|                    3500.0|
|         1.726|                 28.5

In [None]:
from pyspark.sql.functions import mean, stddev,col

In [None]:
mean_value = df.agg(mean(col('DE_solar_generation_actual'))).collect()[0][0]
stddev_value = df.agg(stddev(col('DE_solar_generation_actual'))).collect()[0][0]

In [None]:
print(mean_value)
print(stddev_value)

7604.360789692682
7204.896696490558


In [None]:
df1 = df1.withColumn('DE_solar_generation_actual', (col('DE_solar_generation_actual') - mean_value) / stddev_value)

In [None]:
df1.show()

+--------------+------------------------------+-------------------------------+--------------------------+
|DE_temperature|DE_radiation_direct_horizontal|DE_radiation_diffuse_horizontal|DE_solar_generation_actual|
+--------------+------------------------------+-------------------------------+--------------------------+
|        -1.692|                0.432794925491|                  6.76800507451|       -1.0455890080092498|
|        -1.046|                 8.87734881749|                  51.9463511825|       -0.9481552723747141|
|         0.072|                 34.1582502313|                  97.0929497687|       -0.7616154708179963|
|         0.866|                 56.5458478345|                  120.290652165|         -0.58853873529625|
|         1.493|                  62.869001592|                  126.211198408|      -0.47278412629453676|
|         1.818|                 54.2607041126|                  115.727495887|       -0.5696626839482488|
|         1.726|                 28.5

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
df1 = df1.select(df1.columns). \
    withColumn("DE_temperature", df1["DE_temperature"].cast("float")). \
    withColumn("DE_radiation_direct_horizontal", df1["DE_radiation_direct_horizontal"].cast("float")). \
    withColumn("DE_radiation_diffuse_horizontal", df1["DE_radiation_diffuse_horizontal"].cast("float")). \
    withColumn("DE_solar_generation_actual", df1["DE_solar_generation_actual"].cast("float"))

In [None]:
feature_columns = ["DE_temperature", "DE_radiation_direct_horizontal", "DE_radiation_diffuse_horizontal"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df1).select("features", "DE_solar_generation_actual")

In [None]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [None]:
gbt = GBTRegressor(featuresCol="features", labelCol="DE_solar_generation_actual")

In [None]:
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [50,60,70,80,90]) \
    .build()

In [None]:
evaluator = RegressionEvaluator(labelCol="DE_solar_generation_actual", predictionCol="prediction", metricName="rmse")

In [None]:
cv = CrossValidator(estimator=gbt,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5)

In [None]:
cv_model = cv.fit(train_data)

In [None]:
best_gbt_model = cv_model.bestModel

In [None]:
gbt_predictions = best_gbt_model.transform(test_data)

In [None]:
rmse = evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

Root Mean Squared Error (RMSE) on test data: 0.23657889037984967


In [None]:
best_maxIter = best_gbt_model.getMaxIter()
print("Best value for maxIter:", best_maxIter)

Best value for maxIter: 90


In [None]:
model_path = "/content/drive/MyDrive/final_best_gbt_model"
best_gbt_model.save(model_path)

In [None]:
from pyspark.ml.regression import RandomForestRegressor

In [None]:
rf = RandomForestRegressor(featuresCol="features", labelCol="DE_solar_generation_actual")

In [None]:
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 150]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

In [None]:
evaluator = RegressionEvaluator(labelCol="DE_solar_generation_actual", predictionCol="prediction", metricName="rmse")

In [None]:
cv = CrossValidator(estimator=rf,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5)

In [None]:
cv_model = cv.fit(train_data)

In [None]:
best_model = cv_model.bestModel

In [None]:
predictions = best_model.transform(test_data)

In [None]:
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)

Root Mean Squared Error (RMSE) on test data: 1845.5682630262443


In [None]:
best_numTrees = best_model.getNumTrees
best_maxDepth = best_model.getOrDefault("maxDepth")
print("Best value for numTrees:", best_numTrees)
print("Best value for maxDepth:", best_maxDepth)

Best value for numTrees: 150
Best value for maxDepth: 15


In [None]:
model_path = "/content/drive/MyDrive/best_gbt_model"
best_gbt_model.save(model_path)

In [None]:
gbt_predictions.summary().show()

In [None]:
scaled_actual = (gbt_predictions['DE_solar_generation_actual'] - 7593.21916686677)/7191.058701578969
scaled_pred = (gbt_predictions['prediction'] - 7598.0848829962115)/6987.516354318852

gbt_predictions = gbt_predictions.withColumn("scaled_actual", scaled_actual)
gbt_predictions = gbt_predictions.withColumn("scaled_pred", scaled_pred)

In [None]:
import pyspark.sql.functions as psf

def compute_RMSE(expected_col, actual_col):

  rmse = gbt_predictions.withColumn("squarederror",
                           psf.pow(psf.col(actual_col) - psf.col(expected_col),
                                   psf.lit(2)
                           ))\
  .agg(psf.avg(psf.col("squarederror")).alias("mse"))\
  .withColumn("rmse", psf.sqrt(psf.col("mse")))\

  return(rmse)

In [None]:
compute_RMSE("scaled_actual", "scaled_pred").show()

In [None]:
compute_RMSE("DE_solar_generation_actual", "prediction").show()