In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql.functions import col, split
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql.types import FloatType
from pyspark.ml.evaluation import RegressionEvaluator


### Build Spark session and read data
- Change file path of csv folder accordingly

In [None]:
# spark = SparkSession.builder.appName("Project_2").getOrCreate()
# geo_params = spark.read.option("header", True).\
#                         csv('gs://dataproc-staging-us-central1-480803309829-llnlpada/notebooks/jupyter/tar_unzipped')

spark = SparkSession.builder.appName("Project_2")\
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.local.dir", "/home/wsw/spark-temp") \
    .config("spark.driver.extraJavaOptions","-Xss800M") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size","12g") \
    .getOrCreate()
    
geo_params = spark.read.option("header", True).\
                        csv('./2024') # Change the path to the folder containing CSV files

25/11/01 20:29:13 WARN Utils: Your hostname, wsw-Z790M-AORUS-ELITE-AX-ICE resolves to a loopback address: 127.0.1.1; using 192.168.0.87 instead (on interface wlo1)
25/11/01 20:29:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/01 20:29:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/01 20:29:13 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
                                                                                

### Data cleaning and pre-processing

In [3]:
#Filter rows with missing values
geo_params_filtered = geo_params.filter((col("LATITUDE") != "+99999") & 
              (col("LONGITUDE") != "+999999") & (col("ELEVATION") != "+9999") &
              (split(col("WND"), ",")[0] != "999") &(split(col("WND"), ",")[3] != "9999") & 
              (split(col("CIG"), ",")[0] != "99999") & (split(col("VIS"), ",")[0] != "999999") & 
              (split(col("TMP"), ",")[0] != "+9999") & (split(col("DEW"), ",")[0] != "+9999") & 
              (split(col("SLP"), ",")[0] != "99999")).\
              select(col("LATITUDE").cast("float"),
                     col("LONGITUDE").cast("float"),
                     col("ELEVATION").cast("float"),
                     split(col("WND"), ",").getItem(0).cast("float").alias("WIND_ANGLE"),
                     split(col("WND"), ",").getItem(3).cast("float").alias("WIND_SPEED"),
                     split(col("CIG"), ",").getItem(0).cast("float").alias("CEILING_HEIGHT"),
                     split(col("VIS"), ",").getItem(0).cast("float").alias("VISIBILITY_DIST"),
                     split(col("TMP"), ",").getItem(0).cast("float").alias("AIR_TEMPERATURE"),
                     split(col("DEW"), ",").getItem(0).cast("float").alias("DEW_POINT_TEMPERATURE"),
                     split(col("SLP"), ",").getItem(0).cast("float").alias("SEA_LEVEL_PRESSURE"))

#Combine feature columns into one and apply MinMax Scaling
va = VectorAssembler(inputCols = ["LATITUDE", "LONGITUDE", "ELEVATION", "WIND_ANGLE", "WIND_SPEED",\
                                 "CEILING_HEIGHT", "VISIBILITY_DIST", "AIR_TEMPERATURE", "DEW_POINT_TEMPERATURE"],\
                                 outputCol="features", handleInvalid="skip")
scaler = MinMaxScaler(inputCol = "features", outputCol = "scaled_features",)



### Machine learning model pipeline

In [4]:
gbt = GBTRegressor().setFeaturesCol("scaled_features").setLabelCol("SEA_LEVEL_PRESSURE")
ridge_reg = LinearRegression().setFeaturesCol("scaled_features").setLabelCol("SEA_LEVEL_PRESSURE")
rf = RandomForestRegressor().setFeaturesCol("scaled_features").setLabelCol("SEA_LEVEL_PRESSURE")


evaluator = RegressionEvaluator(labelCol = "SEA_LEVEL_PRESSURE")

gbt_params =(ParamGridBuilder().\
                       addGrid(gbt.stepSize, [0.05, 0.1, 0.2]).\
                       build())

ridge_params = (ParamGridBuilder()
                .addGrid(ridge_reg.regParam, [0.01, 0.1, 1.0])
                .build())

rf_params = (ParamGridBuilder()
                .addGrid(rf.numTrees, [20, 30, 40])
                .addGrid(rf.maxDepth, [5, 7, 9])
                .build())

