In [None]:
!pip install --upgrade kiteconnect
!pip install python-dotenv
!pip install python-dateutil

In [None]:
import logging
import os
from kiteconnect import KiteConnect
from dotenv import load_dotenv
load_dotenv()
import bs_threading
import importlib
import time
from datetime import datetime, timedelta
import dateutil
import pandas as pd
importlib.reload(bs_threading)
from bs_threading import bs_threadify, bs_make_throttle_ready_func


In [None]:
KITE_CONNECT_API_KEY = os.getenv("KITE_CONNECT_API_KEY")
KITE_CONNECT_API_SECRET = os.getenv("KITE_CONNECT_API_SECRET")

In [None]:

logging.basicConfig(level=logging.DEBUG)

kite = KiteConnect(api_key=KITE_CONNECT_API_KEY)



### Get the url based on your api key, which will have request token, this is in order to verify the app itself(bsstonks) is valid and it'll get redirected to the callback url

In [None]:
print(kite.login_url())

### Given we have request token and redirected to our callback with it, our app is valid, but is the app in the hands/server of the right person or not? only API secret can verify that. 
### We'll pass that api secret with the request token to create a session which give us Access_token, to the rightful person that we are

In [None]:
# Redirect the user to the login url obtained
# from kite.login_url(), and receive the request_token
# from the registered redirect url after the login flow.
# Once you have the request_token, obtain the access_token
# as follows.

kite_session_data = kite.generate_session("azmU6LOmL6sqQmNGjXtVBdmoGpKr7623", api_secret=KITE_CONNECT_API_SECRET)
print("THE TOKEN:",kite_session_data["access_token"])
kite.set_access_token(kite_session_data["access_token"])

In [None]:
kite.set_access_token("G141alqP4huPnGMMXFwb2OIGjvu3q9I5")

In [None]:
kite.orders()
kite.holdings()

In [None]:
all_instruments = kite.instruments()
all_instruments_nse = kite.instruments(exchange="NSE")

In [None]:
df_all_instruments = pd.DataFrame(all_instruments)
os.makedirs("input/kite_instruments", exist_ok=True)
df_all_instruments.to_csv("input/kite_instruments/instrument_list.csv", index=False)
df_all_instruments

In [None]:
df_all_instruments["segment"].unique()

In [None]:

print("Total Instruments:", len(all_instruments))
print("Total EQ",len([x for x in all_instruments if x['instrument_type']=="EQ"]))
print("Sample instsrument:", all_instruments[0])
all_tcs_match = [x for x in all_instruments if "tcs" in x['tradingsymbol'].lower()]

In [None]:
print("Total TCS match:", len(all_tcs_match))
for x in all_tcs_match:
    print(x['tradingsymbol'], x['name'], "InstrumentType:", x['instrument_type'],'Exchange:',x['exchange'], "InstrumentToken:", x['instrument_token'])
all_tcs_match

In [None]:
kite.quote(["2953217"])

In [None]:
#TCS NSE TOKEN: 2953217
# day : 2000 days  = 2000 entries
# 60minute : 400 days = 400 * 24 = 9600 entries
# 30minute : 200 days = 
# 15minute : 200 days =
# 5minute : 100 days = 
# 3minute : 100 days =
# minute : 60 days = 
interval_data_span_limit = {
    'day': 2000,
    '60minute':400,
    '30minute':200,
    '15minute':200,
    '5minute':100,
    '3minute':100,
    'minute':60,
}
kite.historical_data(2953217, "2021-06-15", "2021-06-16", "day")

In [None]:
kite.historical_data(2953217, "2021-06-15", "2021-06-16", "hour", oi=True)

### We want to keep 3 request per second limit of zerodha
- ### we'll use multiple threads
- ### we'll use the shared throttle function which only returns true if we haven't made request in last x(can be sub second) seconds

In [None]:
throttle_ready_func = bs_make_throttle_ready_func(min_interval_second=1.0/3.0)
def worker_func(data):
    while not throttle_ready_func():
        time.sleep(0.001)
    print("this is the real life")

worker_data = [0] * 6
last_time = time.time()
queue = bs_threadify(worker_data, worker_func, num_threads=16)
print("Time took", time.time()-last_time)



### Okay then throttle seems to be working with multithreading, let's create the data fetcher

In [None]:
all_instruments_nse_eq = [x for x in all_instruments_nse if x['instrument_type']=="EQ"]
print(len(all_instruments_nse),len(all_instruments_nse_eq))
all_instruments_nse_eq[0]

