##Importing

In [13]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV

import joblib

import yfinance as yf
import pandas as pd
import numpy as np

import datetime

try:
  from finta import TA

  from backtesting import Backtest, Strategy

except:
  !pip install -qq backtesting
  !pip install -qq finta

  from finta import TA

  from backtesting import Backtest, Strategy

##Data Fetching and Processing

In [31]:
class StockDataProcessor:
    def __init__(self, ticker, start, end, interval):
        self.ticker = ticker
        self.start = start
        self.end = end
        self.interval = interval
        self.features = ['EMA1', 'EMA2', 'ER', 'High', 'Low', 'Dividends', 'Stock Splits']
        self.target = ['Close']

    def download_data(self):
        """Downloads historical stock data."""
        ohlcv_df = yf.download(tickers=self.ticker, start=self.start,end=self.end, interval=self.interval, auto_adjust=True, actions=True)
        return ohlcv_df

    def process_data(self, ohlcv_df):
        """Processes the downloaded data by adding technical indicators and handling missing values."""
        ohlcv_df.columns = ohlcv_df.columns.droplevel(level=1)
        ohlcv_df.columns.name = None

        ohlcv_df['EMA1'] = TA.EMA(ohlcv_df, period=10)
        ohlcv_df['EMA2'] = TA.EMA(ohlcv_df, period=20)
        ohlcv_df['ER'] = TA.ER(ohlcv_df)

        ohlcv_df.dropna(inplace=True)
        return ohlcv_df, ohlcv_df[self.features], ohlcv_df[self.target]


In [32]:
"""Data Processing"""

# Define parameters for data downloading
offset = 1            #Latest years to not download data
ticker = 'KO'         #Ticker in y finance
period = 4            #In years
interval = '1wk'      #Candle

end_date = datetime.datetime.now() - datetime.timedelta(days=365 * offset)
start_date = end_date - datetime.timedelta(days=365 * period)

# Instantiate the StockDataProcessor
data_processor = StockDataProcessor(ticker=ticker, start=start_date.strftime('%Y-%m-%d'), end=end_date.strftime('%Y-%m-%d'),interval=interval)

# Download the data
ohlcv_df = data_processor.download_data()

# Process the downloaded data
ohlcv_df, X_processed, y_processed = data_processor.process_data(ohlcv_df)

[*********************100%***********************]  1 of 1 completed


The final dataframe's format is as follows:


In [33]:
display(X_processed.head())
display(y_processed.head())

Unnamed: 0_level_0,EMA1,EMA2,ER,High,Low,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2020-10-12,43.061296,42.897894,0.252421,44.637511,42.705789,0.0,0.0
2020-10-19,43.201418,43.015693,0.241077,44.351661,42.844396,0.0,0.0
2020-10-26,42.893321,42.834571,0.100841,43.753942,40.973302,0.0,0.0
2020-11-02,42.882177,42.834717,0.001941,43.494067,41.78757,0.0,0.0
2020-11-09,43.535952,43.259468,0.214276,47.037014,45.105292,0.0,0.0


Unnamed: 0_level_0,Close
Date,Unnamed: 1_level_1
2020-10-12,43.338146
2020-10-19,43.762615
2020-10-26,41.631649
2020-11-02,42.835724
2020-11-09,46.300709


##Model Training

In [34]:
class ModelTraining:
    def __init__(self):
        self.rfr = RandomForestRegressor()

    def train_model(self, X_processed, y_processed):

        """Trains a RandomForestRegressor model using GridSearchCV."""
        param_grid = {
            'n_estimators': [100, 200, 300],
            'max_features': ['sqrt', 'log2'],
            'max_depth': [10, 20, None],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4],
            'bootstrap': [True, False]
        }


        grid_search = GridSearchCV(estimator=self.rfr, param_grid=param_grid, cv=3, n_jobs=-1, verbose=2, scoring='neg_mean_squared_error')
        grid_search.fit(X_processed, y_processed.values.ravel()) # Flatten y_train to a 1D array

        return grid_search.best_estimator_

In [35]:
"""Train Model"""
"""
# Intializing Class Model training
model_trainer = ModelTraining()

# Run this to train the model once and save it as random_forest_model.pkl
# Train the model using the instance of ModelTraining
best_est = model_trainer.train_model(X_processed,y_processed)

#Saving the Random Forest Model so as to not run again and again
joblib.dump(best_est, 'random_forest_model.pkl')
"""

Fitting 3 folds for each of 324 candidates, totalling 972 fits


['random_forest_model.pkl']

##Model Deployment For BackTesting

###Data Preperation

In [36]:
"""Data Processing"""

# Define parameters for data downloading
offset = 0            #Latest years to not download data
ticker = 'KO'         #Ticker in y finance
period = 1            #In years
interval = '1wk'      #Candle

end_date = datetime.datetime.now() - datetime.timedelta(days=365 * offset)
start_date = end_date - datetime.timedelta(days=365 * period)

# Instantiate the StockDataProcessor
data_processor = StockDataProcessor(ticker=ticker, start=start_date.strftime('%Y-%m-%d'), end=end_date.strftime('%Y-%m-%d'),interval=interval)

# Download the data
ohlcv_df = data_processor.download_data()

# Process the downloaded data
ohlcv_df, X_processed, y_processed = data_processor.process_data(ohlcv_df)

[*********************100%***********************]  1 of 1 completed


###Model Deployment

In [37]:
class StockPredictor:
    """A class for making stock price predictions using a trained model."""
    def __init__(self, model):
        """
        Initializes the StockPredictor with a trained model.

        Args:
            model: The trained machine learning model.
        """
        self.model = model

    def predict(self, X_new):
        """
        Makes predictions on new data using the stored model.

        Args:
            X_new: DataFrame containing the features for prediction.

        Returns:
            An array of predictions.
        """
        predictions = self.model.predict(X_new)
        return predictions

In [38]:
#Get the model which is saved as random_forest_model.pkl
joblib_file = "random_forest_model.pkl"                 #Your file path here
best_est = joblib.load(joblib_file)

# Instantiate the StockPredictor
stock_predictor = StockPredictor(best_est)

# Make predictions on the prepared data
predictions = stock_predictor.predict(X_processed)

display(predictions[-5:]) # Display the last 5 predictions

array([65.12159431, 65.12159431, 65.60126257, 65.60126257, 65.60126257])

##BackTesting

In [39]:
class MyStrategy(Strategy):
  def init(self):
    self.prediction = predictions

  def next(self):
    pred_value=self.prediction[len(self.data)-1]        #This gets the predicted value for the current Closing Price
    close_price=self.data.Close[-1]
    if pred_value > close_price:
        self.buy()
    elif pred_value < close_price:
        self.sell()

In [40]:
# Pass the predictions as data to the Backtest object
bt=Backtest(ohlcv_df.assign(predictions=predictions),MyStrategy,cash=10000,exclusive_orders=True,finalize_trades=True)
stats=bt.run()

display(stats)

Backtest.run:   0%|          | 0/42 [00:00<?, ?bar/s]

Unnamed: 0,0
Start,2024-10-14 00:00:00
End,2025-08-04 00:00:00
Duration,294 days 00:00:00
Exposure Time [%],95.348837
Equity Final [$],10490.935246
Equity Peak [$],10958.729358
Return [%],4.909352
Buy & Hold Return [%],0.075887
Return (Ann.) [%],5.967007
Volatility (Ann.) [%],16.891158


In [41]:
bt.plot()