gbt_pipeline = Pipeline(stages = [va, scaler, gbt])
ridge_pipeline = Pipeline(stages = [va, scaler, ridge_reg])
rf_pipeline = Pipeline(stages = [va, scaler, rf])

gbt_cv = CrossValidator().\
        setNumFolds(3).\
        setEstimatorParamMaps(gbt_params).\
        setEstimator(gbt_pipeline).\
        setEvaluator(evaluator)

ridge_cv = CrossValidator().\
        setNumFolds(3).\
        setEstimatorParamMaps(ridge_params).\
        setEstimator(ridge_pipeline).\
        setEvaluator(evaluator)

rf_cv = CrossValidator().\
        setNumFolds(3).\
        setEstimatorParamMaps(rf_params).\
        setEstimator(rf_pipeline).\
        setEvaluator(evaluator)

train, test = geo_params_filtered.randomSplit([.7, .3], seed = 4)
train.cache()

gbt_model = gbt_cv.fit(train)
ridge_model = ridge_cv.fit(train)
rf_model = rf_cv.fit(train)

gbt_train_rmse = evaluator.evaluate(gbt_model.transform(train))
gbt_test_rmse = evaluator.evaluate(gbt_model.transform(test))

ridge_train_rmse = evaluator.evaluate(ridge_model.transform(train))
ridge_test_rmse = evaluator.evaluate(ridge_model.transform(test))

rf_train_rmse = evaluator.evaluate(rf_model.transform(train))
rf_test_rmse = evaluator.evaluate(rf_model.transform(test))

print(f"----------------- Ridge Regression -----------------------")
print(f"Root Mean Squared Error of train dataset: {ridge_train_rmse}")
print(f"Root Mean Squared Error of test dataset: {ridge_test_rmse}")

print(f"------------ Gradient Boosted Tree ---------------------")
print(f"Root Mean Squared Error of train dataset: {gbt_train_rmse}")
print(f"Root Mean Squared Error of test dataset: {gbt_test_rmse}")

print(f"------------------ Random Forest --------------------------")
print(f"Root Mean Squared Error of train dataset: {rf_train_rmse}")
print(f"Root Mean Squared Error of test dataset: {rf_test_rmse}")


25/11/01 20:30:50 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_257, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_433, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_352, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_200, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_493, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_218, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_749, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_824, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove block rdd_654_307, which does not exist
25/11/01 20:31:10 WARN BlockManager: Asked to remove 

----------------- Ridge Regression -----------------------
Root Mean Squared Error of train dataset: 83.89886083044854
Root Mean Squared Error of test dataset: 83.85277357251191
------------ Gradient Boosted Tree ---------------------
Root Mean Squared Error of train dataset: 74.97530390285782
Root Mean Squared Error of test dataset: 74.92233374174717
------------------ Random Forest --------------------------
Root Mean Squared Error of train dataset: 76.09043239800525
Root Mean Squared Error of test dataset: 76.05855446825043


                                                                                

### Best hyperparameters for the model

In [5]:
ridge_bestModel = ridge_model.bestModel.stages[2]
gbt_bestModel = gbt_model.bestModel.stages[2]
rf_bestModel = rf_model.bestModel.stages[2]

print(f"Best hyperparameters for Ridge Regression")
print(f"Reg Param = {ridge_bestModel._java_obj.getRegParam()}")
print("-------------------------------------------------------")

print(f"Best hyperparameters for Gradient Boosted Tree")
print(f"Step size = {gbt_bestModel._java_obj.getStepSize()}")
print("-------------------------------------------------------")

print(f"Best hyperparameters for Random Forest")
print(f"Num Trees = {rf_bestModel._java_obj.getNumTrees()}")
print(f"Max Depth = {rf_bestModel._java_obj.getMaxDepth()}")
print("-------------------------------------------------------")


Best hyperparameters for Ridge Regression
Reg Param = 0.01
-------------------------------------------------------
Best hyperparameters for Gradient Boosted Tree
Step size = 0.2
-------------------------------------------------------
Best hyperparameters for Random Forest
Num Trees = 20
Max Depth = 9
-------------------------------------------------------
