### Import Package

In [None]:
!pip install stockstats

Collecting stockstats
  Downloading stockstats-0.4.1-py2.py3-none-any.whl (19 kB)
Installing collected packages: stockstats
Successfully installed stockstats-0.4.1


In [None]:
import pandas as pd
import numpy as np
import datetime as dt
import matplotlib.pyplot as plt
from stockstats import StockDataFrame

from sklearn.model_selection import train_test_split
from sklearn import linear_model
from sklearn.svm import SVR

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, models

from math import floor

import pandas_datareader.data as web
import pickle
import time

In [None]:
from google.colab import drive
drive.mount('/content/drive')

model = "CNN_LSTM"

volatility_folder = "/content/drive/MyDrive/fyp/Final report/4. Backtesting/Volatility_result/"
backtest_summary_folder = volatility_folder + "backtest_summary/"
record_folder = volatility_folder + "trade_record/"
backtest_folder= "/content/drive/MyDrive/fyp/Final report/4. Backtesting/"
model_folder = backtest_folder+model+"/"

Mounted at /content/drive


# Backtesting Setting

In [None]:
start_date = []
num_period = 10
period_year = 1

start = 2022-(num_period+period_year-1)
for i in range(num_period):
    start_date.append((start+i)*10000+101)


date_range_list={}
for i in start_date:
    date_range_list[i] = (i-101)+(period_year-1)*10000+1231

print(date_range_list)

stock_list = {"BlockChain":["COIN","NVDA","FB"],
              "Airline":["BA","GD","LMT",],
              "Traveling":["UBER","ABNB","MAR","BKNG"],
              "Semiconductors":["INTC","NVDA","QCOM","MU","AMD"],
              "Cloud Computing":["IBM", "AMZN","GOOG","CRM"],
              "Social Media":["TWTR","SNAP","PINS","FB"],
              "Entertainment":["DIS","NFLX","FB"],
              "Retail":["WMT","COST","TGT","BBY","HD"],
              "Franchise":["MCD", "YUM", "SBUX", "DPZ"],
              "Real Estate":["HST", "EQR", "AVB", "PLD", "SPG"],
              "Telecommunication":["T","TMUS","VZ","CMCSA","CHTR"],
              "Energy & Resources":["DOW", "DD"],
              "Luxury goods":["RACE","EL","PVH"]
                            }

start_date = "2000-01-01"
end_date = "2022-12-31"


do_nth_name = "Buy&Hold"
model_name = "WithModel"

backtest_col = ["Sector","Stock",
                "Range_Start","Range_End",
                do_nth_name+"_Profit",model_name+"_Profit",
                do_nth_name+"_CAGR(in %)",model_name+"_CAGR(in %)",
                "CAGR_Performance(in %)",
                do_nth_name+"_MDD(in %)",model_name+"_MDD(in %)",
                "MDD_Performance(in %)"]

{20120101: 20121231, 20130101: 20131231, 20140101: 20141231, 20150101: 20151231, 20160101: 20161231, 20170101: 20171231, 20180101: 20181231, 20190101: 20191231, 20200101: 20201231, 20210101: 20211231}


# Backtesting Module

