In [1]:
from kiteconnect import KiteConnect
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import os 
import time
from time import sleep 
import statsmodels.api as sm
from pyotp import TOTP
import pyotp
import base64
import binascii
from datetime import datetime
from urllib.parse import urlparse,parse_qs
import pandas as pd 
import numpy as np
import datetime as dt
import matplotlib.pyplot as plt
import mplfinance as mpf
import plotly.graph_objects as go
import seaborn as sns

In [2]:
def get_curr_path(folder_name):
    curr_dir = os.getcwd()
    curr_path = os.path.join(curr_dir,folder_name)
    return curr_path

def get_credentials(curr_path,file_name):
    file_dir = os.path.join(curr_path,file_name)
    file = open(file_dir,'r').read().split()
    api_key = file[0]
    api_secret = file[1]
    user_name = file[2]
    pwd = file[3]
    totp_key = file[-1]
    return api_key,api_secret,user_name,pwd,totp_key

def auto_login(api_key,user_name,pwd,totp_key):
    kite = KiteConnect(api_key=api_key)
    service = Service(ChromeDriverManager().install())
    service.start()
    options = Options()
    options.to_capabilities()
    driver = webdriver.Remote(
        command_executor=service.service_url,
        options=options)
    driver.get(kite.login_url())
    driver.implicitly_wait(5)
    username = driver.find_element(By.XPATH, "//input[@type='text']")
    username.send_keys(user_name)
    password = driver.find_element(By.XPATH, "//input[@type='password']")
    password.send_keys(pwd)
    driver.find_element(By.XPATH, "//button[@type='submit']").click()
    sleep(1)
    totp = driver.find_element(By.XPATH,"//input[@type='number']")
    totp_token = TOTP(totp_key)
    token = totp_token.now()
    totp.send_keys(token)
    driver.find_element(By.XPATH,"//button[@type = 'submit']").click()
    sleep(1)
    current_url = driver.current_url
    parsed_url = urlparse(current_url)
    query_params = parse_qs(parsed_url.query)
    request_token = query_params.get('request_token',[None])[0]
    with open('request_token.txt', 'w') as f:
        f.write(request_token)
    request_token = open('request_token.txt','r').read()
    driver.quit()
    return request_token
    

def generate_access_token(request_token,api_key,api_secret):
    request_token = open('request_token.txt','r').read()
    kite = KiteConnect(api_key=api_key)
    data = kite.generate_session(request_token=request_token,api_secret=api_secret)
    data
    with open('access_token.txt','w') as f:
        f.write(data['access_token'])
    access_token = open('access_token.txt','r').read()
    return access_token

In [3]:
folder_name = 'api_keys'
curr_dir = get_curr_path(folder_name)

file_name = 'credentials.txt'
api_key,api_secret,user_name,pwd,totp_key = get_credentials(curr_dir,file_name)

request_token = auto_login(api_key,user_name,pwd,totp_key)

access_token = generate_access_token(request_token,api_key,api_secret)

In [4]:
kite = KiteConnect(api_key=api_key)
kite.set_access_token(access_token=access_token)

In [5]:
instrument_dump = kite.instruments('NSE')
instrument_df = pd.DataFrame(instrument_dump)
instrument_df.to_csv('NSE_instruments.csv',index=False)

In [6]:
def instrumentLookup(instrument_df,symbol):
    """Looks up instrument token for a given script from instrument dump"""
    try:
        return instrument_df[instrument_df.tradingsymbol==symbol].instrument_token.values[0]
    except:
        return -1
    
def fetchOHLC(ticker,interval,duration):
    """extracts historical data and outputs in the form of dataframe"""
    instrument = instrumentLookup(instrument_df,ticker)
    data = pd.DataFrame(kite.historical_data(instrument,dt.date.today()-dt.timedelta(duration), dt.date.today(),interval))
    data.set_index("date",inplace=True)
    return data

