# Backtesting on EMR with PySpark and Backtrader

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import datetime as dt
import pyspark.sql.functions as F
import pyspark.sql.types as T
import matplotlib.pyplot as plt

# Just a few days for testing
sDate = dt.datetime(2020, 1, 1)
eDate = dt.datetime(2020, 1, 7)

# df <- full dataframe
# df2 <- filtered dataframe

# df2 = ( df.filter(df.eventtype == "TRADE NB").filter( df.date.between(sDate, eDate) ).filter(df.ticker == "NFLX") ).select(['start','open','high','low','close','volume','vwap'])

In [None]:
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.analyzers as btanalyzers
from backtrader.feed import DataBase
from backtrader import date2num
from backtrader import TimeFrame
import os
import pytz
from pytz import timezone
import json
import time
import itertools
import datetime

# More documentation about backtrader: https://www.backtrader.com/

class AlgoStrategy():

    def __init__(self,strategy):
        self.cerebro = bt.Cerebro()
        strategy.init_broker(self.cerebro.broker)
        data=strategy.add_data(self.cerebro)
        strategy.data=data

        self.cerebro.addstrategy(strategy)

        self.portfolioStartValue=self.cerebro.broker.getvalue()
        self.cerebro.addanalyzer(btanalyzers.DrawDown, _name='dd')
        self.cerebro.addanalyzer(btanalyzers.SharpeRatio_A, _name='sharpe')
        self.cerebro.addanalyzer(btanalyzers.SQN, _name='sqn')
        self.cerebro.addanalyzer(btanalyzers.TradeAnalyzer, _name='ta')

    def performance(self):
        analyzer=self.thestrat.analyzers.ta.get_analysis()
        dd_analyzer=self.thestrat.analyzers.dd.get_analysis()

        #Get the results we are interested in
        total_open = analyzer.total.open
        total_closed = analyzer.total.closed
        total_won = analyzer.won.total
        total_lost = analyzer.lost.total
        win_streak = analyzer.streak.won.longest
        lose_streak = analyzer.streak.lost.longest
        pnl_net = round(analyzer.pnl.net.total,2)
        strike_rate=0
        if total_closed>0:
            strike_rate = (total_won / total_closed) * 100
        #Designate the rows
        h1 = ['Total Open', 'Total Closed', 'Total Won', 'Total Lost']
        h2 = ['Strike Rate','Win Streak', 'Losing Streak', 'PnL Net']
        h3 = ['DrawDown Pct','MoneyDown', '', '']
        self.total_closed=total_closed
        self.strike_rate=strike_rate
        self.max_drawdown=dd_analyzer.max.drawdown
        r1 = [total_open, total_closed,total_won,total_lost]
        r2 = [('%.2f%%' %(strike_rate)), win_streak, lose_streak, pnl_net]
        r3 = [('%.2f%%' %(dd_analyzer.max.drawdown)), dd_analyzer.max.moneydown, '', '']
        #Check which set of headers is the longest.
        header_length = max(len(h1),len(h2),len(h3))
        #Print the rows
        print_list = [h1,r1,h2,r2,h3,r3]
        row_format ="{:<15}" * (header_length + 1)
        print("Trade Analysis Results:")
        for row in print_list:
            print(row_format.format('',*row))

        analyzer=self.thestrat.analyzers.sqn.get_analysis()
        sharpe_analyzer=self.thestrat.analyzers.sharpe.get_analysis()
        self.sqn = analyzer.sqn
        self.sharpe_ratio = sharpe_analyzer['sharperatio']
        if self.sharpe_ratio is None:
            self.sharpe_ratio=0
        self.pnl = self.cerebro.broker.getvalue()-self.portfolioStartValue
        print('[SQN:%.2f, Sharpe Ratio:%.2f, Final Portfolio:%.2f, Total PnL:%.2f]' % (self.sqn,self.sharpe_ratio,self.cerebro.broker.getvalue(),self.pnl))

    def run(self):
        thestrats = self.cerebro.run()
        self.thestrat = thestrats[0]
        self.performance()

