In [2]:
from backtest import BacktestEngine
from Bardata import BarData
import sys
from typing import List
from datetime import datetime, timedelta
import pytz
from kiteconnect import KiteConnect
import pandas as pd
import os

In [3]:
class SSO_strategy(BacktestEngine):
    def __init__(self, config):
        super().__init__(config)
        self.buy = True
        self.closepx = {}
        self.ticks = {}
        self.today = datetime.strptime(config['start_date'], '%Y-%m-%d').date()
        self.start_time = pytz.timezone('Asia/Kolkata').localize(datetime.combine(self.today, datetime.min.time()).replace(hour=9, minute=25, second=0))
        self.end_time = pytz.timezone('Asia/Kolkata').localize(datetime.combine(self.today, datetime.min.time()).replace(hour=10, minute=30, second=0))
        self.executed = {t: False for t in self.trading_symbols.keys()}
        self.high_925 = {}
        self.low_925 = {}
        self.top_gainers = config.get("top_gainers", [])
        self.top_losers = config.get("top_losers", [])

        self.active_orders = {}
        self.option_data = {}
        print(self.trading_symbols)

    def update_bars(self, new_bars: List[BarData]):
        # print(f"Updating {new_bars[0].date} bars")
        for b in new_bars:
            # print(f"Updating bar: {b.date} for instrument {self.trading_symbols[b.instrument_token]}")
            if b.instrument_token not in self.ticks:
                self.ticks[b.instrument_token] = []
            self.ticks[b.instrument_token].append(b)
            if b.instrument_token not in self.closepx:
                self.closepx[b.instrument_token] = []
            self.closepx[b.instrument_token].append(b.close)
            # print(f"Close price for {self.trading_symbols[b.instrument_token]} at {b.date} is {b.close}")
            if b.date > self.start_time and b.instrument_token not in self.high_925:
                print(f"Setting high_925 and low_925 for {self.trading_symbols[b.instrument_token]} at {b.date}")
                self.high_925[b.instrument_token] = max([t.high for t in self.ticks[b.instrument_token][-10:]])
                self.low_925[b.instrument_token] = min([t.low for t in self.ticks[b.instrument_token][-10:]])
            # print("checking all conditions")
            self.check_all_conditions(b)

    def get_option(self,instrument_token,underlying_price,option_type):
        instr_df = pd.DataFrame(self.kite.instruments('NFO'))
        
        instr_tokens = instr_df[instr_df['name'] == self.trading_symbols[instrument_token]]
        expiries = sorted(instr_tokens['expiry'].unique())
        print(f"Available expiries for {self.trading_symbols[instrument_token]}: {expiries}")
        current_month_options = instr_tokens[
            (instr_df['expiry'] == expiries[0]) & 
            (instr_df['instrument_type'].isin(['CE', 'PE']))]
        
        if option_type == 'CE':
            ce_strike = current_month_options[
                            (current_month_options['strike'] >= underlying_price) & 
                            (current_month_options['instrument_type'] == 'CE')
                        ]['strike'].nsmallest(3).nlargest(1)
            ce_tokens = current_month_options[
                            (current_month_options['strike'].isin(ce_strike.values)) & 
                            (current_month_options['instrument_type'] == 'CE')
                        ]['instrument_token'].tolist()
            return ce_tokens[0] if ce_tokens else None
        
        elif option_type == 'PE':
            pe_strike = current_month_options[
                            (current_month_options['strike'] <= underlying_price) & 
                            (current_month_options['instrument_type'] == 'PE')
                        ]['strike'].nlargest(3).nsmallest(1)
            pe_tokens = current_month_options[
                            (current_month_options['strike'].isin(pe_strike.values)) & 
                            (current_month_options['instrument_type'] == 'PE')
                        ]['instrument_token'].tolist()
            return pe_tokens[0] if pe_tokens else None

        return
    
    def check_entry_conditions(self,tick: BarData):
        print(f"Checking entry conditions for {self.trading_symbols[tick.instrument_token]} at {tick.close} on {tick.date}")
        print(f"Current high_925: {self.high_925.get(tick.instrument_token, 0)}")
        print(f"Current low_925: {self.low_925.get(tick.instrument_token, 0)}")
        if tick.close > self.high_925.get(tick.instrument_token, 0) and tick.instrument_token in self.top_gainers:
            print(f"Buy signal for {self.trading_symbols[tick.instrument_token]} at {tick.close} on {tick.date}")
            token = self.get_option(tick.instrument_token, tick.close, 'CE')
            today = tick.date.date().strftime('%Y-%m-%d')

            self.option_data[tick.instrument_token] = self.kite.historical_data(token,today,today,interval='minute')
            # Find the option candle with the same timestamp as tick.date
            print(self.option_data[tick.instrument_token])
            option_candle = next((c for c in self.option_data[tick.instrument_token] if c['date'] == tick.date), None)
            option_candle['instrument_token']=tick.instrument_token
            option_tick = BarData(**option_candle)
            self.trading_symbols[token] = self.trading_symbols[tick.instrument_token] + 'CE'
            if token:
                self.place_order(option_tick, 1, 'BUY', option_tick.close)
                self.active_orders[tick.instrument_token] = token
            else:
                print(f"No suitable CE option found for {self.trading_symbols[tick.instrument_token]} at {tick.close}")
            
            self.executed[tick.instrument_token] = True
            
        
        if tick.close < self.low_925.get(tick.instrument_token, 0) and tick.instrument_token in self.top_losers:

            print(f"BUY signal for {self.trading_symbols[tick.instrument_token]} at {tick.close} on {tick.date}")
            token = self.get_option(tick.instrument_token, tick.close, 'PE')
            today = tick.date.date().strftime('%Y-%m-%d')
            print(token,today)
            self.option_data[tick.instrument_token] = self.kite.historical_data(token,today,today,interval='minute')
            # Find the option candle with the same timestamp as tick.date
            option_candle = next((c for c in self.option_data[tick.instrument_token] if c['date'] == tick.date), None)
            print(self.option_data)
            option_candle['instrument_token']=tick.instrument_token
            option_tick = BarData(**option_candle)
            self.trading_symbols[token] = self.trading_symbols[tick.instrument_token] + 'PE'
            if token:
                self.place_order(option_tick, 1, 'BUY', option_tick.close)
                self.active_orders[tick.instrument_token] = token
            else:
                print(f"No suitable PE option found for {self.trading_symbols[tick.instrument_token]} at {tick.close}")

            self.executed[tick.instrument_token] = True
        return
    
    def check_exit_conditions(self,tick: BarData):
        if tick.instrument_token in self.top_gainers:
            sma_40 = sum(self.closepx[tick.instrument_token][-40:]) / min(40, len(self.closepx[tick.instrument_token]))
            # Check if close price is less than sma_40 for 5 or more consecutive minutes
            if len(self.closepx[tick.instrument_token]) >= 40:
                last_5_closes = self.closepx[tick.instrument_token][-10:]
                if all(px < sma_40 for px in last_5_closes):
                    print(f"Exit signal for {self.trading_symbols[tick.instrument_token]} at {tick.close} on {tick.date} (close < sma_40 for 5+ mins)")
                    option_candle = next((c for c in self.option_data[tick.instrument_token] if c['date'] == tick.date), None)
                    option_candle['instrument_token'] = tick.instrument_token
                    option_tick = BarData(**option_candle)
                    self.place_order(option_tick, 1, 'SELL', option_tick.close)
                    self.executed[tick.instrument_token] = False
        else :
            sma_40 = sum(self.closepx[tick.instrument_token][-40:]) / min(40, len(self.closepx[tick.instrument_token]))
            # Check if close price is greater than sma_40 for 5 or more consecutive minutes
            if len(self.closepx[tick.instrument_token]) >= 40:
                last_5_closes = self.closepx[tick.instrument_token][-10:]
                if all(px > sma_40 for px in last_5_closes):
                    print(f"Exit signal for {self.trading_symbols[tick.instrument_token]} at {tick.close} on {tick.date} (close > sma_40 for 5+ mins)")
                    option_candle = next((c for c in self.option_data[tick.instrument_token] if c['date'] == tick.date), None)
                    option_candle['instrument_token'] = tick.instrument_token
                    option_tick = BarData(**option_candle)
                    self.place_order(option_tick, 1, 'SELL', option_tick.close)
                    self.executed[tick.instrument_token] = False
        return

    def check_all_conditions(self,b):
        if b.date < self.end_time and b.date > self.start_time and not self.executed[b.instrument_token]:
            self.check_entry_conditions(b)
        if self.executed[b.instrument_token] and self.executed[b.instrument_token]:
            self.check_exit_conditions(b)
        return

