In [None]:
import time
from datetime import datetime
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable, Union
from collections import defaultdict, deque
import logging
import asyncio
import aiofiles
import nest_asyncio

nest_asyncio.apply()

@dataclass
class Candle:
    __slots__ = ['timestamp_str', 'open', 'high', 'low', 'close']
    
    timestamp_str: str  # "10:50:15" format
    open: float
    high: float
    low: float
    close: float

class TimeframeEngine:
    def __init__(self, interval_seconds: int):
        self.interval_seconds = interval_seconds
        self.current_candle = None
        self.completed_candles = deque(maxlen=1000)
        self.current_interval_start = None
    
    def process_tick(self, tick_data: dict) -> Optional[Candle]:
        try:
            price = float(tick_data['LTP'])
            time_str = tick_data['LTT']
            
            interval_start_str = self._get_interval_start_str(time_str)
            
            completed_candle = None
            
            # Complete current candle if new interval
            if self.current_candle and interval_start_str != self.current_interval_start:
                completed_candle = self.current_candle
                self.completed_candles.append(completed_candle)
                self.current_candle = None
            
            # Create or update candle
            if self.current_candle is None:
                self.current_candle = Candle(
                    timestamp_str=interval_start_str,
                    open=price,
                    high=price,
                    low=price,
                    close=price
                )
                self.current_interval_start = interval_start_str
            else:
                self.current_candle.high = max(self.current_candle.high, price)
                self.current_candle.low = min(self.current_candle.low, price)
                self.current_candle.close = price
            
            return completed_candle
            
        except (ValueError, KeyError, IndexError) as e:
            logging.error(f"Error processing tick: {e}, tick_data: {tick_data}")
            return None
    
    def _get_interval_start_str(self, time_str: str) -> str:
        try:
            time_parts = time_str.split(':')
            if len(time_parts) != 3:
                raise ValueError(f"Invalid time format: {time_str}")
            
            hours = int(time_parts[0])
            minutes = int(time_parts[1])
            seconds = int(time_parts[2])
            
            # Convert to seconds from market open (09:15:00)
            market_open_seconds = 9 * 3600 + 15 * 60  # 09:15:00 in seconds
            current_seconds = hours * 3600 + minutes * 60 + seconds
            
            # Calculate elapsed seconds from market open
            elapsed = current_seconds - market_open_seconds
            if elapsed < 0:
                elapsed = 0  # Before market open
            
            # Calculate interval number
            interval_number = elapsed // self.interval_seconds
            
            # Calculate interval start
            interval_start_seconds = market_open_seconds + (interval_number * self.interval_seconds)
            
            # Convert back to HH:MM:SS
            start_hours = interval_start_seconds // 3600
            start_minutes = (interval_start_seconds % 3600) // 60
            start_secs = interval_start_seconds % 60
            
            return f"{start_hours:02d}:{start_minutes:02d}:{start_secs:02d}"
            
        except (ValueError, IndexError) as e:
            logging.error(f"Error parsing time {time_str}: {e}")
            return time_str  # Fallback to original time

