In [1]:
import os 
import sys 
import csv 
import yaml
import pickle 
import numpy as np 
import pandas as pd 
from tqdm import tqdm 
import datetime 

In [2]:
# define fixed variable and user inputs : 

IST = datetime.timezone(datetime.timedelta(hours=5, minutes=30))
 
header_list = [
'trading_date','data_source','stream_id','exchange_epoch_nanos','server_epoch_nanos','capture_epoch_nanos',
'contract_id','venue_token','contract_name','market_segment_id','symbol','contract_type','strike_price','expiry date',
'display_factor','handle_value','currency_unit','is_touchline_change','feed_message_id','inpacket_sequence_num',
'is_last_in_feed_message','event_type','packet_sequence_num','contract_sequence_num','is_implied','transact_epoch_nanos',
'bp1','bq1','boc1','has_hidden_qty_bid','ap1','aq1','aoc1','has_hidden_qty_ask','bp2','bq2','boc2','ap2','aq2','aoc2',
'bp3','bq3','boc3','ap3','aq3','aoc3','bp4','bq4','boc4','ap4','aq4','aoc4','bp5','bq5','boc5','ap5','aq5','aoc5',
'implied_bp1','implied_bq1','implied_ap1','implied_aq1','implied_bp2','implied_bq2','implied_ap2','implied_aq2',
'previous_mid','mid','is_mid_change','weighted_mid','contract_status','side','price','qty','order_count','qpos',
'old_qpos','oid1','oid2','priority','old_price','old_qty','aggressor_type','qty_in_last_trade','level','capture_ts timestamptz'
]

header_list_to_keep = [
'exchange_epoch_nanos','contract_name','expiry date','is_touchline_change','event_type',
'bp1','bq1','boc1','ap1','aq1','aoc1','bp2','bq2','boc2','ap2','aq2','aoc2',
'bp3','bq3','boc3','ap3','aq3','aoc3','bp4','bq4','boc4','ap4','aq4','aoc4','bp5','bq5','boc5','ap5','aq5','aoc5',
'previous_mid','mid','is_mid_change','weighted_mid','side','price','qty','qpos',
'old_qpos','oid1','oid2','old_price','old_qty','aggressor_type','qty_in_last_trade','level'
]

cols_to_check = [
'market_segment_id','contract_type',
'display_factor','handle_value','is_touchline_change','inpacket_sequence_num',
'is_last_in_feed_message','contract_sequence_num','is_implied','contract_status','qpos',
'old_qpos'
]

qty_cols = [
    'bq1','aq1','bq2','aq2','bq3','aq3',
    'bq4','aq4','bq5','aq5','qty',
    'old_qty','qty_in_last_trade'
]


book_cols = [
    'bp1','bq1','boc1','ap1','aq1','aoc1','bp2','bq2','boc2',
    'ap2','aq2','aoc2','bp3','bq3','boc3','ap3','aq3','aoc3','bp4','bq4',
    'boc4','ap4','aq4','aoc4','bp5','bq5','boc5','ap5','aq5','aoc5'
]

ask_cols = [
    'ap1','aq1','aoc1',
    'ap2','aq2','aoc2','ap3','aq3','aoc3',
    'ap4','aq4','aoc4','ap5','aq5','aoc5'
]

bid_cols = [
    'bp1','bq1','boc1','bp2','bq2','boc2',
    'bp3','bq3','boc3','bp4','bq4',
    'boc4','bp5','bq5','boc5'
]



ns_cols = ['exchange_epoch_nanos' , 'server_epoch_nanos' , 'capture_epoch_nanos' , 'transact_epoch_nanos' , 'priority']

with open('user_input.yaml' , 'r' ) as f : 
    inputs = yaml.safe_load( f ) 


usecols = [header_list.index(col) for col in header_list_to_keep ]

In [3]:
# path to obo file :
date_dir =  os.path.join(inputs['parent_dir'] , inputs['date'] )
for file in os.listdir( date_dir ) : 
    if file.startswith('obo') : 
        file_path = os.path.join( date_dir , file )
        break 

file_path 

'/Users/ashmitbathla/Documents/quote-dist/data/20250519/obo_20250519.log'

In [4]:
def get_iter() : 
    return pd.read_csv(file_path, header=None, names=header_list_to_keep , chunksize=inputs['chunk_size']  , dtype = {82 : str} , usecols= usecols ) 

In [5]:
def get_unique( cols_to_check ): 
    check_dict = { col : []  for col in cols_to_check }
    for chunk in get_iter() : 
        for col in cols_to_check : 
            temp = chunk.dropna(subset=[col])
            check_dict[col] = check_dict[col] + temp[col].unique().tolist()
    for col in check_dict : 
        print(f'for {col} : {np.unique(check_dict[col])}')

In [6]:
def convert_ns_to_time(time_ns ) : 
    dt = datetime.datetime.fromtimestamp( np.floor(time_ns/1e9) , tz=IST).replace(tzinfo=None).time()
    ns_part = f"{time_ns%int(1e9)*1e-9:.9f}"

    return dt.strftime('%H:%M:%S') + ns_part[1:]

