In [1]:
import ira
%alphalab dark
import qlearn as q
from sklearn.base import TransformerMixin, BaseEstimator

from alpha.utils.tick_loaders import load_data, ls_data
from tqdm.auto import tqdm
import seaborn as sns

# Testing basics

<hr/>

QLearn gives possibility to backtest any type of signals even some manual cases. For example we could test manual portfolio trades

In [72]:
data = load_data(
    'NYSED:XOM', 'NYSED:BAC', 'NYSED:CL', 'NYSED:TGT', 'NYSED:RL', 'NASDAQD:MSFT'
) 

In [73]:
data

XOM (2005-01-03 00:00:00 / 2020-08-17 00:00:00 [4076] records)
BAC (2005-01-03 00:00:00 / 2020-08-17 00:00:00 [4076] records)
CL (2005-01-03 00:00:00 / 2020-08-17 00:00:00 [4076] records)
TGT (2005-01-03 00:00:00 / 2020-08-17 00:00:00 [4076] records)
RL (2005-01-03 00:00:00 / 2020-08-17 00:00:00 [4076] records)
MSFT (2005-01-03 00:00:00 / 2020-08-17 00:00:00 [4076] records)

In [None]:
fig(18, 5)
plt.plot(retain_columns_and_join(data.ticks('XOM','BAC','CL', 'TGT', 'RL', 'MSFT'), 'close'));


<hr/>
Make manual entries

In [None]:
manual_signals = pd.DataFrame.from_dict({
    '2015-01-02':  [+100, +100, +100, +100, +100, +100],
    '2020-01-01':  [   0,    0,    0,    0,    0,    0],
}, orient='index', columns=['XOM', 'BAC', 'CL', 'TGT', 'RL', 'MSFT'])
manual_signals.index = pd.DatetimeIndex(manual_signals.index)

manual_signals

In [None]:
manual_signals_2 = pd.DataFrame.from_dict({
    '2015-01-02':  [-100, -100, +100, +100, -100, +100],
    '2018-01-01':  [-200, -200, +200, +200, -200, +200],
    '2020-01-01':  [   0,    0,    0,    0,    0,    0],
}, orient='index', columns=['XOM', 'BAC', 'CL', 'TGT', 'RL', 'MSFT'])
manual_signals_2.index = pd.DatetimeIndex(manual_signals_2.index)

manual_signals_2


<hr/>
Run portfolio backtest

In [78]:
r = q.simulation({
        'Manual 1': manual_signals,
        'Manual 2': manual_signals_2,
    }, 
    data.ticks(), 'stock' )

HBox(children=(HTML(value='backtest'), FloatProgress(value=0.0, max=200.0), HTML(value='')))




Fast reports viewing

In [None]:
fig(16, 8)
r.report(5000)

More detailed reports statistics

In [None]:
tearsheet(r.results[0], 5000)

In [None]:
tearsheet(r.results[1], 5000)

In [None]:
r.results[1].portfolio

In [None]:
r.results[1].executions

# Automatic signals generation  

<hr/>

In [84]:
btc = load_data('BINANCEF:BTCUSDT')

In [86]:
ohlc1H = btc.ohlc('60Min')
btc1H = ohlc5M['BTCUSDT']

In [None]:
btc1H

In [88]:
ma = smooth(btc1H.close, 'kama', 250)

In [89]:
g = LookingGlass([
    btc1H, 
    ma
])

In [None]:
g.look('2022-02-01', '2022-02-10')

In [91]:
delta = btc1H.close - ma

In [None]:
fig(18, 7)
plt.plot(delta)

In [None]:
sns.kdeplot(delta[:'2021-01-01'], cut=1)
sns.kdeplot(delta['2021-01-01':], cut=1)

In [None]:
xd = scols(ma, delta, names=['K', 'd'])
fig(16,5)
sbp(12,1)
sns.kdeplot(xd[xd.K > xd.K.shift(1)].d)
sns.kdeplot(xd[xd.K < xd.K.shift(1)].d)
plt.xlim(-15000, +15000)