class AsyncCandleEngine:
    def __init__(self, output_folder: str = None, default_timeframes: List[str] = None):
        self.output_folder = output_folder
        self.default_timeframes = default_timeframes or ['1s', '5s', '1m']
        
        # Per-instrument timeframes
        self.instruments: Dict[int, Dict[str, TimeframeEngine]] = defaultdict(lambda: {})
        self.configured_instruments = set()
        
        # Performance tracking
        self.tick_count = 0
        self.candle_count = 0
        self.error_count = 0
        
        # Callbacks
        self.on_candle_complete: Optional[Callable] = None
        self.on_error: Optional[Callable] = None
        
        # Setup logging
        logging.basicConfig(level=logging.ERROR)
        
        # Create output folders
        if output_folder:
            os.makedirs(output_folder, exist_ok=True)
    
    def set_default_timeframes(self, timeframes: List[str]):
        """Set default timeframes for all instruments"""
        self.default_timeframes = timeframes
        print(f"Default timeframes set: {timeframes}")
    
    def configure_instrument(self, security_id: Union[int, List[int]], timeframes: List[str] = None):
        """Configure instrument(s) with timeframes"""
        if timeframes is None:
            timeframes = self.default_timeframes
        
        # Handle single security_id or list
        security_ids = [security_id] if isinstance(security_id, int) else security_id
        
        for sid in security_ids:
            try:
                # Clear existing configuration
                if sid in self.instruments:
                    self.instruments[sid].clear()
                
                # Configure new timeframes
                for tf_str in timeframes:
                    interval_seconds = self._parse_timeframe(tf_str)
                    tf_engine = TimeframeEngine(interval_seconds)
                    self.instruments[sid][tf_str] = tf_engine
                
                self.configured_instruments.add(sid)
                print(f"Configured instrument {sid} with timeframes: {timeframes}")
                
            except Exception as e:
                logging.error(f"Error configuring instrument {sid}: {e}")
                if self.on_error:
                    self.on_error(f"Configuration error for {sid}: {e}")
    
    def configure_multiple_instruments(self, security_ids: List[int], timeframes: List[str] = None):
        """Configure multiple instruments at once"""
        self.configure_instrument(security_ids, timeframes)
    
    def _parse_timeframe(self, tf_str: str) -> int:
        """Convert timeframe string to seconds"""
        try:
            tf_str = tf_str.lower().strip()
            
            if tf_str.endswith('s'):
                return int(tf_str[:-1])
            elif tf_str.endswith('m'):
                return int(tf_str[:-1]) * 60
            elif tf_str.endswith('h'):
                return int(tf_str[:-1]) * 3600
            elif tf_str.endswith('d'):
                return int(tf_str[:-1]) * 86400
            else:
                return int(tf_str)
        except ValueError as e:
            logging.error(f"Invalid timeframe format: {tf_str}")
            raise ValueError(f"Invalid timeframe format: {tf_str}")
    
    async def process_tick(self, tick_data: dict) -> Dict[str, Candle]:
        """Process tick data with async instant file saving"""
        try:
            self.tick_count += 1
            
            # Validate tick data
            if not self._validate_tick_data(tick_data):
                self.error_count += 1
                return {}
            
            security_id = int(tick_data['security_id'])
            
            # Auto-configure with defaults if not configured
            if security_id not in self.configured_instruments:
                self.configure_instrument(security_id)
            
            completed_candles = {}
            write_tasks = []
            
            # Process tick for each timeframe
            for tf_name, tf_engine in self.instruments[security_id].items():
                completed_candle = tf_engine.process_tick(tick_data)
                
                if completed_candle:
                    self.candle_count += 1
                    completed_candles[tf_name] = completed_candle
                    
                    # INSTANT ASYNC SAVE - Create async write task
                    if self.output_folder:
                        task = self._write_candle_async(security_id, tf_name, completed_candle)
                        write_tasks.append(task)
                    
                    # Trigger callback
                    if self.on_candle_complete:
                        try:
                            self.on_candle_complete(security_id, tf_name, completed_candle)
                        except Exception as e:
                            logging.error(f"Error in candle callback: {e}")
            
            # Wait for all writes to complete instantly
            if write_tasks:
                await asyncio.gather(*write_tasks)
            
            return completed_candles
            
        except Exception as e:
            self.error_count += 1
            logging.error(f"Error processing tick: {e}, tick_data: {tick_data}")
            if self.on_error:
                self.on_error(f"Tick processing error: {e}")
            return {}
    
    async def process_batch(self, tick_batch: List[dict]) -> List[Dict[str, Candle]]:
        """Process batch of ticks with async instant saving"""
        tasks = [self.process_tick(tick) for tick in tick_batch]
        results = await asyncio.gather(*tasks)
        return [result for result in results if result]
    
    def _validate_tick_data(self, tick_data: dict) -> bool:
        """Validate tick data format"""
        try:
            required_fields = ['security_id', 'LTP', 'LTT']
            
            for field in required_fields:
                if field not in tick_data:
                    logging.error(f"Missing required field: {field}")
                    return False
            
            # Validate security_id
            int(tick_data['security_id'])
            
            # Validate LTP
            float(tick_data['LTP'])
            
            # Validate LTT format
            time_str = tick_data['LTT']
            if not isinstance(time_str, str) or len(time_str.split(':')) != 3:
                logging.error(f"Invalid time format: {time_str}")
                return False
            
            return True
            
        except (ValueError, TypeError) as e:
            logging.error(f"Tick validation error: {e}")
            return False
    
    async def _write_candle_async(self, security_id: int, tf_name: str, candle: Candle):
        """Asynchronous instant file writing"""
        try:
            candle_dir = os.path.join(self.output_folder, tf_name)
            os.makedirs(candle_dir, exist_ok=True)
            
            filename = os.path.join(candle_dir, f"{security_id}_{tf_name}.csv")
            write_header = not os.path.exists(filename)
            
            # Use aiofiles for async file operations
            async with aiofiles.open(filename, 'a', newline='') as f:
                if write_header:
                    await f.write('timestamp,open,high,low,close,volume\n')
                
                line = f"{candle.timestamp_str},{candle.open},{candle.high},{candle.low},{candle.close},0.0\n"
                await f.write(line)
                await f.flush()
                
        except Exception as e:
            logging.error(f"Error writing candle to file: {e}")
    
    def get_current_candles(self, security_id: int) -> Dict[str, Candle]:
        """Get current incomplete candles for instrument"""
        if security_id not in self.instruments:
            return {}
        
        current_candles = {}
        for tf_name, tf_engine in self.instruments[security_id].items():
            if tf_engine.current_candle:
                current_candles[tf_name] = tf_engine.current_candle
        
        return current_candles
    
    def get_completed_candles(self, security_id: int, tf_name: str, count: int = None) -> List[Candle]:
        """Get completed candles for specific timeframe"""
        if security_id not in self.instruments or tf_name not in self.instruments[security_id]:
            return []
        
        tf_engine = self.instruments[security_id][tf_name]
        candles = list(tf_engine.completed_candles)
        
        if count:
            return candles[-count:]
        return candles
    
    def get_stats(self) -> Dict:
        """Get engine statistics"""
        return {
            'tick_count': self.tick_count,
            'candle_count': self.candle_count,
            'error_count': self.error_count,
            'instruments': len(self.configured_instruments),
            'total_timeframes': sum(len(tfs) for tfs in self.instruments.values()),
            'pending_writes': 0  # Always 0 with async instant saving
        }
    
    def get_configured_instruments(self) -> List[int]:
        """Get list of configured instruments"""
        return list(self.configured_instruments)
    
    def get_instrument_timeframes(self, security_id: int) -> List[str]:
        """Get timeframes for specific instrument"""
        if security_id in self.instruments:
            return list(self.instruments[security_id].keys())
        return []
    
    def set_candle_callback(self, callback: Callable):
        """Set callback for candle completion"""
        self.on_candle_complete = callback
    
    def set_error_callback(self, callback: Callable):
        """Set callback for errors"""
        self.on_error = callback
    
    async def shutdown(self):
        """Graceful shutdown - write any remaining current candles"""
        print("Shutting down engine...")
        
        # Write current incomplete candles
        if self.output_folder:
            write_tasks = []
            for security_id, timeframes in self.instruments.items():
                for tf_name, tf_engine in timeframes.items():
                    if tf_engine.current_candle:
                        task = self._write_candle_async(security_id, tf_name, tf_engine.current_candle)
                        write_tasks.append(task)
            
            if write_tasks:
                await asyncio.gather(*write_tasks)
        
        final_stats = self.get_stats()
        print(f"Shutdown complete. Final stats: {final_stats}")

