In [1]:
import pandas as pd
from datetime import datetime, timedelta, time
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
from dateutil.relativedelta import relativedelta, TH ,WE , TU
from dateparser import parse
import os
import re
import concurrent.futures

# creating df with each expiry csv filename to access csv for particular expiry
path = pd.DataFrame()
for i in range(2018,2023,1):
    df_path = pd.DataFrame(glob(f'banknifty fno bt_v6/{i}/*'),columns=['location'])
    df_path['expiry'] = df_path['location'].apply(lambda x: x.split('\\')[-1])
    df_path['expiry'] = df_path['expiry'].str.replace(r'Expiry ', '')
    df_path['expiry'] = df_path['expiry'] + f' {i}'
    df_path['expiry'] = pd.to_datetime(df_path['expiry'])
    df_path.set_index('expiry',inplace=True)
    path = pd.concat([path,df_path])
    path.sort_values(by='expiry',inplace=True)    
path

Unnamed: 0_level_0,location
expiry,Unnamed: 1_level_1
2022-01-06,banknifty fno bt_v6/2022\Expiry 06th January
2022-01-13,banknifty fno bt_v6/2022\Expiry 13th January
2022-01-20,banknifty fno bt_v6/2022\Expiry 20th January
2022-01-27,banknifty fno bt_v6/2022\Expiry 27th January
2022-02-03,banknifty fno bt_v6/2022\Expiry 03rd February
2022-02-10,banknifty fno bt_v6/2022\Expiry 10th February
2022-02-17,banknifty fno bt_v6/2022\Expiry 17th February
2022-02-24,banknifty fno bt_v6/2022\Expiry 24th February
2022-03-03,banknifty fno bt_v6/2022\Expiry 03rd March
2022-03-10,banknifty fno bt_v6/2022\Expiry 10th March


In [2]:
def metrics(x):
    # overall return
    overall_return = x.pnl.sum() 

    # win rate
    win_rate = round(len(x[x['pnl']>0])/len(x),2)

    # risk reward and expectancy
    mean_win = x[x['pnl']>0]['pnl'].mean()
    mean_loss = x[x['pnl']<0]['pnl'].mean()
    risk_reward = abs(mean_win/mean_loss)
    expectancy = round((win_rate*risk_reward) - ((1-win_rate)*1),2)

    # cumulative pnl and equity 
    initial_capital = 200000
    x['cumsum_pnl'] = x.pnl.cumsum()
    x['equity'] = x['cumsum_pnl'].apply(lambda v: initial_capital+v)

    # sharpe ratio
    risk_free_rate = 0.05
    average_return = x.pnl.mean()
    std_dev = np.std(x.pnl)
    sharpe_ratio = (average_return - risk_free_rate) / std_dev

    # sortino ratio
    pnl = np.array(x.pnl)
    MAR = 0 # Minimum acceptable return
    downside_dev = np.std(pnl[pnl<MAR])
    sortino_ratio = (average_return - risk_free_rate) / downside_dev

    # best day and worst day
    x.index = pd.to_datetime(x.index)
    x['day'] = x.index.weekday
    day_pnl = x.groupby('day')['pnl'].sum()
    max_day = day_pnl.idxmax()
    min_day = day_pnl.idxmin()

    # maximum drawdown and maximum drawdown percentage
    mdd = (x.pnl.cumsum() - x.pnl.cumsum().cummax() ).min()
    mdd_percent = mdd/2000

    # consecutive win and lose
    x['cont_wins'] = 0
    x['cont_losses'] = 0
    for i,row in enumerate(x.itertuples(),1):
        if row.pnl > 0:
            x.loc[row.Index,'cont_wins'] = round(x.iloc[i-2].cont_wins + 1) if i > 0 else 1
        if row.pnl < 0:
            x.loc[row.Index,'cont_losses'] = round(x.iloc[i-2].cont_losses + 1) if i > 0 else 1
            
    consecutive_wins = x.cont_wins.max()
    consecutive_losses = x.cont_losses.max()

    # Outlier Adjusted Profit Factor (Profit Factor except that One Exceptional Biggest Winner)
    outlier = round((x[x.pnl>0].pnl.sum() - x.pnl.max()) / abs(x[x.pnl<0].pnl.sum()),2)
    
    metrics = {'overall':overall_return,'winrate':win_rate,'mdd':mdd,
               'mdd_percent':mdd_percent,'expectancy':expectancy,'sharpe':sharpe_ratio,
               'sortino':sortino_ratio,'cons_lose':consecutive_losses,
               'cons_win':consecutive_wins,'outlier':outlier,'best_day/w':max_day,
               'worst_day/w':min_day}
    
    return metrics

