Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 112 additions & 64 deletions tests/core/test_live_streaming.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Deterministic tests for SignalR live streaming helpers."""
"""Deterministic tests for the live streaming session abstraction."""

from __future__ import annotations

Expand All @@ -8,7 +8,7 @@
import types
from typing import Any, Callable, Dict, List, Tuple

if "httpx" not in sys.modules: # pragma: no cover - import shim for optional dependency
if "httpx" not in sys.modules: # pragma: no cover - optional dependency shim
httpx_stub = types.ModuleType("httpx")

class _StubResponse:
Expand Down Expand Up @@ -44,6 +44,27 @@ def close(self) -> None: # noqa: D401 - interface compatibility
from toptek.core import live


class DummyGateway:
"""Gateway double that only tracks auth header refreshes."""

def __init__(self) -> None:
self._counter = 0
self.base_url = "https://example.com/api"
self.auth_requests: List[str] = []

def auth_headers(self) -> Dict[str, str]:
token = f"token-{self._counter}"
self._counter += 1
self.auth_requests.append(token)
return {"Authorization": token}

def search_open_orders(self, payload: Dict[str, Any]) -> Dict[str, Any]:
return {"orders": payload}

def search_positions(self, payload: Dict[str, Any]) -> Dict[str, Any]:
return {"positions": payload}


class DummySignalRConnection:
"""Test double that mimics the minimal SignalR hub API surface."""

Expand Down Expand Up @@ -85,7 +106,7 @@ def off(self, event: str, identifier: Any | None = None) -> None:
self.remove_listener(event, identifier)

def send(self, method: str, args: List[Any]) -> None:
self.sent.append((method, args))
self.sent.append((method, list(args)))

def on_open(self, callback: Callable[[], None]) -> None:
self._open_callbacks.append(callback)
Expand All @@ -107,7 +128,7 @@ def emit(self, event: str, payload: Any) -> None:


class DummyHubConnectionBuilder:
"""Builder double compatible with :func:`connect_market_hub`."""
"""Builder double compatible with :class:`GatewayStreamingSession`."""

instances: List["DummyHubConnectionBuilder"] = []

Expand All @@ -119,7 +140,7 @@ def __init__(self) -> None:

def with_url(self, url: str, options: Dict[str, Any] | None = None) -> "DummyHubConnectionBuilder":
self.url = url
self.options = options
self.options = options or {}
return self

def build(self) -> DummySignalRConnection:
Expand All @@ -131,88 +152,114 @@ def reset_builder_instances() -> None:
DummyHubConnectionBuilder.instances.clear()


def test_connect_market_hub_merges_headers_and_closes(monkeypatch: pytest.MonkeyPatch) -> None:
def test_streaming_session_fanout_and_resubscribe(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(live, "_require_signalr_builder", lambda: DummyHubConnectionBuilder)

opened: List[bool] = []
closed: List[Any] = []
gateway = DummyGateway()
session = live.GatewayStreamingSession(gateway)

handle = live.connect_market_hub(
"https://example.com/api",
hub_path="stream",
headers={"Authorization": "token"},
options={"headers": {"User-Agent": "ProjectX"}},
on_open=lambda: opened.append(True),
on_close=lambda exc: closed.append(exc),
ticker_events: List[Tuple[str, Any]] = []
bar_events: List[Tuple[str, str, Any]] = []
depth_events: List[Tuple[str, Any]] = []
order_events: List[Tuple[str, Any]] = []
position_events: List[Tuple[str, Any]] = []
trade_events: List[Tuple[str, Any]] = []
account_events: List[Any] = []

ticker_handle = session.market.subscribe_ticker(
"ES=F", lambda symbol, payload: ticker_events.append((symbol, payload))
)
session.market.subscribe_bars(
"NQ=F",
"1m",
lambda symbol, timeframe, payload: bar_events.append((symbol, timeframe, payload)),
)
session.market.subscribe_depth(
"CL=F",
lambda symbol, payload: depth_events.append((symbol, payload)),
)

assert isinstance(handle, live.HubConnectionHandle)
builder = DummyHubConnectionBuilder.instances[-1]
assert builder.url == "https://example.com/api/stream"
assert builder.options == {
"headers": {"User-Agent": "ProjectX", "Authorization": "token"}
}
session.user.subscribe_orders(
"ACCT1", lambda account, payload: order_events.append((account, payload))
)
session.user.subscribe_positions(
"ACCT1", lambda account, payload: position_events.append((account, payload))
)
session.user.subscribe_trades(
"ACCT1", lambda account, payload: trade_events.append((account, payload))
)
session.user.subscribe_accounts(lambda payload: account_events.append(payload))

connection = handle.connection
assert connection.started is True
assert len(DummyHubConnectionBuilder.instances) == 2

connection.trigger_open()
assert opened == [True]
market_builder, user_builder = DummyHubConnectionBuilder.instances
assert market_builder.url == "https://example.com/api/marketHub"
assert user_builder.url == "https://example.com/api/userHub"

connection.trigger_close(None)
assert closed == [None]
market_connection = market_builder.connection
user_connection = user_builder.connection
assert market_connection.started is True
assert user_connection.started is True

handle.close()
assert connection.stopped is True
market_connection.trigger_open()
user_connection.trigger_open()

market_connection.emit("ticker_update", {"bid": 1})
market_connection.emit("bar_update", {"close": 4100})
market_connection.emit("depth_update", {"levels": []})
user_connection.emit("order_update", {"id": 1})
user_connection.emit("position_update", {"symbol": "ES=F"})
user_connection.emit("trade_update", {"qty": 2})
user_connection.emit("account_update", {"margin": 1000})

def test_subscribe_ticker_dispatch_and_unsubscribe() -> None:
connection = DummySignalRConnection()
events: List[Tuple[str, Any]] = []
assert ticker_events == [("ES=F", {"bid": 1})]
assert bar_events == [("NQ=F", "1m", {"close": 4100})]
assert depth_events == [("CL=F", {"levels": []})]
assert order_events == [("ACCT1", {"id": 1})]
assert position_events == [("ACCT1", {"symbol": "ES=F"})]
assert trade_events == [("ACCT1", {"qty": 2})]
assert account_events == [{"margin": 1000}]

handle = live.subscribe_ticker(
connection,
"ES=F",
lambda symbol, payload: events.append((symbol, payload)),
event="ticker",
)
sent_methods = [method for method, _ in market_connection.sent]
assert sent_methods.count("SubscribeTicker") == 1
assert sent_methods.count("SubscribeBars") == 1
assert sent_methods.count("SubscribeDepth") == 1

assert handle.connection is connection
assert connection.sent == [("SubscribeTicker", ["ES=F"])]
market_connection.trigger_close(None)

connection.emit("ticker", {"bid": 1})
assert events == [("ES=F", {"bid": 1})]
assert len(DummyHubConnectionBuilder.instances) == 3
reconnect_builder = DummyHubConnectionBuilder.instances[-1]
new_market_connection = reconnect_builder.connection
assert reconnect_builder.options == {"headers": {"Authorization": "token-3"}}

handle.unsubscribe()
assert ("UnsubscribeTicker", ["ES=F"]) in connection.sent
new_market_connection.trigger_open()
resubscribe_calls = [method for method, _ in new_market_connection.sent]
assert resubscribe_calls.count("SubscribeTicker") == 1
assert resubscribe_calls.count("SubscribeBars") == 1
assert resubscribe_calls.count("SubscribeDepth") == 1

connection.emit("ticker", {"bid": 2})
assert events == [("ES=F", {"bid": 1})]
new_market_connection.emit("ticker_update", {"bid": 2})
assert ticker_events[-1] == ("ES=F", {"bid": 2})

ticker_handle.close()
assert ("UnsubscribeTicker", ["ES=F"]) in new_market_connection.sent

def test_subscribe_bars_uses_handle_and_timeframe() -> None:
handle = live.HubConnectionHandle(DummySignalRConnection())
events: List[Tuple[str, str, Any]] = []
session.close()
assert new_market_connection.stopped is True
assert user_connection.stopped is True

subscription = live.subscribe_bars(
handle,
"NQ=F",
"1m",
lambda symbol, timeframe, payload: events.append((symbol, timeframe, payload)),
event="bars",
)
assert gateway.auth_requests[:2] == ["token-0", "token-1"]

connection = handle.connection
assert connection.sent == [("SubscribeBars", ["NQ=F", "1m"])]

connection.emit("bars", {"close": 4100})
assert events == [("NQ=F", "1m", {"close": 4100})]
def test_poll_helpers_use_gateway() -> None:
gateway = DummyGateway()
context = live.ExecutionContext(gateway=gateway, account_id="ACCT2")

subscription.unsubscribe()
assert ("UnsubscribeBars", ["NQ=F", "1m"]) in connection.sent
orders = live.poll_open_orders(context)
positions = live.poll_positions(context)

connection.emit("bars", {"close": 4200})
assert events == [("NQ=F", "1m", {"close": 4100})]
assert orders == {"orders": {"accountId": "ACCT2"}}
assert positions == {"positions": {"accountId": "ACCT2"}}


def test_utils_module_behaviour(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
Expand Down Expand Up @@ -289,3 +336,4 @@ def _version_with_error(package: str) -> str:
message = exc.value.args[0]
assert "Missing packages" in message
assert "Version mismatches" in message

1 change: 1 addition & 0 deletions tests/gui/test_ui_live_tab_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest

tk = pytest.importorskip("tkinter")
pytest.importorskip("pandas")
from tkinter import ttk # noqa: E402

from toptek.ui.live_tab import LiveTab # noqa: E402
Expand Down
15 changes: 15 additions & 0 deletions toptek/core/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ def _headers(self) -> Dict[str, str]:
"Content-Type": "application/json",
}

@property
def base_url(self) -> str:
"""Expose the configured base URL for downstream consumers."""

return self._config.base_url

def auth_headers(self) -> Dict[str, str]:
"""Return authorization headers, refreshing the JWT if required."""

if not self._token:
self.login()
else:
self._validate()
return dict(self._headers)

def _request(self, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Send a POST request with automatic token validation."""

Expand Down
Loading
Loading