In [4]:
kite = KiteConnect(os.getenv("api_key"), os.getenv("access_token"))
instruments=(pd.DataFrame(kite.instruments("NFO")))
instruments_NSE = (pd.DataFrame(kite.instruments("NSE")))

all_stocks = instruments['name'].unique()

def get_instrument_token(symbol):
    try:
        return instruments_NSE[instruments_NSE['tradingsymbol'] == symbol]['instrument_token'].values[0]
    except IndexError:
        # print(f"Instrument token for {symbol} not found.")
        return None
    
def get_all_derivative_token(symbol):
    option_tokens = []
    try:
        tokens = instruments[(instruments['name'] == symbol)]['instrument_token'].values
        # expiries = sorted(instruments[instruments['name'] == symbol]['expiry'].values)
        # expiry = expiries[0] if len(expiries) > 0 else None
        # token = tokens[(tokens['segment'] == 'NFO-OPT')]['instrument_token'].values
        for t in tokens:
            option_tokens.append(t)

    except IndexError:
        print(f"Option token for {symbol} not found.")
        return []
    return option_tokens

def get_option_token(symbol):
    option_tokens = []
    try:
        tokens = instruments[(instruments['name'] == symbol) & (instruments['segment']=='NFO-OPT')]
        expiries = sorted(instruments[instruments['name'] == symbol]['expiry'].values)
        expiry = expiries[0] if len(expiries) > 0 else None
        token = tokens[(tokens['expiry']==expiry)&(tokens['segment'] == 'NFO-OPT')]['instrument_token'].values
        for t in token:
            option_tokens.append(t)

    except IndexError:
        print(f"Option token for {symbol} not found.")
        return []
    return option_tokens