In [None]:
def kite_instrument_to_filename(instrument_dict, interval="day", expiry="1mo"):
    kite_instrument_token = instrument_dict['instrument_token']
    trading_symbol = instrument_dict['tradingsymbol']
    instrument_type = instrument_dict['instrument_type']
    segment = instrument_dict['segment']
    name = instrument_dict['name']
    exchange = instrument_dict['exchange']
    if exchange == "NFO" and (instrument_type == "CE" or instrument_type == "PE"):
        trading_symbol = trading_symbol.replace(name,"")
        trading_symbol = trading_symbol[5:]
        trading_symbol = name + trading_symbol
        trading_symbol += expiry
    # we've tested that _ is valid seperator by evaluating for all the instruments (86k)
    result_file = f"{kite_instrument_token}_{trading_symbol}_{instrument_type}_{segment}_{exchange}_{interval}.csv"
    return result_file

In [None]:
kite_instrument_to_filename([x for x in all_instruments if x['segment'] == "NFO-OPT" and x['name'].startswith("REL")][0])

In [None]:


def make_sync_single_symbol_worker_func(throttle_ready_func, output_path = None, expiry = None):
    #NOTE: input to our stock prediction/training system
    if output_path is None:
        output_path = "input/kite_historical"
    os.makedirs(output_path, exist_ok=True)
    def sync_single_symbol_worker_func(data):
        nonlocal throttle_ready_func
        nonlocal expiry
        nonlocal output_path
        # data['instrument_dict'] this should be instrument dictionary that zerodha returns
        # data['interval'] 
        
        data_interval = data['interval']
        if 'fetch_past' not in data:
            data['fetch_past'] = True
            
        interval_span_days = interval_data_span_limit[data_interval]
        # we've tested that _ is valid seperator by evaluating for all the instruments (86k)
        result_file = kite_instrument_to_filename(data['instrument_dict'], interval=data['interval'], expiry=expiry)
        result_file_path = os.path.join(output_path,result_file)
        df_to_sync = None
        if not os.path.exists(result_file_path):
            df_to_sync = pd.DataFrame([],columns=["date","open","high","low","close","volume"])
