In [65]:
pip install xgboost



In [66]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [67]:
cd /content/drive/MyDrive/AllFolders/Master/'Data Analytics & Business Intelligence'/Project

/content/drive/.shortcut-targets-by-id/132D0hj6T_LSOLQAErSF1GqYKBI7TOocF/AllFolders/Master/Data Analytics & Business Intelligence/Project


In [68]:
from xgboost.spark import SparkXGBRegressor
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
spark = SparkSession.builder.appName("XGBoost Model").getOrCreate()


In [69]:
df = spark.read.csv("FINAL_USO.csv", header=True,inferSchema=True)

In [70]:
df.columns

['Date',
 'Open',
 'High',
 'Low',
 'Close',
 'Adj Close',
 'Volume',
 'SP_open',
 'SP_high',
 'SP_low',
 'SP_close',
 'SP_Ajclose',
 'SP_volume',
 'DJ_open',
 'DJ_high',
 'DJ_low',
 'DJ_close',
 'DJ_Ajclose',
 'DJ_volume',
 'EG_open',
 'EG_high',
 'EG_low',
 'EG_close',
 'EG_Ajclose',
 'EG_volume',
 'EU_Price',
 'EU_open',
 'EU_high',
 'EU_low',
 'EU_Trend',
 'OF_Price',
 'OF_Open',
 'OF_High',
 'OF_Low',
 'OF_Volume',
 'OF_Trend',
 'OS_Price',
 'OS_Open',
 'OS_High',
 'OS_Low',
 'OS_Trend',
 'SF_Price',
 'SF_Open',
 'SF_High',
 'SF_Low',
 'SF_Volume',
 'SF_Trend',
 'USB_Price',
 'USB_Open',
 'USB_High',
 'USB_Low',
 'USB_Trend',
 'PLT_Price',
 'PLT_Open',
 'PLT_High',
 'PLT_Low',
 'PLT_Trend',
 'PLD_Price',
 'PLD_Open',
 'PLD_High',
 'PLD_Low',
 'PLD_Trend',
 'RHO_PRICE',
 'USDI_Price',
 'USDI_Open',
 'USDI_High',
 'USDI_Low',
 'USDI_Volume',
 'USDI_Trend',
 'GDX_Open',
 'GDX_High',
 'GDX_Low',
 'GDX_Close',
 'GDX_Adj Close',
 'GDX_Volume',
 'USO_Open',
 'USO_High',
 'USO_Low',
 'USO

In [71]:
inputCols = list(set(df.columns) - set(["Close", "Adj Close", "Date"]))
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")
df2 = assembler.transform(df)

In [72]:
df2.show()

+----------+----------+----------+------------------+------------------+------------------+--------+----------+----------+------------------+------------------+------------------+---------+-----------+-----------+-----------+-----------+-----------+---------+---------+---------+---------+---------+-----------------+---------+--------+------------------+-------+------+--------+--------+-------+-------+------+---------+--------+--------+-------+-------+------+--------+--------+-------+-------+------+---------+--------+---------+--------+--------+-------+---------+---------+--------+--------+-------+---------+---------+--------+--------+-------+---------+---------+-----------------+---------+---------+--------+-----------+----------+---------+---------+---------+---------+-------------+----------+------------------+------------------+------------------+------------------+------------------+----------+--------------------+
|      Date|      Open|      High|               Low|             C

In [73]:
xgb_regressor = SparkXGBRegressor(objective='reg:squarederror',
                                  eval_metric='rmse',
                                  max_depth=6,
                                  eta=0.1,
                                  num_round=1000,
                                  features_col="features",
                                  label_col="Close")


In [74]:
train_data, test_data = df2.randomSplit([0.7, 0.3], seed=42)

In [75]:

# Train model
xgb_model = xgb_regressor.fit(train_data)

# Predict
predictions = xgb_model.transform(test_data)
predictions.show()


INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'eval_metric': 'rmse', 'max_depth': 6, 'eta': 0.1, 'num_round': 1000, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


+----------+------------------+------------------+----------+----------+----------+--------+------------------+----------+------------------+------------------+----------+---------+-----------+-----------+-----------+-----------+-----------+---------+---------+---------+---------+---------+-----------------+---------+------------------+------------------+-------+------+--------+--------+-------+-------+------+---------+--------+--------+-------+-------+------+--------+--------+-------+-------+------+---------+--------+------------------+------------------+------------------+------------------+---------+---------+--------+--------+-------+---------+---------+--------+--------+-------+---------+---------+-----------------+---------+---------+--------+-----------+----------+---------+---------+---------+---------+-----------------+----------+------------------+------------------+------------------+------------------+------------------+----------+--------------------+------------------+
| 

In [76]:
evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
rmse

0.6198773925974421

In [77]:

# gbt = GBTRegressor(featuresCol="features", labelCol="Close") # Split data into training and testing sets
# train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Assemble all the steps (indexing, assembling, and model building) into apipeline.
# pipeline = Pipeline(stages=[assembler, gbt])


xgb_model = SparkXGBRegressor(
    objective='reg:squarederror',
    eval_metric='rmse',
    seed=42,
    label_col="Close"
)


# Define a parameter grid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
              .addGrid(xgb_model.n_estimators, [500, 1000, 1200])
              .addGrid(xgb_model.max_depth, [3, 5, 7])
              .addGrid(xgb_model.learning_rate, [0.05, 0.01, 0.008])
              .addGrid(xgb_model.subsample, [0.6, 0.8])
              # .addGrid(xgb_model.num_boost_round, [10, 100, 500, 1000])
              .build())