In [5]:
instrument_tokens = [256265]
option_tokens = []
for symbol in all_stocks:
    instrument_token = get_instrument_token(symbol)
    if instrument_token is not None:
        option_tokens += get_option_token(symbol)
        # print(f"Symbol: {symbol}, Instrument Token: {instrument_token}")
        instrument_tokens.append(int(instrument_token))
    else:
        print(f"Symbol: {symbol} not found in instruments.")

Symbol: NIFTY not found in instruments.
Symbol: BANKNIFTY not found in instruments.
Symbol: FINNIFTY not found in instruments.
Symbol: MIDCPNIFTY not found in instruments.
Symbol: NIFTYNXT50 not found in instruments.


In [6]:
strt_date = "2025-05-02"
en_date = "2025-05-28"

start = datetime.strptime(strt_date, "%Y-%m-%d")
end = datetime.strptime(en_date, "%Y-%m-%d")

# Generate list of date strings
all_dates = [(start + timedelta(days=i)).strftime("%Y-%m-%d") for i in range((end - start).days + 1)]

print(all_dates)

for date in all_dates:
    start_date = date
    top_gainers = []
    top_losers = []
    for stocks in instrument_tokens:
        data=kite.historical_data(stocks, date, date, "5minute")
        if data is None or len(data) == 0:
            # print(f"No data available for {stocks} on {date}. Skipping...")
            continue
        def get_previous_trading_day(date_str):
            date = datetime.strptime(date_str, "%Y-%m-%d")
            prev_day = date - timedelta(days=1)
            # Skip weekends (Saturday=5, Sunday=6)
            while prev_day.weekday() >= 5:
                prev_day -= timedelta(days=1)
            return prev_day.strftime("%Y-%m-%d")

        previous_trading_day = get_previous_trading_day(date)
        data2 = kite.historical_data(stocks, previous_trading_day, previous_trading_day, "day")
        while data2 is None or len(data2) == 0:
            previous_trading_day = get_previous_trading_day(previous_trading_day)
            data2 = kite.historical_data(stocks, previous_trading_day, previous_trading_day, "day")
        candle = data[0]
        candle2 = data2[0]
        change= (candle['close'] - candle2['close']) / candle2['close'] * 100
        if change > 2:
            top_gainers.append(stocks)
        elif change < -2:
            top_losers.append(stocks)

    nifty10_candle = kite.historical_data(256265, date, date, "10minute")
    if nifty10_candle is None or len(nifty10_candle) == 0:
        # print(f"No 10 minute candle data available for Nifty on {date}. Skipping...")
        continue
    print(f"Nifty 10 minute candle: {nifty10_candle[0]}")
    if nifty10_candle[0]['open']>nifty10_candle[0]['close']:
        top_gainers=[]
        
    stocks_to_trade = top_gainers + top_losers


        
    final_stocks = []

    for stock in stocks_to_trade:
        symbol = instruments_NSE[instruments_NSE['instrument_token'] == stock]['tradingsymbol'].values[0]
        option_token = get_all_derivative_token(symbol)
        initial_oi = 0
        final_oi = 0
        for token in option_token:
            oi_data = kite.historical_data(token, date, date, "10minute",oi=True)
            oi_data2 = kite.historical_data(token, previous_trading_day, previous_trading_day, "day",oi=True)
            initial_oi += oi_data2[0]['oi'] if oi_data2 else 0
            try:
                final_oi += oi_data[0]['oi']
            except IndexError:
                # print(f"OI data for {token}{symbol} not available for the specified date range.")
                continue
            # if oi_data:
            #     initial_oi += oi_data[0]['oi']
            #     final_oi += oi_data[5]['oi']
        oi_change = (final_oi - initial_oi) / initial_oi * 100 if initial_oi != 0 else 0
        if oi_change > 3:
            final_stocks.append(stock)
        print(f"Stock: {symbol}, Initial OI: {initial_oi}, Final OI: {final_oi}, OI Change: {oi_change:.2f}%")


    print(f"Final stocks to trade: {final_stocks}")
    strategy_name = "SSO_3oi_" + date
    config = {
        "strategy_name": strategy_name,

        "start_date": date,
        "end_date": date,
        "interval": "minute",
        'tokens':final_stocks,

        "top_gainers": top_gainers,
        "top_losers": top_losers,
    }

    backtest = SSO_strategy(config)
    try:
        backtest.run()
        backtest.plot_results()
    except Exception as e:
        print(e)
        continue

['2025-05-02', '2025-05-03', '2025-05-04', '2025-05-05', '2025-05-06', '2025-05-07', '2025-05-08', '2025-05-09', '2025-05-10', '2025-05-11', '2025-05-12', '2025-05-13', '2025-05-14', '2025-05-15', '2025-05-16', '2025-05-17', '2025-05-18', '2025-05-19', '2025-05-20', '2025-05-21', '2025-05-22', '2025-05-23', '2025-05-24', '2025-05-25', '2025-05-26', '2025-05-27', '2025-05-28']


NetworkException: Too many requests