class MyFeed(DataBase):
    def __init__(self):
        super(MyFeed, self).__init__()
        self.list=testData.select("start", "open", "high", "low", "close", "volume", "vwap", "exponential_moving_average").collect()
        # AlgoSeek Columns:
        # Date,Ticker,TimeBarStart,OpenBarTime,OpenBidPrice,OpenBidSize,OpenAskPrice,OpenAskSize,FirstTradeTime,FirstTradePrice,FirstTradeSize,HighBidTime,HighBidPrice,HighBidSize,HighAskTime,HighAskPrice,HighAskSize,HighTradeTime,HighTradePrice,HighTradeSize,LowBidTime,LowBidPrice,LowBidSize,LowAskTime,LowAskPrice,LowAskSize,LowTradeTime,LowTradePrice,LowTradeSize,CloseBarTime,CloseBidPrice,CloseBidSize,CloseAskPrice,CloseAskSize,LastTradeTime,LastTradePrice,LastTradeSize,MinSpread,MaxSpread,CancelSize,VolumeWeightPrice,NBBOQuoteCount,TradeAtBid,TradeAtBidMid,TradeAtMid,TradeAtMidAsk,TradeAtAsk,TradeAtCrossOrLocked,Volume,TotalTrades,FinraVolume,FinraVolumeWeightPrice,UptickVolume,DowntickVolume,RepeatUptickVolume,RepeatDowntickVolume,UnknownTickVolume,TradeToMidVolWeight,TradeToMidVolWeightRelative,TimeWeightBid,TimeWeightAsk

        self.n=0

        self.fromdate=self.list[0]['start']
        self.todate=self.list[len(self.list)-1]['start']
        self.timeframe=bt.TimeFrame.Minutes
        print("from=%s,to=%s" % (self.fromdate,self.todate))

        self.m={}
        #print(self.list)

    def start(self):
        # Nothing to do for this data feed type
        pass

    def stop(self):
        # Nothing to do for this data feed type
        pass

    def _load(self):
        if self.n>=len(self.list):
            return False

        r=self.list[self.n]
        self.lines.datetime[0] = date2num(r['start'])

        self.lines.open[0] = r['open']
        self.lines.high[0] = r['high']
        self.lines.low[0] = r['low']
        self.lines.close[0] = r['close']
        self.lines.volume[0] = r['volume']
        self.m[r['start']]=r

        self.n=self.n+1
        return True

class StrategyTemplate(bt.Strategy):

    def __init__(self):
        self.lastDay=-1
        self.lastMonth=-1
        self.dataclose = self.datas[0].close

    @staticmethod
    def init_broker(broker):
        pass

    @staticmethod
    def add_data(cerebro):
        pass

    def next(self):
        dt=self.datas[0].datetime.datetime(0)
        #print("[NEXT]:%s:close=%s" % (dt,self.dataclose[0]))

        #SOM
        if self.lastMonth!=dt.month:
            if self.lastMonth!=-1:
                chg=self.broker.getvalue()-self.monthCash
                #print("[%s] SOM:chg=%.2f,cash=%.2f" % (dt,chg,self.broker.getvalue()))
            self.lastMonth=dt.month
            self.monthCash=self.broker.getvalue()

        #SOD
        if self.lastDay!=dt.day:
            self.lastDay=dt.day
            #print("[%s] SOD:cash=%.2f" % (dt,self.broker.getvalue()))