# -----------------------------------------

# Usage examples
async def async_usage_example():
    """Pure async usage example"""
    
    # Initialize engine
    engine = AsyncCandleEngine(
        output_folder="./async_candles",
        default_timeframes=['1s', '5s', '1m']
    )
    
    # Configure instruments
    engine.configure_instrument([99997, 99998, 99999])
    engine.configure_instrument(99999, ['1s', '5s'])
    
    # Set callback
    def on_candle_complete(security_id, tf_name, candle):
        print(f"[ASYNC SAVED] {security_id} {tf_name}: {candle.timestamp_str} "
              f"OHLC=({candle.open:.2f},{candle.high:.2f},{candle.low:.2f},{candle.close:.2f})")
    
    engine.set_candle_callback(on_candle_complete)
    
    # Sample tick data
    ticks = [
        {'security_id': 62402, 'LTP': '1600.00', 'LTT': '09:15:00'},
        {'security_id': 62402, 'LTP': '1600.50', 'LTT': '09:15:01'},  # 1s candle saved
        {'security_id': 62402, 'LTP': '1599.75', 'LTT': '09:15:02'},  # 1s candle saved
        {'security_id': 62402, 'LTP': '1601.00', 'LTT': '09:15:03'},  # 1s candle saved
        {'security_id': 62402, 'LTP': '1600.25', 'LTT': '09:15:04'},  # 1s + 4s candles saved
        {'security_id': 62403, 'LTP': '2800.50', 'LTT': '09:15:01'},  # 1s candle saved
        {'security_id': 62403, 'LTP': '2799.75', 'LTT': '09:15:05'},  # 1s + 5s candles saved
    ]
    
    print("Processing ticks with ASYNC instant saving...")
    
    # Method 1: Process one by one -----
    for tick in ticks:
        completed_candles = await engine.process_tick(tick)
        if completed_candles:
            print(f"Processed {tick['security_id']}: {len(completed_candles)} candles saved")
    
    # Method 2: Process batch (parallel) -----
    print("\nProcessing batch in parallel...")
    batch_results = await engine.process_batch(ticks)
    print(f"Batch processed: {len(batch_results)} results")
    
    # Show stats
    stats = engine.get_stats()
    print(f"Final stats: {stats}")
    
    # Show current states
    for security_id in engine.get_configured_instruments():
        current = engine.get_current_candles(security_id)
        completed = {tf: len(engine.get_completed_candles(security_id, tf)) 
                    for tf in engine.get_instrument_timeframes(security_id)}
        print(f"Instrument {security_id}: {len(current)} current, completed: {completed}")
    
    # Shutdown
    await engine.shutdown()

