In [1]:
# import
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from ipywidgets import interactive, HBox, VBox
from numba import njit

In [2]:
T = np.load('tick.npz')['arr_0'].view(np.recarray)
len(T)

889360

In [3]:
default_fee = 0.02
fp32 = np.float32

@njit(nogil=True)
def vwap(priceA: np.array, volumeA: np.array, period: int, destA: np.array = None) -> np.ndarray:
    turnover = 0.
    size = 0.
    sizeA = np.abs(volumeA)

    if destA is None:
        destA = np.zeros(len(priceA))

    for i in range(period):
        size += sizeA[i]
        turnover += sizeA[i] * priceA[i]

    for i in range(period, len(priceA)):
        k = i - period
        size += sizeA[i] - sizeA[k]
        turnover += sizeA[i] * priceA[i] - sizeA[k] * priceA[k]
        destA[i] = turnover / size

    destA[:period] = destA[period]
    return destA


@njit(nogil=True)
def backtestLimit(PriceA, qA, qB, fee_percent=default_fee) -> list:
    """Vectorized backtester for limit order strategies"""

    buys = [(int(x), fp32(x)) for x in range(0)]
    sells = [(int(x), fp32(x)) for x in range(0)]
    trades = [(int(x), fp32(x), int(x), fp32(x), int(x), fp32(x), fp32(x)) for x in range(0)]

    pos: int = 0

    for i in range(len(PriceA) - 1):
        price = PriceA[i]

        if price > qA[i]:
            delta_pos = -min(pos + 1, 1)
        elif price < qB[i]:
            delta_pos = min(1 - pos, 1)
        else:
            delta_pos = 0

        k = i + 1
        if delta_pos > 0:
            buys.append((k, qB[k]))
        elif delta_pos < 0: 
            sells.append((k, qA[k]))

        if len(sells) > 0 and len(buys) > 0:
            k_buy, buy = buys.pop(0)
            k_sell, sell = sells.pop(0)
            d_rawPnL = sell - buy
            fee = fee_percent / 100 * (sell + buy)
            d_PnL = d_rawPnL - fee
            if delta_pos < 0:
                trades.append((k_buy, buy, k_sell, sell, -delta_pos, d_PnL, fee))
            else:
                trades.append((k_sell, sell, k_buy, buy, -delta_pos, d_PnL, fee))

        pos += delta_pos

    return trades


def npBacktestLimit(PriceA, qA, qB, fee_percent=default_fee) -> np.ndarray:
    """Converts trades from the limit-backtester to structured array"""

    trades = backtestLimit(PriceA, qA, qB, fee_percent=fee_percent)
    TPairTrade = [('X0', int), ('Price0', float), ('X1', int), ('Price1', float),
                  ('Size', float), ('Profit', float), ('Fee', float)]
    return np.array(trades, dtype=TPairTrade).view(np.recarray)

In [4]:
# trade model func
def model(Period: int, StdDev: float):
    center = vwap(T.PriceA, T.VolumeA, Period)
    std = pd.Series(T.PriceA).rolling(Period).std().values
    qA = center + std*StdDev
    qB = center - std*StdDev

    trades = npBacktestLimit(T.PriceA, qA, qB)
    return center, qA, qB, trades

In [5]:
# create widget
fig = go.FigureWidget(make_subplots(specs=[[{"secondary_y": True}]]))
fig.update_layout(autosize=True, height=700, template='plotly_white', legend=dict(x=0.1, y=1, orientation="h"), 
                  margin=dict(l=45, r=5, b=10, t=30, pad=3), xaxis_rangeslider_visible=False, yaxis2_showgrid=False)

r = len(T.DateTimeA) // 1000
x = np.array(range(len(T.DateTimeA[::r])))*r

fig.add_scattergl(y=T.PriceA[::r], x=x, line_color='gray', name='Trade', opacity=0.25)
fig.add_scattergl(x=x, line_color='blue', name='center', opacity=0.4)
fig.add_scattergl(x=x, line_color='red', name='QAsk', opacity=0.25, line_dash='dot')
fig.add_scattergl(x=x, line_color='green', name='QBid', opacity=0.25, line_dash='dot')

# trades
fig.add_scattergl(mode='markers', name='Buy', marker=dict(color='green', line_color="darkgreen", line_width=1, symbol='triangle-up', size=10))
fig.add_scattergl(mode='markers', name='Sell', marker=dict(color='red', line_color="darkred", line_width=1, symbol='triangle-down', size=10))

# profit w/fee
fig.add_scattergl(mode='lines', line_color='black', line_width=8, name='Profit', opacity=0.1, secondary_y=True, line_shape='hv');

In [6]:
# interactive mode
def update(Period, StdDev):
    center, qA, qB, trades = model(Period, StdDev)
    
    LongEntry = trades[['X0', 'Price0']][trades.Size > 0]
    LongExit = trades[['X1', 'Price1']][trades.Size < 0]
    ShortEntry = trades[['X0', 'Price0']][trades.Size < 0]
    ShortExit = trades[['X1', 'Price1']][trades.Size > 0]
    
    with fig.batch_update():
        fig.data[1].y = center[::r]
        fig.data[2].y = qA[::r]
        fig.data[3].y = qB[::r]
        
        fig.data[4].x = np.concatenate((LongEntry.X0, LongExit.X1))
        fig.data[4].y = np.concatenate((LongEntry.Price0, LongExit.Price1))

        fig.data[5].x = np.concatenate((ShortEntry.X0, ShortExit.X1))
        fig.data[5].y = np.concatenate((ShortEntry.Price0, ShortExit.Price1))
        
        fig.data[6].x = trades.X1
        fig.data[6].y = trades.Profit.cumsum() + trades.Fee.cumsum()

widget = interactive(update, Period=(1000, 60000, 1000), StdDev=(1, 4, 0.1))

param = {w.description: w.value for w in widget.children[:-1]}
update(**param)

VBox([HBox(widget.children), fig])

VBox(children=(HBox(children=(IntSlider(value=30000, description='Period', max=60000, min=1000, step=1000), Fl…