In [None]:
from sklearn.datasets import load_iris, load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import KFold, TimeSeriesSplit, ParameterGrid
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import accuracy_score
from xgboost import XGBClassifier, XGBRegressor
from xgboost import plot_tree
from sklearn.metrics import mean_squared_error, make_scorer, mean_absolute_error, r2_score

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [None]:
filepath = 'E:\C\ICT\Machine Learning and Data Mining\ML-DM-Stock-Price-Prediction\Dataset\VNM.csv'

df = pd.read_csv(filepath)
df.tail()

In [None]:
scaler = StandardScaler()
# scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(np.array(df['close']).reshape(-1,1))
# scaled_data = scaled_data.reshape(scaled_data.shape[0])
print(scaled_data.shape)

In [None]:
# Generate dataset
def data_generator(datasets, timestep):
    dataset_inp = []
    dataset_oup = []
    for day in range(timestep, len(datasets)):
        subsample = []
        for prev_day in range(day-timestep, day):
            subsample.append(datasets[prev_day])
        dataset_inp.append(subsample)
        dataset_oup.append(datasets[day])
    return (np.array(dataset_inp), np.array(dataset_oup).reshape(-1, 1))

In [None]:
train_size = 1500

data_for_train, data_for_test = scaled_data[: train_size], scaled_data[train_size: ]
print(data_for_train.shape)
print(data_for_test.shape)

In [None]:
timestep = 30 # number of previous day that today depend on
newX_train, newY_train = data_generator(data_for_train, timestep)
newX_test, newY_test = data_generator(data_for_test, timestep)
newX_train = newX_train.reshape(newX_train.shape[0], -1)
newX_test = newX_test.reshape(newX_test.shape[0], -1)
print(newX_train.shape, newX_test.shape)
print(newY_train.shape, newY_test.shape)
print(newX_train[-1], newX_test[0])
print(newY_train[-1], newY_test[0])

In [None]:
# xgboost = XGBRegressor()
# max_depth = [2, 10, 20, 50]
# n_estimators = [10, 50, 100, 200, 500]
# learning_rate = [0.0001, 0.001, 0.01, 0.1]
# param_grid = dict(max_depth = max_depth, n_estimators = n_estimators,
#                  learning_rate = learning_rate)

# tscv = TimeSeriesSplit(n_splits=10)
# kfold = KFold(n_splits=10)

# grid_search = GridSearchCV(xgboost, param_grid, n_jobs=-1, cv=tscv,
#                           scoring='r2', verbose=0,
#                            return_train_score=True)
# grid_result = grid_search.fit(newX_train, newY_train)
# print(f"Best {grid_result.best_score_} using {grid_result.best_params_}")

Some important parameters of the model:

– **learning_rate α**: The learning rate of the model, similar to the learning rate in the gradient descent algorithm.

– **max_depth**: The maximum depth of each tree in the model.

– **gamma**: The regularization term used for pruning a tree.

– **n_estimators**: The number of trees in the model.



In [None]:
# Define the model and the parameter grid
# xgboost = XGBRegressor()
# max_depth = [2, 10, 20, 50]
# n_estimators = [10, 50, 100, 200, 500]
# learning_rate = [0.0001, 0.001, 0.01, 0.1]
# param_grid = dict(max_depth=max_depth, n_estimators=n_estimators, learning_rate=learning_rate)
xgboost = XGBRegressor(objective="reg:squarederror", n_jobs=-1)
max_depth = [2, 10, 20, 50]
n_estimators = [10, 50, 100, 200, 500]
learning_rate = [0.0001, 0.001, 0.01, 0.1]
gamma = [0, 0.1, 1.0]

param_grid = dict(
    max_depth=max_depth, 
    n_estimators=n_estimators, 
    learning_rate=learning_rate,
    gamma=gamma
)
# Define the cross-validation strategy
tscv = TimeSeriesSplit(n_splits=10)

# Define the scoring metric
mse_scorer = make_scorer(mean_squared_error, greater_is_better=False)

# Custom function to print each fold and parameters being evaluated
def print_grid_search_details(param_grid, X_train, y_train):
    n_samples = X_train.shape[0]
    n_folds = tscv.get_n_splits()
    fold_size = n_samples // n_folds
    print(f"Size train set: {X_train.shape}")
    print(f"Size of each fold: {fold_size}\n")
    
    split_idx = 1
    for train_idx, valid_idx in tscv.split(X_train):
        print(f"Splitting the first {split_idx} chunks at {split_idx}/{n_folds}")
        split_idx += 1

    params_list = list(ParameterGrid(param_grid))
    best_params = None
    best_score = float('inf')
    for idx, params in enumerate(params_list, start=1):
        print(f"Evaluating parameters {idx}: {params}")
        
        for train_idx, valid_idx in tscv.split(X_train):
            X_tr, X_val = X_train[train_idx], X_train[valid_idx]
            y_tr, y_val = y_train[train_idx], y_train[valid_idx]
            model = xgboost.set_params(**params)
            model.fit(X_tr, y_tr)
            y_tr_pred = model.predict(X_tr)
            y_val_pred = model.predict(X_val)
            train_mse = mean_squared_error(y_tr, y_tr_pred)
            valid_mse = mean_squared_error(y_val, y_val_pred)
            train_r2 = r2_score(y_tr, y_tr_pred)
            valid_r2 = r2_score(y_val, y_val_pred)
            
        print(f"Parameters: {params}")
        print(f"Train Accuracy: {train_r2}")
        print(f"Valid Accuracy: {valid_r2}")
        print(f"Train MSE: {train_mse}")
        print(f"Valid MSE: {valid_mse}")
        print("-" * 64)
        
        if valid_mse < best_score:
            best_score = valid_mse
            best_params = params
    
    return best_params

# Perform the grid search with custom print function
best_params = print_grid_search_details(param_grid, newX_train, newY_train)

In [None]:

xgb = XGBRegressor(learning_rate=best_params['learning_rate'], 
                   max_depth=best_params['max_depth'],
                   n_estimators=best_params['n_estimators'], n_jobs=-1,
                  objective="reg:squarederror",
                  reg_lambda=0.1)
xgb.fit(newX_train, newY_train)

In [None]:
newy_pred = xgb.predict(newX_test).reshape(-1, 1)
newy_pred_rt = scaler.inverse_transform(newy_pred)
newY_test_rt = scaler.inverse_transform(newY_test)
print(newy_pred.shape)
plt.plot(newy_pred_rt, color='red', label='Test Predict')
plt.plot(newY_test_rt, color='blue', label='Test Real')
plt.legend()
plt.show()

In [None]:
# Evaluate train set
rmse_train = mean_squared_error(newY_train, xgb.predict(newX_train))
mae_train = mean_absolute_error(newY_train, xgb.predict(newX_train))
r2_train = r2_score(newY_train, xgb.predict(newX_train))
print("MAE train: ", mae_train)
print("RMSE train: ", rmse_train)
print("R2 train: ", r2_train)

In [None]:
# Evaluate test set
rmse_test = mean_squared_error(newY_test, newy_pred)
mae_test = mean_absolute_error(newY_test, newy_pred)
r2_test = r2_score(newY_test, newy_pred)
print("MAE test: ", mae_test)
print("RMSE test: ", rmse_test)
print("R2 test: ", r2_test)