Importing Required Libraries

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

Initialize Spark Session

In [16]:
spark = SparkSession.builder.appName("XGBoost_PySpark").getOrCreate()

Load Dataset

In [17]:
df = spark.read.csv("Data/coin_ChainLink.csv", header=True, inferSchema=True)
df = df.orderBy("Date")

Feature Engineering

In [18]:
windowSpec = Window.orderBy("Date")
df = df.withColumn("RSI", (col("Close") - lag("Close", 14).over(windowSpec)) / col("Close") * 100) #This calculates the Relative Strength Index (RSI)
df = df.withColumn("MA7", avg("Close").over(Window.orderBy("Date").rowsBetween(-6, 0)))     #7-Day Moving Average (MA7)
df = df.withColumn("MA21", avg("Close").over(Window.orderBy("Date").rowsBetween(-20, 0)))  #21-Day Moving Average (MA21)
df = df.dropna()

Feature Selection

In [19]:
features = ["High", "Low", "Open", "Volume", "RSI", "MA7", "MA21"]
target = "Close"
assembler = VectorAssembler(inputCols=features, outputCol="features")
df = assembler.transform(df)

Normalize Features

In [20]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
df = scaler.fit(df).transform(df)


Train-Test Split

In [21]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

Define XGBoost Model

In [22]:
model = GBTRegressor(featuresCol="scaledFeatures", labelCol="Close", maxIter=50, maxDepth=5)


Train Model

In [23]:
xgb_model = model.fit(train_df)

Predictions

In [24]:
predictions = xgb_model.transform(test_df)

Evaluation


In [25]:
mse_evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="mse")
mae_evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="mae")
r2_evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="r2")

mse = mse_evaluator.evaluate(predictions)
mae = mae_evaluator.evaluate(predictions)
r2 = r2_evaluator.evaluate(predictions)

print(f"Mean Squared Error (MSE): {mse:.4f}")
print(f"Mean Absolute Error (MAE): {mae:.4f}")
print(f"R-Squared (R2 Score): {r2:.4f}")


Mean Squared Error (MSE): 0.5583
Mean Absolute Error (MAE): 0.3244
R-Squared (R2 Score): 0.9936


Stop Spark Session

In [26]:
spark.stop()