# sbp(12,2)
# sns.kdeplot(xd[xd.K > 1.001*xd.K.shift(1)].d)
# sns.kdeplot(xd[xd.K < 0.999*xd.K.shift(1)].d)
# plt.xlim(-15000, +15000)

In [None]:
xd = scols(ma, delta, names=['K', 'd'])['2021-01-01':]

sns.kdeplot(xd[xd.K > 1.001*xd.K.shift(1)].d)
sns.kdeplot(xd[xd.K < 0.999*xd.K.shift(1)].d)
plt.xlim(-15000, +15000)

<hr/>

Now let's try to generate signals based on following logic:
1. Buy when close price crosses MA up and `MA[t] > (1 + f) * MA[t-1]`
2. Sell when close price crosses MA down and `MA[t] < (1 - f) * MA[t-1]`

<hr/>

In [116]:
F = 0.01
PERIOD = 250

ma = smooth(btc1H.close, 'kama', PERIOD)

delta = btc5M.close - ma

xd = scols(ma,  delta,  btc1H.close,  names=['K', 'd', 'close'])

longs = pd.Series(
    +1, index=xd[(xd.K > (1 + F/100)*xd.K.shift(1)) & (xd.close > xd.K) & (xd.close.shift(1) < xd.K.shift(1))].index
)

shorts = pd.Series(
    -1, index=xd[(xd.K < (1 - F/100)*xd.K.shift(1)) & (xd.close < xd.K) & (xd.close.shift(1) > xd.K.shift(1))].index
)

# we shift signals to end's of bar
signals = shift_signals(
    srows(longs, shorts).rename('BTCUSDT'), '59Min59S'
)

In [None]:
signals

In [118]:
r_ma = q.simulation({
    
    # Simple reverse systems
    'MA trader': 1000 * signals['2021':],
    
    # Fixed risk managenement: 3% take and 1.5% lose
#     'MA trader RM': [signals['2021':], q.FixedPctTrader(1000, 3/100, 1.5/100) ], 
    
    # Fixed risk managenement + close positions by time (after 3 Hours)
#     'MA trader RM + time stop': [signals['2021':], q.PipelineTracker(
#         q.FixedPctTrader(1000, 3/100, 1.5/100), q.TimeExpirationTracker('3h'))
#     ]
    
    },
    
    btc.ohlc('5Min'),  # data for testing (we use 5Min frame)
    'binance_um_vip0', # broker - it allows to calculate commissions etc
)

HBox(children=(HTML(value='backtest'), FloatProgress(value=0.0, max=300.0), HTML(value='')))




In [None]:
r_ma.results[1].trackers_stat

In [None]:
fig(18, 6)
sbp(21,1); r_ma.report(1000)
sbp(21,2); 
plt.plot(btc1H.close['2021':])

In [None]:
tearsheet(r_ma.results[0], 1000)

## Make generator

Now we put all code we wrote above into signal generator.

In [126]:
@q.signal_generator                     # - 1: class must be decorated as signal_generator
class CrossMovingEntry(BaseEstimator):  # - 2: class extends BaseEstimator
    
    def __init__(self, timeframe, period, step):
        self.timeframe = timeframe
        self.period = period
        self.step = step

    def fit(self, X, y, **fit_params):   # - 3: class override fit method (for ML applications)
        return self

    def predict(self, x):                # - 4: class override predict method (contains signal generation logic)
        # get price
        # self.market_info_.column - contains applicated price column (close by default)
        applied_column = self.market_info_.column
        price = ohlc_resample(x, self.timeframe)[applied_column]
        
        # price moving average
        ma = smooth(price, 'kama', self.period)
       
        # delta
        delta = price - ma

        # combined frame
        xd = scols(ma, delta, price, names=['K', 'd', 'price'])
        
        # longs signals 
        longs = pd.Series(
            +1, index=xd[(xd.K > (1 + self.step/100)*xd.K.shift(1)) & (xd.price > xd.K) & (xd.price.shift(1) < xd.K.shift(1))].index
        )

        # shorts signals 
        shorts = pd.Series(
            -1, index=xd[(xd.K < (1 - self.step/100)*xd.K.shift(1)) & (xd.price < xd.K) & (xd.price.shift(1) > xd.K.shift(1))].index
        )

        # collect signals into single series
        signals = srows(longs, shorts)
        
        # adjust to bar's end for 'close' case
        if applied_column == 'close':
            signals = q.shift_for_timeframe(signals, x, self.timeframe)
        
        return signals

