Define runtime workspace selection contract#2
Conversation
Introduce a shared resolver API in hotdata-runtime and enforce it with contract tests so adapters can rely on one source of workspace selection semantics.
| @dataclass(frozen=True) | ||
| class WorkspaceSelection: | ||
| workspace_id: str | ||
| source: str | ||
| workspaces: list |
There was a problem hiding this comment.
nit: (not blocking) workspaces: list is loosely typed and source: str accepts any string. Since source has a small closed set of values ("explicit_env", "active", "first") it would be more contract-friendly to type it as Literal["explicit_env", "active", "first"], and workspaces as list[Any] (or the SDK workspace type). This makes the contract self-documenting and lets type-checkers catch typos in downstream consumers.
Also: the dataclass is frozen=True, but workspaces is a mutable list — consumers could still mutate it in place. Using tuple[...] would make immutability real, though that's a stylistic call.
| return WorkspaceSelection( | ||
| workspace_id=explicit, | ||
| source="explicit_env", | ||
| workspaces=[], | ||
| ) |
There was a problem hiding this comment.
super nit: (not blocking) returning workspaces=[] when the source is "explicit_env" conflates "we didn't list" with "we listed and got nothing." Since the empty-list case in the auto path raises, this is unambiguous today, but workspaces: list | None = None (with None meaning "not discovered") would make the contract clearer to adapters that key behavior off selection.workspaces.
| def test_public_exports_contract(): | ||
| assert hr.__all__ == [ | ||
| "__version__", | ||
| "HotdataClient", | ||
| "QueryResult", | ||
| "workspace_health_lines", | ||
| "default_api_key", | ||
| "default_host", | ||
| "default_session_id", | ||
| "explicit_workspace_id", | ||
| "from_env", | ||
| "list_workspaces", | ||
| "normalize_host", | ||
| "pick_workspace", | ||
| "resolve_workspace_selection", | ||
| "WorkspaceSelection", | ||
| ] |
There was a problem hiding this comment.
nit: (not blocking) asserting hr.__all__ == [...] checks both membership and order. Any future reordering of __all__ breaks the test even when the public surface is unchanged. If order isn't part of the contract, compare as sets (set(hr.__all__) == {...}) plus a length check to catch duplicates. If order is intentional, a comment here saying so would help.
| def test_resolve_workspace_selection_returns_workspaces_and_source( | ||
| monkeypatch: pytest.MonkeyPatch, | ||
| ): | ||
| monkeypatch.delenv("HOTDATA_WORKSPACE", raising=False) | ||
| monkeypatch.delenv("HOTDATA_WORKSPACE_ID", raising=False) | ||
|
|
||
| items = [ | ||
| SimpleNamespace(public_id="ws_1", active=False), | ||
| SimpleNamespace(public_id="ws_2", active=True), | ||
| ] | ||
| listing = SimpleNamespace(workspaces=items) | ||
|
|
||
| with patch("hotdata_runtime.env.WorkspacesApi") as Api: | ||
| Api.return_value.list_workspaces.return_value = listing | ||
| resolved = resolve_workspace_selection( | ||
| "k", "https://api.hotdata.dev", None | ||
| ) | ||
| assert resolved.workspace_id == "ws_2" | ||
| assert resolved.source == "active" | ||
| assert resolved.workspaces == items |
There was a problem hiding this comment.
super nit: (not blocking) the new resolver tests cover source="explicit_env" and source="active" but not source="first" (the fallback when no workspace is active). Since source is part of the contract surface, a third case locking that value down would round out coverage.
Treat cancelled result states as immediate runtime errors and add coverage so result polling no longer times out on terminal cancellation.
Document the adapter-facing HotdataClient methods in the runtime contract, add duplicate-connection-name protection, and support explicit connection_id column resolution with regression coverage.
Introduce canonical result/history summary models and QueryResult metadata/record helpers in hotdata-runtime, then lock the expanded contract and behavior with dedicated tests.
| def list_recent_results( | ||
| self, | ||
| *, | ||
| limit: int = 50, | ||
| offset: int = 0, | ||
| ) -> list[ResultSummary]: | ||
| listing = self.results().list_results(limit=limit, offset=offset) | ||
| return [ | ||
| ResultSummary( | ||
| result_id=r.id, | ||
| status=r.status, | ||
| created_at=r.created_at, | ||
| ) | ||
| for r in listing.results | ||
| ] | ||
|
|
||
| def list_run_history( | ||
| self, | ||
| *, | ||
| limit: int = 20, | ||
| ) -> list[RunHistoryItem]: | ||
| listing = self.query_runs().list_query_runs(limit=limit) | ||
| return [ | ||
| RunHistoryItem( | ||
| query_run_id=r.id, | ||
| status=r.status, | ||
| created_at=r.created_at, | ||
| execution_time_ms=r.execution_time_ms, | ||
| result_id=r.result_id, | ||
| ) | ||
| for r in listing.query_runs | ||
| ] |
There was a problem hiding this comment.
super nit: (not blocking) list_recent_results accepts offset but list_run_history does not, and their default limits differ (50 vs 20). Both wrap paginated server endpoints and adapters will likely want to page through both. Adding a matching offset: int = 0 to list_run_history (and considering aligning default limits) would make these helpers consistent and avoid surprise when adapters reuse pagination code across the two.
Add offset support to list_run_history and extend tests/contracts so runtime pagination semantics stay consistent across normalized helper APIs.
Provide a concrete script demonstrating query execution, metadata, records, and normalized history/result helper usage in hotdata-runtime.
Require HOTDATA_API_KEY as the single environment contract so runtime behavior and docs stay unambiguous.
Summary
WorkspaceSelectionandresolve_workspace_selection()as the canonical workspace-selection contract inhotdata-runtimepick_workspace()through the canonical resolver and export the new symbols from package rootCONTRACT.mdand add tests that lock public API and resolver behaviorTest plan
uv run pytest