In [7]:
convert_ns_to_time( 1747626300018421950 ) , convert_ns_to_time( 1747626300018421950%(int(36*24e11)) )

('09:15:00.018421950', '09:15:00.018421950')

In [8]:
convert_ns_to_time( 1747626300018421950%(int(36*24e11)) - 33300*1e9  )

'00:00:00.018421950'

In [9]:
# get_unique(['event_type' , 'qpos' , 'level'])
# get_unique(['order_count' , 'has_hidden_qty_bid' , 'aggressor_type'])
# get_unique(['order_count'])

In [10]:
# next(get_iter()).fillna('--').to_csv('temp.csv' , index = False )

In [11]:
def get_lot_size_path(inputs) : 
    return os.path.join( inputs['contract_dir'], inputs['near'] + '_NSE_FO_FUT_LOT_SIZE.pickel' )

def get_contract_master_path(inputs) : 
    year_month_str = pd.to_datetime( inputs['date'] ).strftime(format = '%Y_%m')
    contract_path = [ os.path.join(inputs['contract_dir'], f) for f in os.listdir(inputs['contract_dir']) if f.startswith(year_month_str) and os.path.isfile(os.path.join(inputs['contract_dir'], f))]
    if( len(contract_path) == 0 ) : 
        raise ValueError(f"Contrate Path for {inputs['date']} not found.")
    return contract_path[0]

month_to_nse_code = {
    1: 'F',   # January
    2: 'G',   # February
    3: 'H',   # March
    4: 'J',   # April
    5: 'K',   # May
    6: 'M',   # June
    7: 'N',   # July
    8: 'Q',   # August
    9: 'U',   # September
    10: 'V',  # October
    11: 'X',  # November
    12: 'Z'   # December
}

def convert_date_to_code( date ) : 
    if( type(date) != str ): 
        date = str( date )
    date = pd.to_datetime( date )
    return month_to_nse_code[int(date.strftime('%m'))] + date.strftime('%Y')[2:]

def get_lot_size(inputs) : 
    lot_path = get_lot_size_path(inputs) 
    try : 
        with open( lot_path , 'rb' ) as f : 
            df = pickle.load(f)
    except :  
        df = pd.read_csv(
            get_contract_master_path(inputs) , 
            compression='gzip' , 
            usecols = [2,3,7,12,28]
        )
        df = df[df['type'] == 'FUT' ]
        df['expiry_date'] = df['expiry_date'].apply(convert_date_to_code)
        df['symbol'] = 'NSEFNO_' + df['symbol'] + '_' + df['expiry_date']
        df.pop('type')
        df.pop('expiry_date')
        df = df.set_index('symbol')
        # lot_dict = df['lotsize'].to_dict()
        with open( lot_path , 'wb' ) as f : 
            pickle.dump( df , f ) 
    return df 

In [12]:
near_symbol = '_'.join(['NSEFNO',inputs['underlying'],inputs['near']]) 
far_symbol = '_'.join(['NSEFNO',inputs['underlying'],inputs['far']])
farfar_symbol = '_'.join(['NSEFNO',inputs['underlying'],inputs['farfar']])
spread_symbol= '_'.join(['NSEFNO' , inputs['underlying'] , 'SP' , inputs['near'] , inputs['far']])

print(near_symbol , far_symbol , spread_symbol )

df = get_lot_size(inputs)
df.head()

NSEFNO_HFCL_K25 NSEFNO_HFCL_M25 NSEFNO_HFCL_SP_K25_M25


Unnamed: 0_level_0,lotsize,tick_size
symbol,Unnamed: 1_level_1,Unnamed: 2_level_1
NSEFNO_BANKNIFTY_N25,35,20
NSEFNO_FINNIFTY_N25,65,10
NSEFNO_MIDCPNIFTY_N25,140,5
NSEFNO_NIFTY_N25,75,10
NSEFNO_NIFTYNXT50_N25,25,20


In [13]:
if( df.loc[near_symbol]['lotsize'] != df.loc[far_symbol]['lotsize']) : 
    raise ValueError(f'Lot size for {near_symbol} and {far_symbol} are not the same ... ')
else : 
    lotsize = df.loc[near_symbol]['lotsize']


if( df.loc[near_symbol]['tick_size'] != df.loc[far_symbol]['tick_size']) : 
    raise ValueError(f'Tick size for {near_symbol} and {far_symbol} are not the same ... ')
else : 
    ticksize = df.loc[near_symbol]['tick_size']
 
lotsize , ticksize 

(np.int64(4150), np.int64(1))

