In [1]:
# import module want, run when the code start for preparing data
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# this is for feather data, input path to get data. This can load other feather data like stk_fin_cashflow etc.
def data_load(path):
    df=pd.read_feather(path)
    return df

# load stock daily because I use stk_daily only
# developer can change the default dates for more adjustment easier
stk_daily = data_load("../data/"+"stk_daily.feather")
stkname = set(stk_daily["stk_id"])
defaultStartDate,defaultEndDate,default_lookback_period,default_decline_days="2020-01-02","2022-12-30",20,5
defaultParameters=[defaultStartDate,defaultEndDate,default_lookback_period,default_decline_days]
with open('configSettingFile.txt', mode='r', encoding='utf-8') as cf:
    configSetting=eval(cf.read())

defaultStartDate = configSetting[str("defaultStartDate")]
defaultEndDate = configSetting[str("defaultEndDate")]
default_lookback_period = configSetting[str("default_lookback_period")]
default_decline_days = configSetting[str("default_decline_days")]

# print(default_lookback_period)
# print(defaultParameters)

# get data whose stock id is 'stkid', user can set start date and endDate.
def filter_data(stkid, startDate = defaultStartDate, endDate = defaultEndDate):
    startDate, endDate = max(startDate,defaultStartDate) ,min(endDate, defaultEndDate)
    filteredData = stk_daily[(stk_daily["stk_id"] == stkid) & (startDate<=stk_daily["date"]) & (stk_daily["date"]<=endDate)]
    # Ensure the 'date' column is in the correct format
    filteredData['date'] = pd.to_datetime(filteredData['date'])
    return filteredData

In [2]:
# generating mean reversion with bollinger signal. 
# buy when consecutive decline days bigger than "decline_days" and close is less than Lower Band.
# sell when close is greater than upper band.
def mean_reversion_strategy_with_bollinger(stkidentity, lookback_period, decline_days, num_std_dev=2, startDate = defaultStartDate, endDate = defaultEndDate):
    df=filter_data(stkidentity, startDate, endDate)

    # Calculate the mean closing price and standard deviation over the lookback period
    df['Mean'] = df["close"].rolling(window=lookback_period).mean()
    df['Std Dev'] = df['close'].rolling(window=lookback_period).std()

    # Bollinger Bands
    df['Upper Band'] = df['Mean'] + (df['Std Dev'] * num_std_dev)
    df['Lower Band'] = df['Mean'] - (df['Std Dev'] * num_std_dev)
    # df['Exit Band'] = df['Mean'] - (df['Std Dev'] * (num_std_dev + 1))  Stop loss band for stoping loss and exit

    # Detect if the stock has declined for 'decline_days' consecutive days
    df['Decline'] = df['close'] < df['close'].shift(1)
    df['Consecutive Decline'] = df['Decline'].rolling(window=decline_days).sum()

    # Buy if the stock has declined for 'decline_days' consecutive days and is below the lower band
    # Cannot generate signal by 'close' data. Can generate by 'open' only
    df['Buy Signal'] = (df['Consecutive Decline'] >= decline_days) & (df['open'] < df['Lower Band'])

    # Sell when the stock reaches the upper band or the mean
    # When higher than upper band, we can sell, so make signal by high is reasonable
    df['Sell Signal'] = (df['high'] > df['Upper Band']) #| (df['low'] < df['Exit Band'])
    return df

