In [1]:
import pandas as pd
import datetime
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
from dateutil.relativedelta import relativedelta, TH
import py_vollib.black_scholes.implied_volatility as iv
from datetime import datetime as dt
import py_vollib.black_scholes.greeks.analytical as greeks

In [2]:
# Storing path of all data files and date in a dataframe
path = pd.DataFrame(glob('BANKNIFTY 2022/*'), columns=['location'])
path['data_date'] = path['location'].apply(lambda x : datetime.datetime.strptime(x.split('_')[-1].split('.')[0], '%Y%m%d'))
path.sort_values(['data_date'], inplace=True)
path.reset_index(drop=True, inplace=True)

In [3]:
trade_log = pd.DataFrame(columns=['Entry_Date' , 'BANKNIFTY', 'Days To Expiry', 'CE_Symbol', 'CE_Entry_Price', 'CE_Exit_price', 'CE_Exit_Time', 'PE_Symbol', 'PE_Entry_Price', 'PE_Exit_price', 'PE_Exit_Time', 'PnL'])

In [4]:
# Generates a dataframe containing all call options data along with their delta values 
def call_open_data(df_open, underlying_price_open):
    call_option_data = df_open[df_open['symbol'].str.endswith('CE')].copy() #Get all call symbols
    call_option_data['time_to_expiration'] = None
    call_option_data['implied_volatility'] = 0.2

    for index, row in call_option_data.iterrows():
        option_price = float(row['close']) # OPTION PRICE
        stock_price = float(underlying_price_open) # BANKNIFTY PRICE
        strike_price = float(row['symbol'].split('CE')[0][-5:]) # STRIKE PRICE
        expiration_date_str = row['symbol'][9:16]
        expiration_date = datetime.datetime.strptime(expiration_date_str, '%d%b%y').date()
        current_date = datetime.datetime.strptime(row['date'], '%Y-%m-%d').date()
        call_option_data.loc[index, 'time_to_expiration'] = (expiration_date - current_date).days + 1 #Time to Expire (in Days)
        time_to_expiration = ((expiration_date - current_date).days + 1) / 365.0 #Time to expire (in years)
        risk_free_rate = 0.065 # Annual risk free rate
        initial_guess = 0.2  
        
        # Calculating implied volatility 
        try:
            implied_volatility = iv.implied_volatility(option_price, stock_price, strike_price, time_to_expiration, risk_free_rate, 'c')
            call_option_data.loc[index, 'implied_volatility'] = implied_volatility
        except:
            continue
            
    # Calculating Delta
    for index, row in call_option_data.iterrows():
        option_price = float(row['close'])
        stock_price = float(row['close'])
        strike_price = float(row['symbol'].split('CE')[0][-5:])
        time_to_expiration = row['time_to_expiration'] / 365.0
        risk_free_rate = 0.065
        volatility = row['implied_volatility']

        delta = greeks.delta('c', underlying_price_open, strike_price, time_to_expiration, risk_free_rate, volatility)
        delta_rounded = round(delta, 3)
        call_option_data.loc[index, 'delta'] = abs(delta_rounded)

        
    return call_option_data

In [5]:
# Generates a dataframe containing all put options data along with their delta values 
def put_open_data(df_open, underlying_price_open):
    put_option_data = df_open[df_open['symbol'].str.endswith('PE')].copy()
    put_option_data['time_to_expiration'] = None        
    put_option_data['implied_volatility'] = 0.2

    for index, row in put_option_data.iterrows():
        option_price = float(row['close'])
        stock_price = float(underlying_price_open)
        strike_price = float(row['symbol'].split('PE')[0][-5:])
        expiration_date_str = row['symbol'][9:16]
        expiration_date = datetime.datetime.strptime(expiration_date_str, '%d%b%y').date()
        current_date = datetime.datetime.strptime(row['date'], '%Y-%m-%d').date()
        put_option_data.loc[index, 'time_to_expiration'] = (expiration_date - current_date).days + 1
        time_to_expiration = ((expiration_date - current_date).days + 1) / 365.0
        risk_free_rate = 0.065  
        initial_guess = 0.2  

        try:
            implied_volatility = iv.implied_volatility(option_price, stock_price, strike_price, time_to_expiration, risk_free_rate, 'p')
            put_option_data.loc[index, 'implied_volatility'] = implied_volatility
        except:
            continue
            
            
    for index, row in put_option_data.iterrows():
        option_price = row['close']
        stock_price = float(row['close'])
        strike_price = float(row['symbol'].split('PE')[0][-5:])
        time_to_expiration = row['time_to_expiration'] / 365.0
        risk_free_rate = 0.065
        volatility = row['implied_volatility']

        delta = greeks.delta('p', underlying_price_open, strike_price, time_to_expiration, risk_free_rate, volatility)
        delta_rounded = round(delta, 3)
        put_option_data.loc[index, 'delta'] = abs(delta_rounded)
        
    
    return put_option_data