In [None]:
################### Input ##########################
# For hist_price_data: index=["date"], columns = ["Open"]
# For pred_action: index=["date"], columns = ["Action"] (Buy/Sell)
################### Output #########################
# 1. trading record
# 2. total profit
class backtest:
    hpd = ""
    pred_action=pd.DataFrame()
    trade_record=pd.DataFrame(index=[],
                              columns=["Action","Price","Position","Cash","Pos_Bal","Cash_Bal","Cum_Profit","Total_Bal"],
                             )
    capital = 0
    cash_balance = 0
    profit = 0
    handle_fee = 0
    position = 0
    last_price = 0
    do_nth_profit = 0
    num_year = 0
    _tested = False
    
    _stock_trough = 0
    _stock_peak = 0
    _stock_all_time_low = 0
    _stock_all_time_high = 0
    
    _portfolio_trough = 0
    _portfolio_peak = 0
    _portfolio_all_time_low = 0
    _portfolio_all_time_high = 0
    
    def __init__(self,hist_price_data,pred_action,capital,handling_fee,num_year=1):
        self.hpd = hist_price_data
        self.pred_action = pred_action
        self.capital = capital
        self.cash_balance = capital
        self._portfolio_trough = capital
        self._portfolio_peak = capital
        self._portfolio_all_time_low = capital
        self._portfolio_all_time_high = capital
        self.handle_fee = handling_fee
        self.num_year = num_year  
        
    def clear_trade_record(self,sec):
        self.trade_record=pd.DataFrame(index=[],
                                       columns=["Action","Price","Position","Cash","Pos_Bal","Cash_Bal","Cum_Profit","Total_Bal"],
                                       )
        print("Clearing trade_record...")
        time.sleep(sec)
        
    def start_test(self): 
        if not self._tested:
            status = "sell"
            self._tested = True
            print("Start Backtesting...")  
            self._stock_all_time_low = self.hpd.iloc[0,0]
            self._stock_all_time_high = self.hpd.iloc[0,0]
            self._stock_trough = self.hpd.iloc[0,0]
            self._stock_peak = self.hpd.iloc[0,0]
            # For loop to iterate the data
            for ind in self.pred_action.index:
                # Update latest price
                self.last_price = self.hpd.loc[ind,"Open"]
                
                # Mark All Time Low,High , Trough and Peak for MDD of stock price (using "Buy&Hold")
                if self.last_price > self._stock_all_time_high:
                    self._stock_all_time_high = self.last_price
                if self.last_price < self._stock_all_time_low:
                    self._stock_all_time_low = self.last_price
                if self.last_price > self._stock_all_time_low and self._stock_trough != self._stock_all_time_low:
                    self._stock_trough = self._stock_all_time_low
                if self.last_price < self._stock_all_time_high and self._stock_peak != self._stock_all_time_high:
                    self._stock_peak = self._stock_all_time_high
                
                # Do the action
                if self.pred_action.loc[ind,"Action"].lower() == "buy" and status == "sell":
                    self._buy(ind,self.last_price)
                    status = "buy"
                elif self.pred_action.loc[ind,"Action"].lower() == "sell" and status == "buy":
                    self._sell(ind,self.last_price)
                    status = "sell"
                else:
                    self._hold(ind,self.last_price) # newly added
                    
                
                curr_portfolio_val = self.get_capital()+self.get_profit()
                
                # Mark All Time Low,High , Trough and Peak for MDD of our portfolio balance (using our model)
                if curr_portfolio_val > self._portfolio_all_time_high:
                    self._portfolio_all_time_high = curr_portfolio_val
                if curr_portfolio_val < self._portfolio_all_time_low:
                    self._portfolio_all_time_low = curr_portfolio_val
                if curr_portfolio_val > self._portfolio_all_time_low and self._portfolio_trough != self._portfolio_all_time_low:
                    self._portfolio_trough = self._portfolio_all_time_low
                if curr_portfolio_val < self._portfolio_all_time_high and self._portfolio_peak != self._portfolio_all_time_high:
                    self._portfolio_peak = self._portfolio_all_time_high
                            
            # =================================================
            self._run_do_nothing() # Calculate do nothing profit
        else:
            print("Backtesting has been completed...")
              
                
        
    def _mark_down_record(self,date,action,price,pos_delta,cash_delta):
        self.trade_record.loc[date] = [action,price,pos_delta,cash_delta,
                                       round(self.position,4),round(self.cash_balance,3),
                                       round(self.get_profit(),3),round(self.get_amount(),3)]
        
    def _buy(self,date,price):
        # Assume use all money to buy all
        buy_pos = floor(self.cash_balance / price)
        for i in range(buy_pos+1):
            act_buy_pos = buy_pos - i
            if act_buy_pos == 0:
                #print("You do not have enough money to buy!")
                return
            total_amt = act_buy_pos*price*(1+self.handle_fee)
            if self.cash_balance > total_amt:
                self.position += act_buy_pos
                self.cash_balance -= total_amt
                self._mark_down_record(date,
                                       "Buy",
                                       price,
                                       act_buy_pos,
                                       -total_amt)
                print("Bought at",date,"with price =", price, "\tPos:", act_buy_pos)
                return
        
    
    def _sell(self,date,price):
        # Assume sell all position
        sell_pos = self.position
        total_amt = sell_pos*price*(1-self.handle_fee)
        if self.position >= 1:
            self.position -= sell_pos
            self.cash_balance += total_amt
            self._mark_down_record(date,
                                   "Sell",
                                   price,
                                   -sell_pos,
                                   total_amt)
            print("Sold at",date,"with price =", price, "\tPos:", sell_pos)
            
            return
        
    def _hold(self,date,price): # newly added
        self._mark_down_record(date,"Hold",price,0,0)

    def _run_do_nothing(self):
        balance = self.capital
        first_day_price = self.hpd.iloc[0,0]
        last_day_price = self.hpd.iloc[-1,0]
        buy_pos = floor(self.capital / first_day_price) # Calculate how many position can buy
        total_amt = buy_pos*first_day_price*(1+self.handle_fee) # Check if okay to buy (including fee)
        while self.capital < total_amt: # If not enough, reduce buy_pos by 1
            buy_pos -= 1
            total_amt = buy_pos*first_day_price*(1+self.handle_fee)
        position = buy_pos # Buy in 
        balance -= total_amt
        self.do_nth_profit = last_day_price*position+balance-self.capital

    def get_performance(self):
        if self._tested:
            model_profit = self.get_profit()
            do_nth_profit = self.get_do_nothing()
            perf = (model_profit-do_nth_profit)/abs(do_nth_profit)
            return perf
        else:
            print("No Backtesting Record.")

    def get_do_nothing(self):
        if self._tested:
            return self.do_nth_profit
        else:
            print("No Backtesting Record.")

    def get_profit(self):
        if self._tested:
            return self.get_cash_balance()+self.get_last_price()*self.get_position()-self.get_capital()
        else:
            print("No Backtesting Record.")

    def get_last_price(self):
        if self._tested:
            return self.last_price
        else:
            print("No Backtesting Record.")
    
    def get_position(self):
        if self._tested:
            return self.position
        else:
            print("No Backtesting Record.")
    
    def get_do_nothing_CAGR(self):
        if self._tested:
            start_bal = self.get_capital()
            end_bal = self.get_do_nothing()+self.get_capital()
            num_year = self.get_num_year()
            return pow(end_bal/start_bal,1/num_year)-1
        else:
            print("No Backtesting Record.")
            
    def get_model_CAGR(self):
        if self._tested:
            start_bal = self.get_capital()
            end_bal = self.get_profit()+self.get_capital()
            num_year = self.get_num_year()
            return pow(end_bal/start_bal,1/num_year)-1
        else:
            print("No Backtesting Record.")
            
    def get_diff_in_CAGR(self):
        if self._tested:
            return self.get_model_CAGR() - self.get_do_nothing_CAGR()
        else:
            print("No Backtesting Record.")
    
    def get_do_nothing_MDD(self):
        if self._tested:
            return (self._stock_trough-self._stock_peak)/self._stock_peak
        else:
            print("No Backtesting Record.")
            
    def get_model_MDD(self):
        if self._tested:
            return (self._portfolio_trough-self._portfolio_peak)/self._portfolio_peak
        else:
            print("No Backtesting Record.")
            
    def get_diff_in_MDD(self):
        if self._tested:
            return self.get_model_MDD() - self.get_do_nothing_MDD()
        else:
            print("No Backtesting Record.")
    
    def get_num_year(self):
        return self.num_year
    
    def get_cash_balance(self):
        return self.cash_balance
    
    def get_capital(self):
        return self.capital
    
    def get_amount(self):
        return self.get_capital()+self.get_profit()

    def print_do_nothing(self):
        if self._tested:
            print("If buy at", self.hpd.index[0],"with price =",self.hpd.iloc[0,0])
            print("and do nothing")
            print("Current Profit:",self.get_do_nothing())
        else:
            print("No Backtesting Record.")
            
    def print_performance(self):
        if self._tested:
            print("Performance:", str(round(self.get_performance()*100,2))+"%")
        else:
            print("No Backtesting Record.")
            
    def print_CAGR_performance(self):
        if self._tested:
            print("CAGR Performance:", str(round(self.get_diff_in_CAGR()*100,2))+"%")
        else:
            print("No Backtesting Record.")
    
    def print_trade_record(self):
        if self._tested:
            print(self.trade_record)
        else:
            print("No Backtesting Record.")
            print(self.trade_record)
    
    def print_profit(self):
        if self._tested:
            print("Current Profit with model:",self.get_profit())
        else:
            print("No Backtesting Record.")
    
    def export_trade_record(self,stock,add_msg = ""):
        if self._tested:
            # Save the trade record to the path
            if not add_msg == "":
                add_msg = "_" + add_msg
            self.trade_record = self.trade_record.sort_index()
            self.trade_record.to_csv(record_folder+stock+add_msg+".csv")
            print("Trade record exported.")
        else:
            print("No Backtesting Record.")

