In [1]:
import asyncio
import logging
import os
from collections import deque
from datetime import datetime, timedelta
import pandas as pd
import pyotp
from NorenRestApiPy.NorenApi import NorenApi
import numba_indicators
import nest_asyncio
import multiprocessing
import zmq
import uvloop
import numpy as np
import time
import signal
import structlog
from dotenv import load_dotenv

load_dotenv()

class TickCollector(multiprocessing.Process):
    def __init__(self, credentials_file="usercred.xlsx", zmq_context=None):
        super().__init__()
        self.api = None
        self.feed_opened = False
        self.ring_buffers = {}
        self.active_subscriptions = set()
        self.RING_BUFFER_SIZE = int(os.getenv('RING_BUFFER_SIZE', 1000))
        self.VALID_TIMEFRAMES = ['1min', '5min']
        
        self.logger = self._setup_logger()
        self.zmq_context = zmq_context or zmq.Context()
        self.tick_pub_socket = self.zmq_context.socket(zmq.PUB)
        self.ZMQ_PUB_PORT = int(os.getenv('ZMQ_PUB_PORT', 5555))
        self.tick_pub_socket.bind(f"tcp://*:{self.ZMQ_PUB_PORT}")

        self.tick_count = 0
        self.start_time = time.time()

        self._initialize_api(credentials_file)
        signal.signal(signal.SIGTERM, self.shutdown)

    def _setup_logger(self):
        structlog.configure(
            processors=[
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.stdlib.add_log_level,
                structlog.stdlib.PositionalArgumentsFormatter(),
                structlog.processors.StackInfoRenderer(),
                structlog.processors.format_exc_info,
                structlog.processors.UnicodeDecoder(),
                structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
            ],
            logger_factory=structlog.stdlib.LoggerFactory(),
            wrapper_class=structlog.stdlib.BoundLogger,
            cache_logger_on_first_use=True,
        )
        return structlog.get_logger()

    def _initialize_api(self, credentials_file):
        try:
            cred = pd.read_excel(credentials_file)
            userid = cred['Value'][cred['Key'] == 'user'].values[0]
            password = cred['Value'][cred['Key'] == 'pwd'].values[0]
            totp_key = cred['Value'][cred['Key'] == 'totp'].values[0]
            mpin = cred['Value'][cred['Key'] == 'mpin'].values[0]
            vendor = cred['Value'][cred['Key'] == 'vendor'].values[0]
            api_key = cred['Value'][cred['Key'] == 'api_secret'].values[0]

            self.api = NorenApi(host='https://api.shoonya.com/NorenWClientTP/',
                                websocket='wss://api.shoonya.com/NorenWSTP/')
            
            totp = pyotp.TOTP(totp_key).now()
            login_status = self.api.login(userid=userid, password=password, twoFA=totp,
                                          vendor_code=vendor, api_secret=api_key, imei='abc1234')
            
            if login_status:
                self.logger.info("API login successful")
            else:
                self.logger.error("API login failed")
                raise Exception("API login failed")
        except Exception as e:
            self.logger.error(f"Error initializing API: {e}")
            raise

    def create_ring_buffers(self, tokens):
        for token in tokens:
            if token not in self.ring_buffers:
                self.ring_buffers[token] = deque(maxlen=self.RING_BUFFER_SIZE)
                self.logger.info(f"Created ring buffer for token: {token}")

    def event_handler_feed_update(self, tick_data):
        try:
            if not all(key in tick_data for key in ['lp', 'tk', 'ft']):
                raise ValueError("Missing required fields in tick data")

            timest = datetime.fromtimestamp(int(tick_data['ft'])).isoformat()
            token = tick_data['tk']
            new_tick = {'tt': timest, 'ltp': float(tick_data['lp'])}
            
            if token in self.ring_buffers:
                self.ring_buffers[token].append(new_tick)
                self.tick_pub_socket.send_pyobj({"token": token, "tick": new_tick})
                
                self.tick_count += 1
                if self.tick_count % 1000 == 0:
                    elapsed_time = time.time() - self.start_time
                    ticks_per_second = self.tick_count / elapsed_time
                    self.logger.info(f"Processing {ticks_per_second:.2f} ticks/second")
            else:
                self.logger.warning(f"Token {token} not found in ring buffers. Ignoring tick.")
        except (KeyError, ValueError) as e:
            self.logger.error(f"Error processing tick data: {e}")

    async def connect_and_subscribe(self):
        while True:
            try:
                await self.api.connect()
                self.logger.info("WebSocket connected")
                self.feed_opened = True

                # Re-subscribe to all active subscriptions
                for token in self.active_subscriptions:
                    await self.api.subscribe(token)
                
                await self.api.set_on_feed_update(self.event_handler_feed_update)
                self.logger.info("Feed update handler set")
                
                # Keep the connection alive
                while self.feed_opened:
                    await asyncio.sleep(1)
            except Exception as e:
                self.logger.error(f"WebSocket connection error: {e}. Retrying in 5 seconds...")
                self.feed_opened = False
                await asyncio.sleep(5)

    async def manage_subscriptions(self, command, subscription):
        try:
            if command == 'subscribe':
                await self.api.subscribe(subscription)
                self.active_subscriptions.add(subscription)
                self.logger.info(f"Subscribed to {subscription}")
            elif command == 'unsubscribe':
                await self.api.unsubscribe(subscription)
                self.active_subscriptions.remove(subscription)
                self.logger.info(f"Unsubscribed from {subscription}")
        except Exception as e:
            self.logger.error(f"Error managing subscription: {e}")

    def run(self):
        asyncio.run(self.connect_and_subscribe())

    def shutdown(self, signum, frame):
        self.logger.info("Shutting down TickCollector...")
        self.feed_opened = False
        self.api.close()
        self.tick_pub_socket.close()
        self.zmq_context.term()