async def dhan_websocket_async_example():
    """Example with DhanHQ WebSocket integration"""
    
    engine = AsyncCandleEngine(
        output_folder="./dhan_async_data",
        default_timeframes=['1s', '5s', '15s', '1m']
    )
    
    # Configure trading instruments
    trading_instruments = [62402, 62403, 62404]
    engine.configure_multiple_instruments(trading_instruments)
    
    async def process_dhan_tick(dhan_tick_data):
        """Convert and process DhanHQ tick format"""
        if dhan_tick_data.get('type') == 'Ticker Data':
            tick_data = {
                'security_id': dhan_tick_data['security_id'],
                'LTP': dhan_tick_data['LTP'], 
                'LTT': dhan_tick_data['LTT']
            }
            return await engine.process_tick(tick_data)
        return {}
    
    # Simulate DhanHQ WebSocket data
    dhan_ticks = [
        {'type': 'Ticker Data', 'exchange_segment': 2, 'security_id': 62402, 'LTP': '160.00', 'LTT': '14:15:00'},
        {'type': 'Ticker Data', 'exchange_segment': 2, 'security_id': 62402, 'LTP': '160.50', 'LTT': '14:15:01'},
        {'type': 'Ticker Data', 'exchange_segment': 2, 'security_id': 62403, 'LTP': '85.25', 'LTT': '14:15:01'},
        {'type': 'Ticker Data', 'exchange_segment': 2, 'security_id': 62404, 'LTP': '500.75', 'LTT': '14:15:02'},
    ]
    
    print("Processing DhanHQ ticks asynchronously...")
    
    # Process all ticks in parallel
    tasks = [process_dhan_tick(tick) for tick in dhan_ticks]
    results = await asyncio.gather(*tasks)
    
    for i, result in enumerate(results):
        if result:
            print(f"DhanHQ tick {i+1}: {len(result)} candles saved")
    
    await engine.shutdown()