In [None]:
class AlgoSeekDataFeed(DataBase):
    def __init__(self):
        super(MyFeed, self).__init__()
        self.list=testData.select("start", "open", "high", "low", "close", "volume", "vwap", "exponential_moving_average").collect()
        # AlgoSeek Columns:
        # Date,Ticker,TimeBarStart,OpenBarTime,OpenBidPrice,OpenBidSize,OpenAskPrice,OpenAskSize,FirstTradeTime,FirstTradePrice,FirstTradeSize,HighBidTime,HighBidPrice,HighBidSize,HighAskTime,HighAskPrice,HighAskSize,HighTradeTime,HighTradePrice,HighTradeSize,LowBidTime,LowBidPrice,LowBidSize,LowAskTime,LowAskPrice,LowAskSize,LowTradeTime,LowTradePrice,LowTradeSize,CloseBarTime,CloseBidPrice,CloseBidSize,CloseAskPrice,CloseAskSize,LastTradeTime,LastTradePrice,LastTradeSize,MinSpread,MaxSpread,CancelSize,VolumeWeightPrice,NBBOQuoteCount,TradeAtBid,TradeAtBidMid,TradeAtMid,TradeAtMidAsk,TradeAtAsk,TradeAtCrossOrLocked,Volume,TotalTrades,FinraVolume,FinraVolumeWeightPrice,UptickVolume,DowntickVolume,RepeatUptickVolume,RepeatDowntickVolume,UnknownTickVolume,TradeToMidVolWeight,TradeToMidVolWeightRelative,TimeWeightBid,TimeWeightAsk

        self.n=0

        self.fromdate=self.list[0]['start']
        self.todate=self.list[len(self.list)-1]['start']
        self.timeframe=bt.TimeFrame.Minutes
        print("from=%s,to=%s" % (self.fromdate,self.todate))

        self.m={}
        #print(self.list)

    def start(self):
        # Nothing to do for this data feed type
        pass

    def stop(self):
        # Nothing to do for this data feed type
        pass

    def _load(self):
        if self.n>=len(self.list):
            return False

        r=self.list[self.n]
        self.lines.datetime[0] = date2num(r['start'])

        self.lines.open[0] = r['open']
        self.lines.high[0] = r['high']
        self.lines.low[0] = r['low']
        self.lines.close[0] = r['close']
        self.lines.volume[0] = r['volume']
        self.m[r['start']]=r

        self.n=self.n+1
        return True


In [None]:
class MyStrategy(StrategyTemplate):

    def __init__(self):  # Initiation
        super(MyStrategy, self).__init__()

    def init_broker(broker):
        broker.setcash(1000000.0)
        broker.setcommission(commission=0.0)

    def add_data(cerebro):
        data = MyFeed()
        cerebro.adddata(data)
        return data

    def next(self):  # Processing
        super(MyStrategy, self).next()
        dt=self.datas[0].datetime.datetime(0)
        r=self.data.m[dt]
        #print(r)
        size=self.cerebro.strat_params['size']
        threshold_PctChg=self.cerebro.strat_params['pct_chg']

        model=self.cerebro.strat_params['model']
        df=spark.createDataFrame([r])
        VWAP=r['vwap']
        predictedVWAP = model.transform(df).collect()[0]['prediction']
        expectedPctChg=(predictedVWAP-VWAP)/VWAP*100.0

        goLong=expectedPctChg>threshold_PctChg
        goShort=expectedPctChg<-threshold_PctChg
        #print("expectedPctChg=%s,goLong=%s,goShort=%s" % (expectedPctChg,goLong,goShort))

        if not self.position:
            if goLong:
                print("%s:%s x BUY @ %.2f" % (dt,size,r['close']))
                self.buy(size=size) # Go long
            else:
                print("%s:%s x SELL @ %.2f" % (dt,size,r['close']))
                self.sell(size=size) # Go short
        elif self.position.size>0 and goShort:
            print("%s:%s x SELL @ %.2f" % (dt,size*2,r['close']))
            self.sell(size=size*2)
        elif self.position.size<0 and goLong:
            print("%s:%s x BUY @ %.2f" % (dt,size*2,r['close']))
            self.buy(size=size*2)

In [None]:
scenarios=[]
for p in range(0,len(models)):
    for s in range(0,1):
        c={'scenario':(p+1), "size":100,"pct_chg":0.01,"model":models[p],'model_name':'model.%s' % (p+1)}
        print(c)
    scenarios.append(c)

In [None]:
# run scenarios
best_config=None
best_pnl=None
n=0

for c in scenarios:
    print("*** [%s] RUN SCENARIO:%s" % ((n+1),c))
    config=c
    algo=AlgoStrategy(MyStrategy)
    algo.cerebro.strat_params=config
    algo.run()
    if best_pnl is None or best_pnl<algo.pnl:
        best_config=c
        best_pnl=algo.pnl
    n+=1

In [None]:
# best scenario
print("*** BEST SCENARIO ***:%s" % best_config)
algo=AlgoStrategy(MyStrategy)
algo.cerebro.strat_params=best_config
algo.run()