# Create an evaluator for model evaluation
evaluator = RegressionEvaluator(labelCol="Close", predictionCol="prediction", metricName="rmse")

# Create a CrossValidator object
crossval = CrossValidator(estimator=xgb_model,
                         estimatorParamMaps=paramGrid,
                         evaluator=evaluator,
                         numFolds=5,
                          parallelism=2)



# Train the model with the best hyperparameters
cv_model = crossval.fit(train_data)
# Apply on test set
predictions = cv_model.transform(test_data)




INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 1 workers with
	booster params: {'device': 'cpu', 'eval_metric': 'rmse', 'learning_rate': 0.05, 'max_depth': 3, 'objective': 'reg:squarederror', 'subsample': 0.6, 'seed': 42, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 500}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 1 workers with
	booster params: {'device': 'cpu', 'eval_metric': 'rmse', 'learning_rate': 0.05, 'max_depth': 3, 'objective': 'reg:squarederror', 'subsample': 0.8, 'seed': 42, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 500}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 1 workers with
	booster params: {'device': 'cpu', 'eval_metric': 'rmse', 'learning_rate': 0.01, 'max_depth': 3, 'objective': 'reg:squarederror',

In [78]:
rmse = evaluator.evaluate(predictions)


In [79]:
rmse

0.5713033798865612

In [80]:
best_model = cv_model.bestModel
bestparams=best_model.extractParamMap()
for param, value in bestparams.items():

    print(f"{param.name}:{value}")

enable_sparse_data_optim:False
featuresCol:features
features_cols:[]
labelCol:Close
predictionCol:prediction
arbitrary_params_dict:{'seed': 42}
base_score:None
booster:None
callbacks:None
colsample_bylevel:None
colsample_bynode:None
colsample_bytree:None
device:cpu
early_stopping_rounds:None
eval_metric:rmse
feature_names:None
feature_types:None
feature_weights:None
force_repartition:False
gamma:None
grow_policy:None
importance_type:None
interaction_constraints:None
iteration_range:None
learning_rate:0.008
max_bin:None
max_cat_threshold:None
max_cat_to_onehot:None
max_delta_step:None
max_depth:5
max_leaves:None
min_child_weight:None
missing:nan
monotone_constraints:None
multi_strategy:None
n_estimators:1200
num_parallel_tree:None
num_workers:1
objective:reg:squarederror
random_state:None
reg_alpha:None
reg_lambda:None
repartition_random_shuffle:False
sampling_method:None
scale_pos_weight:None
subsample:0.6
tree_method:None
use_gpu:False
validate_parameters:None
verbose:True
verbosity:N

In [81]:
evaluator_r2 = RegressionEvaluator(
    labelCol="Close",          # Actual values column
    predictionCol="prediction", # Predicted values column
    metricName="r2"             # Metric for R-squared
)

# Calculate R^2
r2 = evaluator_r2.evaluate(predictions)
print(f"R^2: {r2}")


R^2: 0.9988910568903872