# Processing Function

In [None]:
# Split the train and test data
def custom_split(data,start,end):
    train = (data.index >= start) & (data.index <= end)
    train_X = data[train]
    
    return train_X

In [None]:
# Assume we use 5 days price data to predict opening price of the 6th day
num_day_to_predict = 5

In [None]:
def produce_result_target_price(X,num_day,result_col_name = "Action"):
    y = pd.DataFrame(np.nan, index=X.index, columns=[result_col_name])
    status = "Hold"
    for i in range(len(X)-num_day):
        last_10_day_mean = np.mean(X.iloc[i:i+num_day,0])
        if X.iloc[i+num_day,0]>last_10_day_mean*1.01:
            y.iloc[i+num_day_to_predict,0] = 1
            status = "Buy"
        elif X.iloc[i+num_day,0]<last_10_day_mean/1.01:
            y.iloc[i+num_day_to_predict,0] = 0
            status = "Sell"
        else:
            if status == "Hold" or status == "Sell":
                y.iloc[i+num_day_to_predict,0] = 0
            elif status == "Buy":
                y.iloc[i+num_day_to_predict,0] = 1
    return y

In [None]:
def transform_X_data_to_tensor(X,num_day):
    # Initiate tensor for X
    x_first = X.iloc[0:num_day,:]
    x_mean = x_first.mean(axis=0) # Get the mean of the 10-day frame
    x_std = x_first.std(axis=0) # Get the std of the 10-day frame
    x_first = x_first.sub(x_mean, axis=1).div(x_std, axis=1) # Normalize the 10-day frame here
    x_tf_data = [tf.convert_to_tensor(np.array(x_first),dtype = tf.float32)]
    
    for i in range(1,len(X)-num_day):   
        x_window = X.iloc[i:i+num_day,:] # Set the window as a 10-day frame 
        x_mean = x_window.mean(axis=0) # Get the mean of the 10-day frame
        x_std = x_window.std(axis=0) # Get the std of the 10-day frame
        x_window = x_window.sub(x_mean, axis=1).div(x_std, axis=1) # Normalize the 10-day frame here
        
        x_next_tf = tf.convert_to_tensor(np.array(x_window),dtype = tf.float32)
        x_tf_data = tf.concat([x_tf_data, [x_next_tf]], 0)
        
    return tf.reshape(x_tf_data,(-1,num_day,14,1))
