In [1]:
import backtrader as bt
#import yfinance as yf
import pandas as pd
#import pyfolio as pf
import datetime
import math
# import the package after installation
from backtrader_plotly.plotter import BacktraderPlotly
from backtrader_plotly.scheme import PlotScheme
import plotly.io
import warnings
import backtrader.feeds as btfeeds
warnings.filterwarnings("ignore")


This trading strategy involves both long and short positions based on specific conditions. For long positions, the conditions include a cross rate below the 10th percentile of the past 10-day cross rates, 1 minus the cross rate being greater than a threshold of 0.05, and the closing price being above the 5-day moving average. On the other hand, for short positions, the conditions are a cross rate above the 90th percentile of the past 10-day cross rates, cross rate minus 1 being greater than 0.05, and the closing price being below the 5-day moving average. Exiting long and short positions is determined solely by the closing price relative to the 5-day moving average. This code is designed to perform a backtest of this strategy.

# BIAS Research

In [2]:
import pandas as pd
import os
import plotly.graph_objs as go
import numpy as np

def load_data(filename):
    data = pd.read_csv(filename, parse_dates=['datetime'])
    data.set_index('datetime', inplace=True)
    return data

def filter_trading_hours(data):
    data = data.between_time('9:00', '15:00')
    return data

def resample_data(data, frequency):
    resampled_data = data.resample(frequency).agg({'open':'last', 'high':'last', 'low':'last', 'close':'last',
                                                   'volume':'sum', 'turnover':'sum', 'open_interest':'sum'})
    resampled_data = resampled_data.dropna()
    return resampled_data

def calculate_moving_average(data, N, ma_type='simple'):
    if ma_type == 'simple':
        data['N_MA'] = data['close'].rolling(window=N).mean()
    elif ma_type == 'exponential':
        data['N_MA'] = data['close'].ewm(span=N).mean()
    return data

def calculate_bias(data):
    data['BIAS'] = data['close'] / data['N_MA']
    return data

def calculate_rolling_statistics(data, X):
    data['bias_max'] = data['BIAS'].rolling(window=X).max()
    data['bias_min'] = data['BIAS'].rolling(window=X).min()
    for quantile in [10, 30, 50, 70, 90]:
        data[f'bias_{quantile}pct'] = data['BIAS'].rolling(window=X).quantile(quantile/100)
    return data

def calculate_bias_high_low(data):
    data['BIAS_high'] = np.where(data['BIAS'] > data['bias_90pct'], -1, 0)
    data['BIAS_low'] = np.where(data['BIAS'] < data['bias_10pct'], 1, 0)
    return data

def calculate_future_returns(data, Y, Z):
    data['BIAS_high_fut_ret'] = data['close'].shift(-Y)/data['close']
    data['BIAS_low_fut_ret'] = - (data['close'].shift(-Z)/data['close'])
    data['BIAS_high_low_ret'] = data['BIAS_high_fut_ret'] + data['BIAS_low_fut_ret']
    return data

def visualize_close_MA_bias(data):
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=data.index, y=data['close'], mode='lines', name='close'))
    fig.add_trace(go.Scatter(x=data.index, y=data['N_MA'], mode='lines', name='N_MA'))
    fig.add_trace(go.Scatter(x=data[data['BIAS_high']==1].index, y=data[data['BIAS_high']==1]['close'],
                             mode='markers', marker_symbol='triangle-up', name='BIAS_high=1 (Buy)'))
    fig.add_trace(go.Scatter(x=data[data['BIAS_high']==-1].index, y=data[data['BIAS_high']==-1]['close'],
                             mode='markers', marker_symbol='triangle-down', name='BIAS_high=-1 (Sell)'))
    fig.add_trace(go.Scatter(x=data[data['BIAS_low']==1].index, y=data[data['BIAS_low']==1]['close'],
                             mode='markers', marker_symbol='circle', name='BIAS_low=1 (Buy)'))
    fig.add_trace(go.Scatter(x=data[data['BIAS_low']==-1].index, y=data[data['BIAS_low']==-1]['close'],
                             mode='markers', marker_symbol='circle-open', name='BIAS_low=-1 (Sell)'))
    fig.update_layout(xaxis=dict(type='category'))
    fig.show()