#             df_to_sync.to_csv(result_file_path, index=False, header=True)
        else:
            df_to_sync = pd.read_csv(result_file_path,index_col=False)



        #     
        # Past direction
        while True and (data['fetch_past'] or df_to_sync.shape[0]==0):

            #IMPORTANT: THROTTLE LIMIT MUST BE RESPECTED
            while not throttle_ready_func():
                time.sleep(0.001)

            end = datetime.now()
            start = end - timedelta(days=interval_span_days-1)

            if df_to_sync.shape[0] > 0:
                end = df_to_sync.iloc[0]["date"]
                if type(end) == str:
                    end = dateutil.parser.parse(end)
                end = end - timedelta(days=1)
                #dates are inclusive so start date (historical date) needs to be adjusted
                start = end - timedelta(days=interval_span_days-1)

            historical_data = kite.historical_data(kite_instrument_token, start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d"), data_interval, oi=True)
            for x in historical_data:
                x['date'] = str(x['date'])

            # WHEN API RETURNS NO DATA; EXIT.
            if len(historical_data) == 0:
                break


            df_to_sync_more_hist = pd.DataFrame(historical_data)

            df_to_sync = df_to_sync_more_hist.append(df_to_sync, ignore_index=True)      
            df_to_sync.to_csv(result_file_path, index=False)

            # if it returns less than possible working days of data then just 
            if len(historical_data) < 3 or dateutil.parser.parse(historical_data[-1]['date']) - dateutil.parser.parse(historical_data[0]['date']) < timedelta(days=int(interval_span_days*0.5 - 15)):
                break


        #     
        # Future direction
        while True:

            #IMPORTANT: THROTTLE LIMIT MUST BE RESPECTED
            while not throttle_ready_func():
                time.sleep(0.001)

            start = datetime.now()
            end = start + timedelta(days=interval_span_days-1)

            if df_to_sync.shape[0] > 0:
                start = df_to_sync.iloc[-1]["date"]
                if type(start) == str:
                    start = dateutil.parser.parse(start)
                # what if we run it while the stock market is open, and run it again after it closes
                # today's data must be overwritten/appended
                start = start - timedelta(days=1)
                #dates are inclusive so start date (historical date) needs to be adjusted
                end = start + timedelta(days=interval_span_days-1)


            historical_data = kite.historical_data(kite_instrument_token, start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d"), data_interval, oi=True)
            
            for x in historical_data:
                x['date'] = str(x['date'])

            if len(historical_data) == 0:
                break
            # WHEN API RETURNS NO DATA; EXIT.


            df_to_sync_more_hist = pd.DataFrame(historical_data)
            # rewrite last few days of new data onto old data; this takes care of running it during trdaing window
            overlaps_count = df_to_sync[df_to_sync['date']>=str(historical_data[0]['date'])].shape[0]
            

            df_to_sync = df_to_sync.iloc[:-overlaps_count]
            
            df_to_sync = df_to_sync.append(df_to_sync_more_hist, ignore_index=True)      
            df_to_sync.to_csv(result_file_path, index=False)

            # if it returns less than possible working days of data then just 
            if len(historical_data) < 3 or dateutil.parser.parse(historical_data[-1]['date']) - dateutil.parser.parse(historical_data[0]['date']) < timedelta(days=int(interval_span_days*0.5 - 15)):
                break

    return sync_single_symbol_worker_func
# USage:

importlib.reload(bs_threading)
sync_single_symbol_worker_func = make_sync_single_symbol_worker_func(bs_make_throttle_ready_func(min_interval_second=1.01/3.0))
sync_single_symbol_worker_func({'instrument_dict':all_tcs_match[0],'interval':'day'})

### Yay! we made it, we made the perfect sync code!!!

#### Let's do day wise sync

In [None]:
def sync_instruments(instuments_list, interval="15minute", need_daily_exist=False, fetch_past=True):
    should_terminate=False

    sync_single_symbol_worker_func = make_sync_single_symbol_worker_func(bs_make_throttle_ready_func(min_interval_second=1.01/3.0))
    output_path = "input/kite_historical"
    os.makedirs(output_path, exist_ok=True)
    symbol_worker_data = []
    for instrument_dict in instuments_list:
        
        


        # we've tested that _ is valid seperator by evaluating for all the instruments (86k)
        result_file = f"{kite_instrument_token}_{trading_symbol}_{instrument_type}_{segment}_{exchange}_day.csv"
        result_file_path = os.path.join(output_path,result_file)
        if os.path.exists(result_file_path) or not need_daily_exist:
            symbol_worker_data.append({'instrument_dict':instrument_dict,'interval':interval,'fetch_past':fetch_past})
#             sync_single_symbol_worker_func({'instrument_dict':instrument_dict,'interval':interval})

#     print(len(symbol_worker_data), " out of ", len(all_instruments_nse))
    def should_terminate_func():
        nonlocal should_terminate
        return should_terminate
    
    bs_threadify(symbol_worker_data, sync_single_symbol_worker_func, num_threads=8, should_terminate_func=should_terminate_func)
    should_terminate = True

In [None]:
# sync_single_symbol_worker_func = make_sync_single_symbol_worker_func(bs_make_throttle_ready_func(min_interval_second=1.01/3.0))

# symbol_worker_data = []
# for instrument_dict in all_instruments_nse:
#     symbol_worker_data.append({'instrument_dict':instrument_dict,'interval':'day'})
# bs_threadify(symbol_worker_data, sync_single_symbol_worker_func, num_threads=8)
sync_instruments(all_instruments_nse, interval='day',fetch_past=False)

#### There are some useless symbols there with no data, only do sub day interval for those for which the day data exist

In [None]:
# sync_instruments(all_instruments_nse, interval='15minute', need_daily_exist=True, fetch_past=False)

In [None]:
# sync_instruments(all_instruments_nse, interval='5minute', need_daily_exist=True, fetch_past=True)


In [None]:
# historical_data = kite.historical_data(264713, "2021-06-17", "2022-01-02", "15minute", oi=True)
# pd.DataFrame(historical_data)
nifty_50_instruments = [x for x in all_instruments_nse if "nifty 50" in x['tradingsymbol'].lower()]

nifty_50_instruments


In [None]:
# all_instruments_nse_eq[all_instruments_nse_eq["tradingsymbol"].str.lower().str.startswith("nifty") & all_instruments_nse_eq["name"].str.lower().str.startswith("nifty")]
print("Total instruments to fetch:", len(nifty_50_instruments))
sync_instruments(nifty_50_instruments, interval="day")

In [None]:
historical_data = kite.historical_data(136330244, "2015-04-01", "2015-05-10", "day", oi=True)
        
df_to_sync_more_hist = pd.DataFrame(historical_data)
df_to_sync_more_hist2 = pd.DataFrame(historical_data)
df_to_sync_more_hist2