def transform_y_data_to_tensor(y,num_day):
    temp_y = y.dropna()
    y_tf_data = []
    for ind in temp_y.index:
        if temp_y.loc[ind,"Action"] == 1:
            y_tf_data.append([1,0])
        elif temp_y.loc[ind,"Action"] == 0:
            y_tf_data.append([0,1])
    y_tf_data = tf.convert_to_tensor(y_tf_data)
        
    return y_tf_data



In [None]:
def convert_decision(test,pred,n):
    h = np.array(pred)
    action = []
    status = "N"
    for i in range(len(h)):
        if h[i][0] == max(h[i]):
            h[i] = [1,0]
            if status == "N":
                action.append("Buy")
                status = "Buy"
            else:
                action.append("Hold")
        else:
            h[i] = [0,1]
            if status == "Buy":
                action.append("Sell")
                status = "N"
            else:
                action.append("Hold")
                
    backtest = test[["open"]][n+5:]
    backtest.columns = ["Open"]
    return (backtest,pd.DataFrame(action,index=test[n+5:].index,columns=["Action"]))



# Load Model

In [None]:
loaded_cnn_lstm_class_model = keras.models.load_model(model_folder+"cnn_lstm_classify_best")




# Backtest Result of CNN_LSTM (Direction)

In [None]:
import os

