In [None]:
# === Install packages (if missing) ===
# !pip install pandas websocket-client SmartApi

# === Import libraries ===
import os             # File and directory handling
import time           # Time functions and sleep
import json           # Parse JSON from API responses
import threading      # For running collector in background
import urllib.request # For fetching instrument list JSON
from datetime import datetime, time as dt_time
import pandas as pd   # Handling and saving data to CSV
import sys
import os
# SmartApi WebSocket for real-time market data
from SmartApi.smartWebSocketV2 import SmartWebSocketV2


In [None]:
# Set the working directory where your API keys and files are stored
KEY_PATH = r"C:\Users\Ekraj\OneDrive\SmartApi"
os.chdir(KEY_PATH)

# Load API credentials from key.txt
# File format: API_KEY CLIENT_CODE PASSWORD TOTP_KEY
with open("key.txt", "r") as f:
    API_KEY, CLIENT_CODE, PASSWORD, TOTP_KEY = f.read().split()[0:4]

# Output CSV file to save market data
FILENAME = f"data/market_data_{datetime.now().strftime('%Y%m%d')}.csv"

# List of stock tickers to monitor
TICKERS = ["WIPRO", "INFY", "RELIANCE"]

# NSE market hours
MARKET_START = dt_time(9, 15)
MARKET_END = dt_time(15, 30)


In [None]:
# Function to check if market is currently open
def market_open() -> bool:
    now = datetime.now().time()
    return MARKET_START <= now <= MARKET_END

# Function to fetch all NSE instruments from Angel Broking API
def fetch_instrument_list():
    url = "https://margincalculator.angelbroking.com/OpenAPI_File/files/OpenAPIScripMaster.json"
    return json.loads(urllib.request.urlopen(url).read())

# Function to map tickers to their corresponding API tokens
def get_token_map(instrument_list: list, tickers: list) -> dict:
    symbol_map = {}
    for ticker in tickers:
        for inst in instrument_list:
            if inst["name"] == ticker and inst["exch_seg"] == "NSE" and inst["symbol"].endswith("-EQ"):
                symbol_map[inst["token"]] = ticker
    return symbol_map


In [None]:
# Calculate Order Book Imbalance (OBI)
def calculate_obi(bid: list, ask: list) -> float:
    bq = sum(q for _, q in bid)
    aq = sum(q for _, q in ask)
    return round((bq - aq) / (bq + aq), 4) if bq + aq != 0 else 0

# Calculate volume imbalance from executed trades
def calculate_vol_imbalance(trades: list) -> float:
    buy = sum(t["volume"] for t in trades if t["type"] == "buy")
    sell = sum(t["volume"] for t in trades if t["type"] == "sell")
    return round((buy - sell) / (buy + sell), 4) if buy + sell != 0 else 0

# Calculate spread between best bid and best ask
def calculate_spread(bid: list, ask: list) -> float:
    return round(ask[0][0] - bid[0][0], 4) if bid and ask else 0


In [None]:
# Class to handle WebSocket data collection and processing
class DataCollector:
    def __init__(self, jwt_token, api_key, client_code, feed_token, tokens, symbol_map, filename):
        # Initialize required variables
        self.tokens = tokens
        self.symbol_map = symbol_map
        self.order_books = {}  # Stores best bid/ask data
        self.trades = {}       # Stores executed trades
        self.data = []         # Processed metrics ready to save
        self.lock = threading.Lock()  # Thread-safe operations
        self.filename = filename

        # Initialize SmartApi WebSocket
        self.ws = SmartWebSocketV2(jwt_token, api_key, client_code, feed_token)
        self.ws.on_open = self.on_open
        self.ws.on_data = self.on_data
        self.ws.on_error = self.on_error

    # Called when WebSocket connects
    def on_open(self, ws):
        print("✅ WebSocket connected.")
        self.ws.subscribe("stream_1", 3, [{"exchangeType":1, "tokens":self.tokens}])

    # Called when new market data is received
    def on_data(self, ws, msg):
        msg = msg if isinstance(msg, dict) else json.loads(msg)
        token = msg.get("token")
        symbol = self.symbol_map.get(token, "UNKNOWN")
        if symbol == "UNKNOWN":
            return
        
        # Extract best bid and ask
        bid = [(float(b["price"]), int(b["quantity"])) for b in msg.get("best_5_buy_data",[])]
        ask = [(float(a["price"]), int(a["quantity"])) for a in msg.get("best_5_sell_data",[])]
        
        # Last traded quantity and trade side
        ltq = int(msg.get("ltq", 0))
        side = msg.get("c", "B")

        # Store data thread-safely
        with self.lock:
            self.order_books[symbol] = {"bid": bid, "ask": ask}
            self.trades.setdefault(symbol, []).append({"volume": ltq, "type": "buy" if side=="B" else "sell"})

    # Called if WebSocket encounters an error
    def on_error(self, ws, err):
        print("❌ WebSocket error:", err)

    # Loop to process order book and trades, calculate metrics, and save CSV
    def collect_loop(self, market_open_func):
        while market_open_func():
            with self.lock:
                for symbol, book in self.order_books.items():
                    trades = self.trades.get(symbol, [])
                    if not book["bid"] or not book["ask"]:
                        continue
                    obi = calculate_obi(book["bid"], book["ask"])
                    vol_imb = calculate_vol_imbalance(trades)
                    spread = calculate_spread(book["bid"], book["ask"])
                    mid_price = (book["bid"][0][0] + book["ask"][0][0])/2
                    self.data.append({
                        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                        "symbol": symbol,
                        "obi": obi,
                        "volume_imbalance": vol_imb,
                        "spread": spread,
                        "mid_price": mid_price
                    })
                
                # Save every 30 records
                if len(self.data) >= 30:
                    pd.DataFrame(self.data).to_csv(
                        self.filename, 
                        mode='a', 
                        header=not pd.io.common.file_exists(self.filename), 
                        index=False
                    )
                    self.data = []
            time.sleep(1)

    # Start WebSocket and data collection in background thread
    def run(self, market_open_func):
        threading.Thread(target=self.collect_loop, args=(market_open_func,), daemon=True).start()
        self.ws.connect()


In [None]:
# Import authenticate function (make sure collector/auth.py exists)

# Authenticate and get JWT + feed tokens
jwt_token, feed_token = authenticate(API_KEY, CLIENT_CODE, PASSWORD, TOTP_KEY)

# Fetch instruments and map tickers to tokens
instrument_list = fetch_instrument_list()
symbol_map = get_token_map(instrument_list, TICKERS)
tokens = list(symbol_map.keys())


In [None]:
# Initialize collector instance
collector = DataCollector(
    jwt_token=jwt_token,
    api_key=API_KEY,
    client_code=CLIENT_CODE,
    feed_token=feed_token,
    tokens=tokens,
    symbol_map=symbol_map,
    filename=FILENAME
)

# Start collecting market data
collector.run(market_open)


In [None]:
# Keep notebook running so data collection continues
try:
    while True:
        print(f"🟢 {datetime.now().strftime('%H:%M:%S')} | Collector running...")
        time.sleep(10)
except KeyboardInterrupt:
    print("🛑 Manual exit triggered.")
