In [17]:
import os
import ast
import json
import time
import base64
import requests
import threading
import websocket
import pandas as pd
from dotenv import load_dotenv
from datetime import datetime, timezone, date
from collections import defaultdict, deque
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding


load_dotenv()


# -------------------------------------------------
# CAPITAL.COM CLIENT
# -------------------------------------------------
class CapitalClient:
    CANDLES_PER_MINUTE = {
        "MINUTE": 1,
        "MINUTE_5": 1 / 5,
        "MINUTE_15": 1 / 15,
        "MINUTE_30": 1 / 30,
        "HOUR": 1 / 60,
        "HOUR_4": 1 / 240,
        "DAY": 1 / 1440,
        "WEEK": 1 / 10080,
    }

    CHUNK_MINUTES = {
        "MINUTE": 600,
        "MINUTE_5": 3000,
        "MINUTE_15": 6000,
        "MINUTE_30": 12000,
        "HOUR": 43200,
        "HOUR_4": 172800,
        "DAY": 525600,
        "WEEK": 1048320,
    }

    def __init__(self, api_key: str, identifier: str, password: str):
        self.api_key = api_key
        self.identifier = identifier
        self.password = password
        self.base_url = os.getenv("CAPITAL_BASE_URL")

        # Session management
        self.cst = None
        self.security_token = None
        self.session_expiry = None
        self._session_lock = threading.Lock()

        # Real-time tick data
        self._tick_callback = None
        self._tick_epics = []
        self._ws_active = False
        self._ws_stop = threading.Event()
        self._ws_instance = None

    # ----------------------------------
    # SESSION MANAGEMENT
    # ----------------------------------
    def _is_session_valid(self) -> bool:
        """Check if the current session is still valid."""
        if not self.cst or not self.security_token:
            return False

        # If we don't have expiry time, assume it's valid
        if not self.session_expiry:
            return True

        # Add a buffer of 5 minutes before actual expiry
        return datetime.now(timezone.utc) < (self.session_expiry - timedelta(minutes=5))

    def _renew_session(self):
        """Renew the session by logging in again."""
        with self._session_lock:
            print("üîÑ Renewing session...")
            self.login()

    def _ensure_valid_session(self):
        """Ensure we have a valid session, renew if needed."""
        if not self._is_session_valid():
            self._renew_session()

    # ----------------------------------
    # AUTH
    # ----------------------------------
    def login(self):
        """Login and store session expiry information."""
        r = requests.get(
            f"{self.base_url}/session/encryptionKey",
            headers={"X-CAP-API-KEY": self.api_key},
        )
        r.raise_for_status()

        encryption_key = r.json()["encryptionKey"]
        timestamp = r.json()["timeStamp"]

        message = f"{self.password}|{timestamp}".encode()
        message_b64 = base64.b64encode(message)

        public_key = serialization.load_der_public_key(base64.b64decode(encryption_key))
        encrypted = public_key.encrypt(message_b64, padding.PKCS1v15())
        encrypted_password = base64.b64encode(encrypted).decode()

        r = requests.post(
            f"{self.base_url}/session",
            headers={
                "X-CAP-API-KEY": self.api_key,
                "Content-Type": "application/json",
            },
            json={
                "identifier": self.identifier,
                "password": encrypted_password,
                "encryptedPassword": True,
            },
        )
        r.raise_for_status()

        self.cst = r.headers["CST"]
        self.security_token = r.headers["X-SECURITY-TOKEN"]

        # Try to parse expiry from response if available
        try:
            account_info = r.json().get("accountInfo", {})
            # Capital.com sessions typically last 24 hours
            self.session_expiry = datetime.now(timezone.utc) + timedelta(
                hours=23
            )  # Conservative estimate
        except:
            # If we can't parse expiry, set to 23 hours from now
            self.session_expiry = datetime.now(timezone.utc) + timedelta(hours=23)

        print("‚úÖ Logged in successfully")

    @property
    def headers(self):
        """Get headers with automatic session renewal."""
        self._ensure_valid_session()

        if not self.cst or not self.security_token:
            raise RuntimeError("Call login() first")

        return {
            "X-CAP-API-KEY": self.api_key,
            "CST": self.cst,
            "X-SECURITY-TOKEN": self.security_token,
        }

    # ----------------------------------
    # WEB SOCKET SESSION MANAGEMENT
    # ----------------------------------
    def get_websocket_headers(self):
        """Get headers specifically for WebSocket connections."""
        self._ensure_valid_session()
        return {
            "X-CAP-API-KEY": self.api_key,
            "CST": self.cst,
            "X-SECURITY-TOKEN": self.security_token,
        }

    # ----------------------------------
    # RAW PRICE CALL (UPDATED WITH TIMEZONE HANDLING)
    # ----------------------------------
    def _fetch_prices(
        self,
        epic: str,
        resolution: str,
        start: datetime,
        end: datetime,
    ) -> pd.DataFrame:

        minutes = (end - start).total_seconds() / 60
        max_points = int(minutes * self.CANDLES_PER_MINUTE[resolution]) + 5

        # Convert to UTC and remove timezone info for Capital.com API
        if start.tzinfo is not None:
            start = start.astimezone(timezone.utc).replace(tzinfo=None)

        if end.tzinfo is not None:
            end = end.astimezone(timezone.utc).replace(tzinfo=None)

        # Format as simple ISO string without timezone
        start_iso = start.strftime("%Y-%m-%dT%H:%M:%S")
        end_iso = end.strftime("%Y-%m-%dT%H:%M:%S")

        params = {
            "resolution": resolution,
            "from": start_iso,
            "to": end_iso,
            "max": max_points,
        }

        r = requests.get(
            f"{self.base_url}/prices/{epic}",
            headers=self.headers,
            params=params,
        )
        r.raise_for_status()

        prices = r.json().get("prices", [])
        if not prices:
            return pd.DataFrame()

        df = pd.DataFrame(prices)

        # Convert to UTC timezone-aware datetime
        df["timestamp"] = pd.to_datetime(df["snapshotTime"]).dt.tz_localize(None)

        # For 15-minute candles, check if we need to round to the nearest 15 minutes
        if resolution == "MINUTE_15":
            # Round down to the nearest 15 minutes
            df["timestamp"] = df["timestamp"].dt.floor("15min")

        df = df.sort_values("timestamp")

        return df[["timestamp", "openPrice", "highPrice", "lowPrice", "closePrice"]]

    # ----------------------------------
    # SMART HISTORICAL DOWNLOAD (UPDATED)
    # ----------------------------------
    def get_historical_prices(
        self,
        epic: str,
        resolution: str,
        from_date: str,
        to_date: str,
        timezone_offset: int = 0,  # Timezone offset in hours (e.g., 5.5 for IST)
    ) -> pd.DataFrame:
        """
        Get historical prices with timezone handling.
        """

        # Parse dates (handle both date-only and datetime strings)
        try:
            start_dt = datetime.fromisoformat(from_date)
        except ValueError:
            # If only date is provided, add time
            start_dt = datetime.strptime(from_date, "%Y-%m-%d")

        try:
            end_dt = datetime.fromisoformat(to_date)
        except ValueError:
            # If only date is provided, add end of day
            end_dt = datetime.strptime(to_date, "%Y-%m-%d")
            end_dt = end_dt.replace(hour=23, minute=59, second=59, microsecond=999999)

        # Apply timezone offset if needed
        if timezone_offset != 0:
            start_dt = start_dt + timedelta(hours=timezone_offset)
            end_dt = end_dt + timedelta(hours=timezone_offset)
            print(f"üìÖ Applied {timezone_offset:+} hour timezone offset")

        delta = timedelta(minutes=self.CHUNK_MINUTES[resolution])

        all_chunks = []
        current = start_dt

        print(f"\nüì• Downloading {epic} {resolution} candles")
        print(
            f"   Date range: {start_dt.strftime('%Y-%m-%d %H:%M:%S')} to {end_dt.strftime('%Y-%m-%d %H:%M:%S')}"
        )

        while current < end_dt:
            chunk_end = min(current + delta, end_dt)

            print(
                f"  {current.strftime('%Y-%m-%d %H:%M:%S')} ‚Üí {chunk_end.strftime('%Y-%m-%d %H:%M:%S')}"
            )

            df = self._fetch_prices(
                epic=epic,
                resolution=resolution,
                start=current,
                end=chunk_end,
            )

            if not df.empty:
                all_chunks.append(df)

            current = chunk_end

        if not all_chunks:
            return pd.DataFrame()

        df = (
            pd.concat(all_chunks)
            .drop_duplicates("timestamp")
            .sort_values("timestamp")
            .reset_index(drop=True)
        )

        # Apply reverse timezone offset to display in local time
        if timezone_offset != 0:
            df["timestamp"] = df["timestamp"] - pd.Timedelta(hours=timezone_offset)

        print(
            f"‚úÖ Done: {len(df):,} candles "
            f"({df.timestamp.min().strftime('%Y-%m-%d %H:%M:%S')} ‚Üí {df.timestamp.max().strftime('%Y-%m-%d %H:%M:%S')})"
        )

        # Check for missing candles
        if len(df) > 1:
            expected_freq = "15min" if resolution == "MINUTE_15" else "1D"
            full_range = pd.date_range(
                start=df.timestamp.min(), end=df.timestamp.max(), freq=expected_freq
            )
            missing = set(full_range) - set(df.timestamp)
            if missing:
                print(f"‚ö†Ô∏è  Missing {len(missing)} candles at: {list(missing)[:5]}...")

        return df

    # ----------------------------------
    # ACCOUNTS (BALANCE / MARGIN / P&L)
    # ----------------------------------
    def get_accounts(self) -> pd.DataFrame:
        """
        Fetch all trading accounts with balances and margin info.
        """
        r = requests.get(
            f"{self.base_url}/accounts",
            headers=self.headers,
        )
        r.raise_for_status()

        accounts = r.json().get("accounts", [])

        if not accounts:
            print("‚ö†Ô∏è  No accounts found")
            return None

        return accounts

    # ----------------------------------
    # MARKET SEARCH
    # ----------------------------------
    def search_markets(self, search_term=""):
        r = requests.get(
            f"{self.base_url}/markets",
            headers=self.headers,
            params={"searchTerm": search_term, "limit": 20},
        )
        r.raise_for_status()
        return r.json().get("markets", [])

        # ----------------------------------

    # WORKING ORDERS (LIMIT/STOP ORDERS)
    # ----------------------------------
    def create_working_order(
        self,
        epic: str,
        direction: str,
        size: float,
        level: float,
        order_type: str,
        good_till_date: str = None,
        guaranteed_stop: bool = False,
        trailing_stop: bool = False,
        stop_level: float = None,
        stop_distance: float = None,
        stop_amount: float = None,
        profit_level: float = None,
        profit_distance: float = None,
        profit_amount: float = None,
        deal_reference: str = None,
    ) -> dict:
        """Create a limit or stop order (working order)."""

        # Validate required parameters
        if direction not in ["BUY", "SELL"]:
            raise ValueError("direction must be either 'BUY' or 'SELL'")

        if size <= 0:
            raise ValueError("size must be greater than 0")

        if level <= 0:
            raise ValueError("level must be greater than 0")

        if order_type not in ["LIMIT", "STOP"]:
            raise ValueError("order_type must be either 'LIMIT' or 'STOP'")

        # Validate stop parameters
        if guaranteed_stop and trailing_stop:
            raise ValueError("Cannot set both guaranteedStop and trailingStop to True")

        if guaranteed_stop:
            if not any([stop_level, stop_distance, stop_amount]):
                raise ValueError(
                    "If guaranteedStop=True, must provide stopLevel, stopDistance, or stopAmount"
                )

        if trailing_stop:
            if stop_distance is None:
                raise ValueError("If trailingStop=True, must provide stopDistance")

        # Validate good_till_date format if provided
        if good_till_date:
            try:
                datetime.fromisoformat(good_till_date.replace("Z", ""))
            except ValueError:
                raise ValueError("good_till_date must be in format YYYY-MM-DDTHH:MM:SS")

        # Prepare request body
        body = {
            "epic": epic,
            "direction": direction,
            "size": size,
            "level": level,
            "type": order_type,
            "guaranteedStop": guaranteed_stop,
            "trailingStop": trailing_stop,
        }

        # Add optional parameters if provided
        if good_till_date is not None:
            body["goodTillDate"] = good_till_date

        if stop_level is not None:
            body["stopLevel"] = stop_level

        if stop_distance is not None:
            body["stopDistance"] = stop_distance

        if stop_amount is not None:
            body["stopAmount"] = stop_amount

        if profit_level is not None:
            body["profitLevel"] = profit_level

        if profit_distance is not None:
            body["profitDistance"] = profit_distance

        if profit_amount is not None:
            body["profitAmount"] = profit_amount

        if deal_reference is not None:
            body["dealReference"] = deal_reference

        # Make the API request
        r = requests.post(
            f"{self.base_url}/workingorders", headers=self.headers, json=body
        )
        r.raise_for_status()

        response = r.json()

        # Add convenience fields
        if "dealReference" in response:
            response["order_id"] = response["dealReference"].replace("o_", "")
            response["dealId"] = response["order_id"]

        order_type_desc = "Limit" if order_type == "LIMIT" else "Stop"
        print(
            f"‚úÖ {order_type_desc} order created: {direction} {size} {epic} @ {level}"
        )
        if "dealReference" in response:
            print(f"   Deal Reference: {response['dealReference']}")

        return response

    # ----------------------------------
    # GET ALL WORKING ORDERS
    # ----------------------------------
    def get_working_orders(self) -> pd.DataFrame:
        """
        Get all pending working orders (limit/stop orders).

        """
        r = requests.get(f"{self.base_url}/workingorders", headers=self.headers)
        r.raise_for_status()

        orders = r.json().get("workingOrders", [])
        if not orders:
            return pd.DataFrame()

        df = pd.json_normalize(orders)

        # Optional: Clean up column names
        df = df.rename(columns=lambda x: x.replace(".", "_"))

        # Convert timestamp columns if they exist
        timestamp_cols = ["createdDate", "goodTillDate"]
        for col in timestamp_cols:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col])

        return df

    # ----------------------------------
    # GET SPECIFIC WORKING ORDER
    # ----------------------------------
    def get_working_order(self, deal_id: str) -> dict:
        """
        Get details of a specific working order.
        """
        r = requests.get(
            f"{self.base_url}/workingorders/{deal_id}", headers=self.headers
        )
        r.raise_for_status()
        return r.json()

    # ----------------------------------
    # UPDATE WORKING ORDER
    # ----------------------------------
    def update_working_order(
        self,
        deal_id: str,
        level: float = None,
        good_till_date: str = None,
        guaranteed_stop: bool = None,
        trailing_stop: bool = None,
        stop_level: float = None,
        stop_distance: float = None,
        stop_amount: float = None,
        profit_level: float = None,
        profit_distance: float = None,
        profit_amount: float = None,
    ) -> dict:
        """
        Update a limit or stop working order.

        """

        # Validate parameter combinations
        if guaranteed_stop is not None and trailing_stop is not None:
            if guaranteed_stop and trailing_stop:
                raise ValueError(
                    "Cannot set both guaranteedStop and trailingStop to True"
                )

        if guaranteed_stop and guaranteed_stop is True:
            if not any([stop_level, stop_distance, stop_amount]):
                raise ValueError(
                    "If guaranteedStop=True, must provide stopLevel, stopDistance, or stopAmount"
                )

        if trailing_stop and trailing_stop is True:
            if stop_distance is None:
                raise ValueError("If trailingStop=True, must provide stopDistance")

        # Validate good_till_date format if provided
        if good_till_date:
            try:
                datetime.fromisoformat(good_till_date.replace("Z", ""))
            except ValueError:
                raise ValueError("good_till_date must be in format YYYY-MM-DDTHH:MM:SS")

        # Prepare request body
        body = {}

        # Add parameters if provided
        if level is not None:
            if level <= 0:
                raise ValueError("level must be greater than 0")
            body["level"] = level

        if good_till_date is not None:
            body["goodTillDate"] = good_till_date

        if guaranteed_stop is not None:
            body["guaranteedStop"] = guaranteed_stop

        if trailing_stop is not None:
            body["trailingStop"] = trailing_stop

        if stop_level is not None:
            body["stopLevel"] = stop_level

        if stop_distance is not None:
            body["stopDistance"] = stop_distance

        if stop_amount is not None:
            body["stopAmount"] = stop_amount

        if profit_level is not None:
            body["profitLevel"] = profit_level

        if profit_distance is not None:
            body["profitDistance"] = profit_distance

        if profit_amount is not None:
            body["profitAmount"] = profit_amount

        # Make the API request
        r = requests.put(
            f"{self.base_url}/workingorders/{deal_id}", headers=self.headers, json=body
        )
        r.raise_for_status()

        response = r.json()
        print(f"‚úÖ Working order updated: Deal ID {deal_id}")

        return response

    # ----------------------------------
    # DELETE WORKING ORDER
    # ----------------------------------
    def delete_working_order(self, deal_id: str) -> dict:
        """
        Delete (cancel) a working order.
        """
        r = requests.delete(
            f"{self.base_url}/workingorders/{deal_id}", headers=self.headers
        )
        r.raise_for_status()

        response = r.json()
        print(f"‚úÖ Working order deleted: Deal ID {deal_id}")

        return response

    # ----------------------------------
    # REAL-TIME TICK DATA STREAM
    # ----------------------------------
    def stream_ticks(
        self,
        epics: list[str],
        on_tick=None,
        auto_reconnect: bool = True,
        reconnect_delay: int = 2,
    ):
        """Stream real-time tick data (not delayed OHLC)."""
        if not on_tick:
            raise ValueError("on_tick callback is required")

        # Store callback
        self._tick_callback = on_tick
        self._tick_epics = epics

        ws_url = "wss://api-streaming-capital.backend-capital.com/connect"

        # Reset state
        self._ws_active = False
        self._ws_stop.clear()

        def on_message(ws, raw):
            try:
                msg = json.loads(raw)

                # Handle ping response
                if msg.get("destination") == "ping":
                    return

                # Handle subscription response
                if msg.get("destination") == "marketData.subscribe":
                    status = msg.get("status")
                    if status == "OK":
                        subscriptions = msg.get("payload", {}).get("subscriptions", {})
                        print(
                            f"‚úÖ Real-time tick subscription successful: {subscriptions}"
                        )
                    else:
                        error_code = msg.get("payload", {}).get("errorCode")
                        print(f"‚ùå Subscription failed: {error_code}")

                        if error_code == "error.invalid.session.token":
                            print("üîÑ Session expired, renewing...")
                            self._renew_session()
                            subscribe_to_ticks(ws)
                    return

                # Handle real-time tick data
                if msg.get("destination") == "quote":
                    if self._ws_stop.is_set():
                        return

                    p = msg["payload"]
                    epic = p["epic"]

                    # Process tick immediately
                    tick_data = {
                        "epic": epic,
                        "bid": p.get("bid"),
                        "ask": p.get("ofr"),  # Note: "ofr" is ask price
                        "bid_qty": p.get("bidQty"),
                        "ask_qty": p.get("ofrQty"),
                        "timestamp": datetime.fromtimestamp(p["timestamp"] / 1000),
                        "received_at": datetime.now(),
                    }

                    # Call the tick callback
                    try:
                        self._tick_callback(tick_data)
                    except Exception as e:
                        print(f"‚ö†Ô∏è Error in tick callback: {e}")

            except Exception as e:
                if not self._ws_stop.is_set():
                    print(f"‚ùå Error processing tick message: {e}")
                    import traceback

                    traceback.print_exc()

        def subscribe_to_ticks(ws):
            """Helper function to subscribe to real-time tick data."""
            ws_headers = self.get_websocket_headers()

            sub_msg = {
                "destination": "marketData.subscribe",
                "correlationId": str(int(time.time() * 1000)),
                "cst": ws_headers["CST"],
                "securityToken": ws_headers["X-SECURITY-TOKEN"],
                "payload": {
                    "epics": epics,
                },
            }
            ws.send(json.dumps(sub_msg))
            print(f"üì° Sent real-time tick subscription for {epics}")

        def on_open(ws):
            print("‚úÖ Real-time WebSocket connection opened")
            connect_time = datetime.now()
            print(f"‚è±Ô∏è  Connected at: {connect_time.strftime('%H:%M:%S.%f')[:-3]}")

            ws_headers = self.get_websocket_headers()

            ping_msg = {
                "destination": "ping",
                "correlationId": str(int(time.time() * 1000)),
                "cst": ws_headers["CST"],
                "securityToken": ws_headers["X-SECURITY-TOKEN"],
            }
            ws.send(json.dumps(ping_msg))

            # Minimal delay before subscription
            time.sleep(0.1)
            subscribe_to_ticks(ws)

            self._ws_active = True

        def on_error(ws, error):
            if not self._ws_stop.is_set():
                print(f"‚ùå WebSocket error: {error}")
                if auto_reconnect:
                    print(f"‚è≥ Reconnecting in {reconnect_delay} seconds...")
                    time.sleep(reconnect_delay)
                    if not self._ws_stop.is_set():
                        connect_websocket()

        def on_close(ws, close_status_code, close_msg):
            if not self._ws_stop.is_set():
                print(f"üîå WebSocket closed: Code={close_status_code}")

            self._ws_active = False

            if auto_reconnect and not self._ws_stop.is_set():
                print(f"‚è≥ Reconnecting in {reconnect_delay} seconds...")
                time.sleep(reconnect_delay)
                if not self._ws_stop.is_set():
                    connect_websocket()

        def connect_websocket():
            """Connect to WebSocket with current session."""
            if self._ws_stop.is_set():
                return None

            print(f"üåê Connecting to real-time WebSocket...")

            ws_headers = self.get_websocket_headers()

            ws = websocket.WebSocketApp(
                ws_url,
                on_open=on_open,
                on_message=on_message,
                on_error=on_error,
                on_close=on_close,
                header=[
                    f"X-CAP-API-KEY: {self.api_key}",
                    f"CST: {ws_headers['CST']}",
                    f"X-SECURITY-TOKEN: {ws_headers['X-SECURITY-TOKEN']}",
                ],
            )

            self._ws_instance = ws

            # Run in background thread
            def run_ws():
                ws.run_forever(
                    ping_interval=30,
                    ping_timeout=10,
                    ping_payload=json.dumps(
                        {
                            "destination": "ping",
                            "correlationId": str(int(time.time() * 1000)),
                        }
                    ),
                    reconnect=5,
                )

            thread = threading.Thread(target=run_ws, daemon=True)
            thread.start()

            # Wait a moment for connection to establish
            time.sleep(0.5)

            return ws

        # Start the WebSocket connection
        return connect_websocket()

    def stop_streaming(self):
        """Stop the WebSocket streaming."""
        print("üõë Stopping WebSocket stream...")
        self._ws_stop.set()
        self._ws_active = False

        if hasattr(self, "_ws_instance") and self._ws_instance:
            try:
                self._ws_instance.close()
                print("‚úÖ WebSocket closed successfully")
            except Exception as e:
                print(f"‚ö†Ô∏è  Error closing WebSocket: {e}")

        # Clear callback
        self._tick_callback = None