In [3]:
# this func takes inputs as param and returns df(each day pnl in rows)
def backtest_short_straddle(params):
    # assigning all the arguments passed
    moneyness,slpercentage,reentry = params['moneyness'],params['sl'],params['reentry']
    entry_time,exit_time,max_loss = params['entry_time'],params['exit_time'],params['max_loss']
    # bnf spot fetchimg is not required for this particular project because bt is done for 1 year only(2022)
        
    # fetching the moneyness based on entry time.
    df_spot = pd.read_csv('banknifty spot bt_v6/2022.csv')
    df_spot['datetime'] = pd.to_datetime(df_spot['datetime'])
    df_spot.set_index('datetime', inplace=True)
    df_entry_row = df_spot[df_spot.index.time == pd.to_datetime(entry_time).time()]
    df_entry_row_copy = df_entry_row.copy()
    df_entry_row_copy['atm'] = df_entry_row_copy.apply(lambda row: 100 * round(row['open']/100),axis=1)
    
    # intializing returning df and list
    log = pd.DataFrame()
    ls = []
    datetime_ = []

    for row in df_entry_row_copy.itertuples():
        
        # fetch nearest expiry folder to date
        data = None
        for day in [TU, WE, TH]:
            try:
                nearest_expiry = row.Index.date() + relativedelta(weekday=day(+1))
                nearest_expiry = nearest_expiry.strftime("%Y-%m-%d")
                data = pd.DataFrame(glob(f"{path.loc[nearest_expiry][0]}\*"),columns=['csv_files'])
                break
            except:
                continue       
        if data is None:
            print(f'option data not availble for expiry: {nearest_expiry} and date: {row.Index.date()}')
            continue
        
        # read fno csv files based on input strike and concatnate it for further bt
        moneyness_ = moneyness*100
        strike_filtered_ce = data[data['csv_files'].str.contains(str(round(row.atm)+moneyness_))]
        strike_filtered_pe = data[data['csv_files'].str.contains(str(round(row.atm)-moneyness_))]
        ce_csv = pd.read_csv(strike_filtered_ce.iloc[0][0])
        ce_csv['datetime'] = pd.to_datetime(ce_csv['datetime'])
        ce_csv.set_index('datetime', inplace=True)
        pe_csv = pd.read_csv(strike_filtered_pe.iloc[1][0])
        pe_csv['datetime'] = pd.to_datetime(pe_csv['datetime'])
        pe_csv.set_index('datetime', inplace=True)

        entry_time_index = row.Index
        exit_time_index = str(row.Index.date())+' '+exit_time
        intraday_data = pd.concat([ce_csv,pe_csv],axis=1)
        intraday_data.ffill(axis = 0,inplace=True)
        intraday_data = intraday_data.loc[entry_time_index:exit_time_index]
        intraday_data.columns = ['ce_open', 'ce_high', 'ce_low', 'ce_close','pe_open', 'pe_high', 'pe_low', 'pe_close',]
        intraday_data = intraday_data[~intraday_data.index.duplicated(keep='first')]
        
        ce_entry, pe_entry = intraday_data['ce_open'].iloc[0], intraday_data['pe_open'].iloc[0]
        intraday_data['total_pnl'],maxloss_flag = 0, False
        sl = (slpercentage + 100) / 100
        ce_sl = ce_entry * sl
        pe_sl = pe_entry * sl
        pnl, ce_exist, pe_exist, ce_loss, pe_loss, ce_reentry, pe_reentry = 0, True, True, 0, 0, reentry, reentry
        maxloss = max_loss*-1
        ce_pnl=0
        pe_pnl=0
        
        def cal_pnl(row):
            nonlocal ce_entry,pe_entry,sl,ce_sl,pe_sl,pnl,ce_exist,pe_exist
            nonlocal ce_loss,pe_loss,ce_reentry,pe_reentry,maxloss,ce_pnl,pe_pnl,maxloss_flag
            
            if maxloss_flag:
                row.total_pnl = maxloss
            else:
                if ce_exist:
                    if row.ce_high > ce_sl:
                        ce_pnl = (ce_entry - ce_sl) + ce_loss
                        ce_exist = False
                        ce_loss = ce_pnl
                    else:
                        ce_pnl = (ce_entry - row.ce_close) + ce_loss

                elif ce_exist == False and row.ce_open < ce_entry and ce_reentry != 0:
                    ce_entry = row.ce_open
                    ce_sl = ce_entry * sl
                    ce_exist = True
                    ce_reentry -= 1

                if pe_exist:
                    if row.pe_high > pe_sl:
                        pe_pnl = (pe_entry - pe_sl) + pe_loss
                        pe_exist = False
                        pe_loss = pe_pnl
                    else:
                        pe_pnl = (pe_entry - row.pe_close) + pe_loss

                elif pe_exist == False and row.pe_open < pe_entry and pe_reentry != 0:
                    pe_entry = row.pe_open
                    pe_sl = pe_entry * sl
                    pe_exist = True
                    pe_reentry -= 1

                row.total_pnl = (ce_pnl + pe_pnl) * 25
                if row.total_pnl <= maxloss:
                    maxloss_flag = True
            
            return row
        
        intraday_data = intraday_data.apply(cal_pnl,axis=1)
        pnl = intraday_data.iloc[-1].total_pnl
        ls.append(pnl)
        datetime_.append(row.Index.date()) 
    log['pnl'] = ls
    log['datetime'] = datetime_
    log.set_index('datetime',inplace=True)
    
    return log

In [None]:
start = datetime.now()
f = glob("pieces_v3/piece_2.csv")
args = pd.read_csv(f[0])

try:
    result_v3 = pd.read_csv('result_v3/piece_2.csv')
    from_ = pd.read_csv('result_v3/piece_2.csv').iloc[-1].name+1
except:
    from_ = 0
    result_v3 = pd.DataFrame()
    pass


for index, row in args[from_:].iterrows():    
    
    # returns backtested pnl(dataframe) respective to its date
    log = backtest_short_straddle(row)
    
    # returns metrics(single row dataframe) by passing log as argument
    metrics_df = metrics(log)
        
    single_result_v3 = args.iloc[[index]].assign(**metrics_df)
    display(single_result_v3)
    
    result_v3 = pd.concat([result_v3,single_result_v3],axis=0)
    
    if (index+1) % 10 == 0:
        result_v3.to_csv("result_v3/piece_2.csv", index=False)
        print('saving batch number:',index)

# Save the final DataFrame
result_v3.to_csv("result_v3/piece_2.csv", index=False)
end = datetime.now()
print(end - start)

In [7]:
result.to_csv("result/piece_8.csv", index=False)