# Visualize data using Plotly
def visualize_data(data):
    for column in data.columns:
        if data[column].dtype != 'object': # Plot only numeric data
            fig = px.line(data, y=column)
            fig.show()

def main(filename, frequency, N, ma_type, X, Y, Z):
    data = load_data(filename)
    data = filter_trading_hours(data)
    data = resample_data(data, frequency)
    data = calculate_moving_average(data, N, ma_type)
    data = calculate_bias(data)
    data = calculate_rolling_statistics(data, X)
    data = calculate_bias_high_low(data)
    data = calculate_future_returns(data, Y, Z)
    visualize_close_MA_bias(data)
    # visualize_data(data)
    return data

if __name__ == "__main__":
    # data = main('ag2306.csv', '15T', 5, 'simple', 20, 5, 5)
    data = main('a2307.csv', '15T', 5, 'simple', 20, 5, 5)

The trading output provided demonstrates the performance of the Bias Strategy when it is applied to a specific dataset with 15-minute time intervals and a 10-period Moving Average window.
The Bias Strategy operates according to a simple rule on the ground of the Moving Average (MA) of the asset's price.
Moreover, it adopts a 10-period moving average, which means the strategy tends to calculate the average price of the asset over the last 10 periods (each period being 15 minutes in the case).
Besides, the strategy generates a signal of buying if the Moving Average price is 10 units more than the current (spot) price, which indicates the price is expected to rise.
Conversely, the strategy generates a signal of selling if the Moving Average price is 10 units less than the current price, demonstrating the price is expected to drop.


In [2]:
class BiasStrategy(bt.Strategy):
    params = (("para_ma", 20),
              ("bias_len",10),
              ("para_risk_ma", 5),
              ("high_q", 90),
              ("low_q", 10),
              ("bias_thresh", 0.05),

              )

    def __init__(self):
        self.close_len = 1000
        self.order_history = []
        self.order = None
        self.buyprice = None
        self.buycomm = None

        # get all indicators
        self.close = self.datas[0].close
        self.ma = self.datas[0].ma
        self.risk_ma = self.datas[0].risk_ma
        self.bias = self.datas[0].bias
        self.bias_hi = self.datas[0].bias_hi
        self.bias_lo = self.datas[0].bias_lo


    def next(self):
        # if there's any pending orders that haven't been filled, then we don't move on to the next steps, other wise we do.
        # The logic is that new orders are not to be sent until old orders are either canceled or filled.
        # After filling orders, self.order will be set to be None
        if self.order:
            return

        if self.position.size > 0: # if there're long position
            if self.close[0] < self.risk_ma[0]: # and there's risk
                self.order = self.sell() # close long pos

        elif self.position.size < 0: # if there're short position
            if self.close[0] > self.risk_ma[0]: # and there's risk
                self.order = self.buy() # close short pos

        else: # if there're no positions
            if (self.bias[0] > self.bias_hi[0]) & (self.bias[0]-1 > self.params.bias_thresh) & (self.close[0] < self.risk_ma[0]): # if short signals are triggered
                self.order = self.sell() # enter short pos

            elif (self.bias[0] < self.bias_lo[0]) & (1-self.bias[0] > self.params.bias_thresh) & (self.close[0] > self.risk_ma[0]): # if long signals are triggered
                self.order = self.buy() # enter long pos


    # outputting information
    def log(self, txt):
        dt=self.datas[0].datetime.date(0)
        print('%s, %s' % (dt.isoformat(), txt))

    def notify_order(self, order):
        if order.status == order.Completed:
            self.order_history.append({'datetime': bt.num2date(order.executed.dt),  # 订单执行的日期和时间
                            'price': order.executed.price,  # 订单执行的价格
                            'commission': order.executed.comm,  # 佣金
                            'value': order.executed.value,  # 订单价值
                            'size': order.executed.size,  # 订单规模（购买/卖出的股票数量）
                            'type': 'buy' if order.isbuy() else 'sell',  # 订单类型
                            'ref': order.ref  # 订单的引用号
                            })
            # if order.isbuy():
            #     self.log(
            # 	"Executed BUY (Price: %.2f, Value: %.2f, Commission %.2f)" %
            # 	(order.executed.price, order.executed.value, order.executed.comm))
            # else:
            #     self.log(
            # 	"Executed SELL (Price: %.2f, Value: %.2f, Commission %.2f)" %
            # 	(order.executed.price, order.executed.value, order.executed.comm))
            self.bar_executed = len(self)
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log("Order was canceled/margin/rejected")
        self.order = None

    def stop(self):
        ending_value = self.broker.getvalue()
        self.log(f"para_ma:{self.params.para_ma}, \
                 bias_len:{self.params.bias_len}, \
                 para_risk_ma:{self.params.para_risk_ma}, \
                 high_q:{self.params.high_q},\
                 low_q:{self.params.low_q},\
                 bias_thresh:{self.params.bias_thresh},\
                 ending value: {ending_value}")