# -------------------------------------------------
# CANDLE BUILDER FOR REAL-TIME TICKS
# -------------------------------------------------
class RealTimeCandleBuilder:
    """Build candles from real-time tick data."""

    def __init__(
        self, resolution_seconds: int = 60, csv_storage: CandleCSVStorage = None
    ):
        self.resolution_seconds = resolution_seconds
        self.candle_data = {}
        self.current_candle_start = {}
        self.last_candle = {}
        # Statistics
        self.tick_count = 0
        self.last_tick_time = None
        self.last_candle_time = None
        # CSV storage
        self.csv_storage = csv_storage

    def set_csv_storage(self, csv_storage: CandleCSVStorage):
        """Set CSV storage for saving candles."""
        self.csv_storage = csv_storage

    def process_tick(self, tick: dict) -> dict:
        """Process a tick and return candle if it just closed."""
        epic = tick["epic"]

        # Use bid price for calculations (or mid price: (bid+ask)/2)
        price = tick["bid"]
        timestamp = tick["timestamp"]

        # Update statistics
        self.tick_count += 1
        self.last_tick_time = datetime.now()

        # Determine candle start time (floor to resolution)
        seconds_in_minute = self.resolution_seconds // 60
        if seconds_in_minute > 0:
            # For minute-based resolutions
            candle_start = timestamp.replace(
                second=0,
                microsecond=0,
                minute=timestamp.minute - (timestamp.minute % seconds_in_minute),
            )
        else:
            # For second-based resolutions (if needed)
            candle_start = timestamp.replace(
                microsecond=0,
                second=timestamp.second - (timestamp.second % self.resolution_seconds),
            )

        # Initialize candle data for this epic if needed
        if epic not in self.candle_data:
            self.candle_data[epic] = {
                "open": price,
                "high": price,
                "low": price,
                "close": price,
                "volume": 1,
                "start_time": candle_start,
            }
            self.current_candle_start[epic] = candle_start
            return None

        # Check if we moved to a new candle
        if candle_start > self.current_candle_start[epic]:
            # Candle just closed! Return the completed candle
            closed_candle = self.candle_data[epic].copy()
            closed_candle["epic"] = epic
            closed_candle["end_time"] = self.current_candle_start[epic] + timedelta(
                seconds=self.resolution_seconds
            )
            closed_candle["closed_at"] = tick["received_at"]

            # Store as last candle
            self.last_candle[epic] = closed_candle
            self.last_candle_time = closed_candle["end_time"]

            # Save to CSV if storage is configured
            if self.csv_storage:
                try:
                    self.csv_storage.save_candle(closed_candle, self.resolution_seconds)
                except Exception as e:
                    print(f"‚ö†Ô∏è Error saving candle to CSV: {e}")

            # Start new candle
            self.candle_data[epic] = {
                "open": price,
                "high": price,
                "low": price,
                "close": price,
                "volume": 1,
                "start_time": candle_start,
            }
            self.current_candle_start[epic] = candle_start

            return closed_candle
        else:
            # Update current candle
            candle = self.candle_data[epic]
            candle["high"] = max(candle["high"], price)
            candle["low"] = min(candle["low"], price)
            candle["close"] = price
            candle["volume"] += 1

            # Also update the current price for tracking
            candle["current_price"] = price
            candle["current_time"] = timestamp

            return None

    def get_current_candle(self, epic: str) -> dict:
        """Get the current incomplete candle."""
        if epic in self.candle_data:
            candle = self.candle_data[epic].copy()
            candle["epic"] = epic
            candle["is_complete"] = False
            return candle
        return None

    def get_last_candle(self, epic: str) -> dict:
        """Get the last completed candle."""
        return self.last_candle.get(epic)

    def get_stats(self) -> dict:
        """Get statistics about the candle builder."""
        time_since_last = None
        if self.last_tick_time:
            time_since_last = (datetime.now() - self.last_tick_time).total_seconds()

        return {
            "tick_count": self.tick_count,
            "time_since_last_tick": time_since_last,
            "last_candle_time": self.last_candle_time,
            "active_epics": list(self.candle_data.keys()),
        }