In [3]:
# trade process
def backtest_strategy(df, initial_capital, risk_free_rate=0.01):
    capital = initial_capital
    position = 0
    df['Position'] = 0
    df['Portfolio Value'] = initial_capital
    df['Returns'] = 0.0
    previous_portfolio_value = initial_capital

    for index, row in df.iterrows():
        if row['Buy Signal'] and capital >= row['low']:
            # Buy as many shares as possible at the 'Low' price
            shares_to_buy = int(capital // row['low'])
            capital -= shares_to_buy * row['low']
            position += shares_to_buy
        
        elif row['Sell Signal'] and position > 0:
            # Sell all shares at the 'High' price
            capital += position * row['high']
            position = 0

        # The current portfolio value is calculated based on the 'Close' price
        current_portfolio_value = capital + position * row['close']
        df.loc[index, 'Position'] = position
        df.loc[index, 'Portfolio Value'] = current_portfolio_value

        # Calculate returns based on the 'Close' price
        if previous_portfolio_value != 0:  # Avoid division by zero
            df.loc[index, 'Returns'] = (current_portfolio_value - previous_portfolio_value) / previous_portfolio_value
        previous_portfolio_value = current_portfolio_value

    # Final capital if position is still held at the end
    if position > 0:
        capital += position * df['close'].iloc[-1]

    # Ensure NAV never goes negative
    df['NAV'] = df['Portfolio Value']
    if (df['NAV'] < 0).any():
        raise ValueError("NAV should not be negative. Check the logic and data.")

    # Maximum Drawdown
    rolling_max = df['NAV'].cummax()
    drawdown = df['NAV'] / rolling_max - 1
    max_drawdown = drawdown.min()

    # Sharpe Ratio
    excess_returns = df['Returns'] - risk_free_rate / 252
    sharpe_ratio = excess_returns.mean() / df['Returns'].std() * np.sqrt(252)

    # Excess Earnings
    excess_earnings = (df['Returns'].sum() - risk_free_rate / 252 * len(df)) * initial_capital

    # Annualized Rate of Return
    annualized_return = np.power(df['Portfolio Value'].iloc[-1] / initial_capital, 252 / len(df)) - 1

    # Annualized Fluctuation (Volatility)
    annualized_volatility = df['Returns'].std() * np.sqrt(252)

    return {
        'final_capital': capital,
        'max_drawdown': max_drawdown,
        'sharpe_ratio': sharpe_ratio,
        'excess_earnings': excess_earnings,
        'annualized_return': annualized_return,
        'annualized_volatility': annualized_volatility
    }, df

In [4]:
# show NAV
%matplotlib qt

def plot_nav(df, metricsGet):
    plt.figure(figsize=(18, 12))
    plt.plot(df["date"], df['NAV'], label='NAV')
    plt.title("Net Asset Value Over Time, stock id = {}".format(df['stk_id'].iloc[0]))
    plt.xlabel("Date")
    plt.ylabel("NAV")
    plt.grid(True)
    cnt=0
    for effect in metricsGet:
        this=metricsGet[effect]
        if effect in {'max_drawdown', 'sharpe_ratio', 'annualized_return', 'annualized_volatility'}:
            plt.annotate(f'{effect}'f': {this:.0%}', xy=(0.05, 0.95-cnt*0.1), xycoords='axes fraction', 
                 fontsize=18, bbox=dict(boxstyle="round,pad=0.3", edgecolor='black', facecolor='yellow'))
        else:
            plt.annotate(f'{effect}'f': {this:.2f}', xy=(0.05, 0.95-cnt*0.1), xycoords='axes fraction', 
                 fontsize=18, bbox=dict(boxstyle="round,pad=0.3", edgecolor='black', facecolor='yellow'))
        cnt+=1
    plt.legend()
    plt.show()
    return None

In [5]:
# join the computation part as a function
def process_data_using_mean_reversion(stkidentity, lookback_period, decline_days,  initial_capital, stDate = defaultStartDate, edDate = defaultEndDate, num_std_dev=2):
    processedData = mean_reversion_strategy_with_bollinger(stkidentity, lookback_period, decline_days, num_std_dev, stDate, edDate)
    rst_num, rst_dataframe=backtest_strategy(processedData,initial_capital)
    plot_nav(rst_dataframe,rst_num)
    return rst_num, rst_dataframe

In [6]:
# main programme, user interface
import tkinter as tk
from tkinter import Tk, Label, Button
from tkinter.ttk import Frame
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import mplfinance as mpf

# initialse data when user input northing
def initialiseDate(stdate,eddate):
    return [stdate,defaultStartDate][stdate==''], [eddate,defaultEndDate][stdate=='']

# check dates input
def checkDates(stdate,eddate):
    flag=1
    if stdate>eddate:
        tk.messagebox.showerror("Error","Start date cannot greater than end date")
        flag=0
    elif stdate>defaultEndDate:
        tk.messagebox.showerror("Error","Start date cannot greater than {} ".format(defaultEndDate))
        flag=0
    elif eddate<defaultStartDate:
        tk.messagebox.showerror("Error","End date cannot smaller than {}".format(defaultStartDate))
        flag=0
    return flag==1

# name should be in stock name list
def checkNames(name):
    if name not in stkname:
        tk.messagebox.showerror("Error", "Invalid stock ID entered")
        return False
    return True

with open('guidance_text.txt', mode='r', encoding='utf-8') as f:
    guidance_text=f.read()

# Home Page
class HomePage(tk.Frame):
    def __init__(self, master=None):
        super().__init__(master)
        self.create_widgets()
    
    def create_widgets(self):
        # Exit button
        self.exit_button = tk.Button(self, text="Exit", command=self.call_exit)
        self.exit_button.pack(side='top', pady=20)  # Centering the button at the top, with padding

    def call_exit(self):
        res=tk.messagebox.askquestion('Exit Appliccation', 'Do you really want to exit?')
        if res == 'yes':
            root.destroy()
        else:
            tk.messagebox.showinfo('Return', 'Returning to main application')

# BacktestInterface
# User type parameters to backtest
class BacktestInterface(tk.Frame):
    def __init__(self, master=None):
        super().__init__(master)
        self.create_widgets()
        self.show_numbers = False
    
    def create_widgets(self):
        # Stock ID Entry
        self.stock_id_label = tk.Label(self, text="Stock ID")
        self.stock_id_label.pack()
        self.stock_id_entry = tk.Entry(self)
        self.stock_id_entry.pack()

        # Start Date Entry
        self.start_date_label = tk.Label(self, text="Start Date (YYYY-MM-DD); default: {} ".format(defaultStartDate))
        self.start_date_label.pack()
        self.start_date_entry = tk.Entry(self)
        self.start_date_entry.pack()

        # End Date Entry
        self.end_date_label = tk.Label(self, text="End Date (YYYY-MM-DD); default: {} ".format(defaultEndDate))
        self.end_date_label.pack()
        self.end_date_entry = tk.Entry(self)
        self.end_date_entry.pack()

        # Lookback Period Entry
        self.lookback_period_label = tk.Label(self, text="Lookback Period; default: {} ".format(default_lookback_period))
        self.lookback_period_label.pack()
        self.lookback_period_entry = tk.Entry(self)
        self.lookback_period_entry.pack()

        # Decline Days Entry
        self.decline_days_label = tk.Label(self, text="Decline Days; default: {} ".format(default_decline_days))
        self.decline_days_label.pack()
        self.decline_days_entry = tk.Entry(self)
        self.decline_days_entry.pack()

        # Run Backtest Button
        self.run_backtest_button = tk.Button(self, text="Run Backtest", command=self.run_backtest)
        self.run_backtest_button.pack()

        # Binding <Enter> key to run_backtest function
        # self.bind_all('<Return>', lambda event: self.run_backtest())
    
    # check the input and run
    def run_backtest(self):
        stock_id = self.stock_id_entry.get()
        if checkNames(stock_id)==False: return
        [start_date,end_date] = [self.start_date_entry.get(),self.end_date_entry.get()]
        start_date,end_date = initialiseDate(start_date,end_date)
        if checkDates(start_date,end_date)==False: return
        lookback_period = self.lookback_period_entry.get()
        if lookback_period == "": lookback_period=default_lookback_period
        else: lookback_period= int(lookback_period)
        decline_days = self.decline_days_entry.get()
        if decline_days == "": decline_days=default_decline_days
        else:decline_days = int(decline_days)
        self.metrics_label,result_df = process_data_using_mean_reversion(stock_id,  lookback_period, decline_days, initial_capital= 100000, stDate=start_date,edDate=end_date)
        self.display_graph(result_df)
        # self.update_metrics_label()

    def display_graph(self, df):
        fig, ax = plt.subplots(figsize=(10,4))
        ax.grid(True)
        ax.plot(df['date'], df['NAV'])  # Replace 'NAV' with the appropriate column name from result_df
        ax.set_xlabel('date')
        ax.set_ylabel('NAV')

        # Embedding the plot in the Tkinter window
        if hasattr(self, 'canvas'):
            self.canvas.get_tk_widget().pack_forget()  # Remove previous canvas if exists
        self.canvas = FigureCanvasTkAgg(fig, master=self)
        self.canvas.draw()
        self.canvas_widget = self.canvas.get_tk_widget()
        self.canvas_widget.pack()

# KLineInterface
# user can type id and dates to check the Kline chart
class KLineInterface(tk.Frame):
    def __init__(self, master=None):
        super().__init__(master)
        self.master = master
        self.create_widgets()

    def create_widgets(self):
        # Stock ID Entry
        self.stock_id_label = tk.Label(self, text="Stock ID")
        self.stock_id_label.pack()
        self.stock_id_entry = tk.Entry(self)
        self.stock_id_entry.pack()

        # Start Date Entry
        self.start_date_label = tk.Label(self, text="Start Date (YYYY-MM-DD); default: {} ".format(defaultStartDate))
        self.start_date_label.pack()
        self.start_date_entry = tk.Entry(self)
        self.start_date_entry.pack()

        # End Date Entry
        self.end_date_label = tk.Label(self, text="End Date (YYYY-MM-DD); default: {} ".format(defaultEndDate))
        self.end_date_label.pack()
        self.end_date_entry = tk.Entry(self)
        self.end_date_entry.pack()

        self.load_data_button = tk.Button(self, text="Load K-Line Data", command=self.load_k_line_data)
        self.load_data_button.pack()

    def load_k_line_data(self):
        # Load and display K-line data
        stock_id = self.stock_id_entry.get()
        if checkNames(stock_id)==False: return
        [start_date,end_date] = [self.start_date_entry.get(),self.end_date_entry.get()]
        start_date,end_date = initialiseDate(start_date,end_date)
        if checkDates(start_date,end_date)==False: return
        df = filter_data(stock_id, start_date, end_date)
        df.index = df['date']
        fig, ax = plt.subplots(figsize=(10,4))
        ax.grid(True)
        mpf.plot(df, type='candle', mav=(3,6,9), ax=ax)
        plt.title(stock_id)
        
        # Embedding the plot in the Tkinter window
        if hasattr(self, 'canvas'):
            self.canvas.get_tk_widget().pack_forget()  # Remove previous canvas
        self.canvas = FigureCanvasTkAgg(fig, master=self)
        self.canvas_widget = self.canvas.get_tk_widget()
        self.canvas_widget.pack()

# GuidancePage
# User can scroll when the text is too long
class GuidancePage(tk.Frame):
    def __init__(self, master=None):
        super().__init__(master)
        self.create_widgets()

    def create_widgets(self):
        self.scrollbar = tk.Scrollbar(self)
        self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

        self.text_box = tk.Text(self, wrap=tk.WORD, yscrollcommand=self.scrollbar.set)
        self.text_box.pack(expand=False, fill='both',)

        # Populate the text box with guidance text
        self.text_box.insert(tk.END, guidance_text)
        self.text_box.config(state='disabled')

        # Configure scrollbar
        self.scrollbar.config(command=self.text_box.yview)
        
class SwitchFrame(object):
    def __init__(self, root):
        self.root = root
        root.title('backtest system')
        root.geometry('2200x1600')
        # root.resizable(width=False, height=False) 
        
        # pack L1
        L1 = Label(root)
        L1.pack() 
        Button(L1, text='返回', command=self.firstpage).grid(row=1, column=1, padx=10, pady=10)
        Button(L1, text='进入回测页面', command=self.secondpage).grid(row=1, column=2, padx=10, pady=10)
        Button(L1, text='进入K线图页面', command=self.thirdpage).grid(row=1, column=3, padx=10, pady=10)
        Button(L1, text='进入说明页面', command=self.fourthpage).grid(row=1, column=4, padx=10, pady=10)

        # pack L2
        L2 = Label(root, borderwidth=20, relief="sunken")
        L2.pack() 

        # set the frames
        frame1 = Frame(L2, padding=(5, 20, 10, 10))
        self.frame1 = frame1
        frame2 = Frame(L2, padding=(5, 20, 10, 10))
        self.frame2 = frame2
        frame3 = Frame(L2, padding=(5, 20, 10, 10))
        self.frame3 = frame3
        frame4 = Frame(L2, padding=(5, 20, 10, 10))
        self.frame4 = frame4

        # Initialize HomePage in frame1
        self.home_page = HomePage(self.frame1)
        self.home_page.grid(row=2, column=1, padx=10, pady=10)

        # Initialize BacktestInterface in frame2
        self.backtest_interface = BacktestInterface(self.frame2)
        self.backtest_interface.grid(row=2, column=1, padx=10, pady=10)

        # Initialize the K-line interface in frame3
        self.k_line_interface = KLineInterface(self.frame3)
        self.k_line_interface.grid(row=3, column=1, padx=10, pady=10)

        # Initialize the guidance page interface in frame4
        self.GuidencePage_interface = GuidancePage(self.frame4)
        self.GuidencePage_interface.grid(row=3, column=1, padx=10, pady=10)

        self.currentpage = frame1
        self.currentpage.pack()

    # set the transition function
    def firstpage(self):
        if self.currentpage != self.frame1:
            self.currentpage.pack_forget()
            self.currentpage = self.frame1
            self.currentpage.pack()
            self.root.unbind('<Return>')

    def secondpage(self):
        if self.currentpage != self.frame2:
            self.currentpage.pack_forget()
            self.currentpage = self.frame2
            self.currentpage.pack()
            self.root.bind('<Return>', lambda event: self.backtest_interface.run_backtest())

    def thirdpage(self):
        if self.currentpage != self.frame3:
            self.currentpage.pack_forget()
            self.currentpage = self.frame3
            self.currentpage.pack()
            self.root.bind('<Return>', lambda event: self.k_line_interface.load_k_line_data())

    def fourthpage(self):
        if self.currentpage != self.frame4:
            self.currentpage.pack_forget()
            self.currentpage = self.frame4
            self.currentpage.pack()
            self.root.unbind('<Return>')

root = Tk()
SwitchFrame(root)
root.mainloop()