In [None]:
import time

t1 = time.time()

# Project Topic: Application of Blending Ensemble Learning in predicting positive moves (uptrend), a case-study on Nvidia Corporation stock price.
### - Caleb Fowowe

#### Import Libraries

In [None]:
#Import the internal modules written for the purpose of this project
from src.utils_data_processing import (LoadData, cwts, getpath, rnd_state)
from src.utils_features_engineering import (FeaturesCreation, FeaturesTransformation, FeaturesSelection)
from src.utils_model_and_tuning import (Blending, HpTuning, SimpleBacktest, Btest)

#Import external modules for the basemodels and blender (metamodel)
from xgboost import XGBClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression

#data manipulation modules
import numpy as np
from datetime import datetime

# Creates a folder for saving of code graphics and trading strategy report.
output_path = getpath()

### Load Data, EDA, Fix Null Data, and Plots

##### Load data

In [None]:
#File names of the used data, provided as dictionary values.
data_files = {'files': ['NVDA', 'VVIX_History', 'USCPI', 'USGDP', 'FedFundRate', '2yrTreasury', '10yrTreasury']} 

time_period = ['2008', '2024'] #specifies a period range, in the case provided data is goes back than required.
company_name = data_files['files'][0] #Extract the company name or ticker into the company_name variable

ldata = LoadData(*time_period, **data_files) #instantiate the class

#Call the function to merger all the data into a single dataframe and returns the dataframe into variable 'df'
df = ldata.joinData()

#### Exploratory Data Analysis (EDA) of Original dataset

##### Cleaning and Imputation

In [None]:
#Preview the original datasets of combined dataframe
df.describe()

In [None]:
#Check for missing points and values within the combined dataset
ldata.checkNullData(df)#within the ldata object, call the checkNullData method, with the df as input parameter

In [None]:
#Intial step of fixing missing data, backfill the quarterly, and monthly macrodata,to 
df = ldata.fixNullData(df, method='bfill')

In [None]:
#Post-backfilling check for null data after fixing null data
ldata.checkNullData(df)

In [None]:
#fix future data that are not available yet, can drop rows to choose, knn_impute method was used here.
df = ldata.fixNullData(df, method='knnimpute')

In [None]:
df.describe()

In [None]:
# preview the latest five (5) values of the cleaned up data
df.tail()

In [None]:
# Plot candlestick of the historical stock data
# stock_split = {'event_dates': ['2024-06-10', '2021-07-20', '2007-09-11', '2006-04-07', '2001-09-12', '2000-06-27'], 'event_title': 'stock-split'}
stock_split = {'event_dates': ['2024-06-10', '2021-07-20'], 'event_title': 'stock-split'}
ldata.plotCandleStick(df, events=stock_split)

In [None]:
# plot individual (ohlcv)
ldata.plotPrices(df)

## Feature Engineering

##### FeaturesEngineering Class with the entire FeaturesCreation, FeaturesTransformation, and FeaturesSelections sub-classes

#### Feature Creation/Extraction

##### Define the parameters to be used in the target variable (y)/ Label

In [None]:
# The target variable is a trend and volatility play which creates a signal based on two conditions:
# 1. when the return over a relative short period of time (5days i.e. mean of 5-day rolling return), 
# crosses over the return trend over a relative medium period (10days), with the difference been equal or 
# above a specified hurdle rate, (hurdle) the condition is deemed fulfilled. 
# 2. The other condition is a volatility play where the standard deviation of the short period return is less 
# or equal to the upper boundary standard deviation of the medium period return. The upper boundary of the medium
# period returns standard deviation is characterized as 2-standard deviations from the mean medium_period return. 

# Target parameters
short_prd= 5 
medium_prd = 10
upper_std=2 
lower_std = 1 
hurdle = 0.005

##### Generate Features - all features

In [None]:
# Instantiate the features creation subclass with the dataframe containing the cleaned data, alongside, the input
#parameters for the target variable as input during class instantiation.

#Instantiate the FeaturesCreation subclass providing the dataframe and the target parameters as inputs
feat_df = FeaturesCreation(df, short_prd, medium_prd, upper_std, lower_std, hurdle) 