Make generator: here we use single instrument selector (each instrument in data would be processed one by one)

In [128]:
gen = q.SingleInstrumentComposer( CrossMovingEntry('1H', 250, 0.01) ).fit(btc.ticks())

In [None]:
r_ma_gen = q.simulation({
    
    'MA trader': [gen, q.FixedTrader(1000, 0, 0)],
    
    'MA trader RM': [gen, q.FixedPctTrader(1000, 3/100, 1.5/100) ], 
    
    'MA trader RM + time stop': [gen, q.PipelineTracker(
        q.FixedPctTrader(1000, 3/100, 1.5/100), q.TimeExpirationTracker('3h'))
    ]
    
    },
    btc.ohlc('5Min'), 'binance_um_vip0',
    start='2021-01-01'
)

In [None]:
r_ma_gen.report(1000)

# Custom tracker

In [5]:
from ira.simulator.SignalTester import Tracker 

In [65]:
class SimpleFixedRiskManagementTracker(Tracker):
    
    def __init__(self, trading_size, risk_change_step):
        self.trading_size = trading_size
        self.risk_change_step = risk_change_step
        self.prev_pl = 0
        
        # just a statistics
        self.pos_increased = 0
        self.pos_decreased = 0
        
    def initialize(self):
        """
        Init method is called on tracker start
        """
        self.pos_increased = 0
        self.pos_decreased = 0
        # self.ohlc = self.get_ohlc_series(self.timeframe)

    def on_signal(self, signal_time, signal_qty, quote_time, bid, ask, bid_size, ask_size):
        """
        This method is called on new signal from signal's generator
    
        We can adjust position size and if signal is processed or cancelled
        """
        current_pnl = self._position.pnl - self.prev_pl
        
        n_x_factor = abs(current_pnl) // self.risk_change_step + 1
        position_size = self.trading_size
        
        if current_pnl < 0:
            position_size = position_size / n_x_factor
            self.pos_decreased += 1
            
        elif current_pnl > 0:
            position_size = position_size * n_x_factor
            self.pos_increased += 1
            
        # previous PnL
        self.prev_pl = self._position.pnl
        
        return np.sign(signal_qty) * position_size

    def update_market_data(self, instrument: str, quote_time, bid, ask, bid_size, ask_size, is_service_quote, **kwargs):
        # New market data update
        super().update_market_data(instrument, quote_time, bid, ask, bid_size, ask_size, is_service_quote, **kwargs)

    def statistics(self) -> Dict:
        return {
            'positions_incr': self.pos_increased,
            'positions_decr': self.pos_decreased,
        }

In [None]:
gen = q.SingleInstrumentComposer(CrossMovingEntry('1H', 250, 0.01)).fit(btc.ticks())

r_ma_gen_track = q.simulation({
    
    'Raw signals': [gen, q.FixedTrader(1000, 0, 0)],
    
    'My Tracker': [gen, SimpleFixedRiskManagementTracker(1000, 50)],
    
    },
    btc.ohlc('5Min'), 'binance_um_vip0',
    start='2021-01-01'
)

In [None]:
r_ma_gen_track.report(2000)

In [None]:
tearsheet(r_ma_gen_track.results[0], 2000)

In [None]:
r_ma_gen_track.results[1].trackers_stat