In [18]:
# -------------------------------------------------
# CANDLE CSV STORAGE
# -------------------------------------------------
import os
import csv
import pandas as pd
from datetime import datetime


# -------------------------------------------------
# CSV STORAGE CLASS (FIXED VERSION)
# -------------------------------------------------
class CandleCSVStorage:
    """Store candle data in CSV files organized by market and resolution."""

    def __init__(self, base_path: str = "data/historical-data"):
        self.base_path = base_path
        os.makedirs(base_path, exist_ok=True)

        # File handle management
        self._file_handles = {}
        self._csv_writers = {}
        self._current_dates = {}
        self._file_paths = {}

    def _get_market_name(self, epic: str) -> str:
        """Extract market name from epic."""
        # Simple mapping or extraction logic
        epic_lower = epic.lower()

        if "gold" in epic_lower:
            return "gold"
        elif "btc" in epic_lower or "bitcoin" in epic_lower:
            return "bitcoin"
        elif "eurusd" in epic_lower:
            return "eurusd"
        elif "gbpusd" in epic_lower:
            return "gbpusd"
        elif "usdjpy" in epic_lower:
            return "usdjpy"
        elif "nasdaq" in epic_lower or "nas100" in epic_lower:
            return "nasdaq"
        elif "spx" in epic_lower or "sp500" in epic_lower:
            return "sp500"
        else:
            # Use epic as fallback (clean it up)
            clean_epic = epic.replace(".", "_").replace("-", "_").lower()
            return clean_epic

    def _get_resolution_name(self, resolution_seconds: int) -> str:
        """Convert resolution in seconds to string name."""
        if resolution_seconds == 60:
            return "m1"
        elif resolution_seconds == 300:
            return "m5"
        elif resolution_seconds == 900:
            return "m15"
        elif resolution_seconds == 1800:
            return "m30"
        elif resolution_seconds == 3600:
            return "h1"
        elif resolution_seconds == 14400:
            return "h4"
        elif resolution_seconds == 86400:
            return "d1"
        elif resolution_seconds == 604800:
            return "w1"
        else:
            return f"{resolution_seconds}s"

    def _get_file_path(
        self, epic: str, resolution_seconds: int, date: datetime = None
    ) -> str:
        """Get the CSV file path for a specific date."""
        if date is None:
            date = datetime.now()

        market_name = self._get_market_name(epic)
        resolution_name = self._get_resolution_name(resolution_seconds)
        date_str = date.strftime("%Y-%m-%d")

        # Create market directory
        market_dir = os.path.join(self.base_path, market_name)
        os.makedirs(market_dir, exist_ok=True)

        # File name pattern: market-resolution-date.csv
        filename = f"{market_name}_{resolution_name}_{date_str}.csv"
        return os.path.join(market_dir, filename)

    def _get_file_key(self, epic: str, resolution_seconds: int) -> str:
        """Get a unique key for file management."""
        return f"{epic}_{resolution_seconds}"

    def _ensure_file_open(self, epic: str, resolution_seconds: int, date: datetime):
        """Ensure the CSV file is open and ready for writing."""
        file_key = self._get_file_key(epic, resolution_seconds)

        # Get new file path
        new_file_path = self._get_file_path(epic, resolution_seconds, date)

        # Check if we need to open a new file
        if (
            file_key not in self._file_paths
            or self._file_paths[file_key] != new_file_path
        ):

            # Close existing file if open
            self._close_file_if_open(file_key)

            # Open new file
            self._open_new_file(file_key, new_file_path, date)

    def _close_file_if_open(self, file_key: str):
        """Close file if it's open."""
        if file_key in self._file_handles:
            try:
                self._file_handles[file_key].close()
                print(f"üìÅ Closed file: {self._file_paths.get(file_key, 'unknown')}")
            except Exception as e:
                print(f"‚ö†Ô∏è Error closing file: {e}")

            # Clean up
            for dict_name in [
                "_file_handles",
                "_csv_writers",
                "_current_dates",
                "_file_paths",
            ]:
                if file_key in getattr(self, dict_name):
                    getattr(self, dict_name).pop(file_key, None)

    def _open_new_file(self, file_key: str, file_path: str, date: datetime):
        """Open a new CSV file for writing."""
        try:
            # Check if file exists to determine if we need a header
            file_exists = os.path.exists(file_path)

            # Open file in append mode
            file_handle = open(file_path, "a", newline="", encoding="utf-8")
            csv_writer = csv.writer(file_handle)

            # Write header if file is new
            if not file_exists:
                csv_writer.writerow(
                    [
                        "timestamp",
                        "open",
                        "high",
                        "low",
                        "close",
                        "volume",
                        "start_time",
                        "end_time",
                        "closed_at",
                        "epic",
                    ]
                )
                print(f"üìÑ Created new file with header: {file_path}")

            # Store handles and metadata
            self._file_handles[file_key] = file_handle
            self._csv_writers[file_key] = csv_writer
            self._current_dates[file_key] = date
            self._file_paths[file_key] = file_path

            print(f"üìÅ Opened file for writing: {file_path}")

        except Exception as e:
            print(f"‚ùå Error opening file {file_path}: {e}")
            raise

    def save_candle(self, candle: dict, resolution_seconds: int):
        """Save a completed candle to CSV."""
        try:
            epic = candle["epic"]
            end_time = candle["end_time"]

            # Ensure file is open for this date
            self._ensure_file_open(epic, resolution_seconds, end_time)

            file_key = self._get_file_key(epic, resolution_seconds)

            if file_key not in self._csv_writers:
                print(f"‚ùå CSV writer not found for {file_key}")
                return False

            csv_writer = self._csv_writers[file_key]

            # Prepare row data
            row = [
                candle["start_time"].isoformat(),  # timestamp
                float(candle["open"]),
                float(candle["high"]),
                float(candle["low"]),
                float(candle["close"]),
                int(candle["volume"]),
                candle["start_time"].isoformat(),
                candle["end_time"].isoformat(),
                candle["closed_at"].isoformat(),
                epic,
            ]

            # Write to CSV
            csv_writer.writerow(row)

            # Flush immediately to ensure data is written
            self._file_handles[file_key].flush()

            # Print confirmation with file info
            file_path = self._file_paths[file_key]
            filename = os.path.basename(file_path)
            print(
                f"üíæ Saved candle to {filename}: "
                f"{candle['start_time'].strftime('%H:%M:%S')} - "
                f"{candle['end_time'].strftime('%H:%M:%S')} "
                f"(O:{candle['open']:.2f} H:{candle['high']:.2f} "
                f"L:{candle['low']:.2f} C:{candle['close']:.2f})"
            )

            return True

        except Exception as e:
            print(f"‚ùå Error saving candle to CSV: {e}")
            import traceback

            traceback.print_exc()
            return False

    def get_recent_candles(
        self, epic: str, resolution_seconds: int, num_candles: int = 100
    ):
        """Read recent candles from CSV files."""
        try:
            market_name = self._get_market_name(epic)
            resolution_name = self._get_resolution_name(resolution_seconds)
            market_dir = os.path.join(self.base_path, market_name)

            if not os.path.exists(market_dir):
                print(f"üìÅ Directory not found: {market_dir}")
                return []

            # Find all CSV files for this market and resolution
            csv_files = []
            for filename in os.listdir(market_dir):
                if filename.startswith(
                    f"{market_name}_{resolution_name}_"
                ) and filename.endswith(".csv"):
                    csv_files.append(os.path.join(market_dir, filename))

            if not csv_files:
                print(f"üìÅ No CSV files found for {market_name}_{resolution_name}_*")
                return []

            # Sort by date (newest first)
            csv_files.sort(reverse=True)
            print(f"üìÅ Found {len(csv_files)} CSV files")

            # Read candles from files
            all_candles = []
            for csv_file in csv_files:
                try:
                    print(f"üìñ Reading: {os.path.basename(csv_file)}")
                    df = pd.read_csv(csv_file)

                    # Check if dataframe is empty
                    if df.empty:
                        continue

                    # Convert timestamp strings back to datetime
                    datetime_columns = [
                        "timestamp",
                        "start_time",
                        "end_time",
                        "closed_at",
                    ]
                    for col in datetime_columns:
                        if col in df.columns:
                            df[col] = pd.to_datetime(df[col])

                    # Filter for this epic if multiple in same file
                    if "epic" in df.columns:
                        df = df[df["epic"] == epic]

                    # Add to collection
                    candles = df.to_dict("records")
                    all_candles.extend(candles)

                    print(f"   ‚Ü≥ Read {len(candles)} candles")

                    if len(all_candles) >= num_candles:
                        print(f"üìä Reached requested limit of {num_candles} candles")
                        break

                except Exception as e:
                    print(f"‚ö†Ô∏è Error reading CSV file {csv_file}: {e}")

            # Sort by timestamp and return requested number
            if all_candles:
                all_candles.sort(
                    key=lambda x: x.get("timestamp", x.get("start_time", datetime.min)),
                    reverse=True,
                )
                return all_candles[:num_candles]
            else:
                return []

        except Exception as e:
            print(f"‚ùå Error in get_recent_candles: {e}")
            import traceback

            traceback.print_exc()
            return []

    def close_all(self):
        """Close all open file handles."""
        print("üìÅ Closing all CSV files...")
        for file_key in list(self._file_handles.keys()):
            self._close_file_if_open(file_key)

        print(f"‚úÖ All files closed. Total handles: {len(self._file_handles)}")

    def __del__(self):
        """Destructor to ensure files are closed."""
        self.close_all()

    def get_file_info(self):
        """Get information about open files."""
        info = []
        for file_key, file_path in self._file_paths.items():
            info.append(
                {
                    "file_key": file_key,
                    "path": file_path,
                    "date": self._current_dates.get(file_key, "unknown"),
                    "is_open": file_key in self._file_handles,
                }
            )
        return info