In [6]:
# Calculate PnL of a position (Lot Size = 25)
def calculate_pnl(entry_price, exit_price, lot_size):
    return (exit_price - entry_price)*-1*lot_size

In [7]:
# PnL earned by shorting a call (SL 30%)
def call_pnl(ce_data, ce_entry_price, ce_sl_price):

    for index, row in ce_data.iterrows():
        close_price = float(row['close'])
        time = row['time']

        # Check if the close price exceeds the stop-loss price or it's 3:15 PM
        if close_price >= ce_sl_price or time == '15:15:00' or index == ce_data.index[-1]:
            # Square-off the position (booking a loss)
            ce_data.loc[index, 'pos'] = 0
            ce_exit_price = close_price
            ce_pnl = calculate_pnl(ce_entry_price, ce_exit_price, 25)
            break
            
    return ce_exit_price, ce_pnl

In [8]:
# PnL earned by shorting a put (SL 30%)
def put_pnl(pe_data, pe_entry_price, pe_sl_price):
    for index, row in pe_data.iterrows():
        close_price = float(row['close'])
        time = row['time']

        # Check if the close price exceeds the stop-loss price or it's 3:15 PM
        if close_price >= pe_sl_price or time == '15:15:00' or index == pe_data.index[-1]:
            # Square-off the position (booking a loss)
            pe_data.loc[index, 'pos'] = 0
            pe_exit_price = close_price
            pe_pnl = calculate_pnl(pe_entry_price, pe_exit_price, 25)
            break
    return pe_exit_price, pe_pnl

In [9]:
# Function to read each data file and calculate PnL for each day. Also stores the daily trade log in a file
def read_paths(path):
    for index, row in path.iterrows():
        try :
            data = pd.read_parquet(row['location'])
            date = row['data_date']
            try :
                underlying_price_open = float(data.loc[(data['symbol'] == 'BANKNIFTY-I') & (data['time'] == '09:15:00'), 'close'].values[0])
            except :
                underlying_price_open = float(data.loc[(data['symbol'] == 'BANKNIFTY') & (data['time'] == '09:15:00'), 'close'].values[0])

            df_open = data[data['time'] == '09:15:00']

            call_option_data = call_open_data(df_open, underlying_price_open)
            put_option_data = put_open_data(df_open, underlying_price_open)

            call_option_data['delta_diff'] = abs(call_option_data['delta'] - 0.3) 
            put_option_data['delta_diff'] = abs(put_option_data['delta'] - 0.3)

            call_option_data = call_option_data.sort_values('delta_diff')
            put_option_data = put_option_data.sort_values('delta_diff')

            nearest_call_row = call_option_data.iloc[0] # Call option with delta closest to 0.3
            nearest_put_row = put_option_data.iloc[0] # Put option with delta closest to 0.3

            ce_entry_price = float(nearest_call_row['close']) #Call entry price
            pe_entry_price = float(nearest_put_row['close']) #Put entry price
            days_to_expiry = nearest_call_row['time_to_expiration']
            ce_symbol = nearest_call_row['symbol'] # Call symbol
            pe_symbol = nearest_put_row['symbol'] # Put Symbol

            ce_data = data[data['symbol'] == ce_symbol].copy() # Tick by Tick data for Call symbol
            pe_data = data[data['symbol'] == pe_symbol].copy() # Tick by Tick data for Put symbol

            ce_data['pos'] = None
            pe_data['pos'] = None

            ce_sl_price = 1.3*float(ce_entry_price) # Stop Loss Price
            pe_sl_price = 1.3*float(pe_entry_price)

            lot_size = 25

            ce_exit_price, ce_pnl = call_pnl(ce_data, ce_entry_price, ce_sl_price)
            pe_exit_price, pe_pnl = put_pnl(pe_data, pe_entry_price, pe_sl_price)

            pnl = ce_pnl + pe_pnl
            ce_exit_time = ce_data[ce_data['pos']==0]['time']
            pe_exit_time = pe_data[pe_data['pos']==0]['time']
            
            try :
                banknifty_close = float(data.loc[(data['symbol'] == 'BANKNIFTY-I') & (data['time'] == '15:15:00'), 'close'].values[0])
            except :
                banknifty_close = float(data.loc[(data['symbol'] == 'BANKNIFTY') & (data['time'] == '15:15:00'), 'close'].values[0])

            result = [date , banknifty_close, days_to_expiry, ce_symbol, ce_entry_price, ce_exit_price, ce_exit_time, pe_symbol, pe_entry_price, pe_exit_price, pe_exit_time, pnl]

            result = [
                pd.to_datetime(result[0]),         
                float(result[1]),                  
                int(result[2]),                    
                result[3],
                float(result[4]),                  
                float(result[5]),                  
                result[6].values[0],         
                result[7],
                float(result[8]),                  
                float(result[9]),                  
                result[10].values[0],        
                float(result[11])                  
            ]

            trade_log.loc[len(trade_log)] = result
            
        except :
            continue


        

