In [2]:
import time
import os
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import GBTRegressor

In [10]:
def get_spark_session(worker_cores=1):
    """
    - Initialize a new Spark session with a dynamic number of worker threads.
    - Before creating a new session, stop the old one (if it exists).
    - Return the SparkSession object.
    """
    global spark

    if 'spark' in globals():
        spark.stop()

    spark = (SparkSession.builder
             .appName("spark_model_training")
             .master(f"local[{worker_cores}]") 
             .getOrCreate())
    
    return spark

def load_data(spark, file_path):
    """
    Load data from a Parquet file using Spark.
    """
    df = spark.read.parquet(file_path)
    return df

def preprocess_data(df, feature_columns):
    """
    Preprocess the data by assembling features into a single column.
    """
    assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
    df = assembler.transform(df)
    return df



def train_gbt_with_cv(processed_df):
    """
    Train a Gradient-Boosted Tree Regressor using Spark's GBTRegressor.
    """
    
    # sample 50% of the data as the kernel was getting killed due to memory issues
    sampled_df = processed_df.sample(fraction=0.5, seed=42)

    # Using  GBTRegressor model
    gbt = GBTRegressor(featuresCol='features',
                       labelCol='Impact')
    
    # Using MAE instead of MAPE as MAPE was not available in RegressionEvaluator.
    evaluator = RegressionEvaluator(labelCol='Impact', predictionCol='prediction', metricName='mae')

    # Defining single set of parameters for the model as parameter tuning was not causing memory issues.
    paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [6])  # Maximum depth of each tree
             .addGrid(gbt.maxIter, [500])  # Number of boosting iterations
             .addGrid(gbt.stepSize, [0.1])  # Learning rate (step size)
             .addGrid(gbt.subsamplingRate, [0.7])  # Subsampling rate (for each tree)
             .addGrid(gbt.featureSubsetStrategy, [ 'onethird'])  # Subsample features
             .build())

    crossval = CrossValidator(estimator=gbt,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=3
                              )  


    start_time = time.time()

    cv_model = crossval.fit(sampled_df)

    # Calculate training time
    total_training_time = time.time() - start_time

   
    avg_mae_cv = np.mean(cv_model.avgMetrics)
    

    
    return avg_mae_cv, total_training_time

def run_experiment(worker_cores, file_path):
    """
    Run the experiment with the specified number of worker cores and file path.
    """
    spark = get_spark_session(worker_cores)
    print("worker cores: ", spark.sparkContext.defaultParallelism)
    df = load_data(spark, file_path).drop('main_author_encoded')
    df = df.filter(df.Impact != 0)
    df = df.drop('__index_level_0__')
    total_features = df.columns
    total_features.remove('Impact')
    
    preprocessed_df = preprocess_data(df, total_features)
    
    avg_mae_cv, total_training_time = train_gbt_with_cv(preprocessed_df)
    
    return avg_mae_cv, total_training_time



In [11]:


file_path = "data_preprocessed.parquet"



worker_configs = [1, 2, 4]
results = {}

# Run the experiment for each number of workers

for workers in worker_configs:
    avg_mape_cv, total_training_time = run_experiment(workers, file_path)
    results[workers] = (avg_mape_cv, total_training_time)
    print(f"Workers: {workers}, MAE: {avg_mape_cv}, Training Time: {total_training_time}")

print("Experiment Results:", results)

24/10/01 20:36:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


1


24/10/01 20:36:38 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/10/01 20:37:04 WARN DAGScheduler: Broadcasting large task binary with size 1000.3 KiB
24/10/01 20:37:05 WARN DAGScheduler: Broadcasting large task binary with size 1001.2 KiB
24/10/01 20:37:05 WARN DAGScheduler: Broadcasting large task binary with size 1002.6 KiB
24/10/01 20:37:05 WARN DAGScheduler: Broadcasting large task binary with size 1005.7 KiB
24/10/01 20:37:05 WARN DAGScheduler: Broadcasting large task binary with size 1011.8 KiB
24/10/01 20:37:05 WARN DAGScheduler: Broadcasting large task binary with size 1011.9 KiB
24/10/01 20:37:05 WARN DAGScheduler: Broadcasting large task binary with size 1012.5 KiB
24/10/01 20:37:05 WARN DAGScheduler: Broadcasting large task binary with size 1013.4 KiB
24/10/01 20:37:05 WARN DAGScheduler: Broadcasting large task binary with size 1014.8 KiB
24/10/01 2

Workers: 1, MAPE: 46.48159303382342, Training Time: 1263.7833709716797


24/10/01 20:57:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


2


24/10/01 20:58:08 WARN DAGScheduler: Broadcasting large task binary with size 1000.4 KiB
24/10/01 20:58:08 WARN DAGScheduler: Broadcasting large task binary with size 1001.4 KiB
24/10/01 20:58:08 WARN DAGScheduler: Broadcasting large task binary with size 1002.8 KiB
24/10/01 20:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1005.9 KiB
24/10/01 20:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1011.9 KiB
24/10/01 20:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1012.0 KiB
24/10/01 20:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1012.6 KiB
24/10/01 20:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1013.5 KiB
24/10/01 20:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1014.9 KiB
24/10/01 20:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1018.0 KiB
24/10/01 20:58:09 WARN DAGScheduler: Broadcasting large task binary with size 1024.1 KiB
24/10/01 20:58:09 WAR

Workers: 2, MAPE: 46.48159303382342, Training Time: 1434.486797094345


24/10/01 21:21:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


4


24/10/01 21:22:02 WARN DAGScheduler: Broadcasting large task binary with size 1005.3 KiB
24/10/01 21:22:02 WARN DAGScheduler: Broadcasting large task binary with size 1005.4 KiB
24/10/01 21:22:02 WARN DAGScheduler: Broadcasting large task binary with size 1006.0 KiB
24/10/01 21:22:02 WARN DAGScheduler: Broadcasting large task binary with size 1006.9 KiB
24/10/01 21:22:02 WARN DAGScheduler: Broadcasting large task binary with size 1008.3 KiB
24/10/01 21:22:03 WARN DAGScheduler: Broadcasting large task binary with size 1011.4 KiB
24/10/01 21:22:03 WARN DAGScheduler: Broadcasting large task binary with size 1017.6 KiB
24/10/01 21:22:03 WARN DAGScheduler: Broadcasting large task binary with size 1017.7 KiB
24/10/01 21:22:03 WARN DAGScheduler: Broadcasting large task binary with size 1018.3 KiB
24/10/01 21:22:03 WARN DAGScheduler: Broadcasting large task binary with size 1019.2 KiB
24/10/01 21:22:03 WARN DAGScheduler: Broadcasting large task binary with size 1020.6 KiB
24/10/01 21:22:03 WAR

Workers: 4, MAPE: 46.44832508020338, Training Time: 1650.076071023941
Experiment Results: {1: (46.48159303382342, 1263.7833709716797), 2: (46.48159303382342, 1434.486797094345), 4: (46.44832508020338, 1650.076071023941)}


24/10/01 21:49:08 WARN DAGScheduler: Broadcasting large task binary with size 6.1 MiB


In [12]:
print("Experiment Results:", results)

Experiment Results: {1: (46.48159303382342, 1263.7833709716797), 2: (46.48159303382342, 1434.486797094345), 4: (46.44832508020338, 1650.076071023941)}


24/10/02 05:26:58 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 974172 ms exceeds timeout 120000 ms
24/10/02 05:26:58 WARN SparkContext: Killing executors is not supported by current scheduler.
24/10/02 05:42:57 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$