https://medium.com/@Jachowskii/how-to-build-a-backtesting-engine-in-python-using-pandas-bc8e532a9e95

In [None]:
import numpy as np
import pandas as pd
from humpback import *

In [93]:
pd.options.mode.copy_on_write = True

In [81]:
symbol = 'BTCUSDT'
commission = 0.002

In [None]:
data_raw_df = pd.read_csv(f'Binance Data/{symbol}.csv', index_col='index')
data_raw_df = data_raw_df.set_index(pd.DatetimeIndex(pd.to_datetime(data_raw_df.index)))

## Backtesting with Pandas DataFrame

In [None]:
def SignalGenerator(
    data: pd.DataFrame,
) -> pd.Series:
    """Generates a pd.Series of the same size as the data

    Args:
        data     (pd.DataFrame)   : OHLCV

    Returns:
        pd.Series: pd.Series of -1,0,1
            -1 refers to short signal
             0 refers to neutral signal
             1 refers to long signal
    """
    signal = np.zeros(len(data))

    sma20 = sma(data, window=20)
    sma200= sma(data, window=200)

    signal = np.where(sma20 > sma200, 1, -1)

    return pd.Series(signal, index=data.index)

In [33]:
def takeProfitStopLoss(
    close: float
):
    tp = close * 1.05
    sl = close *  .95
    
    return tp, sl

In [80]:
def placeLongMarketOrder(
    data: pd.DataFrame,
    timestep
):
    data.loc[timestep, 'Place Order'] = "Long Market Order"
    # tp, sl = takeProfitStopLoss(data.loc[timestep, 'Close'])
    # data.loc[timestep, 'Take Profit'] = tp
    # data.loc[timestep, 'Stop Loss'] = sl
    return data

In [85]:
def fulfillLongMarketOrder(
    data: pd.DataFrame,
    timestep,
    commission: float,
    size: float = .99,
):
    entry_price                       = data.loc[timestep, 'Open']
    data.loc[timestep, 'Entry Price'] = entry_price
    tp, sl                            = takeProfitStopLoss(entry_price)
    data.loc[timestep, 'Take Profit'] = tp
    data.loc[timestep, 'Stop Loss']   = sl

    cash                              = data.loc[timestep, 'Cash']

    # Allow trading a fraction
    amount                            = cash * size / (entry_price * (1 + commission))
    data.loc[timestep, 'Quantity BOP']= amount
    data.loc[timestep, 'Cash']       -= amount * (entry_price * (1 + commission))

    # Reset Order Status
    data.loc[timestep, 'Place Order'] = 0

    return data

In [102]:
def backtest(
    data: pd.DataFrame,
    signal: pd.Series,
    commission: float,
    cash: float = 1e7,
) -> pd.DataFrame:
    """_summary_

    Args:
        data (pd.DataFrame): OHLCV
        signal  (pd.Series): -1 for short, 1 for long
        cash          (int): initial cash amount

    Returns:
        pd.DataFrame
    """

    # Initialise result_df

    columns = [
        'Cash',
        'Signal',
        'Account Value BOP',
        'Quantity BOP',
        'Entry Price',
        'Take Profit',
        'Stop Loss',
        'Place Order',
        'Account Value EOP',
    ]

    result_df = pd.DataFrame(
        index=data.index,
        columns=columns
    )
    result_df = pd.merge(data, result_df, on='index', how='inner')
    result_df['Cash']   = cash
    result_df['Signal'] = signal
    result_df[[
        'Account Value BOP',
        'Quantity BOP',
        'Entry Price',
        'Take Profit',
        'Stop Loss',
        'Account Value EOP']] = 0.0

    result_df.loc[0,'Account Value BOP'] = cash


    # Loop through time
    prev_timestep = None

    for timestep in result_df.index:
        # Calculate Account Value BOP
        result_df.loc[timestep:, 'Account Value BOP'] = \
            result_df.loc[timestep, 'Cash'] + \
            result_df.loc[timestep, 'Quantity BOP'] * result_df.loc[timestep, 'Open']

        # Place Long Market Order
        if result_df.loc[timestep, 'Signal'] == 1 and result_df.loc[timestep, 'Quantity BOP'] == 0:
            result_df = placeLongMarketOrder(result_df, timestep)

        # Place Short Market Order
        # To be developed
        # Signal == -1
        # if result_df.loc[timestep, 'Signal'] == 1 and result_df.loc[timestep, 'Quantity BOP'] == 0:
        #     result_df = placeLongMarketOrder(result_df, timestep)

        # Fulfill placed market order
        if prev_timestep is not None:
            if result_df.loc[prev_timestep, 'Place Order'] != 0:
                # Fulfill Long Market Order
                if result_df.loc[prev_timestep, 'Place Order'] == 'Long Market Order':
                    result_df = fulfillLongMarketOrder(data=result_df, timestep=timestep, commission=commission)

        prev_timestep = timestep
        

    return result_df