In [10]:
def backtest():
    path = pd.DataFrame(glob('BANKNIFTY 2022/*'), columns=['location'])
    path['data_date'] = path['location'].apply(lambda x : datetime.datetime.strptime(x.split('_')[-1].split('.')[0], '%Y%m%d'))
    path.sort_values(['data_date'], inplace=True)
    path.reset_index(drop=True, inplace=True)
    
    read_paths(path)
    
    

In [11]:
backtest()

In [12]:
trade_log

Unnamed: 0,Entry_Date,BANKNIFTY,Days To Expiry,CE_Symbol,CE_Entry_Price,CE_Exit_price,CE_Exit_Time,PE_Symbol,PE_Entry_Price,PE_Exit_price,PE_Exit_Time,PnL
0,2022-01-03,36564.8008,4,BANKNIFTY06JAN2236200CE,138.40,181.15,10:30:00,BANKNIFTY06JAN2235300PE,190.30,48.70,15:15:00,2471.25
1,2022-01-04,36940.1484,3,BANKNIFTY06JAN2237000CE,107.55,144.55,09:17:00,BANKNIFTY06JAN2236200PE,172.20,61.20,15:15:00,1850.00
2,2022-01-05,37828.6016,2,BANKNIFTY06JAN2237200CE,86.90,113.75,09:17:00,BANKNIFTY06JAN2236600PE,118.15,17.25,15:15:00,1851.25
3,2022-01-06,37615.0000,1,BANKNIFTY06JAN2237700CE,73.90,0.35,15:15:00,BANKNIFTY06JAN2237200PE,106.70,139.70,09:17:00,1013.75
4,2022-01-07,37806.1992,7,BANKNIFTY13JAN2238500CE,189.95,259.20,09:30:00,BANKNIFTY13JAN2237300PE,250.00,325.00,12:42:00,-3606.25
...,...,...,...,...,...,...,...,...,...,...,...,...
242,2022-12-26,42650.0000,4,BANKNIFTY29DEC2242200CE,139.10,182.55,09:24:00,BANKNIFTY29DEC2241300PE,156.50,41.35,15:15:00,1792.50
243,2022-12-27,42865.0000,3,BANKNIFTY29DEC2243100CE,126.60,167.05,09:19:00,BANKNIFTY29DEC2242400PE,143.00,185.90,09:27:00,-2083.75
244,2022-12-28,42844.6992,2,BANKNIFTY29DEC2243100CE,95.30,125.40,09:26:00,BANKNIFTY29DEC2242400PE,119.80,50.55,15:15:00,978.75
245,2022-12-29,43240.0000,1,BANKNIFTY29DEC2242800CE,78.45,102.75,13:46:00,BANKNIFTY29DEC2242400PE,69.70,91.60,09:20:00,-1155.00


In [13]:
trade_log['PnL'].describe()

count     247.000000
mean      379.569838
std      1669.636784
min     -4497.500000
25%      -545.625000
50%       657.500000
75%      1388.750000
max      5202.500000
Name: PnL, dtype: float64

In [14]:
pnl_values = np.array(trade_log['PnL'])

In [15]:
total_pnl = np.sum(pnl_values)

In [16]:
total_pnl

93753.75

In [17]:
trade_log.to_csv('backtest_results.csv')