In [19]:
from datetime import date, timedelta
import pandas as pd


def get_previous_trading_day(today: date) -> date:
    if today.weekday() == 0:  # Monday
        return today - timedelta(days=3)
    elif today.weekday() == 6:  # Sunday
        return today - timedelta(days=2)
    else:
        return today - timedelta(days=1)


class YesterdayHighLowStrategy:
    def __init__(
        self,
        epic,
        levels_csv,
        account_balance=10000,
        risk_percent=2.0,
        tp_pips=300,
        pip_size=0.01,
        contract_size=100,
    ):
        self.epic = epic
        self.levels_csv = levels_csv
        self.account_balance = account_balance
        self.risk_percent = risk_percent / 100
        self.tp_pips = tp_pips
        self.pip_size = pip_size
        self.contract_size = contract_size

        # ---- daily state
        self.today = None
        self.traded_today = False

        # ---- levels
        self.y_high = None
        self.y_low = None

        # ---- setup state
        self.c1 = None
        self.c2 = None
        self.direction = None

    # -------------------------------------------------
    def load_yesterday_levels(self, trading_date):
        prev_day = get_previous_trading_day(trading_date)
        df = pd.read_csv(self.levels_csv)

        row = df[df["trading_day"] == prev_day.strftime("%Y-%m-%d")]
        if row.empty:
            raise ValueError(f"No levels for {prev_day}")

        self.y_high = float(row.iloc[-1]["prev_high_bid"])
        self.y_low = float(row.iloc[-1]["prev_low_bid"])

        self.today = trading_date
        self.traded_today = False
        self._reset_setup()

        print(f"\nüìÖ Trading day: {trading_date}")
        print(f"üìä Using levels from: {prev_day}")
        print(f"‚¨ÜÔ∏è High: {self.y_high} | ‚¨áÔ∏è Low: {self.y_low}")

    # -------------------------------------------------
    def _calc_size(self, entry, stop):
        risk_amount = self.account_balance * self.risk_percent
        dist = abs(entry - stop)
        if dist <= 0:
            return 0
        return round(risk_amount / (dist * self.contract_size), 2)

    # -------------------------------------------------
    def on_candle_close(self, candle: dict):
        result = {
            "time": candle["start_time"],
            "decision": "NO_TRADE",
            "reason": "",
        }

        candle_date = candle["start_time"].date()

        # -------------------------------------------------
        # NEW TRADING DAY
        # -------------------------------------------------
        if self.today != candle_date:
            self.load_yesterday_levels(candle_date)
            result["decision"] = "INIT_DAY"
            result["reason"] = (
                f"New trading day detected. Loaded previous trading day levels. "
                f"High: {self.y_high:.2f}, Low: {self.y_low:.2f}. "
                "Waiting for breakout candle (C1)."
            )
            return result

        # -------------------------------------------------
        # ONE TRADE PER DAY ‚Äî HARD BLOCK
        # -------------------------------------------------
        if self.traded_today:
            result["decision"] = "BLOCKED"
            result["reason"] = (
                "Trade already executed today. Strategy locked until next trading day."
            )
            return result

        # -------------------------------------------------
        # C1 ‚Äî BREAKOUT
        # -------------------------------------------------
        if self.c1 is None:
            if candle["close"] > self.y_high:
                self.c1 = candle
                self.direction = "BUY"
                result["decision"] = "C1"
                result["reason"] = (
                    f"Breakout detected: candle closed above yesterday high "
                    f"({self.y_high:.2f}). Waiting for acceptance candle (C2)."
                )
                return result

            if candle["close"] < self.y_low:
                self.c1 = candle
                self.direction = "SELL"
                result["decision"] = "C1"
                result["reason"] = (
                    f"Breakout detected: candle closed below yesterday low "
                    f"({self.y_low:.2f}). Waiting for acceptance candle (C2)."
                )
                return result

            result["reason"] = (
                "No breakout detected. Candle closed inside yesterday range. "
                "Waiting for valid C1 breakout."
            )
            return result

        # -------------------------------------------------
        # C2 ‚Äî ACCEPTANCE
        # -------------------------------------------------
        if self.c2 is None:
            if self.direction == "BUY" and candle["close"] > self.y_high:
                self.c2 = candle
                result["decision"] = "C2"
                result["reason"] = (
                    "Acceptance confirmed: second candle closed above yesterday high. "
                    "Entry planned at next candle open (C3)."
                )
                return result

            if self.direction == "SELL" and candle["close"] < self.y_low:
                self.c2 = candle
                result["decision"] = "C2"
                result["reason"] = (
                    "Acceptance confirmed: second candle closed below yesterday low. "
                    "Entry planned at next candle open (C3)."
                )
                return result

            self._reset_setup()
            result["decision"] = "INVALIDATED"
            result["reason"] = (
                "Breakout failed: acceptance candle closed back inside yesterday range. "
                "Setup cancelled. Waiting for a fresh breakout (C1)."
            )
            return result

        # -------------------------------------------------
        # C3 ‚Äî ENTRY
        # -------------------------------------------------
        entry = candle["open"]
        direction = self.direction  # cache before reset

        if direction == "BUY":
            sl = min(self.c1["low"], self.c2["low"])
            tp = entry + self.tp_pips * self.pip_size
        else:
            sl = max(self.c1["high"], self.c2["high"])
            tp = entry - self.tp_pips * self.pip_size

        size = self._calc_size(entry, sl)
        if size <= 0:
            self._reset_setup()
            result["decision"] = "REJECTED"
            result["reason"] = (
                "Entry rejected: invalid position size calculated. " "Setup reset."
            )
            return result

        # üîí LOCK STRATEGY FOR THE DAY
        self.traded_today = True

        result["decision"] = "SIGNAL"
        result["reason"] = (
            f"Entry triggered: C1 breakout and C2 acceptance confirmed. "
            f"{direction} order placed at C3 price."
        )
        result["order"] = {
            "epic": self.epic,
            "direction": direction,
            "size": size,
            "orderType": "STOP",  # ‚¨ÖÔ∏è recommended
            "level": entry,
            "stopLevel": sl,
            "profitLevel": tp,
        }

        self._reset_setup()
        return result

    # -------------------------------------------------
    def _reset_setup(self):
        self.c1 = None
        self.c2 = None
        self.direction = None