In [26]:
signal = SignalGenerator(data_raw_df)

In [None]:
signal

In [103]:
result = backtest(data_raw_df, signal=signal, commission=commission)

In [101]:
result[result['Signal'] == 1]

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Cash,Signal,Account Value BOP,Quantity BOP,Entry Price,Take Profit,Stop Loss,Place Order,Account Value EOP
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
2020-01-08 23:00:00,8076.13,8125.57,8023.55,8055.98,2651.309243,10000000.0,1.0,0.0,0.000000,0.00,0.0000,0.0000,Long Market Order,0.0
2020-01-09 00:00:00,8054.72,8055.96,7928.00,7956.55,3517.430950,100000.0,1.0,0.0,1226.639724,8054.72,8457.4560,7651.9840,0,0.0
2020-01-09 01:00:00,7956.88,8037.37,7950.00,8004.80,2413.123473,10000000.0,1.0,0.0,0.000000,0.00,0.0000,0.0000,Long Market Order,0.0
2020-01-09 02:00:00,8004.05,8026.01,7980.00,7982.75,1579.537464,100000.0,1.0,0.0,1234.405023,8004.05,8404.2525,7603.8475,0,0.0
2020-01-09 03:00:00,7983.09,7993.46,7935.56,7954.72,2480.168472,10000000.0,1.0,0.0,0.000000,0.00,0.0000,0.0000,Long Market Order,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-12-28 11:00:00,49034.56,49240.08,48897.22,49200.39,1185.664460,10000000.0,1.0,0.0,0.000000,0.00,0.0000,0.0000,Long Market Order,0.0
2021-12-28 12:00:00,49200.39,49371.18,49077.64,49100.37,961.449550,100000.0,1.0,0.0,200.816285,49200.39,51660.4095,46740.3705,0,0.0
2021-12-28 13:00:00,49100.37,49284.00,48974.24,49024.57,1357.318110,10000000.0,1.0,0.0,0.000000,0.00,0.0000,0.0000,Long Market Order,0.0
2021-12-28 14:00:00,49024.56,49155.92,48301.88,49078.29,3094.231550,100000.0,1.0,0.0,201.536526,49024.56,51475.7880,46573.3320,0,0.0


## Reference

In [None]:
def SMA(array, period):
    return array.rolling(period).mean()

In [None]:
sma14 = SMA(data_raw_df['Close'], 14)    
sma200 = SMA(data_raw_df['Close'], 200)

In [None]:
def crossover(array1, array2):
    return array1 > array2

def crossunder(array1, array2):
    return array1 < array2

In [None]:
enter_rules = crossover(sma14, sma200)
exit_rules  = crossunder(sma14, sma200)

In [None]:
def marketposition_generator(
    dataset: pd.DataFrame,
    enter_rules: pd.Series,
    exit_rules : pd.Series) -> pd.Series:
    """
    Read enter_rules and exit_rules.
    Return a series of status
        -1: Short
        0 : No position (i.e. cash)
        1 : Long

    Args:
        dataset     (pd.DataFrame)
        enter_rules (pd.Series)   : pd.Series of Boolean. True = Buy
        exit_rules  (pd.Series)   : pd.Series of Boolean. True = Sell

    Returns:
        pd.Series: pd.Series of -1,0,1
    """
    
    # dataset['enter_rules'] = enter_rules
    # dataset['exit_rules'] = exit_rules
    
    status = 0
    mp = []
    for (i, j) in zip(enter_rules, exit_rules):
        # Check if is in a position
        if status == 0:
            # Check should go long       
            if i == 1 and j != -1:
                # Go long
                status = 1 
        else:
            if j == -1:
                # Close long position
                status = 0
        mp.append(status)
        
    dataset['mp'] = mp

    # Not sure if should shift by one period
    
    # dataset['mp'] = dataset['mp'].shift(1)
    # dataset.iloc[0,2] = 0
    
    return dataset['mp']