file_list = list(filter(lambda name: "CNN_LSTM_CLASS_" in name ,os.listdir(record_folder)))
if len(file_list) == 0:
  print("No such files...")
else:
  for f in file_list:
    print(file_list)
    os.remove(record_folder+f)
  print("All old files deleted...")

In [None]:
#### CNN_LSTM_CLASSIFICATION
backtest_summary = pd.DataFrame(columns=backtest_col)

for sector, stocks in stock_list.items():
    for stock in stocks:
        print("Runninng stock: "+ stock)
        stock_data_1 = web.DataReader(stock, "stooq",start=start_date, end=end_date)
        stock_data_1.columns = ["open","high","low","close","volume"]
        x_1 = StockDataFrame(stock_data_1)
        data_1 = x_1[['open','high','low','close','volume',
                      'boll', 'boll_ub', 'boll_lb',
                      'macd', 'macdh', 'macds',
                      'rsi_11', 'rsi_14', 'rsi_21']]
        data_1.index = [int(str(ind)[0:4]+str(ind)[5:7]+str(ind)[8:10]) for ind in data_1.index]
        data_1 = data_1.sort_index()
        print(stock + " Data loaded...")
        for k,v in date_range_list.items():
        
            print("Getting Data... - " + stock + " from " + str(k))
            if (data_1.index[0]>v):
                print("No data for this testing period... - " + stock + " from " + str(k))
                continue
            test_1 = custom_split(data_1,start = k,end = v)
            if len(test_1) <= num_day_to_predict:
                print("No enoguh data for testing... - " + stock + " from " + str(k))
                continue
                
            print("Transforming Data... - " + stock + " from " + str(k))
            tf_test_1 = transform_X_data_to_tensor(test_1,num_day_to_predict)
            
            print("Predicting... - " + stock + " from " + str(k))
            predictions_1 = loaded_cnn_lstm_class_model.predict(tf_test_1)
            
            print("Converting... - " + stock + " from " + str(k))
            compare_to_n_day_mean = 5
            backtestdata_1,final_pred_1 = convert_decision(test_1,predictions_1,num_day_to_predict - compare_to_n_day_mean)
            final_pred_1 = final_pred_1.sort_index()
            
            print("Backtesting... - " + stock + " from " + str(k))
            back1 = backtest(backtestdata_1,final_pred_1,10000,0.0005,period_year)
            back1.clear_trade_record(3)
            
            print("Start Iteration... - " + stock + " from " + str(k))
            back1.start_test()
            
            print("======= from "+ str(k) + " to " + str(v)+" =========")
            back1.print_do_nothing()
            back1.print_profit()
            back1.print_performance()
            back1.export_trade_record("CNN_LSTM_CLASS_"+sector+"-"+stock,str(k)+"-to-"+str(v)+"("+str(round(back1.get_performance()*100,2))+"%)")
            backtest_summary = backtest_summary.append({"Sector":sector,
                                                        "Stock":stock,
                                                        "Range_Start":str(k),
                                                        "Range_End":str(v),
                                                        do_nth_name+"_Profit":round(back1.get_do_nothing(),2),
                                                        model_name+"_Profit":round(back1.get_profit(),2),
                                                        do_nth_name+"_CAGR(in %)":round(back1.get_do_nothing_CAGR()*100,2),
                                                        model_name+"_CAGR(in %)":round(back1.get_model_CAGR()*100,2),
                                                        "CAGR_Performance(in %)":round(back1.get_diff_in_CAGR()*100,2),
                                                        do_nth_name+"_MDD(in %)":round(back1.get_do_nothing_MDD()*100,2),
                                                        model_name+"_MDD(in %)":round(back1.get_model_MDD()*100,2),
                                                        "MDD_Performance(in %)":round(back1.get_diff_in_MDD()*100,2)}, ignore_index=True)
            print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")

print("Backtesting Completed...")
backtest_summary.to_csv(backtest_summary_folder+"backtest_summary_CNN_LSTM_CLASS.csv")
print("Backtesting Summary Exported...")