async def high_frequency_async_example():
    """High-frequency trading simulation"""
    
    engine = AsyncCandleEngine(
        output_folder="./hft_async_data",
        default_timeframes=['1s', '3s', '5s']
    )
    
    # Configure multiple instruments
    instruments = list(range(62400, 62410))  # 10 instruments
    engine.configure_multiple_instruments(instruments)
    
    print("Simulating high-frequency async processing...")
    
    # Generate high-frequency data
    import random
    
    async def generate_and_process_tick(security_id, base_price, tick_num):
        """Generate and process a single tick"""
        price = base_price + random.uniform(-5, 5)
        second = tick_num // 10  # 10 ticks per second
        
        tick = {
            'security_id': security_id,
            'LTP': f'{price:.2f}',
            'LTT': f'09:15:{second % 60:02d}'
        }
        
        return await engine.process_tick(tick)
    
    # Generate 1000 ticks across 10 instruments (100 ticks each)
    tasks = []
    for i in range(1000):
        security_id = instruments[i % len(instruments)]
        base_price = 1000 + (security_id - 62400) * 100
        task = generate_and_process_tick(security_id, base_price, i)
        tasks.append(task)
    
    # Process all in parallel
    start_time = time.time()
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    
    total_candles = sum(len(result) for result in results if result)
    processing_time = end_time - start_time
    
    print(f"Processed 1000 ticks in {processing_time:.2f}s")
    print(f"Generated {total_candles} candles")
    print(f"Speed: {1000/processing_time:.0f} ticks/sec")
    
    stats = engine.get_stats()
    print(f"Final stats: {stats}")
    
    await engine.shutdown()

if __name__ == "__main__":
    print("Choose async example:")
    print("1. Basic async usage")
    print("2. DhanHQ WebSocket integration")
    print("3. High-frequency simulation")
    
    choice = input("Enter choice (1-3): ").strip()
    
    if choice == "1":
        asyncio.run(async_usage_example())
    elif choice == "2":
        asyncio.run(dhan_websocket_async_example())
    elif choice == "3":
        asyncio.run(high_frequency_async_example())
    else:
        asyncio.run(async_usage_example())

Choose async example:
1. Basic async usage
2. DhanHQ WebSocket integration
3. High-frequency simulation
Configured instrument 62402 with timeframes: ['1s', '4s', '5s']
Configured instrument 62403 with timeframes: ['1s', '5s']
Processing ticks with ASYNC instant saving...
[ASYNC SAVED] 62402 1s: 09:15:00 OHLC=(1600.00,1600.00,1600.00,1600.00)
Processed 62402: 1 candles saved
[ASYNC SAVED] 62402 1s: 09:15:01 OHLC=(1600.50,1600.50,1600.50,1600.50)
Processed 62402: 1 candles saved
[ASYNC SAVED] 62402 1s: 09:15:02 OHLC=(1599.75,1599.75,1599.75,1599.75)
Processed 62402: 1 candles saved
[ASYNC SAVED] 62402 1s: 09:15:03 OHLC=(1601.00,1601.00,1601.00,1601.00)
[ASYNC SAVED] 62402 4s: 09:15:00 OHLC=(1600.00,1601.00,1599.75,1601.00)
Processed 62402: 2 candles saved
[ASYNC SAVED] 62403 1s: 09:15:01 OHLC=(2800.50,2800.50,2800.50,2800.50)
[ASYNC SAVED] 62403 5s: 09:15:00 OHLC=(2800.50,2800.50,2800.50,2800.50)
Processed 62403: 2 candles saved

Processing batch in parallel...
[ASYNC SAVED] 62402 1s: 09