In [14]:
for chunk in get_iter() : 

    chunk = chunk[(chunk['contract_name'] == near_symbol) | (chunk['contract_name'] == far_symbol ) ].reset_index(drop=True)
    chunk.insert(0,'Time',chunk['exchange_epoch_nanos'].apply(convert_ns_to_time))
    # InMarketTime -> time in mili seconds after market starts !! 
    chunk.insert(0,'InMarketTime' , (chunk['exchange_epoch_nanos']%(int(24*36*1e11))*1e-6 - 13500.0*1e3) )
    chunk[qty_cols] /= lotsize
    chunk[book_cols] = chunk[book_cols].fillna(0.0).astype( float )
    display( chunk.tail() )
    break                 

Unnamed: 0,InMarketTime,Time,exchange_epoch_nanos,contract_name,expiry date,is_touchline_change,event_type,bp1,bq1,boc1,...,qty,qpos,old_qpos,oid1,oid2,old_price,old_qty,aggressor_type,qty_in_last_trade,level
88072,421428.186497,09:22:01.428186497,1747626721428186497,NSEFNO_HFCL_K25,20250529,0,MODIFY_TICK,89.68,1.0,1.0,...,1.0,2.0,2.0,2200000000000000.0,,89.87,1.0,,0.0,5.0
88073,421428.190699,09:22:01.428190699,1747626721428190699,NSEFNO_HFCL_K25,20250529,1,NEW_TICK,89.69,1.0,1.0,...,1.0,1.0,,2200000000000000.0,,,,,0.0,1.0
88074,421428.253923,09:22:01.428253923,1747626721428253923,NSEFNO_HFCL_K25,20250529,1,MODIFY_TICK,89.7,1.0,1.0,...,1.0,1.0,1.0,2200000000000000.0,,89.68,1.0,,0.0,1.0
88075,421428.259501,09:22:01.428259501,1747626721428259501,NSEFNO_HFCL_K25,20250529,0,MODIFY_TICK,89.7,1.0,1.0,...,1.0,2.0,2.0,2200000000000000.0,,89.86,1.0,,0.0,4.0
88076,421428.33265,09:22:01.428332650,1747626721428332650,NSEFNO_HFCL_K25,20250529,0,MODIFY_TICK,89.7,1.0,1.0,...,1.0,2.0,2.0,2200000000000000.0,,89.85,1.0,,0.0,5.0


In [1]:
from collections import deque , Counter 
from sortedcontainers import SortedList 

In [None]:
# ith entry denoted the min price at which one can buy i quantities of Near contracts 
NA_Qty_Price = np.zeros( shape = 50 ) 

# known quoting and defencive quoting orders respectively : 
#   orders identified with orderid : 
quote = {}  
def_quote = {}
#       cancelled orders awaiting updates : 
quote_cancelled = {}
def_quote_cancelled = {}
#       new orders that come in whithin the update window 


# current near ask id : 
near_ask_id = 0 

# order book according to book ids 
def make_book_dict(book:np.ndarray) : 
    main = {'book' : book}
    main['qty_cumsum'] = np.cumsum(book[: , 1 ] , dtype = int )
    main['price_cumsum'] = np.cumsum(book[:,0]*book[:,1])
    # to be filled later as and when reqired : 
    main['price_cumsum_dp'] = np.zeros(main['qty_cumsum'][-1]) - 1  
    return main 

# key : near ask id ! 
near_ask =  { 0 : make_near_book(np.zeros((5,3)))}
far_ask =  { 0 : make_near_book(np.zeros((5,3)))}

# track of near ask updates that are not expired ( open to modifications ) 
update_candidates = {}
possible_updates_queue = deque() 
updates_expiry = {}
verified_updates_queue = deque()

In [None]:
for chunk in get_iter() : 

    chunk = chunk[(chunk['contract_name'] == near_symbol) | (chunk['contract_name'] == far_symbol ) ].reset_index(drop=True)
    chunk.insert(0,'Time',chunk['exchange_epoch_nanos'].apply(convert_ns_to_time))
    # InMarketTime -> time in mili seconds after market starts !! 
    chunk.insert(0,'InMarketTime' , (chunk['exchange_epoch_nanos']%(int(24*36*1e11))*1e-6 - 13500.0*1e3) )
    chunk[qty_cols] /= lotsize
    chunk[book_cols] = chunk[book_cols].fillna(0.0).astype( float )

    i = 0 
    n = chunk.shape[0]

    while i < n : 
        row = chunk.iloc[ i , : ]
        if (row['contract_name'] == near_symbol) and (row['side'] == 'SELL'): 
            
            new_ask_book = row[ask_cols].to_numpy() 
            if (new_ask_book != near_ask[near_ask_id] ).any() : 
                near_ask_id += 1  
                # add to near_ask_id: 
                near_ask[near_ask_id] = make_book_dict(new_ask_book) 
                # add this update to possible_update_queue : // lated promoted to verified 
                #   updates if it stays for more than time t1 
                possible_updates_queue.append(near_ask_id)
                

In [21]:
x = np.array([1,2,3,4,5])
y = np.array([1,2,3,2,5])

( x==y ).any() , ( x != y ).any()

(np.True_, np.True_)