In [3]:
# Choose one of the backtesting processes, either single backtesting or performance optimization

if __name__ == '__main__':
    # parameters
    para_ma = 20
    bias_len = 10
    para_risk_ma = 5
    high_q = 90
    low_q = 10

    # read csv data
    filename = 'a2307.csv'
    data = pd.read_csv(filename)
    # calulate all indicators
    data['ma'] = data['close'].rolling(window=para_ma).mean()
    data['risk_ma'] = data['close'].rolling(window=para_risk_ma).mean()
    data['bias'] = data['close'] / data['ma']
    data['bias_hi'] = data['bias'].rolling(window=bias_len).quantile(high_q/100)
    data['bias_lo'] = data['bias'].rolling(window=bias_len).quantile(low_q/100)
    data['datetime'] = pd.to_datetime(data['datetime'], format='%Y/%m/%d %H:%M')

    # import all data
    class NewData(bt.feeds.PandasData):
        lines = ('ma', 'risk_ma', 'bias', 'bias_hi', 'bias_lo')
        params = (('ma', 9), ('risk_ma', 10), ('bias', 11), ('bias_hi', 12), ('bias_lo', 13))
    data = NewData(
        dataname=data,
        datetime=2,
        open=3,
        high=4,
        low=5,
        close=6,
        volume=7,
        openinterest=8,
        ma=9,
        risk_ma = 10,
        bias=11,
        bias_hi=12,
        bias_lo=13,
        fromdate=datetime.datetime(2023, 5, 18),
        todate=datetime.datetime(2023, 6, 8)
    )

    optimize = False
    if optimize == False:
        print("Single Backtest..")
        # %matplotlib inline
        # Create a cerebro instance, add our strategy, some starting cash at broker and a 0.1% broker commission
        cerebro = bt.Cerebro()
        cerebro.addstrategy(BiasStrategy)
        cerebro.broker.setcash(1000000)
        cerebro.broker.setcommission(commission=0.0000)
        cerebro.broker.set_slippage_perc(0.0000)
        # data = bt.feeds.PandasData(dataname=yf.download("SPY", "2015-1-2", "2023-1-30"))

        # cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=15)
        cerebro.adddata(data) #if using resample, then comment this line of code
        # cerebro.addsizer(bt.sizers.PercentSizer, percents=80)
        cerebro.addsizer(bt.sizers.SizerFix, stake=1)
        cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name = 'SharpeRatio', timeframe=bt.TimeFrame.Minutes, riskfreerate=0.01)
        cerebro.addanalyzer(bt.analyzers.DrawDown, _name='DD')
        cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="ta")



        print("========================")
        print(f'<START> Brokerage account: ${round(cerebro.broker.getvalue(),2)}')
        results = cerebro.run()
        strat = results[0]
        trades = strat.analyzers.ta.get_analysis()
        freq_convertion = 252*7.5*4

        # Print the results
        print(f'<FINISH> Brokerage account: ${round(cerebro.broker.getvalue(),2)}')
        print(strat.params.para_ma, strat.params.bias_len, strat.params.para_risk_ma, strat.params.high_q,strat.params.low_q,strat.params.bias_thresh)
        # print('Sharpe Ratio:', strat.analyzers.SharpeRatio.get_analysis()['sharperatio']/math.sqrt(freq_convertion))
        # print('Max Drawdown %:', round(float(strat.analyzers.DD.get_analysis()['drawdown'])*100, 2))
        # print('Max Drawdown $ or RMB:', round(float(strat.analyzers.DD.get_analysis()['moneydown']), 2))


        # define plot scheme with new additional scheme arguments
        scheme = PlotScheme(decimal_places=2, max_legend_text_width=25)
        figs = cerebro.plot(BacktraderPlotly(show=False, scheme=scheme))

        # directly manipulate object using methods provided by `plotly`
        for i, each_run in enumerate(figs):
            each_run[i].update_layout(xaxis=dict(type='category'))
            for j, each_strategy_fig in enumerate(each_run):
                each_strategy_fig.update_layout(xaxis=dict(type='category'))
                each_strategy_fig.show()
                # open plot in browser
                # save the html of the plot to a variable
                html = plotly.io.to_html(each_strategy_fig, full_html=False)
                # write html to disk
                plotly.io.write_html(each_strategy_fig, f'{i}_{j}.html', full_html=True)



        # # use Pyfolio to show all the results:
        # cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')
        # results = cerebro.run()
        # strat = results[0]
        # pyfoliozer = strat.analyzers.getbyname('pyfolio')

        # # %matplotlib inline
        # returns, positions, transactions, gross_lev = pyfoliozer.get_pf_items()
        # print(f'The return of the strategy is: {returns}')
        # pf.create_full_tear_sheet(returns)
        # # Eventually solved the following issue by trying: pip install git+https://github.com/quantopian/pyfolio:
        # # AttributeError: 'numpy.int64' object has no attribute 'to_pydatetime'

    else:
        print("Start Optimizing..")
        cerebro = bt.Cerebro()
        cerebro.optstrategy(BiasStrategy)
        cerebro.broker.setcash(1000000)
        cerebro.broker.setcommission(commission=0.0003)
        cerebro.broker.set_slippage_perc(0.0001)

        cerebro.resampledata(data, timeframe=bt.TimeFrame.Minutes, compression=15)
        # cerebro.adddata(data) #if using resample, then comment this line of code
        # cerebro.addsizer(bt.sizers.PercentSizer, percents=80)
        cerebro.addsizer(bt.sizers.SizerFix, stake=1)
        cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name = 'SharpeRatio', timeframe=bt.TimeFrame.Minutes, riskfreerate=0.01)
        cerebro.addanalyzer(bt.analyzers.DrawDown, _name='DD')
        print("========================")
        print(f'<START> Brokerage account: ${round(cerebro.broker.getvalue(),2)}')
        results = cerebro.run(maxcpus=1)

        # Print the results
        print("========================")
        for res in results:
            res_ = res[0]
            print("--------------------------")
            print('Parameters:')
            print(res_.params.fast, res_.params.slow, res_.params.rumi)
            print('Sharpe Ratio:', res_.analyzers.SharpeRatio.get_analysis())
            print('Drawdown:', res_.analyzers.DD.get_analysis())

        # Find the result with the highest Sharpe Ratio
        print("========================")
        best_result = None
        best_sharpe = None
        best_dd = None

        for res in results:
            # based on drawdown
            dd = res[0].analyzers.DD.get_analysis()['drawdown']
            if best_dd is None or dd < best_dd:
                best_dd = dd
                best_result = res

            # # based on sharpe
            # sharpe = res[0].analyzers.SharpeRatio.get_analysis()['sharperatio']
            # if best_sharpe is None or sharpe > best_sharpe:
            # 	best_sharpe = sharpe
            # 	best_result = res
        # Print the best result
        print('Best Parameters:', best_result[0].params.fast, best_result[0].params.slow, best_result[0].params.rumi)
        # print('Best Sharpe:', best_sharpe)
        print('Best Drawdown:', best_result[0].analyzers.DD.get_analysis())



Single Backtest..
<START> Brokerage account: $1000000
2023-06-01, para_ma:20,                  bias_len:10,                  para_risk_ma:5,                  high_q:90,                 low_q:10,                 bias_thresh:0.05,                 ending value: 999992.0
<FINISH> Brokerage account: $999992.0
20 10 5 90 10 0.05


## Order History

In [None]:
for order in strat.order_history:
    print(order)

{'datetime': datetime.datetime(2023, 5, 18, 9, 33), 'price': 5056.4943, 'commission': 1.51694829, 'value': -5056.4943, 'size': -1, 'type': 'sell', 'ref': 1}
{'datetime': datetime.datetime(2023, 5, 18, 9, 52), 'price': 5049.5049, 'commission': 1.5148514699999998, 'value': -5056.4943, 'size': 1, 'type': 'buy', 'ref': 2}
{'datetime': datetime.datetime(2023, 5, 18, 9, 54), 'price': 5046.4953000000005, 'commission': 1.51394859, 'value': -5046.4953000000005, 'size': -1, 'type': 'sell', 'ref': 3}
{'datetime': datetime.datetime(2023, 5, 18, 9, 55), 'price': 5048.0, 'commission': 1.5144, 'value': -5046.4953000000005, 'size': 1, 'type': 'buy', 'ref': 4}
{'datetime': datetime.datetime(2023, 5, 18, 10, 4), 'price': 5047.4952, 'commission': 1.51424856, 'value': -5047.4952, 'size': -1, 'type': 'sell', 'ref': 5}
{'datetime': datetime.datetime(2023, 5, 18, 10, 5), 'price': 5049.0, 'commission': 1.5147, 'value': -5047.4952, 'size': 1, 'type': 'buy', 'ref': 6}
{'datetime': datetime.datetime(2023, 5, 18,

## Trade Analysis

In [None]:
for k, v in trades.items():
    if isinstance(v, dict):
        for k2, v2 in v.items():
            print(f"{k}:{k2} - {v2}")
    else:
        print(f"{k} - {v}")# new comments testing github

total:total - 206
total:open - 1
total:closed - 205
streak:won - AutoOrderedDict([('current', 0), ('longest', 2)])
streak:lost - AutoOrderedDict([('current', 4), ('longest', 31)])
pnl:gross - AutoOrderedDict([('total', -151.8168999999707), ('average', -0.7405702439022961)])
pnl:net - AutoOrderedDict([('total', -772.5620447499704), ('average', -3.768595340243758)])
won:total - 23
won:pnl - AutoOrderedDict([('total', 296.07581563000383), ('average', 12.8728615491306), ('max', 47.967201559999786)])
lost:total - 182
lost:pnl - AutoOrderedDict([('total', -1068.6378603799742), ('average', -5.871636595494364), ('max', -19.528251409999502)])
long:total - 0
long:pnl - AutoOrderedDict([('total', 0.0), ('average', 0.0), ('won', AutoOrderedDict([('total', 0.0), ('average', 0.0), ('max', 0.0)])), ('lost', AutoOrderedDict([('total', 0.0), ('average', 0.0), ('max', 0.0)]))])
long:won - 0
long:lost - 0
short:total - 205
short:pnl - AutoOrderedDict([('total', -772.5620447499704), ('average', -3.7685953

In [5]:
print("Total Trades:",trades['total']['total'],"| Still Opened Trades:",trades['total']['open'],"| Closed Trades:",trades['total']['closed'])
print("Streak Won - Current", trades['streak']['won']['current'], "| Streak Won - Longest", trades['streak']['won']['longest'])
print("Streak Lost Current", trades['streak']['lost']['current'], "| Streak Won Longest", trades['streak']['lost']['longest'])
print("Pnl Gross Total:", trades['pnl']['gross']['total'],"| Pnl Gross Avg:",trades['pnl']['gross']['average'])
print("Pnl Net Total:", trades['pnl']['net']['total'],"| Pnl Net Avg:",trades['pnl']['net']['average'])

Total Trades: 206 | Still Opened Trades: 1 | Closed Trades: 205
Streak Won - Current 0 | Streak Won - Longest 3
Streak Lost Current 1 | Streak Won Longest 11
Pnl Gross Total: -7.0 | Pnl Gross Avg: -0.03414634146341464
Pnl Net Total: -7.0 | Pnl Net Avg: -0.03414634146341464


After executing the Bias Strategy, the ending portfolio value gathers to 999,224.44, leading to a net profit of 79.29 (approximately).
There are 206 trades totally during the backtesting period.
Among these 206 trades, 205 trades are closed, and 1 remains open at the end of the backtesting period.
In addition, it's essentially imperative to consider that performance may vary with different datasets or historical data periods, rather than assumed to be consistent in every scenario. In addition, more further analysis or optimization may be required in order to improve the performance, if necessary.