def fetchOHLCExtended(ticker, inception_date, interval):
    """Extracts historical data and outputs in the form of a DataFrame.
       inception_date string format - dd-mm-yyyy"""
    instrument = instrumentLookup(instrument_df, ticker)
    from_date = dt.datetime.strptime(inception_date, '%d-%m-%Y')
    data = pd.DataFrame()  # Start with an empty DataFrame
    while True:
        if from_date.date() >= (dt.date.today() - dt.timedelta(100)):
            new_data = pd.DataFrame(kite.historical_data(instrument, from_date, dt.date.today(), interval))
            if not new_data.empty:
                if data.empty:
                    data = new_data
                else:
                    data = pd.concat([data, new_data], ignore_index=True)
            break
        else:
            to_date = from_date + dt.timedelta(100)
            new_data = pd.DataFrame(kite.historical_data(instrument, from_date, to_date, interval))
            if not new_data.empty:
                if data.empty:
                    data = new_data
                else:
                    data = pd.concat([data, new_data], ignore_index=True)
            from_date = to_date
    
    if not data.empty:
        data.set_index("date", inplace=True)
    return data

In [7]:
def compute_super_trend(df, n, multiplier):
    df = df.copy()
    df['hl2'] = (df['high'] + df['low']) / 2
    df['tr'] = np.maximum.reduce([df['high'] - df['low'],
                                  abs(df['high'] - df['close'].shift(1)),
                                  abs(df['low'] - df['close'].shift(1))])
    alpha = 1 / n
    
    df.loc[df.index[n-1], 'atr'] = df['tr'][:n].mean()
    for i in range(n, len(df)):
        df.loc[df.index[i], 'atr'] = alpha * df.loc[df.index[i], 'tr'] + (1 - alpha) * df.loc[df.index[i-1], 'atr']

    df['basic_ub'] = df['hl2'] + (multiplier * df['atr'])
    df['basic_lb'] = df['hl2'] - (multiplier * df['atr'])
    df.loc[df.index[n-1], 'ub'] = df.loc[df.index[n-1], 'basic_ub']
    df.loc[df.index[n-1], 'lb'] = df.loc[df.index[n-1], 'basic_lb']

    for i in range(n, len(df)):
        if ((df.loc[df.index[i], 'basic_ub'] < df.loc[df.index[i-1], 'ub']) or
            (df.loc[df.index[i-1], 'close'] > df.loc[df.index[i-1], 'ub'])):
            df.loc[df.index[i], 'ub'] = df.loc[df.index[i], 'basic_ub']
        else:
            df.loc[df.index[i], 'ub'] = df.loc[df.index[i-1], 'ub']
        
        if ((df.loc[df.index[i], 'basic_lb'] > df.loc[df.index[i-1], 'lb']) or
            (df.loc[df.index[i-1], 'close'] < df.loc[df.index[i-1], 'ub'])):
            df.loc[df.index[i], 'lb'] = df.loc[df.index[i], 'basic_lb']
        else:
            df.loc[df.index[i], 'lb'] = df.loc[df.index[i-1], 'lb']

    isUpTrend, isDownTrend = 'up', 'down'
    init_trend_dir = isDownTrend
    df['trend_direction'] = init_trend_dir
    df['super_trend'] = pd.Series(dtype=float)
    for i in range(len(df)):
        if i < n:
            df.loc[df.index[i], 'trend_direction'] = init_trend_dir
        else:
            prev_super_trend = df.loc[df.index[i-1], 'super_trend'] if i > n else None
            prev_upper_band = df.loc[df.index[i-1], 'ub']
            prev_lower_band = df.loc[df.index[i-1], 'lb']
            
            if prev_super_trend == prev_upper_band:
                if df.loc[df.index[i], 'close'] > df.loc[df.index[i], 'ub']:
                    df.loc[df.index[i], 'trend_direction'] = isUpTrend
                else:
                    df.loc[df.index[i], 'trend_direction'] = isDownTrend
            else:
                if df.loc[df.index[i], 'close'] < df.loc[df.index[i], 'lb']:
                    df.loc[df.index[i], 'trend_direction'] = isDownTrend
                else:
                    df.loc[df.index[i], 'trend_direction'] = isUpTrend
            
            if df.loc[df.index[i], 'trend_direction'] == isUpTrend:
                df.loc[df.index[i], 'super_trend'] = df.loc[df.index[i], 'lb']
            else:
                df.loc[df.index[i], 'super_trend'] = df.loc[df.index[i], 'ub']

    return df['super_trend']