In [20]:
# -------------------------------------------------
# MAIN LOOP WITH REAL-TIME TICK DATA
# -------------------------------------------------
if __name__ == "__main__":
    # Initialize client
    client = CapitalClient(
        api_key=os.getenv("CAPITAL_DEMO_API_KEY"),
        identifier=os.getenv("CAPITAL_IDENTIFIER"),
        password=os.getenv("CAPITAL_PASSWORD"),
    )

    strategy = YesterdayHighLowStrategy(
        epic="GOLD",
        levels_csv="gold_yesterday_levels.csv",
        account_balance=10000,
    )

    # Login initially
    client.login()

    # Initialize CSV storage
    csv_storage = CandleCSVStorage(base_path="data/historical-data")

    # Initialize candle builder with CSV storage
    resolution_seconds = 60  # 15-minute candles
    candle_builder = RealTimeCandleBuilder(
        resolution_seconds=resolution_seconds, csv_storage=csv_storage
    )

    # Search for markets
    try:
        markets = client.search_markets("GOLD")
        if markets:
            epic_to_use = markets[0]["epic"]
            print(f"\n‚úÖ Using epic: {epic_to_use}")
        else:
            epic_to_use = "CS.D.EURUSD.MINI.IP"  # Fallback
            print(f"\n‚ö†Ô∏è  No markets found, using: {epic_to_use}")
    except Exception as e:
        print(f"‚ùå Could not fetch markets: {e}")
        epic_to_use = "BTCUSD"  # Default for BTC

    def on_tick_received(tick: dict):
        """Handle incoming tick data."""
        # Process tick through candle builder
        print("=" * 60)
        for k, v in tick.items():
            print(f"{k}: {v}")
        print("=" * 60)

        # closed_candle = candle_builder.process_tick(tick)

        # if closed_candle:
        #     # Candle just closed! Process it immediately
        #     candle_time = closed_candle["start_time"]
        #     latency = (
        #         closed_candle["closed_at"] - closed_candle["end_time"]
        #     ).total_seconds() * 1000

        #     print("\n" + "=" * 60)
        #     print(f"üïØÔ∏è REAL-TIME CANDLE CLOSED: {closed_candle['epic']}")
        #     print(
        #         f"‚è∞ Candle Period: {candle_time.strftime('%H:%M:%S')} ‚Üí {closed_candle['end_time'].strftime('%H:%M:%S')}"
        #     )
        #     print(
        #         f"üïí Closed At: {closed_candle['closed_at'].strftime('%H:%M:%S.%f')[:-3]}"
        #     )
        #     print(f"üìà Latency: {latency:.1f}ms")
        #     print(
        #         f"üìä O:{closed_candle['open']:.2f} H:{closed_candle['high']:.2f} L:{closed_candle['low']:.2f} C:{closed_candle['close']:.2f}"
        #     )
        #     print(f"üìà Volume: {closed_candle['volume']} ticks")
        #     print("=" * 60)

        #     signal = strategy.on_candle_close(closed_candle)

        #     if signal and signal["decision"] == "SIGNAL":
        #         print("\nüöÄ TRADE SIGNAL")
        #         for k, v in signal.items():
        #             print(f"{k}: {v}")

        #         order = signal["order"]

        #         try:
        #             response = client.create_working_order(
        #                 epic=order["epic"],
        #                 direction=order["direction"],
        #                 size=order["size"],
        #                 level=order["level"],  # C3 price
        #                 order_type=order["orderType"],  # STOP
        #                 stop_level=order["stopLevel"],
        #                 profit_level=order["profitLevel"],
        #                 trailing_stop=False,
        #                 guaranteed_stop=False,
        #             )

        #             print("\n‚úÖ Order created successfully:")
        #             print(json.dumps(response, indent=2))

        #         except Exception as e:
        #             print(f"\n‚ùå Order creation failed: {e}")

        # üö® TRADING LOGIC GOES HERE! üö®
        # This runs as soon as the candle closes (within milliseconds)
        # Example:
        # if closed_candle['close'] > closed_candle['open']:
        #     print("üìà Bullish candle - BUY!")
        #     # client.create_working_order(...)

        # Optional: Display current candle info periodically
        # if candle_builder.tick_count % 100 == 0:
        #     current_candle = candle_builder.get_current_candle(tick["epic"])
        #     if current_candle:
        #         current_price = current_candle.get(
        #             "current_price", current_candle["close"]
        #         )
        #         print(
        #             f"üìä Current Candle [{current_candle['start_time'].strftime('%H:%M')}]: "
        #             f"O:{current_candle['open']:.2f} H:{current_candle['high']:.2f} "
        #             f"L:{current_candle['low']:.2f} C:{current_price:.2f} "
        #             f"(@ {datetime.now().strftime('%H:%M:%S')})"
        #         )

    # Start real-time tick streaming
    print(f"\nüöÄ Starting REAL-TIME tick stream for: {epic_to_use}")
    print("‚è≥ Waiting for ticks...")

    ws = client.stream_ticks(
        epics=[epic_to_use],
        on_tick=on_tick_received,
        auto_reconnect=True,
        reconnect_delay=2,
    )

    # Keep main thread alive with minimal overhead
    try:
        print("\nüéØ Active and receiving real-time ticks...")
        print("Press Ctrl+C to stop\n")

        # Display stats periodically
        stats_timer = time.time()

        while True:
            # Minimal sleep for maximum responsiveness
            time.sleep(0.001)  # 1ms

            # Display stats every 30 seconds
            # current_time = time.time()
            # if current_time - stats_timer > 30:
            #     stats = candle_builder.get_stats()
            #     print(f"\nüìà Statistics:")
            #     print(f"   Ticks received: {stats['tick_count']:,}")
            #     if stats["time_since_last_tick"]:
            #         print(f"   Last tick: {stats['time_since_last_tick']:.1f}s ago")
            #     if stats["last_candle_time"]:
            #         print(
            #             f"   Last candle: {stats['last_candle_time'].strftime('%H:%M:%S')}"
            #         )
            #     print(f"   Active epics: {stats['active_epics']}")

            #     stats_timer = current_time

    except KeyboardInterrupt:
        print("\nüëã Stopping WebSocket...")
        client.stop_streaming()
        time.sleep(0.5)