class DataProcessor(multiprocessing.Process):
    def __init__(self, zmq_context=None):
        super().__init__()
        self.zmq_context = zmq_context or zmq.Context()
        self.tick_sub_socket = self.zmq_context.socket(zmq.SUB)
        self.ZMQ_SUB_PORT = int(os.getenv('ZMQ_SUB_PORT', 5555))
        self.tick_sub_socket.connect(f"tcp://localhost:{self.ZMQ_SUB_PORT}")
        self.tick_sub_socket.setsockopt_string(zmq.SUBSCRIBE, "")
        self.logger = self._setup_logger()
        self.resampled_buffers = {}
        self.RING_BUFFER_RESAMPLE_SIZE = int(os.getenv('RING_BUFFER_RESAMPLE_SIZE', 1000))
        self.VALID_TIMEFRAMES = ['1min', '5min']
        signal.signal(signal.SIGTERM, self.shutdown)

    def _setup_logger(self):
        structlog.configure(
            processors=[
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.stdlib.add_log_level,
                structlog.stdlib.PositionalArgumentsFormatter(),
                structlog.processors.StackInfoRenderer(),
                structlog.processors.format_exc_info,
                structlog.processors.UnicodeDecoder(),
                structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
            ],
            logger_factory=structlog.stdlib.LoggerFactory(),
            wrapper_class=structlog.stdlib.BoundLogger,
            cache_logger_on_first_use=True,
        )
        return structlog.get_logger()

    async def run_async(self):
        while True:
            try:
                tick_data = await self.tick_sub_socket.recv_pyobj()
                token, tick = tick_data['token'], tick_data['tick']
                await self.process_tick(token, tick)
            except Exception as e:
                self.logger.error(f"Error processing tick: {e}")

    def run(self):
        asyncio.run(self.run_async())

    async def process_tick(self, token, tick):
        self.logger.info(f"Processing tick for token: {token}")
        for timeframe in self.VALID_TIMEFRAMES:
            await self.resample_tick(token, tick, timeframe)

    async def resample_tick(self, token, tick, timeframe):
        if token not in self.resampled_buffers:
            self.resampled_buffers[token] = {}
        if timeframe not in self.resampled_buffers[token]:
            self.resampled_buffers[token][timeframe] = deque(maxlen=self.RING_BUFFER_RESAMPLE_SIZE)

        df = pd.DataFrame(list(self.resampled_buffers[token][timeframe]) + [tick])
        df['tt'] = pd.to_datetime(df['tt'])
        df.set_index('tt', inplace=True)

        resampled = df['ltp'].resample(timeframe).agg({
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last'
        }).dropna()

        if len(resampled) > 0:
            last_candle = {
                'open': resampled['open'].values[-1],
                'high': resampled['high'].values[-1],
                'low': resampled['low'].values[-1],
                'close': resampled['close'].values[-1],
                'tt': resampled.index[-1]
            }
            self.resampled_buffers[token][timeframe].append(last_candle)
            self.logger.info(f"Resampled candle for {token} at {timeframe}: {last_candle}")

    def shutdown(self, signum, frame):
        self.logger.info("Shutting down DataProcessor...")
        self.tick_sub_socket.close()
        self.zmq_context.term()

if __name__ == "__main__":
    uvloop.install()
    zmq_context = zmq.Context()
    
    tick_collector = TickCollector(zmq_context=zmq_context)
    data_processor = DataProcessor(zmq_context=zmq_context)

    tick_collector.start()
    data_processor.start()
    
    try:
        tick_collector.join()
        data_processor.join()
    except KeyboardInterrupt:
        print("Stopping processes...")
        tick_collector.terminate()
        data_processor.terminate()
        tick_collector.join()
        data_processor.join()
        print("Processes stopped.")

{'event': "Error initializing API: 'Value'", 'timestamp': '2024-10-07T09:16:42.680439Z', 'level': 'error'}


KeyError: 'Value'

In [None]:
pip install structlog