In [37]:
def st_dir_refresh(ohlc, ticker, st_dir):
    """Function to check for Supertrend reversal and update st_dir."""
    # Ensure there are enough data points to check previous values
    if len(ohlc) < 2:
        print("Not enough data to compare previous values.")
        return st_dir  # Not enough data to perform the comparison

    # Check for Supertrend reversal for st1
    if ohlc["st1"].iloc[-1] > ohlc["close"].iloc[-1] and ohlc["st1"].iloc[-2] < ohlc["close"].iloc[-2]:
        st_dir[ticker][0] = "red"
    elif ohlc["st1"].iloc[-1] < ohlc["close"].iloc[-1] and ohlc["st1"].iloc[-2] > ohlc["close"].iloc[-2]:
        st_dir[ticker][0] = "green"

    # Check for Supertrend reversal for st2
    if ohlc["st2"].iloc[-1] > ohlc["close"].iloc[-1] and ohlc["st2"].iloc[-2] < ohlc["close"].iloc[-2]:
        st_dir[ticker][1] = "red"
    elif ohlc["st2"].iloc[-1] < ohlc["close"].iloc[-1] and ohlc["st2"].iloc[-2] > ohlc["close"].iloc[-2]:
        st_dir[ticker][1] = "green"

    # Check for Supertrend reversal for st3
    if ohlc["st3"].iloc[-1] > ohlc["close"].iloc[-1] and ohlc["st3"].iloc[-2] < ohlc["close"].iloc[-2]:
        st_dir[ticker][2] = "red"
    elif ohlc["st3"].iloc[-1] < ohlc["close"].iloc[-1] and ohlc["st3"].iloc[-2] > ohlc["close"].iloc[-2]:
        st_dir[ticker][2] = "green"

    return st_dir

In [9]:
ticker = 'INFY'
# infy_df = fetchOHLCExtended(ticker,'5minute',5)
infy_df = fetchOHLCExtended(ticker,"12-06-2023","5minute")
infy_df.to_csv('infy.csv')

In [10]:
st1 = compute_super_trend(infy_df, 7, 3)
st2 = compute_super_trend(infy_df, 10, 3)
st3 = compute_super_trend(infy_df, 11, 2)

In [11]:
infy_df['st1'] = st1
infy_df['st2'] = st2
infy_df['st3'] = st3

In [12]:
infy_df

Unnamed: 0_level_0,open,high,low,close,volume,st1,st2,st3
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-06-12 09:15:00+05:30,1273.50,1279.50,1273.50,1278.95,236184,,,
2023-06-12 09:20:00+05:30,1278.85,1282.60,1277.50,1282.45,165972,,,
2023-06-12 09:25:00+05:30,1282.50,1284.90,1281.35,1284.85,132046,,,
2023-06-12 09:30:00+05:30,1284.90,1285.40,1282.60,1284.00,110804,,,
2023-06-12 09:35:00+05:30,1284.05,1284.10,1280.50,1281.95,59973,,,
...,...,...,...,...,...,...,...,...
2024-06-12 15:05:00+05:30,1485.00,1486.45,1484.30,1485.20,373635,1479.154985,1479.433764,1481.457576
2024-06-12 15:10:00+05:30,1485.15,1485.80,1484.15,1485.30,312726,1478.936416,1479.132888,1481.113706
2024-06-12 15:15:00+05:30,1485.20,1485.40,1484.00,1484.25,393775,1478.924071,1479.022099,1480.935187
2024-06-12 15:20:00+05:30,1484.20,1486.50,1482.75,1486.05,474567,1478.067060,1478.389889,1480.520624


In [18]:
infy_df.to_csv('three_st.csv')

In [26]:
copy_df = pd.read_csv('three_st.csv')
copy_df.set_index('date',inplace=True)

