# Infra

### Import

In [1]:
import logging as log
import numpy as np
import pandas as pd
from datetime import datetime as dt
from datetime import time

import concurrent.futures as fut

log.basicConfig(level=log.DEBUG, filename='trade_log.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
# log.basicConfig(level=log.DEBUG)

### Order

In [2]:
class Order():
    def __init__(self, ins, qty, dir, price, time, remark, sl_prct=0, tgt_prct=0, tsl_prct=0, exchange='NSE'):
        log.debug('Order constructor-timestamp:'+str(time)+'-dir:'+dir)
        self.ins = ins
        self.qty = qty
        self.dir = dir
        self.price = price
        self.time = time
        self.remark = remark
        self.sl_prct = sl_prct
        self.tgt_prct = tgt_prct
        self.tsl_prct = tsl_prct
        self.exchange = exchange

### Trade

In [3]:
class Trade():
    def __init__(self):
        log.debug('Trade constructor')
        self.ins = ''
        self.qty = 0
        self.side = 0
        self.exchange = ''
        self.entry_price = -1
        self.entry_time = -1
        self.entry_remark = ''
        self.exit_price = -1
        self.exit_time = ''
        self.exit_remark = ''
        self.holding_period = 0
        # self.cum_pnl = 0
        self.pnl = 0
        self.trade_high = -1
        self.trade_low = -1
        # self.pnl_prct = 0
    
    def to_dict(self):
        return {
            'ins': self.ins,
            'qty': self.qty,
            'side': self.side,
            'exchange': self.exchange,
            'entry_price': self.entry_price,
            'entry_time': self.entry_time,
            'entry_remark': self.entry_remark,
            'exit_price': self.exit_price,
            'exit_time': self.exit_time,
            'exit_remark': self.exit_remark,
            'holding_period': self.holding_period,
            'pnl': self.pnl,
            # 'pnl_prct': self.pnl_prct,
            # 'cum_pnl': self.cum_pnl,
            'trade_high': self.trade_high,
            'trade_low': self.trade_low,
            'cost': self.cost,
        }

    def entry_trade(self, order):
        log.debug('Trade-entry_trade' + '-dir:' + order.dir)
        self.ins = order.ins
        self.qty = order.qty
        if order.dir == 'BUY':
            self.side = 'LONG'
        else:
            self.side = 'SHORT'
        self.exchange = order.exchange
        self.entry_price = order.price
        self.trade_high = order.price
        self.trade_low = order.price
        self.entry_time = order.time
        self.entry_remark = order.remark
        self.compute_cost()

    def compute_cost(self):
        self.cost = (self.qty * self.entry_price) * 0.0002
    
    def calc_pnl(self):
        if self.side == 'LONG':
            self.pnl = (self.exit_price - self.entry_price) * self.qty
            log.debug('Trade-calc_pnl-LONG-pnl:'+str(self.pnl))
        else:
            self.pnl = (self.entry_price - self.exit_price) * self.qty
            log.debug('Trade-calc_pnl-SHORT-pnl:'+str(self.pnl))
        # self.pnl_prct = self.pnl/self.entry_price*100
    
    def exit_trade(self, order):
        log.debug('Trade-exit_trade' + '-dir:' + order.dir)
        if order.price > self.trade_high:
            self.trade_high = order.price
        elif order.price < self.trade_low:
            self.trade_low = order.price
        self.exit_price = order.price
        self.exit_time = order.time
        self.exit_remark = order.remark
        self.calc_pnl()
        return True

### Position

In [4]:
class Position():
    def __init__(self, ins):
        log.debug('Position constructor ins:' + ins)
        self.ins = ins
        self.cost = 0
        self.last_traded_price = 0
        self.last_traded_time = 0
        # self.trade_count = 0
        self.pnl = 0
        self.current_trade = Trade()
        global trade_hist
        self.sl_prct = 0
        self.tgt_prct = 0
        self.tsl_prct = 1
        self.sl = 0
        self.curr_price = 0
        self.target_hit = False
        self.sl_hit = False
        self.prev_closed_trade = None
        # self.trade_list = list()


    def place_order(self, order):
        # self.trade_count += 1
        ret_obj = False
        log.debug('Position-place_order-ins:'+self.ins)
        self.curr_price = order.price
        if order.ins == self.ins:
            log.debug('Position-place_order-Taking %s Trade in %s of %s at %s', order.dir, order.ins, order.qty, order.price)
            if self.current_trade.qty == 0:
                log.debug('Position-place_order-ENTRY')
                self.current_trade.entry_trade(order)
                self.sl_prct = order.sl_prct
                if self.current_trade.side == 'LONG':
                  # print(type(self.sl_prct), self.sl_prct, order.sl_prct)
                  log.debug('Position-place_order-side:LONG')
                  self.sl = order.price*(1-self.sl_prct)

                elif self.current_trade.side == 'SHORT':
                  log.debug('Position-place_order-side:SHORT')
                  self.sl = order.price*(1+self.sl_prct)
                self.tgt_prct = order.tgt_prct
                self.tsl_prct = order.tsl_prct
            else:
                log.debug('Position-place_order-EXIT')
                if self.current_trade.exit_trade(order):
                    self.cost += self.current_trade.cost
                    self.pnl += self.current_trade.pnl
                    # self.trade_list.append(self.current_trade)
                    # trade_hist.append(self.current_trade)
                    log.debug(self.current_trade.to_dict())
                    self.prev_closed_trade = self.current_trade
                    # log.debug('Position-place_order-Append trade to trade_hist')
                    log.debug('Position-place_order-Trade Closed')
                    self.current_trade = Trade()
                    # self.sl_prct = 0
                    # self.tgt_prct = 0
                    # self.tsl_prct = 100
                    self.target_hit = False
                    self.sl_hit = False
            self.last_traded_price = order.price
            self.last_traded_time = order.time
            return ret_obj
        else:
            log.critical('Position-place_order-Cant update since order_ins: %s pos_ins: %s'%(order.ins, self.ins))


    def mark_position(self, market_data):
        new_close = market_data['Close']
        log.debug('Position-mark_position-close:'+str(new_close)+'-ins:'+self.ins)
        self.current_trade.holding_period += 1
        if self.current_trade.side == 'LONG':
            if self.curr_price < new_close:
                # check for target hit
                if new_close > self.current_trade.entry_price*(1+self.tgt_prct):
                    log.debug('Position-mark_position-Target Hit-target:'+str(self.current_trade.entry_price*(1+self.tgt_prct)))
                    self.target_hit = True
                # update tsl
                elif (new_close - self.curr_price )/self.curr_price > self.tsl_prct:
                    log.debug('Position-mark_position-Update SL')
                    self.update_sl(new_close)
            else:
                if new_close < self.sl:
                    self.sl_hit = True
                    log.debug('Position-mark_position-SL Hit-SL:'+str(self.sl))
        elif self.current_trade.side == 'SHORT':
            if self.curr_price < new_close:
                # check for sl hit
                if new_close > self.sl:
                    self.sl_hit = True
                    log.debug('Position-mark_position-SL Hit-SL:'+str(self.sl))
            else:
                # check for target hit
                if new_close < self.current_trade.entry_price*(1-self.tgt_prct):
                    log.debug('Position-mark_position-Target Hit-target:'+str(self.current_trade.entry_price*(1-self.tgt_prct)))
                    self.target_hit = True
                # update tsl
                elif (self.curr_price - new_close)/self.curr_price > self.tsl_prct:
                    log.debug('Position-mark_position-Update SL')
                    self.update_sl(new_close)
        self.curr_price  = new_close
        
        new_high = market_data['High']
        new_low = market_data['Low']
        if new_high > self.current_trade.trade_high:
            self.current_trade.trade_high = new_high
        elif new_low < self.current_trade .trade_low:
            self.current_trade .trade_low = new_low

    
    def update_sl(self, new_close):
        if self.current_trade.side == 'LONG':
            self.sl = new_close*(1-self.sl_prct)
        if self.current_trade.side == 'SHORT':
            self.sl = new_close*(1+self.sl_prct)
        log.debug('Position-update_sl-NEW SL:'+str(self.sl))


### Portfolio

In [5]:
class Portfolio():
    def __init__(self):
        log.debug('Portfolio constructor')
        self.pos_map = dict()
        self.trade_hist = [] 

    @property
    def instrument_list(self):
        return list(self.pos_map.keys())

    def has_active_position(self, ins):
      if not ins in self.instrument_list:
        return False
      if (self.pos_map[ins].current_trade.qty == 0):
        return False
      return True

    def place_order(self, order):
        if not order.ins in self.instrument_list:
            self.pos_map[order.ins] = Position(order.ins)
        self.pos_map[order.ins].place_order(order)
        if not self.has_active_position(order.ins):
            self.trade_hist.append(self.pos_map[order.ins].prev_closed_trade)

    @property
    def trade_list(self):
        all_trades = list()
        for ins, pos in self.pos_map.items():
            all_trades.extend(pos.trade_list)
        return all_trades

    @property
    def cost(self):
        cost = 0
        for ins, pos in self.pos_map.items():
            cost += pos.cost
        return cost

    @property
    def pnl(self):
        pnl = 0
        for ins, pos in self.pos_map.items():
            pnl += pos.pnl
        return pnl

# Strategy

In [6]:
class Strategy():
    def __init__(self, trade_cap, EMA1_span, EMA2_span, RSI_span, sl_prct, tgt_prct, tsl_prct, ds):
        log.debug('Portfolio constructor')
        self.trade_cap = trade_cap
        self.EMA1_span = EMA1_span
        self.EMA2_span = EMA2_span
        self.RSI_span = RSI_span
        self.sl_prct = sl_prct
        self.tgt_prct = tgt_prct
        self.tsl_prct = tsl_prct
        self.pf = Portfolio()
        self.dataset = ds
        self.df = pd.DataFrame()
        self.ins_list = []
        self.trade_data = pd.DataFrame()
        self.data_year = {}
        # self.trade_hist = list()
        self.prep_data()
        # self.run()

    # def upload_data(self, ds):
    #     self.dataset = ds
    #     self.prep_data()


    def calc_rsi(self, close, window):
        delta = close.diff()
        up_days = delta.copy()
        up_days[delta<=0]=0.0
        down_days = abs(delta.copy())
        down_days[delta>0]=0.0
        RS_up = up_days.rolling(window).mean()
        RS_down = down_days.rolling(window).mean()
        rsi= 100-100/(1+RS_up/RS_down)
        return rsi

    def prep_data(self):
        if self.dataset is not None:
            for key in self.dataset.keys():
                df = self.dataset[key]
                # print(self.dataset[key])
                self.data_year[df['Ticker'][0]] = (df.index.max().date()-df.index.min().date()).days/365
                # calc_ema()
                df['EMA1'] = df['Close'].ewm(span=self.EMA1_span, adjust=False).mean()
                df['EMA1_prev'] = df['EMA1'].shift(1)
                df['EMA2'] = df['Close'].ewm(span=self.EMA2_span, adjust=False).mean()
                df['EMA2_prev'] = df['EMA2'].shift(1)
                # calc_rsi()
                df['RSI'] = self.calc_rsi(df.Close, self.RSI_span)
                # df['RSI_prev'] = df['RSI'].shift(1)
                df.dropna(inplace=True)
                self.dataset[key] = df
                del df
            self.df = pd.concat(self.dataset.values())
            self.df = self.df.sort_index()
            self.ins_list = self.df['Ticker'].unique()
            self.df = self.df.reset_index().set_index(['DateTime', 'Ticker'])
    
    def get_data_year(self, ins):
        return self.data_year.get(ins)

    def gen_signal(self, row):
        # crosserver
        if (row['EMA1_prev'] < row['EMA2_prev']) and (row['EMA1'] > row['EMA2']) and row['RSI'] > 60:
            # print('LONG Trade '+ str(row['EMA1_prev'] < row['EMA2_prev']) + str(row['EMA1'] > row['EMA2']), row['EMA1_prev'] ,row['EMA2_prev'], row['EMA1'], row['EMA2'])
            return 'LONG'
        elif (row['EMA1_prev'] > row['EMA2_prev']) and (row['EMA1'] < row['EMA2']) and row['RSI'] < 40:
            # print('SHORT Trade '+ str(row['EMA1_prev'] > row['EMA2_prev']) + str(row['EMA1'] < row['EMA2']), row['EMA1_prev'] ,row['EMA2_prev'], row['EMA1'], row['EMA2'])
            return 'SHORT'
        return None
    
    def run(self):
        for idx in self.df.index:
            ts = idx[0]
            ins = idx[1]
            row = self.df.loc[idx]
            entry_cond1 = ts.time() < time(14, 45) and ts.time() > time(9, 30)
            if self.pf.has_active_position(ins):
                self.pf.pos_map[ins].mark_position(row)
                qty = self.pf.pos_map[ins].current_trade.qty
                # check for reverse signal
                side = self.gen_signal(row)
                reverse_cond = side is not None
                # check for SL condition
                sl_cond = self.pf.pos_map[ins].sl_hit
                # check for TARGET condition
                tgt_cond = self.pf.pos_map[ins].target_hit
                time_cond = ts.time() >= time(15, 15)
                if sl_cond or tgt_cond or time_cond or reverse_cond:
                    log.debug('Strategy-MainLoop-timestamp:'+str(ts)+' Squaring Off Position')
                    dir = ''
                    if self.pf.pos_map[ins].current_trade.side == 'LONG':
                        dir = 'SELL'
                    else:
                        dir = 'BUY'
                    ex_remark = ''
                    if sl_cond: ex_remark = "SL HIT"
                    if tgt_cond: ex_remark = "TARGET HIT"
                    if time_cond: ex_remark = "TIME HIT"
                    if reverse_cond: ex_remark = "REVERSE SIGNAL"
                    ord = Order(ins, qty, dir, row['Close'], ts, ex_remark)
                    self.pf.place_order(ord)
                    if reverse_cond and entry_cond1:
                        log.debug('Strategy-MainLoop-timestamp:'+str(ts)+' Reverse Position')
                        qty = int(self.trade_cap//row['Close'])
                        if (side == 'LONG'):
                            dir = 'BUY'
                        else:
                            dir = 'SELL'
                        ord = Order(ins, qty, dir, row['Close'], ts, "Ent cond", self.sl_prct, self.tgt_prct, self.tsl_prct)
                        self.pf.place_order(ord)
                        # print(pf.pos_map[ins].current_trade.to_dict())
            else:
                if entry_cond1:
                    side = self.gen_signal(row)
                    dir = ''
                    if side is not None:
                        log.debug('Strategy-MainLoop-timestamp:'+str(ts)+' Signal to Enter')
                        qty = int(self.trade_cap//row['Close'])
                        # print(sl_prct)
                        if (side == 'LONG'):
                            dir = 'BUY'
                        else:
                            dir = 'SELL'
                        ord = Order(ins, qty, dir, row['Close'], ts, "Ent cond", self.sl_prct, self.tgt_prct, self.tsl_prct)
                        # print(ts, dir)
                        self.pf.place_order(ord)
        return self.gen_trade_report()

    
    def gen_trade_report(self):
        trades_df = pd.DataFrame.from_records([t.to_dict() for t in self.pf.trade_hist])
        trades_df['change%'] = (trades_df['entry_price']-trades_df['exit_price'])/trades_df['entry_price']*100
        trades_df['Turnover'] = trades_df['qty']*trades_df['entry_price']
        # trades_df['pnl%'] = trades_df['pnl']/trades_df['Turnover']
        trades_df['pnl%'] = trades_df['pnl']/self.trade_cap*100
        trades_df['cum_pnl'] = trades_df['pnl'].cumsum()
        trades_df['mfe%'] = trades_df.apply(lambda x: (x['trade_high']-x['entry_price'])/x['entry_price']*100 if(x['side']=='LONG') else (x['trade_low'] -x['entry_price'])/x['entry_price']*100, axis=1)
        trades_df['mae%'] = trades_df.apply(lambda x: (x['trade_low'] -x['entry_price'])/x['entry_price']*100 if(x['side']=='LONG') else (x['trade_high']-x['entry_price'])/x['entry_price']*100, axis=1)
        trades_df['HighValue'] = trades_df['cum_pnl'].cummax()
        trades_df['DD'] = trades_df['cum_pnl'] - trades_df['HighValue']
        trades_df['MaxDD'] = trades_df['DD'].cummin()
        trade_report_name = 'Trade_Report_EMA'+str(self.EMA1_span)+'_EMA'+str(self.EMA2_span)+'_RSI'+str(self.RSI_span)+'_SL'+str(self.sl_prct)+'_TGT'+str(self.tgt_prct)+'_TSL'+str(self.tsl_prct)+'.csv'
        trades_df.to_csv(trade_report_name)
        self.trade_data = trades_df
        self.summary_ins_lvl()
        return self.strategy_summary()

    def summary_ins_lvl(self):
        summ = self.trade_data.groupby('ins').agg(
            pnl = pd.NamedAgg(column="pnl", aggfunc="sum"),
            trades = pd.NamedAgg(column="ins", aggfunc="count"),
            win = pd.NamedAgg(column="pnl", aggfunc=(lambda x: x[x>0].count())),
            loss = pd.NamedAgg(column="pnl", aggfunc=(lambda x: x[x<=0].count())),
            MaxDD = pd.NamedAgg(column="pnl", aggfunc=(lambda x: (x - x.cumsum().max()).min())),
            data_year = pd.NamedAgg(column="ins", aggfunc=(lambda x: self.get_data_year(x.iloc[0]))),
        )
        summ['PnL/trade'] = summ['pnl']/summ['trades']
        summ['RRR'] = (summ['pnl']/summ['data_year'])/abs(summ['MaxDD'])
        summ_report_name = 'Summary_EMA'+str(self.EMA1_span)+'_EMA'+str(self.EMA2_span)+'_RSI'+str(self.RSI_span)+'_SL'+str(self.sl_prct)+'_TGT'+str(self.tgt_prct)+'_TSL'+str(self.tsl_prct)+'.csv'
        summ.to_csv(summ_report_name)
        # return summ
    
    def strategy_summary(self):
        total_trades = self.trade_data.shape[0]
        data_year = max(self.data_year.values())
        return {
            'EMA1'        : self.EMA1_span,
            'EMA2'        : self.EMA2_span,
            'RSI'         : self.RSI_span,
            'SL'          : self.sl_prct,
            'TGT'         : self.tgt_prct,
            'TSL'         : self.tsl_prct,
            'Data Year'   : max(self.data_year.values()),
            'Total Trade' : self.trade_data.shape[0],
            'Win Trades'  : self.trade_data[self.trade_data['pnl']>0].shape[0],
            'Loss Trades' : self.trade_data[self.trade_data['pnl']<=0].shape[0],
            'PnL'         : self.trade_data.iloc[-1]['pnl'],
            'MaxDD'       : self.trade_data.iloc[-1]['MaxDD'],
            'PnL/Trade'   : self.trade_data.iloc[-1]['pnl']/self.trade_data.shape[0],
            'RRR'         : (self.trade_data.iloc[-1]['pnl']/max(self.data_year.values()))/abs(self.trade_data.iloc[-1]['MaxDD'])
        }




# Backtest

### Data prep

In [7]:
dataset = pd.read_excel('dataset.xlsx', sheet_name=None)
# dataset = pd.read_excel('dataset.xlsx', sheet_name=None)
cols = ['Ticker', 'Date/Time', 'Open', 'High', 'Low', 'Close','Volume']

for key in dataset.keys():
    df = dataset[key]
    df = df[cols]
    df.rename(columns={'Date/Time':'DateTime'}, inplace = True)
    df.set_index(df['DateTime'], inplace = True)
    if 'DateTime' in df.columns: df = df.drop(columns=['DateTime'])
    df.sort_index(inplace = True)
    dataset[key] = df
    del df

index_dataset = {}
index_dataset['BANKNIFTY-I'] = dataset.pop('BANKNIFTY-I')
index_dataset['NIFTY-I'] = dataset.pop('NIFTY-I')

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


### Logic

In [None]:
trade_cap = 500000
# TSL
step = 10
ema1_span = [i for i in range(10, 20+step, step)]
ema2_span = [i for i in range(50, 80+step, step)]
step = 7
rsi_span = [i for i in range(7, 14+step, step)]
step = 25
sl_prct = [i/100 for i in range(100, 200+step, step)]
tgt_prct = [i/100 for i in range(100, 300+step, step)]
tsl_prct = [i/100 for i in range(100, 200+step, step)]

strat_summ = []
threads = []
strats = []
with fut.ThreadPoolExecutor(max_workers=4) as executor:
    for ema1 in ema1_span:
    for ema2 in ema2_span:
        for rsi in rsi_span:
            for sl in sl_prct:
                for tgt in tgt_prct:
                    for tsl in tsl_prct:
                        strat = Strategy(trade_cap, 20, ema2, rsi, sl/100, tgt/100, tsl/100, copy.deepcopy(dataset))
                        strats.append(executor.submit(strat.run))
                        # strat.upload_data(dataset)
                        # strat.run()
                        # strat.gen_trade_report()
                        # strat.summary_ins_lvl()
                        # strat_summ.append(strat.strategy_summary())


In [None]:
pd.DataFrame.from_records([i.result() for i in strats]).to_csv('final_summary.csv')

# Report

In [None]:
df1 = df.drop(columns=['DateTime'])
df1.join(trades_df.set_index('entry_time'), how='left').drop_duplicates().to_csv('backtest_1.csv')