# Call the create_all_features method, setting the fundamental_features and macro_features parameters are true, if provided.
# or false, otherwise. This should be linked to a variable in which an updated dataframe with all created features stored 
new_ft = feat_df.create_all_features(fundamental_features=True, macro_features=True) 
#1. Company fundamentals-related Features (Required column labels: 'PriceToEarnings', 'PriceToCash', 'PriceToBook', 'DividendYield')
#2. Macroeconomic related features (Required column labels: 'CPI', 'GDP', '2yrTreasury', '10yrTreasury')
#3. Technical Indicator features (based on pandas ta-library) (Required column label 'Open', 'High', 'Low', 'Close', 'Volume'
# this can also be in lower case format)

# The ohlcv columns are dropped after using them in the generation of the technical indicators. 
# Below is a preview of the first five row of the 339features including both, macroeconomic, fundamental and technical indicators.
new_ft.head()

In [None]:
new_ft.shape

#### Feature Transformation & Selection

##### Transform day of the week feature

In [None]:
# Prior to starting to features selection process, the days features which consist of trading days (Monday - Friday), is transformed, to two features.
feat_transform = FeaturesTransformation(new_ft) #Instantiate the FeaturesTransformation subclass with generated features as input
new_ft2 = feat_transform.transformDaysColumn() # Invoke the transformDaysColumn method, to transform the day of week column and store in new_ft2 variable

###### To optimize the dataframe performance, the all features outside the target column are convereted to 'float64', with the target variable column converted to 'int16' datatype

In [None]:
new_ft2 = new_ft2.astype('float64')
new_ft2['predict'] = new_ft2['predict'].values.astype('int16')

In [None]:
new_ft2

#### Feature Selection

###### Feature selection - Wrapper Method: Boruta and Recursive Forward Elimination (RFE)

In [None]:
#instantiate the FeaturesSelection subclass, providing dataframe from above, with the days column transformed 
# as required input parameters, and the testsize as an optional input parameter as well. Default testsize is 0.20.
feat_select = FeaturesSelection(new_ft2, testsize = 0.20) 
feat1 = feat_select.wrapper_boruta(max_iter=150) # Call the wrapper_boruta method within the FeaturesSelection subclass.

##### Feature selection - Filtering Method: Addressing Multicollinearity among features

###### using the same Feature Selection class, specify the correlation coefficient Threshold of choice. (The projected tested correlation in the 0.60 - 0.90) ranges.

In [None]:
# Call the filter_correlation method, within the class providing it with the desired correlation threshold.
# The multicollinearity steps follows the Boruta and RFE intersection steps. Hence, there is no need to specify the dataframe, the code has designed such that it already takes as input the dataframe which contains the features output of Boruta and RFE intersection. 
# However, for testing purposes, there's an optionality to provide the function with both correlation coefficient and dataframe, and it will filter for multicollinearitu among features.
filtered_features = feat_select.filter_multicollinearity(corr_coeff=0.90)  

##### applying K-Means clustering to Feature selection

In [None]:
data2 = new_ft2[filtered_features]
data2['predict'] = new_ft2['predict'].values.astype('int')

kmeans_features = feat_select.kmeans_selector(data2, cluster_size=len(filtered_features), upper_threshold=0.065, lower_threshold=0.03) #upper threshold should at least be above the lower_threshold

##### Finally selected Features set - KMeans

In [None]:
data3 = new_ft2[kmeans_features]
data3['predict'] = new_ft2['predict'].values.astype('int')

data3.head()

In [None]:
data3.shape

### Ensemble Model - Blending Ensemble

##### Initial parameterization of basemodels and metamodel

In [None]:
cls_weight = cwts(data3) #generate class weight to treat class imbalance

# Logistic regression algorithm
lr_params = {'random_state': rnd_state(), 'class_weight': cls_weight}
lr = LogisticRegression(**lr_params)

# Decision Tree algorithm
dt_params = {'class_weight': cls_weight, 'random_state': rnd_state()}
dt = DecisionTreeClassifier(**dt_params)

# K-nearest Neighbour algorithm
knn_params = {'algorithm': 'auto', 'n_jobs': -1}
knn = KNeighborsClassifier(**knn_params)

# Gaussian Naive Bayes algorithm 
bayes_params = {}
bayes = GaussianNB()
bayes.set_params(**bayes_params)