In [28]:
copy_df

Unnamed: 0_level_0,open,high,low,close,volume,st1,st2,st3
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2023-06-12 09:15:00+05:30,1273.50,1279.50,1273.50,1278.95,236184,,,
2023-06-12 09:20:00+05:30,1278.85,1282.60,1277.50,1282.45,165972,,,
2023-06-12 09:25:00+05:30,1282.50,1284.90,1281.35,1284.85,132046,,,
2023-06-12 09:30:00+05:30,1284.90,1285.40,1282.60,1284.00,110804,,,
2023-06-12 09:35:00+05:30,1284.05,1284.10,1280.50,1281.95,59973,,,
...,...,...,...,...,...,...,...,...
2024-06-12 15:05:00+05:30,1485.00,1486.45,1484.30,1485.20,373635,1479.154985,1479.433764,1481.457576
2024-06-12 15:10:00+05:30,1485.15,1485.80,1484.15,1485.30,312726,1478.936416,1479.132888,1481.113706
2024-06-12 15:15:00+05:30,1485.20,1485.40,1484.00,1484.25,393775,1478.924071,1479.022099,1480.935187
2024-06-12 15:20:00+05:30,1484.20,1486.50,1482.75,1486.05,474567,1478.067060,1478.389889,1480.520624


In [38]:
ticker = "INFY"
st_dir = {ticker: ["None", "None", "None"]}
st_dir = st_dir_refresh(copy_df, ticker, st_dir)
print(st_dir)

{'INFY': ['None', 'None', 'None']}


In [20]:
# Initialize st_dir with default trend directions
st_dir = {ticker: ['None', 'None', 'None']}

In [24]:
st_dir = st_dir_refresh(infy_df, ticker, st_dir)
print(st_dir)

{'INFY': ['None', 'None', 'None']}


In [15]:
def sl_price(ohlc):
    st = ohlc.iloc[-1,-3:]
    latest_close = ohlc.loc[ohlc.index[-1],'close']
    if st.min() > latest_close:
        sl = (0.6 * st.sort_values(ascending=True).iloc[0]) + ((0.4)* st.sort_values(ascending=True).iloc[1])
    elif st.max() < latest_close:
        sl = (0.6 * st.sort_values(ascending=True).iloc[0]) + ((0.4)* st.sort_values(ascending=True).iloc[1])
    else:
        sl = st.mean()
    return round(sl,1)

In [16]:
stop_loss = sl_price(infy_df)
print(f"Stop loss: {stop_loss}")

Stop loss: 1479.5


In [17]:
def placeSLOrder(symbol,buy_sell,quantity,sl_price):    
    # Place an intraday stop loss order on NSE
    if buy_sell == "buy":
        t_type=kite.TRANSACTION_TYPE_BUY
        t_type_sl=kite.TRANSACTION_TYPE_SELL
    elif buy_sell == "sell":
        t_type=kite.TRANSACTION_TYPE_SELL
        t_type_sl=kite.TRANSACTION_TYPE_BUY
    kite.place_order(tradingsymbol=symbol,
                    exchange=kite.EXCHANGE_NSE,
                    transaction_type=t_type,
                    quantity=quantity,
                    order_type=kite.ORDER_TYPE_MARKET,
                    product=kite.PRODUCT_MIS,
                    variety=kite.VARIETY_REGULAR)
    kite.place_order(tradingsymbol=symbol,
                    exchange=kite.EXCHANGE_NSE,
                    transaction_type=t_type_sl,
                    quantity=quantity,
                    order_type=kite.ORDER_TYPE_SL,
                    price=sl_price,
                    trigger_price = sl_price,
                    product=kite.PRODUCT_MIS,
                    variety=kite.VARIETY_REGULAR)


def ModifyOrder(order_id,price):    
    # Modify order given order id
    kite.modify_order(order_id=order_id,
                    price=price,
                    trigger_price=price,
                    order_type=kite.ORDER_TYPE_SL,
                    variety=kite.VARIETY_REGULAR)      