In [None]:
def apply_trading_system(
    dataset    : pd.DataFrame,
    direction  : str,
    order_type : str,
    # enter_level: pd.Series,
    enter_rules: pd.Series,
    exit_rules : pd.Series):
    """Backtest with a specified Strategy.

    Args:
        dataset     (pd.DataFrame) : OHLCV
        direction   (str)          : long/short
        order_type  (str)          : market/limit
        enter_level (pd.Series)    : Entry Price. Should be the Open price
        enter_rules (pd.Series)    : pd.Series of Boolean. True = Buy
        exit_rules  (pd.Series)    : pd.Series of Boolean. True = Sell

    Returns:
        _type_: _description_
    """

    # enter_rules and exit_rules = 1 or -1 regardless of position
    # enter_rules and exit_rules only depend on strategy (i.e. whether it is worth long/short)
    dataset['enter_rules'] = enter_rules.apply(lambda x: 1 if x else 0)
    dataset['exit_rules']  = exit_rules.apply(lambda x: -1 if x else 0)

    # mp should be a column of -1,0,1. Referring to short/cash/long position resp.
    dataset['mp'] = marketposition_generator(dataset['enter_rules'], dataset['exit_rules'])
    
    if order_type == "market":
        # If the strategy generates a price signal,
        # Set the entry price equal to the open price of the next period
        dataset['entry_price'] = np.where((dataset.mp.shift(1) == 0) & 
                                             (dataset.mp == 1), dataset.Open.shift(-1), np.nan)

        # Use all available cash to buy
        if INSTRUMENT == 1:
            dataset['number_of_stocks'] = np.where((dataset.mp.shift(1) == 0) & 
                                                     (dataset.mp == 1), OPERATION_MONEY / dataset.Open, np.nan)
    
    dataset['entry_price'] = dataset['entry_price'].fillna(method='ffill')
    
    if INSTRUMENT == 1:
        dataset['number_of_stocks'] = dataset['number_of_stocks']\
                                        .apply(lambda x: round(x, 0)).fillna(method='ffill')
    
    dataset['events_in'] = np.where((dataset.mp == 1) & (dataset.mp.shift(1) == 0), 'entry', '')
    
    # Calculate Gain/Loss
    if direction == 'long':
        if INSTRUMENT == 1:
            dataset['open_operations'] = (dataset.Close - dataset.entry_price) * dataset.number_of_stocks
            dataset['open_operations'] = np.where((dataset.mp == 1) & (dataset.mp.shift(-1) == 0),
                                                    (dataset.Open.shift(-1) - dataset.entry_price) * dataset.number_of_stocks - 2 * COSTS, 
                                                     dataset.open_operations)
    elif direction == 'short':
        if INSTRUMENT == 1:
            dataset['open_operations'] = (dataset.entry_price - dataset.Close) * dataset.number_of_stocks
            dataset['open_operations'] = np.where((dataset.mp == 1) & (dataset.mp.shift(-1) == 0),
                                                    (dataset.entry_price - dataset.Open.shift(-1)) * dataset.number_of_stocks - 2 * COSTS, 
                                                     dataset.open_operations)
            
    dataset['open_operations'] = np.where(dataset.mp == 1, dataset.open_operations, 0)
    dataset['events_out'] = np.where((dataset.mp == 1) & (dataset.exit_rules == -1), 'exit', '')
    dataset['operations'] = np.where((dataset.exit_rules == -1) & 
                                       (dataset.mp == 1), dataset.open_operations, np.nan)
    dataset['closed_equity'] = dataset.operations.fillna(0).cumsum()
    dataset['open_equity'] = dataset.closed_equity + dataset.open_operations - dataset.operations.fillna(0)
    
    dataset.to_csv('trading_system_export.csv')
    
    return dataset

In [None]:
trading_system = apply_trading_system(amzn, DIRECTION, ORDER_TYPE, ENTER_LEVEL, enter_rules, exit_rules)