# Support Vector Machine (SVM): Support Vector Classifier (SVC)
svc_params = {'class_weight': cls_weight,'random_state': rnd_state(), 'probability': True}
svc = SVC(**svc_params)

# Combining all the algorithms into basemodels 
basemodels = {'lr': lr, 'dte': dt, 'knn': knn, 'bayes': bayes, 'svc': svc}

# Extreme Gradient Boost algorithm
xgb_params = {'n_jobs': -1, 'class_weight': cls_weight, 'random_state': rnd_state(), 'verbose': 1}
xgb = XGBClassifier(**xgb_params)

# Extreme gradient boosting stated as metamodel or blender.
blender = xgb

##### Initial run of the blending model

In [None]:
#Separate final X and y - Features and target
X_final = data3.iloc[:,:-1].values
y_final = data3.iloc[:,-1].values

Blnd = Blending(X_final, y_final, basemodels, blender, valsize=0.20)
acc, f1score, ypred, yprob, yfull = Blnd.runBlendingEnsemble()

print(f"Accuracy Score: {acc: .1%}, f1score: {f1score:.1%}")

#### Hyperparameter Tuning

In [None]:
#Instantiate tuning
tune_model = HpTuning(X_final, y_final, n_trials=40)
tuned_lr, tuned_dt, tuned_svc, tuned_knn, tuned_bayes, tuned_xgb = tune_model.optimize_lr(), tune_model.optimize_dt(), tune_model.optimize_svc(), tune_model.optimize_knn(), tune_model.optimize_bayes(), tune_model.optimize_xgb()

print("optimal_lr:", tuned_lr.values, "\t","optimal_dt:", tuned_dt.values, "\t", "optimal_svc:", tuned_svc.values, "\t", "optimal_knn:", tuned_knn.values, "\t", "optimal_bayes:", tuned_bayes.values, "\t", "optimal_xgb:", tuned_xgb.values)

##### Preview hyperparameters

In [None]:
hp_list = [tuned_lr.params, tuned_dt.params, tuned_svc.params, tuned_knn.params, tuned_bayes.params, tuned_xgb.params]
hp_names = ['tuned_lr.params', 'tuned_dt.params', 'tuned_svc.params', 'tuned_knn.params', 'tuned_bayes.params', 'tuned_xgb.params']

In [None]:
tune_model.hp_preview(hp_list, hp_names)

#### Run Ensemble Model with tuned parameters

##### Update the initial parameters dictionary with the hyperparameter tuning parameter values

In [None]:
lr_params.update(tuned_lr.params)
lr = LogisticRegression(**lr_params)

dt_params.update(tuned_dt.params)
dt = DecisionTreeClassifier(**dt_params)

knn_params.update(tuned_knn.params)
knn = KNeighborsClassifier(**knn_params)

bayes = GaussianNB()
bayes.set_params(**bayes_params)

svc_params.update(tuned_svc.params)
svc = SVC(**svc_params)

basemodel_upd = {'lre': lr, 'dte': dt, 'knn': knn, 'bayes': bayes, 'svc': svc}

xgb_params.update(tuned_xgb.params)
xgb = XGBClassifier(**xgb_params)

blender_upd = xgb

##### TunedModels Output

In [None]:
Blnd = Blending(X_final, y_final, basemodel_upd, blender_upd, valsize=0.20)
acc_tuned, f1score_tuned, ypred_tuned, yprob_tuned, yfull_tuned = Blnd.runBlendingEnsemble()

print(f"Accuracy Score: {acc_tuned: .1%}, f1score: {f1score_tuned:.1%}")

### Backtest/Strategy Evaluation

#### Approach 1: Using the simple backtest class - Out of sample test

In [None]:
return_period = 1

In [None]:
btd = SimpleBacktest(df)
btdd = btd.approach1(ypred, return_period)

In [None]:
sharpe2 = btd.sharpe_ratios(btdd)

In [None]:
btd.html_report(company_name=company_name)

#### Approach 2: Using the popular Backtesting Library - Out of Sample test

In [None]:
bto_lib = Btest(df, ypred)
bto_lib.runStrategy()

In [None]:
btostats = bto_lib.runstats()
print(btostats)

In [None]:
bto_lib.plotstats()

In [None]:
t2 = time.time()

In [None]:
print(f"Time-taken to run entire script is: {t2-t1:.2f} seconds")