‚úÖ Logged in successfully

‚úÖ Using epic: GOLD

üöÄ Starting REAL-TIME tick stream for: GOLD
‚è≥ Waiting for ticks...
üåê Connecting to real-time WebSocket...

üéØ Active and receiving real-time ticks...
Press Ctrl+C to stop

‚úÖ Real-time WebSocket connection opened
‚è±Ô∏è  Connected at: 18:46:35.317
üì° Sent real-time tick subscription for ['GOLD']
‚úÖ Real-time tick subscription successful: {'GOLD': 'PROCESSED'}
epic: GOLD
bid: 5041.25
ask: 5042.0
bid_qty: 40.0
ask_qty: 40.0
timestamp: 2026-02-10 18:46:36.480000
received_at: 2026-02-10 18:46:36.345682
epic: GOLD
bid: 5041.27
ask: 5042.02
bid_qty: 40.0
ask_qty: 40.0
timestamp: 2026-02-10 18:46:36.605000
received_at: 2026-02-10 18:46:36.442418
epic: GOLD
bid: 5041.3
ask: 5042.05
bid_qty: 40.0
ask_qty: 40.0
timestamp: 2026-02-10 18:46:37.118000
received_at: 2026-02-10 18:46:37.002717
epic: GOLD
bid: 5041.32
ask: 5042.07
bid_qty: 40.0
ask_qty: 40.0
timestamp: 2026-02-10 18:46:37.363000
received_at: 2026-02-10 18:46:37.249554
epic: