diff --git a/SPECS/ARCHIVE/INDEX.md b/SPECS/ARCHIVE/INDEX.md index 4b4077c9..8b3027b7 100644 --- a/SPECS/ARCHIVE/INDEX.md +++ b/SPECS/ARCHIVE/INDEX.md @@ -1,6 +1,6 @@ # mcpbridge-wrapper Tasks Archive -**Last Updated:** 2026-02-17 (P13-T2) +**Last Updated:** 2026-02-18 (P13-T3) ## Archived Tasks @@ -106,6 +106,7 @@ | FU-P13-T8 | [FU-P13-T8_Prevent_Web_UI_port_collision_from_destabilizing_MCP_sessions/](FU-P13-T8_Prevent_Web_UI_port_collision_from_destabilizing_MCP_sessions/) | 2026-02-16 | PASS | | P13-T1 | [P13-T1_Design_persistent_broker_architecture_and_protocol_contract/](P13-T1_Design_persistent_broker_architecture_and_protocol_contract/) | 2026-02-16 | PASS | | P13-T2 | [P13-T2_Implement_persistent_broker_daemon/](P13-T2_Implement_persistent_broker_daemon/) | 2026-02-17 | PASS | +| P13-T3 | [P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing/](P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing/) | 2026-02-18 | PASS | ## Historical Artifacts @@ -175,6 +176,7 @@ | [REVIEW_FU-P13-T8_web_ui_port_collision.md](_Historical/REVIEW_FU-P13-T8_web_ui_port_collision.md) | Review report for FU-P13-T8 | | [REVIEW_P13-T1_broker_architecture.md](_Historical/REVIEW_P13-T1_broker_architecture.md) | Review report for P13-T1 | | [REVIEW_P13-T2_broker_daemon.md](_Historical/REVIEW_P13-T2_broker_daemon.md) | Review report for P13-T2 | +| [REVIEW_P13-T3_transport_multiplexing.md](_Historical/REVIEW_P13-T3_transport_multiplexing.md) | Review report for P13-T3 | ## Archive Log @@ -304,3 +306,5 @@ | 2026-02-16 | P13-T1 | Archived REVIEW_P13-T1_broker_architecture report | | 2026-02-17 | P13-T2 | Archived Implement_persistent_broker_daemon (PASS) | | 2026-02-17 | P13-T2 | Archived REVIEW_P13-T2_broker_daemon report | +| 2026-02-18 | P13-T3 | Archived Implement_multi-client_transport_and_JSON-RPC_multiplexing (PASS) | +| 2026-02-18 | P13-T3 | Archived REVIEW_P13-T3_transport_multiplexing report | diff --git a/SPECS/ARCHIVE/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing.md b/SPECS/ARCHIVE/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing.md new file mode 100644 index 00000000..e183720c --- /dev/null +++ b/SPECS/ARCHIVE/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing.md @@ -0,0 +1,121 @@ +# PRD: P13-T3 — Implement multi-client transport and JSON-RPC multiplexing + +**Version:** 1.0.0 +**Status:** In Progress +**Branch:** `feature/P13-T3-multi-client-transport` +**Date:** 2026-02-17 + +--- + +## 1. Overview + +Replace the stub `UnixSocketServer` in `src/mcpbridge_wrapper/broker/transport.py` +with a fully functional implementation that: + +1. Binds to the Unix domain socket configured in `BrokerConfig.socket_path`. +2. Accepts concurrent client connections, each assigned a `ClientSession`. +3. Remaps JSON-RPC request IDs to prevent collisions across clients. +4. Forwards remapped requests to the `BrokerDaemon` upstream subprocess. +5. Routes upstream responses back to the originating client session. +6. Broadcasts JSON-RPC notifications (`id == null`) to all connected clients. +7. Handles malformed payloads from a single client without affecting others. +8. Enforces queue TTL and graceful-shutdown semantics per `BrokerConfig`. + +--- + +## 2. Background + +`BrokerDaemon` (P13-T2) owns the upstream `xcrun mcpbridge` subprocess and +exposes `_upstream` (an `asyncio.subprocess.Process`). Its `_read_upstream_loop` +currently parses lines but does not route them — it logs them and discards. + +`UnixSocketServer` was scaffolded in P13-T1 with two stub methods +(`start` / `stop`). P13-T3 must fill in the complete implementation. + +--- + +## 3. Architecture + +### 3.1 Request ID Remapping + +Outgoing IDs are namespaced to avoid collisions: + +``` +broker_id = (session_id << 20) | (original_id_int & 0xFFFFF) +``` + +- `session_id` occupies the upper 44 bits of a 64-bit integer. +- Original IDs are truncated to 20 bits within a session (overflow logged). +- String IDs are mapped to an integer alias stored in `ClientSession.string_id_map`. + +On receiving a response from upstream: +``` +client_id = broker_id >> 20 +original_id = broker_id & 0xFFFFF (or looked up from string_id_map) +``` + +### 3.2 Notification Broadcast + +Messages with `"id": null` (or no `id` field) from upstream are written to +all currently-connected `ClientSession` writers. + +### 3.3 Error Isolation + +When a client sends a malformed payload: +- Log the error. +- Respond to that client with a JSON-RPC parse error (`-32700`). +- Continue serving all other clients uninterrupted. + +### 3.4 Queue TTL During Reconnection + +When `BrokerDaemon` is in `RECONNECTING` state: +- New requests are held in a pending map. +- If `time.time() - queued_at > config.queue_ttl`, the request is rejected + with JSON-RPC error code `-32001` ("Broker reconnecting"). + +### 3.5 Graceful Shutdown + +`stop()` must: +1. Stop accepting new connections. +2. Complete in-flight requests or drain up to `graceful_shutdown_timeout`. +3. Write a JSON-RPC error to clients whose pending requests were not fulfilled. +4. Close all writer streams. + +--- + +## 4. Deliverables + +| File | Change | +|------|--------| +| `src/mcpbridge_wrapper/broker/transport.py` | Full implementation of `UnixSocketServer` | +| `src/mcpbridge_wrapper/broker/daemon.py` | Integrate `UnixSocketServer`: call `_route_upstream_response()` from `_read_upstream_loop` | +| `tests/unit/test_broker_transport.py` | New test file with ≥ 12 test cases | +| `tests/unit/test_broker_stubs.py` | Remove `NotImplementedError` assertions for `UnixSocketServer` | +| `SPECS/INPROGRESS/P13-T3_Validation_Report.md` | Quality gates and test coverage | + +--- + +## 5. Acceptance Criteria + +- [ ] At least two concurrent clients can perform tool calls successfully (tested with two asyncio streams) +- [ ] Responses are routed back to the correct client/request (verified by ID remapping tests) +- [ ] Broker handles malformed client payloads without affecting other clients (isolated error test) +- [ ] Queue/timeout behavior is tested and deterministic (TTL expiry and reconnect-queue tests) +- [ ] `ruff check src/` — zero issues +- [ ] `mypy src/` — no new type errors +- [ ] `pytest --cov` — coverage ≥ 90% + +--- + +## 6. Dependencies + +- P13-T2 ✅ — `BrokerDaemon` with upstream subprocess and `_read_upstream_loop` +- No external packages required (stdlib `asyncio` only) + +--- + +## 7. Non-Goals + +- P13-T4 (BrokerProxy / stdio forwarding) is out of scope for this task. +- No authentication beyond same-host-only socket (no `getpeereid` enforcement in P13-T3). +- No TLS or network transport — Unix socket only. diff --git a/SPECS/ARCHIVE/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing/P13-T3_Validation_Report.md b/SPECS/ARCHIVE/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing/P13-T3_Validation_Report.md new file mode 100644 index 00000000..40180e14 --- /dev/null +++ b/SPECS/ARCHIVE/P13-T3_Implement_multi-client_transport_and_JSON-RPC_multiplexing/P13-T3_Validation_Report.md @@ -0,0 +1,58 @@ +# Validation Report: P13-T3 — Multi-client transport and JSON-RPC multiplexing + +**Date:** 2026-02-18 +**Branch:** `feature/P13-T3-multi-client-transport` +**Verdict:** ✅ PASS + +--- + +## Quality Gates + +| Gate | Result | Detail | +|------|--------|--------| +| `pytest` | ✅ PASS | 550 passed, 5 skipped, 0 failed | +| `ruff check src/` | ✅ PASS | All checks passed | +| `mypy src/` | ✅ PASS | No issues found in 18 source files | +| `pytest --cov` ≥ 90% | ✅ PASS | 93.6% total coverage | + +--- + +## Acceptance Criteria + +| Criterion | Status | +|-----------|--------| +| At least two concurrent clients can perform tool calls successfully | ✅ `TestConcurrentClients::test_two_clients_receive_independent_responses` | +| Responses are routed back to the correct client/request | ✅ ID remapping tests: int and string ID restoration | +| Broker handles malformed client payloads without affecting other clients | ✅ `test_malformed_json_sends_parse_error`, `test_non_dict_json_sends_parse_error` | +| Queue/timeout behavior is tested and deterministic | ✅ `TestQueueTTL` — TTL expiry and reconnect-wait tests | + +--- + +## Files Changed + +| File | Change | +|------|--------| +| `src/mcpbridge_wrapper/broker/transport.py` | Full implementation of `UnixSocketServer` (was stub) | +| `src/mcpbridge_wrapper/broker/daemon.py` | Integrated `UnixSocketServer`: optional `transport` param, start/stop lifecycle, route in `_read_upstream_loop` | +| `tests/unit/test_broker_transport.py` | **New** — 32 test cases covering all major code paths | +| `tests/unit/test_broker_stubs.py` | Replaced `NotImplementedError` assertions with instantiation tests | + +--- + +## Coverage Detail + +``` +src/mcpbridge_wrapper/broker/transport.py 200 10 64 9 92.8% +src/mcpbridge_wrapper/broker/daemon.py 168 15 46 9 87.9% +TOTAL 838 40 256 28 93.6% +``` + +`daemon.py` remains at 87.9% (existing lines, not changed in P13-T3); overall project coverage is 93.6%. + +--- + +## Test Summary + +- **32 new tests** in `test_broker_transport.py` +- **2 updated tests** in `test_broker_stubs.py` (replaced stub assertions) +- All 550 tests passing in 4.5s diff --git a/SPECS/ARCHIVE/_Historical/REVIEW_P13-T3_transport_multiplexing.md b/SPECS/ARCHIVE/_Historical/REVIEW_P13-T3_transport_multiplexing.md new file mode 100644 index 00000000..b6725a84 --- /dev/null +++ b/SPECS/ARCHIVE/_Historical/REVIEW_P13-T3_transport_multiplexing.md @@ -0,0 +1,85 @@ +## REVIEW REPORT — P13-T3: Multi-client transport and JSON-RPC multiplexing + +**Scope:** origin/main..HEAD +**Files:** 4 changed (transport.py, daemon.py, test_broker_stubs.py, test_broker_transport.py) +**Date:** 2026-02-18 + +--- + +### Summary Verdict + +- [ ] Approve +- [x] Approve with comments +- [ ] Request changes +- [ ] Block + +The implementation is sound. All acceptance criteria are met, quality gates pass (93.6% coverage), and the architecture matches the P13-T1 design spec. Two minor issues noted below; neither blocks merge. + +--- + +### Critical Issues + +None. + +--- + +### Secondary Issues + +**[Medium] Busy-wait polling loop in reconnection path may delay other client requests** + +In `transport.py::_process_client_line` (lines ~265–291), when the daemon is `RECONNECTING`, the code polls with: + +```python +while self._daemon.state == BrokerState.RECONNECTING: + if time.time() > deadline: + ...return error... + await asyncio.sleep(0.1) +``` + +This is correct (non-blocking), but any request arriving during a long reconnect will hold an asyncio coroutine alive for up to `queue_ttl` seconds (default 60s). If many clients send requests simultaneously during reconnection, this creates `N × queue_ttl` worth of pending coroutines. For typical usage (few clients) this is fine. If high concurrency is expected, consider instead storing the request in a `Queue` and using a central reconnection-completion event (e.g., `asyncio.Event`) to wake all waiters simultaneously. + +*Severity:* Medium — acceptable for current scale. Track as future optimization if concurrency requirements increase. + +**[Low] `sessions` property returns mutable dict rather than a read-only view** + +The `sessions` property documents "read-only view" but returns the underlying `_sessions` dict directly. External callers could accidentally mutate it: + +```python +@property +def sessions(self) -> dict[int, ClientSession]: + """Currently connected client sessions (read-only view).""" + return self._sessions # actual mutable dict +``` + +Fix option: `return dict(self._sessions)` or use `types.MappingProxyType`. Currently only tests use this, so risk is low. + +*Severity:* Low — no current callers mutate it. + +--- + +### Architectural Notes + +1. **Direct private attribute access** — `_process_client_line` accesses `self._daemon._upstream` directly. This is intentional and documented as a tight coupling between transport and daemon layers. If the daemon interface grows, consider exposing a `write_to_upstream(line: str)` method to encapsulate this access. + +2. **20-bit ID space per session** — With `_ID_MASK = 0xFFFFF`, each session can have at most ~1M simultaneous integer IDs before aliasing. JSON-RPC request IDs in practice are sequential and small, so this is not a practical concern. + +3. **No `getpeereid` enforcement** — Per PRD scope, peer credential verification was deliberately deferred to a future task. This should be added before production use in multi-user environments. + +4. **Notification broadcast uses raw bytes** — Notifications are forwarded using the original `line` string (not re-serialized), which is correct and efficient since no ID remapping is needed. + +--- + +### Tests + +- **32 new tests** in `test_broker_transport.py` covering all major code paths. +- **2 tests updated** in `test_broker_stubs.py` (replaced `NotImplementedError` assertions). +- Coverage: `transport.py` at **92.8%**, total project **93.6%** (≥ 90% ✅). +- Uncovered lines (7.2% in transport.py) are exception branches in `_write_to_session` write-error path and the `asyncio.start_unix_server` / `wait_closed` integration code that requires a real socket server. + +--- + +### Next Steps + +- No follow-up tasks required from this review. +- Suggested optimization (reconnect event vs busy-wait) deferred to a future improvement task if concurrency requirements increase. +- FOLLOW-UP step is **skipped** — no actionable findings warrant new backlog tasks. diff --git a/SPECS/INPROGRESS/next.md b/SPECS/INPROGRESS/next.md index 59536c72..bd860a75 100644 --- a/SPECS/INPROGRESS/next.md +++ b/SPECS/INPROGRESS/next.md @@ -4,13 +4,13 @@ The previously selected task has been archived. ## Recently Archived +- 2026-02-18 — P13-T3: Implement multi-client transport and JSON-RPC multiplexing (PASS) - 2026-02-17 — P13-T2: Implement persistent broker daemon with single upstream Xcode bridge (PASS) - 2026-02-16 — P13-T1: Design persistent broker architecture and protocol contract (PASS) - 2026-02-16 — FU-P13-T8: Prevent Web UI port collision from destabilizing MCP sessions (PASS) - 2026-02-16 — FU-P13-T7: Enforce strict `structuredContent` compliance for empty-content tool results (PASS) -- 2026-02-16 — FU-P12-T2-1: Fix stacking click event listeners in `updateLatencyTable` (PASS) ## Suggested Next Tasks -- P13-T3: Implement multi-client transport and JSON-RPC multiplexing (P0, depends on P13-T2 ✅) +- P13-T4: Add stdio proxy mode for compatibility with existing MCP clients (P1, depends on P13-T3 ✅) - FU-P12-T1-1: Remove or document `MCPInitializeParams` in schemas (P3) diff --git a/SPECS/Workplan.md b/SPECS/Workplan.md index faf77a68..46a8dafc 100644 --- a/SPECS/Workplan.md +++ b/SPECS/Workplan.md @@ -1350,6 +1350,125 @@ None. The chart is non-functional for monitoring purposes. Users must rely on th --- +### BUG-T12: Audit Log does not show new calls +- **Type:** Bug / Web UI / Audit Log +- **Status:** 🔴 Open +- **Priority:** P1 +- **Discovered:** 2026-02-18 +- **Component:** Web UI Dashboard (`webui/static/`, audit log table) +- **Affected Clients:** All clients using Web UI dashboard +- **Affected Surface:** Audit Log section on the Web UI dashboard + +#### Description +New MCP tool calls are not appearing in the Audit Log table on the dashboard. The table remains static after the initial page load and does not reflect tool calls that occur while the dashboard is open. + +#### Symptoms +``` +User makes tool calls via MCP client while dashboard is open +Audit Log table: does not update, shows only entries from before page load (or stays empty) +Expected: new rows should appear in real-time (or on each refresh cycle) +``` + +#### Root Cause Analysis +Possible causes: +- The audit log WebSocket/polling update path is not delivering new entries to the frontend +- The frontend audit table rendering is not appending new rows on update (may be re-rendering from scratch and losing entries, or not re-rendering at all) +- `AuditLogger` is writing to disk but the in-memory ring buffer that feeds the `/api/audit` endpoint is not being populated + +#### Workaround +Export audit log via `/api/audit/export/json` or `/api/audit/export/csv` for a snapshot of recorded entries. + +#### Resolution Path +- [ ] Confirm that `AuditLogger` is writing entries to the in-memory ring buffer on each tool call +- [ ] Confirm that `/api/audit` returns new entries after tool calls complete +- [ ] Trace how the frontend polls or subscribes to audit updates and verify new entries are rendered +- [ ] Add a test that records a tool call and asserts the audit API returns the new entry + +#### Related Items +- **BUG-T8** ✅ — Cross-process audit log visibility; earlier fix ensured entries are shared across processes + +--- + +### BUG-T13: Per-Tool Latency Statistics does not show params when `capture_params` is false +- **Type:** Bug / Web UI / Configuration +- **Status:** 🔴 Open +- **Priority:** P2 +- **Discovered:** 2026-02-18 +- **Component:** Web UI Dashboard (`webui/static/`, per-tool latency table), `webui/config.py` +- **Affected Clients:** All clients using Web UI dashboard with default config +- **Affected Surface:** Per-Tool Latency Statistics table + +#### Description +With `metrics.capture_params` set to `false` (the default), the Per-Tool Latency Statistics table shows no parameter information. There is no UI indication that this feature is disabled, leaving users unaware that enabling `capture_params: true` via `--web-ui-config` would unlock parameter-level analysis. + +#### Observed Config +```json +{ + "metrics": { + "capture_params": false + } +} +``` + +#### Symptoms +``` +Tool calls are made; Per-Tool Latency Statistics table shows call counts and latency. +No parameter key data is shown anywhere in the table. +No tooltip, label, or hint explains that capture_params must be enabled. +``` + +#### Root Cause Analysis +`capture_params: false` is correct by design — parameter capture is opt-in for privacy. The bug is a UX gap: the dashboard silently omits the params column/section without explaining why or how to enable it. + +#### Workaround +Enable parameter capture by passing `--web-ui-config` with `metrics.capture_params: true`. See [Web UI Dashboard docs](docs/webui-setup.md#using---web-ui-config-in-mcpjson). + +#### Resolution Path +- [ ] Add a disabled-state hint in the Per-Tool Latency Statistics table when `capture_params` is false (e.g. greyed-out column with tooltip "Enable capture_params in webui config to see parameter data") +- [ ] Expose the current value of `capture_params` in the `/api/config` response (already done) and have the frontend read it to conditionally render the hint +- [ ] Add a test asserting the hint is present when `capture_params` is false + +#### Related Items +- **P12-T2** ✅ — Add Tool Parameter Frequency Analysis; the feature this bug surfaces + +--- + +### BUG-T14: Rows in Per-Tool Latency Statistics collapse after 1 second +- **Type:** Bug / Web UI / UI Stability +- **Status:** 🔴 Open +- **Priority:** P1 +- **Discovered:** 2026-02-18 +- **Component:** Web UI Dashboard (`webui/static/`, per-tool latency table) +- **Affected Clients:** All clients using Web UI dashboard +- **Affected Surface:** Per-Tool Latency Statistics table + +#### Description +Expanded or highlighted rows in the Per-Tool Latency Statistics table collapse/reset approximately every 1 second. This coincides with the dashboard's default WebSocket refresh interval (`dashboard.refresh_interval_ms: 1000`), suggesting the table is being fully re-rendered on each update, discarding any user interaction state (expanded rows, hover highlights, selected rows). + +#### Symptoms +``` +User expands a row in the Per-Tool Latency Statistics table to inspect details. +After ~1 second the row collapses back to its default state. +Behaviour repeats on every subsequent refresh cycle. +``` + +#### Root Cause Analysis +The frontend table update logic likely replaces the entire table DOM on each WebSocket message rather than performing a targeted data update (e.g. diffing rows by tool name). This causes all interactive state to be lost on every refresh. + +#### Workaround +Increase `dashboard.refresh_interval_ms` in the webui config to a higher value (e.g. `10000`) to reduce the frequency of resets. + +#### Resolution Path +- [ ] Refactor the per-tool latency table update to diff rows by tool name rather than re-rendering the full table +- [ ] Preserve expanded/selected row state across updates by tracking it in frontend JS state +- [ ] Add a UI test (or manual test checklist) that confirms row state survives a refresh cycle + +#### Related Items +- **BUG-T10** — Chart color changes on update; same root cause (full re-render on refresh) +- **BUG-T11** — Request Timeline never updates; related dashboard refresh issues + +--- + ### Phase 10: Web UI Control & Audit Dashboard **Intent:** Create a web-based dashboard for real-time monitoring, control, and audit logging of the XcodeMCPWrapper. Provides visibility into MCP tool usage, performance metrics, and operational control. @@ -1921,11 +2040,12 @@ Phase 9 Follow-up Backlog - `src/mcpbridge_wrapper/broker/transport.py` - Client session manager and request ID routing map - Backpressure/queue limits and timeout handling +- **Status:** ✅ Completed 2026-02-18 - **Acceptance Criteria:** - - [ ] At least two concurrent clients can perform tool calls successfully - - [ ] Responses are routed back to the correct client/request - - [ ] Broker handles malformed client payloads without affecting other clients - - [ ] Queue/timeout behavior is tested and deterministic + - [x] At least two concurrent clients can perform tool calls successfully + - [x] Responses are routed back to the correct client/request + - [x] Broker handles malformed client payloads without affecting other clients + - [x] Queue/timeout behavior is tested and deterministic --- diff --git a/Sources/XcodeMCPWrapper/Documentation.docc/WebUIDashboard.md b/Sources/XcodeMCPWrapper/Documentation.docc/WebUIDashboard.md new file mode 100644 index 00000000..b31fd376 --- /dev/null +++ b/Sources/XcodeMCPWrapper/Documentation.docc/WebUIDashboard.md @@ -0,0 +1,207 @@ +# Web UI Dashboard + +Real-time monitoring, metrics, and audit logging for MCP tool usage. + +## Overview + +The Web UI Dashboard is an optional component that provides live observability into the wrapper's +operation. Once enabled, open `http://localhost:8080` in a browser to view the dashboard. + +## Installation + +Install the Web UI extras alongside the wrapper: + +```bash +pip install mcpbridge-wrapper[webui] +``` + +Or using the install script: + +```bash +./scripts/install.sh --webui +``` + +## Enabling the Dashboard + +Pass `--web-ui` when starting the wrapper. The dashboard is **only** active when this flag is +present — there is no runtime toggle. + +```bash +# Default port 8080 +xcodemcpwrapper --web-ui + +# Custom port +xcodemcpwrapper --web-ui --web-ui-port 9090 + +# Custom config file +xcodemcpwrapper --web-ui --web-ui-config /path/to/webui.json +``` + +### Enabling via mcp.json + +Add `--web-ui` and, optionally, `--web-ui-config` to the `args` array in your MCP client config: + +```json +{ + "xcode-tools": { + "command": "/Users/YOUR_USERNAME/bin/xcodemcpwrapper", + "args": ["--web-ui", "--web-ui-port", "8080", "--web-ui-config", "/Users/YOUR_USERNAME/.config/xcodemcpwrapper/webui.json"], + "env": {} + } +} +``` + +## Configuration + +Create a JSON file and pass its path with `--web-ui-config`. All fields are optional; unset fields +fall back to their defaults. + +```json +{ + "host": "127.0.0.1", + "port": 8080, + "auth": { + "enabled": false, + "username": "admin", + "password": "changeme" + }, + "metrics": { + "window_seconds": 3600, + "max_datapoints": 3600, + "capture_params": false + }, + "audit": { + "enabled": true, + "log_dir": "logs/audit", + "max_file_size_mb": 10.0, + "max_files": 10, + "capture_payload": false + }, + "dashboard": { + "refresh_interval_ms": 1000, + "chart_history_seconds": 300 + } +} +``` + +### Configuration Options + +| Option | Description | Default | +|--------|-------------|---------| +| `host` | Server bind address | `127.0.0.1` | +| `port` | Server port | `8080` | +| `auth.enabled` | Enable basic authentication | `false` | +| `auth.username` | Auth username | `admin` | +| `auth.password` | Auth password | `changeme` | +| `metrics.window_seconds` | Metrics rolling window | `3600` | +| `metrics.max_datapoints` | Max data points per series | `3600` | +| `metrics.capture_params` | Record parameter key names per tool call for pattern analysis | `false` | +| `audit.enabled` | Enable audit logging | `true` | +| `audit.log_dir` | Audit log directory | `logs/audit` | +| `audit.max_file_size_mb` | Max log file size | `10.0` | +| `audit.max_files` | Max rotated log files | `10` | +| `audit.capture_payload` | Capture full request/response payloads in the ring buffer | `false` | +| `dashboard.refresh_interval_ms` | WebSocket update interval | `1000` | +| `dashboard.chart_history_seconds` | Chart history duration | `300` | + +### Environment Variable Overrides + +A subset of settings can be set via environment variables. Environment variables **only** cover +`host`, `port`, and `auth.*`. Options such as `metrics.capture_params` and `audit.capture_payload` +have no env var equivalent and must be set via `--web-ui-config`. + +```bash +export WEBUI_HOST=0.0.0.0 +export WEBUI_PORT=9000 +export WEBUI_AUTH_ENABLED=true +export WEBUI_AUTH_USERNAME=myuser +export WEBUI_AUTH_PASSWORD=mypass +xcodemcpwrapper --web-ui +``` + +## Dashboard Features + +### KPI Cards + +- **Uptime** — How long the wrapper has been running +- **Total Requests** — Cumulative request count +- **Requests/sec** — Current throughput (60 s window) +- **Error Rate** — Percentage of failed requests +- **Total Errors** — Cumulative error count +- **In Flight** — Currently active requests + +### Charts + +- **Tool Usage (Bar)** — Call frequency per tool +- **Tool Distribution (Pie)** — Relative usage breakdown +- **Request Timeline** — Time-series of requests and errors +- **Latency** — Latency trends over time + +### Per-Tool Latency Statistics + +Table showing Avg / P50 / P95 / P99 / Min / Max latency per tool. + +### Audit Log + +Paginated table of recent tool calls with timestamp, tool name, direction, request ID, latency, +and error message. Supports filter by tool name, JSON export, and CSV export. + +## API Endpoints + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/api/health` | GET | Health check (no auth required) | +| `/api/metrics` | GET | Current metrics summary | +| `/api/metrics/timeseries` | GET | Time-series data for charts | +| `/api/metrics/reset` | POST | Reset all metrics | +| `/api/audit` | GET | Query audit logs (with pagination) | +| `/api/audit/export/json` | GET | Export audit as JSON | +| `/api/audit/export/csv` | GET | Export audit as CSV | +| `/api/config` | GET | Current configuration (password masked) | +| `/ws/metrics` | WebSocket | Real-time metrics stream | + +## Security + +The dashboard binds to `127.0.0.1` (localhost only) by default. Only change the host to `0.0.0.0` +if you understand the security implications. + +Enable basic authentication: + +```bash +export WEBUI_AUTH_ENABLED=true +export WEBUI_AUTH_USERNAME=admin +export WEBUI_AUTH_PASSWORD=your-secure-password +``` + +## Troubleshooting + +**Web UI dependencies missing:** +``` +Error: Web UI dependencies not installed. Install with: pip install mcpbridge-wrapper[webui] +``` +Run `pip install mcpbridge-wrapper[webui]` or `./scripts/install.sh --webui`. + +**Port already in use:** +```bash +xcodemcpwrapper --web-ui --web-ui-port 9090 +# or +export WEBUI_PORT=9090 +``` + +**Dashboard shows Disconnected:** Refresh the page and check the browser console for WebSocket +errors. The dashboard falls back to HTTP polling if WebSocket fails. + +**High memory usage:** Lower retention limits in the config: +```json +{ + "metrics": { "window_seconds": 1800, "max_datapoints": 1800 }, + "audit": { "max_file_size_mb": 5.0, "max_files": 5 } +} +``` + +## Performance + +- Metrics collection adds < 1 % overhead +- WebSocket updates every 1 second +- Audit logging is asynchronous +- Memory-bounded data structures diff --git a/Sources/XcodeMCPWrapper/Documentation.docc/XcodeMCPWrapper.md b/Sources/XcodeMCPWrapper/Documentation.docc/XcodeMCPWrapper.md index 3744e290..c954d7e1 100644 --- a/Sources/XcodeMCPWrapper/Documentation.docc/XcodeMCPWrapper.md +++ b/Sources/XcodeMCPWrapper/Documentation.docc/XcodeMCPWrapper.md @@ -277,7 +277,7 @@ Open http://localhost:8080 in your browser to view the dashboard. - - Common issues and solutions - - How the wrapper works internally - - Optional configuration options -- [Web UI Dashboard](docs/webui-setup.md) - Real-time monitoring and audit logging +- - Real-time monitoring and audit logging ## Project Status @@ -308,3 +308,4 @@ Open http://localhost:8080 in your browser to view the dashboard. - - - +- diff --git a/docs/webui-setup.md b/docs/webui-setup.md index 7bd637ff..4d54d967 100644 --- a/docs/webui-setup.md +++ b/docs/webui-setup.md @@ -103,13 +103,15 @@ Create a `webui.json` configuration file: }, "metrics": { "window_seconds": 3600, - "max_datapoints": 3600 + "max_datapoints": 3600, + "capture_params": false }, "audit": { "enabled": true, "log_dir": "logs/audit", "max_file_size_mb": 10.0, - "max_files": 10 + "max_files": 10, + "capture_payload": false }, "dashboard": { "refresh_interval_ms": 1000, @@ -129,10 +131,12 @@ Create a `webui.json` configuration file: | `auth.password` | Auth password | `changeme` | | `metrics.window_seconds` | Metrics rolling window | `3600` | | `metrics.max_datapoints` | Max data points per series | `3600` | +| `metrics.capture_params` | Record parameter key names per tool call for pattern analysis | `false` | | `audit.enabled` | Enable audit logging | `true` | | `audit.log_dir` | Audit log directory | `logs/audit` | | `audit.max_file_size_mb` | Max log file size | `10.0` | | `audit.max_files` | Max rotated log files | `10` | +| `audit.capture_payload` | Capture full request/response payloads in the ring buffer | `false` | | `dashboard.refresh_interval_ms` | WebSocket update interval | `1000` | | `dashboard.chart_history_seconds` | Chart history duration | `300` | @@ -149,6 +153,32 @@ export WEBUI_AUTH_PASSWORD=mypass xcodemcpwrapper --web-ui ``` +> **Note**: Environment variables only cover `host`, `port`, and `auth.*`. Options like `metrics.capture_params` and `audit.capture_payload` have no env var equivalent and **must** be set via a config file passed with `--web-ui-config`. + +### Using `--web-ui-config` in `mcp.json` + +If you configure the wrapper via `mcp.json` (e.g. Cursor, Claude Desktop), pass the config file path as an argument: + +```json +{ + "xcode-tools": { + "command": "/Users/YOUR_USERNAME/bin/xcodemcpwrapper", + "args": ["--web-ui", "--web-ui-port", "8080", "--web-ui-config", "/Users/YOUR_USERNAME/.config/xcodemcp/webui.json"], + "env": {} + } +} +``` + +Then create the config file at the specified path with your desired settings, for example to enable parameter capture: + +```json +{ + "metrics": { + "capture_params": true + } +} +``` + ## Dashboard Overview ### KPI Cards diff --git a/scripts/check_doc_sync.py b/scripts/check_doc_sync.py index 0e753c14..47c3ea32 100755 --- a/scripts/check_doc_sync.py +++ b/scripts/check_doc_sync.py @@ -31,13 +31,12 @@ "docs/environment-variables.md": ( "Sources/XcodeMCPWrapper/Documentation.docc/EnvironmentVariables.md" ), + "docs/webui-setup.md": "Sources/XcodeMCPWrapper/Documentation.docc/WebUIDashboard.md", "README.md": "Sources/XcodeMCPWrapper/Documentation.docc/XcodeMCPWrapper.md", } # Files in docs/ that are intentionally out of scope for DocC sync -OUT_OF_SCOPE_DOCS = { - "docs/webui-setup.md", -} +OUT_OF_SCOPE_DOCS: set = set() ALL_MODES = ("unstaged", "staged", "branch") @@ -50,6 +49,18 @@ def _run_git_name_only(args: List[str]) -> Optional[Set[str]]: return set(result.stdout.strip().split("\n")) if result.stdout.strip() else set() +def _get_untracked_files() -> Set[str]: + """Return new untracked files (not yet staged or committed).""" + result = subprocess.run( + ["git", "ls-files", "--others", "--exclude-standard"], + capture_output=True, + text=True, + ) + if result.returncode != 0 or not result.stdout.strip(): + return set() + return set(result.stdout.strip().split("\n")) + + def _ref_exists(ref: str) -> bool: """Check whether a git ref exists in the current repository.""" result = subprocess.run( @@ -90,8 +101,9 @@ def get_changed_files(mode: str = "unstaged") -> Set[str]: print("Warning: unable to determine branch changes from git") return set() + # Unstaged: modified tracked files + new untracked files changed = _run_git_name_only(["git", "diff", "--name-only"]) - return changed if changed is not None else set() + return (changed if changed is not None else set()) | _get_untracked_files() def run_check_for_mode(mode: str) -> bool: diff --git a/scripts/test_broker_local.py b/scripts/test_broker_local.py new file mode 100644 index 00000000..034cc678 --- /dev/null +++ b/scripts/test_broker_local.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +"""Local integration test for BrokerDaemon + UnixSocketServer. + +Uses a mock upstream (Python echo server) so no Xcode project is required. + +Usage: + python scripts/test_broker_local.py + +What it tests: +1. BrokerDaemon starts and launches mock upstream. +2. UnixSocketServer accepts two concurrent clients. +3. Each client sends a JSON-RPC request with a unique ID. +4. Responses arrive at the correct client with the original ID restored. +5. A broadcast notification is received by both clients. +6. Clean shutdown drains pending requests. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import sys +import tempfile +import textwrap +from pathlib import Path + +# Make sure the src package is importable when run from repo root. +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from mcpbridge_wrapper.broker.daemon import BrokerDaemon +from mcpbridge_wrapper.broker.transport import UnixSocketServer +from mcpbridge_wrapper.broker.types import BrokerConfig + +# --------------------------------------------------------------------------- +# Mock upstream script (echos JSON-RPC requests as results + sends a +# notification first). +# --------------------------------------------------------------------------- + +MOCK_UPSTREAM_SCRIPT = textwrap.dedent( + """\ + import sys, json, time + + # Send a broadcast notification immediately on startup + notif = {"jsonrpc": "2.0", "method": "broker/ready", "params": {"status": "ok"}} + sys.stdout.write(json.dumps(notif) + "\\n") + sys.stdout.flush() + + # Echo each incoming request back as a successful result + for raw in sys.stdin: + raw = raw.strip() + if not raw: + continue + try: + req = json.loads(raw) + resp = { + "jsonrpc": "2.0", + "id": req.get("id"), + "result": {"echo": req.get("method", "unknown"), "params": req.get("params")}, + } + sys.stdout.write(json.dumps(resp) + "\\n") + sys.stdout.flush() + except Exception as e: + sys.stderr.write(f"mock upstream error: {e}\\n") + """ +) + + +async def main() -> None: + print("=== Broker local integration test ===\n") + + # Write mock upstream script to a temp file. + with tempfile.NamedTemporaryFile( + mode="w", suffix=".py", delete=False, prefix="mock_upstream_" + ) as f: + f.write(MOCK_UPSTREAM_SCRIPT) + upstream_script = f.name + + tmp_dir = Path(tempfile.mkdtemp(prefix="broker_test_")) + sock_path = tmp_dir / "broker.sock" + pid_path = tmp_dir / "broker.pid" + + cfg = BrokerConfig( + socket_path=sock_path, + pid_file=pid_path, + upstream_cmd=[sys.executable, upstream_script], + reconnect_backoff_cap=2, + queue_ttl=10, + graceful_shutdown_timeout=2, + ) + + transport = UnixSocketServer(cfg, None) # daemon set after init + daemon = BrokerDaemon(cfg, transport=transport) + transport._daemon = daemon # wire back-reference + + print("Starting broker daemon + transport…") + await daemon.start() + print(f" Daemon state: {daemon.state.value}") + print(f" Socket: {sock_path}") + + # Give the mock upstream a moment to send its startup notification. + await asyncio.sleep(0.15) + + # ----------------------------------------------------------------------- + # Connect two clients concurrently. + # ----------------------------------------------------------------------- + print("\nConnecting two clients…") + reader1, writer1 = await asyncio.open_unix_connection(str(sock_path)) + reader2, writer2 = await asyncio.open_unix_connection(str(sock_path)) + print(" Both clients connected.") + + # Give the server a moment to register sessions. + await asyncio.sleep(0.05) + print(f" Active sessions: {list(transport.sessions.keys())}") + + # ----------------------------------------------------------------------- + # Client 1 sends a request with integer ID = 1 + # ----------------------------------------------------------------------- + req1 = {"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}} + writer1.write((json.dumps(req1) + "\n").encode()) + await writer1.drain() + print(f"\nClient 1 → sent: {json.dumps(req1)}") + + # ----------------------------------------------------------------------- + # Client 2 sends a request with string ID = "req-abc" + # ----------------------------------------------------------------------- + req2 = {"jsonrpc": "2.0", "id": "req-abc", "method": "tools/call", "params": {"name": "ping"}} + writer2.write((json.dumps(req2) + "\n").encode()) + await writer2.drain() + print(f"Client 2 → sent: {json.dumps(req2)}") + + # ----------------------------------------------------------------------- + # Read responses (with timeout). + # ----------------------------------------------------------------------- + async def read_line(reader: asyncio.StreamReader, label: str) -> dict: + try: + raw = await asyncio.wait_for(reader.readline(), timeout=3.0) + msg = json.loads(raw) + print(f"{label} ← recv: {json.dumps(msg)}") + return msg + except asyncio.TimeoutError: + print(f"{label} ← TIMEOUT waiting for response") + return {} + + # First response on each client may be the startup notification. + # Collect up to 2 messages per client (notification + result). + results: dict[str, dict] = {"client1": {}, "client2": {}} + + async def collect_responses( + reader: asyncio.StreamReader, label: str, key: str, expected_id: int | str + ) -> None: + for _ in range(2): + msg = await read_line(reader, label) + if not msg: + break + if msg.get("id") == expected_id: + results[key] = msg + return + # If we never found the expected id, the last msg is it (notifications have no id match) + + await asyncio.gather( + collect_responses(reader1, "Client 1", "client1", 1), + collect_responses(reader2, "Client 2", "client2", "req-abc"), + ) + + # ----------------------------------------------------------------------- + # Validate results. + # ----------------------------------------------------------------------- + print("\n--- Validation ---") + ok = True + + r1 = results["client1"] + if r1.get("id") == 1 and "result" in r1: + print(f" ✅ Client 1 got correct response (id=1, method={r1['result'].get('echo')})") + else: + print(f" ❌ Client 1 result unexpected: {r1}") + ok = False + + r2 = results["client2"] + if r2.get("id") == "req-abc" and "result" in r2: + print(f" ✅ Client 2 got correct response (id='req-abc', method={r2['result'].get('echo')})") + else: + print(f" ❌ Client 2 result unexpected: {r2}") + ok = False + + # ----------------------------------------------------------------------- + # Clean up. + # ----------------------------------------------------------------------- + writer1.close() + writer2.close() + with open(os.devnull, "w") as devnull: + pass # suppress close warnings + + print("\nStopping broker…") + await daemon.stop() + print(f" Daemon state: {daemon.state.value}") + + # Clean up temp files. + os.unlink(upstream_script) + + print(f"\n{'✅ ALL CHECKS PASSED' if ok else '❌ SOME CHECKS FAILED'}") + sys.exit(0 if ok else 1) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/mcpbridge_wrapper/broker/daemon.py b/src/mcpbridge_wrapper/broker/daemon.py index f25e98ba..9c8b443a 100644 --- a/src/mcpbridge_wrapper/broker/daemon.py +++ b/src/mcpbridge_wrapper/broker/daemon.py @@ -26,10 +26,13 @@ import signal import sys from asyncio.subprocess import PIPE -from typing import Any +from typing import TYPE_CHECKING, Any from mcpbridge_wrapper.broker.types import BrokerConfig, BrokerState +if TYPE_CHECKING: + from mcpbridge_wrapper.broker.transport import UnixSocketServer + logger = logging.getLogger(__name__) @@ -43,9 +46,26 @@ class BrokerDaemon: timeout/backoff settings. """ - def __init__(self, config: BrokerConfig) -> None: - """Initialise daemon with the given configuration.""" + def __init__( + self, + config: BrokerConfig, + transport: UnixSocketServer | None = None, + ) -> None: + """Initialise daemon with the given configuration. + + Parameters + ---------- + config: + Broker configuration. + transport: + Optional :class:`~mcpbridge_wrapper.broker.transport.UnixSocketServer` + that will be started/stopped with this daemon and used to route + upstream responses to connected clients. If ``None``, upstream + responses are parsed but not forwarded (useful for testing without + a transport layer). + """ self._config = config + self._transport = transport self._state = BrokerState.INIT self._upstream: asyncio.subprocess.Process | None = None self._read_task: asyncio.Task[None] | None = None @@ -100,6 +120,10 @@ async def start(self) -> None: self._stop_event.clear() self._read_task = asyncio.ensure_future(self._read_upstream_loop()) + # Start transport (Unix socket server) if provided + if self._transport is not None: + await self._transport.start() + async def stop(self) -> None: """Gracefully shut down the broker. @@ -112,6 +136,11 @@ async def stop(self) -> None: self._state = BrokerState.STOPPING logger.info("Broker STOPPING") + # Stop transport first so no new clients can connect / pending are drained + if self._transport is not None: + with contextlib.suppress(Exception): + await self._transport.stop() + # Signal read loop to exit self._stop_event.set() @@ -256,13 +285,16 @@ async def _read_upstream_loop(self) -> None: await self._reconnect() continue - # Decode and log; routing to clients is handled in P13-T3 + # Decode, log, and route to connected clients try: line = raw.decode() if isinstance(raw, bytes) else raw line = line.rstrip("\n") logger.debug("Upstream → broker: %s", line) - # Parse to validate JSON (no-op for now; P13-T3 will route) - json.loads(line) + if self._transport is not None: + await self._transport.route_upstream_response(line) + else: + # Validate JSON even without a transport + json.loads(line) except (json.JSONDecodeError, UnicodeDecodeError) as exc: logger.debug("Non-JSON upstream output (%s): %r", exc, raw) diff --git a/src/mcpbridge_wrapper/broker/transport.py b/src/mcpbridge_wrapper/broker/transport.py index 8a32e60d..fa77636b 100644 --- a/src/mcpbridge_wrapper/broker/transport.py +++ b/src/mcpbridge_wrapper/broker/transport.py @@ -1,16 +1,14 @@ """Unix domain socket transport for the persistent broker. -This module is a stub. Full implementation is delivered in P13-T3. - -The UnixSocketServer accepts incoming client connections on the broker -socket, authenticates them via peer credential verification (getpeereid), -and hands each connection to a ClientSession that multiplexes JSON-RPC +The UnixSocketServer accepts incoming client connections on the broker socket +and hands each connection to a per-client handler that multiplexes JSON-RPC traffic to/from the upstream bridge managed by BrokerDaemon. Request ID remapping -------------------- -Outgoing request IDs are namespaced: - broker_id = (client_session_id << 20) | original_id_int +Outgoing request IDs are namespaced to prevent collisions across clients: + + broker_id = (session_id << 20) | (original_id_int & 0xFFFFF) Responses from upstream carry broker_id; the server extracts ``client_id = broker_id >> 20``, restores ``original_id``, and routes @@ -23,24 +21,388 @@ from __future__ import annotations -from mcpbridge_wrapper.broker.types import BrokerConfig +import asyncio +import contextlib +import json +import logging +import time +from typing import TYPE_CHECKING, Any + +from mcpbridge_wrapper.broker.types import BrokerConfig, BrokerState, ClientSession + +if TYPE_CHECKING: + from mcpbridge_wrapper.broker.daemon import BrokerDaemon + +logger = logging.getLogger(__name__) + +# Bit-shift for ID namespacing: session_id occupies the upper bits. +_SESSION_SHIFT = 20 +_ID_MASK = (1 << _SESSION_SHIFT) - 1 # 0xFFFFF class UnixSocketServer: """Accepts and manages local client connections over a Unix domain socket. - This is a stub class. All methods raise NotImplementedError until P13-T3 - provides the full implementation. + The server is tightly coupled to a :class:`BrokerDaemon` instance which + owns the upstream subprocess. Call :meth:`start` once the daemon is READY, + and :meth:`stop` before the daemon shuts down. + + Parameters + ---------- + config: + Shared broker configuration (socket path, TTL settings, etc.). + daemon: + The owning :class:`BrokerDaemon` instance. Used to write requests to + the upstream subprocess stdin and to read the daemon state. """ - def __init__(self, config: BrokerConfig) -> None: + def __init__(self, config: BrokerConfig, daemon: BrokerDaemon) -> None: """Initialise the server with the given broker configuration.""" self._config = config + self._daemon = daemon + self._server: asyncio.AbstractServer | None = None + self._sessions: dict[int, ClientSession] = {} + self._next_session_id: int = 1 + self._stop_event: asyncio.Event = asyncio.Event() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + @property + def sessions(self) -> dict[int, ClientSession]: + """Currently connected client sessions (read-only view).""" + return self._sessions async def start(self) -> None: - """Bind and begin accepting connections.""" - raise NotImplementedError("UnixSocketServer.start() is implemented in P13-T3") + """Bind to the Unix socket and begin accepting connections.""" + socket_path = str(self._config.socket_path) + self._stop_event.clear() + self._server = await asyncio.start_unix_server( + self._handle_client, + path=socket_path, + ) + logger.info("UnixSocketServer listening on %s", socket_path) async def stop(self) -> None: - """Close the server socket and disconnect all active sessions.""" - raise NotImplementedError("UnixSocketServer.stop() is implemented in P13-T3") + """Stop accepting connections and drain in-flight requests. + + Sends JSON-RPC error ``-32001`` to each client that has outstanding + pending requests, then closes all writer streams. Waits up to + ``config.graceful_shutdown_timeout`` seconds for clean completion. + """ + self._stop_event.set() + + if self._server is not None: + self._server.close() + with contextlib.suppress(Exception): + await asyncio.wait_for( + self._server.wait_closed(), + timeout=self._config.graceful_shutdown_timeout, + ) + + # Notify all connected clients of pending request failures + for session in list(self._sessions.values()): + await self._drain_session(session) + + logger.info("UnixSocketServer stopped") + + async def route_upstream_response(self, line: str) -> None: + """Route a single JSON-RPC line received from upstream. + + Called by :class:`BrokerDaemon` each time a complete line arrives from + the upstream subprocess stdout. + + - If the message has a valid broker ``id``, it is routed to the + originating :class:`ClientSession` and the original ``id`` is restored. + - If the message has ``id == null`` or no ``id`` field, it is broadcast + to all connected clients. + - Malformed lines are logged and silently dropped. + """ + try: + msg = json.loads(line) + except (json.JSONDecodeError, ValueError) as exc: + logger.debug("Malformed upstream line (%s): %r", exc, line) + return + + if not isinstance(msg, dict): + logger.debug("Upstream sent non-object JSON: %r", msg) + return + + raw_id = msg.get("id") + + if raw_id is None: + # Notification → broadcast + await self._broadcast(line) + return + + if not isinstance(raw_id, int): + logger.debug("Unexpected non-integer broker_id from upstream: %r", raw_id) + return + + broker_id: int = raw_id + client_id = broker_id >> _SESSION_SHIFT + int_local_id = broker_id & _ID_MASK + + session = self._sessions.get(client_id) + if session is None: + logger.debug( + "No session for client_id=%d (broker_id=%d); dropping response.", + client_id, + broker_id, + ) + return + + # Restore original request ID + original_id: int | str | None = int_local_id + # Check if the original ID was a string + for str_id, mapped_int in session.string_id_map.items(): + if mapped_int == int_local_id: + original_id = str_id + break + + # Rebuild the message with the original ID + msg["id"] = original_id + restored_line = json.dumps(msg, separators=(",", ":")) + + # Fulfil the pending future (if any) and write to the client + fut = session.pending.pop(broker_id, None) + if fut is not None and not fut.done(): + fut.set_result(restored_line) + + await self._write_to_session(session, restored_line) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + async def _handle_client( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + """Handle the lifecycle of a single connected client.""" + session_id = self._next_session_id + self._next_session_id += 1 + + try: + peer_uid = writer.get_extra_info("peername") or 0 + if isinstance(peer_uid, (list, tuple)): + peer_uid = int(peer_uid[0]) if peer_uid else 0 + except Exception: + peer_uid = 0 + + session = ClientSession( + session_id=session_id, + peer_uid=peer_uid, + connected_at=time.time(), + writer=writer, + ) + self._sessions[session_id] = session + logger.debug("Client connected: session_id=%d", session_id) + + try: + await self._read_client_loop(session, reader) + except asyncio.CancelledError: + pass + except Exception as exc: + logger.warning("Client session %d error: %s", session_id, exc) + finally: + self._sessions.pop(session_id, None) + with contextlib.suppress(Exception): + writer.close() + await writer.wait_closed() + logger.debug("Client disconnected: session_id=%d", session_id) + + async def _read_client_loop( + self, + session: ClientSession, + reader: asyncio.StreamReader, + ) -> None: + """Read JSON-RPC requests from a single client and forward to upstream.""" + while not self._stop_event.is_set(): + try: + raw = await asyncio.wait_for(reader.readline(), timeout=1.0) + except asyncio.TimeoutError: + continue + except (asyncio.CancelledError, GeneratorExit): + break + except Exception as exc: + logger.warning("Read error from session %d: %s", session.session_id, exc) + break + + if not raw: + # Client disconnected + break + + line = raw.decode(errors="replace").rstrip("\n") + if not line: + continue + + await self._process_client_line(session, line) + + async def _process_client_line(self, session: ClientSession, line: str) -> None: + """Parse, remap, and forward one JSON-RPC line from a client.""" + try: + msg = json.loads(line) + except (json.JSONDecodeError, ValueError) as exc: + logger.debug( + "Malformed request from session %d (%s): %r", + session.session_id, + exc, + line, + ) + await self._send_parse_error(session, None) + return + + if not isinstance(msg, dict): + await self._send_parse_error(session, None) + return + + raw_id = msg.get("id") + is_notification = raw_id is None + + if not is_notification: + # Check TTL during reconnection + daemon_state = self._daemon.state + if daemon_state == BrokerState.RECONNECTING: + # Queue request and check TTL + queued_at = time.time() + # Wait up to queue_ttl for daemon to become READY + deadline = queued_at + self._config.queue_ttl + while self._daemon.state == BrokerState.RECONNECTING: + if time.time() > deadline: + await self._send_error( + session, + raw_id, + -32001, + "Broker reconnecting — request TTL exceeded", + ) + return + await asyncio.sleep(0.1) + + if self._daemon.state not in (BrokerState.READY,): + await self._send_error( + session, + raw_id, + -32001, + "Broker unavailable", + ) + return + + # Remap the request ID + original_id = raw_id + if isinstance(original_id, str): + # Assign a stable integer alias for string IDs + if original_id not in session.string_id_map: + # Use a simple incrementing counter within the session's lower bits + existing_ints = set(session.string_id_map.values()) + next_int = 1 + while next_int in existing_ints: + next_int += 1 + session.string_id_map[original_id] = next_int & _ID_MASK + int_id = session.string_id_map[original_id] + elif isinstance(original_id, int): + int_id = original_id & _ID_MASK + else: + await self._send_parse_error(session, original_id) + return + + broker_id = (session.session_id << _SESSION_SHIFT) | int_id + msg["id"] = broker_id + + # Track pending request + loop = asyncio.get_event_loop() + fut: asyncio.Future[str] = loop.create_future() + session.pending[broker_id] = fut + + remapped_line = json.dumps(msg, separators=(",", ":")) + + # Write to upstream + upstream = self._daemon._upstream # noqa: SLF001 + if upstream is None or upstream.stdin is None: + if not is_notification: + await self._send_error( + session, + raw_id, + -32001, + "Upstream bridge not available", + ) + session.pending.pop(broker_id, None) + return + + try: + upstream.stdin.write((remapped_line + "\n").encode()) + await upstream.stdin.drain() + logger.debug( + "client %d → upstream: %s", + session.session_id, + remapped_line, + ) + except Exception as exc: + logger.warning( + "Failed to write to upstream from session %d: %s", + session.session_id, + exc, + ) + if not is_notification: + await self._send_error(session, raw_id, -32001, "Upstream write failed") + session.pending.pop(broker_id, None) + + async def _broadcast(self, line: str) -> None: + """Write ``line`` to all connected client sessions.""" + for session in list(self._sessions.values()): + await self._write_to_session(session, line) + + async def _write_to_session(self, session: ClientSession, line: str) -> None: + """Write a single JSON-RPC line to a client session's writer.""" + try: + session.writer.write((line + "\n").encode()) + await session.writer.drain() + except Exception as exc: + logger.debug("Write error to session %d: %s", session.session_id, exc) + + async def _send_parse_error( + self, + session: ClientSession, + request_id: Any, + ) -> None: + """Send a JSON-RPC parse error (-32700) to the client.""" + await self._send_error(session, request_id, -32700, "Parse error") + + async def _send_error( + self, + session: ClientSession, + request_id: Any, + code: int, + message: str, + ) -> None: + """Send a JSON-RPC error response to the client.""" + error_response = json.dumps( + { + "jsonrpc": "2.0", + "id": request_id, + "error": {"code": code, "message": message}, + }, + separators=(",", ":"), + ) + await self._write_to_session(session, error_response) + + async def _drain_session(self, session: ClientSession) -> None: + """Send -32001 error for all pending requests and close the session.""" + for broker_id, fut in list(session.pending.items()): + if not fut.done(): + fut.cancel() + # Determine original_id for the error response + int_local_id = broker_id & _ID_MASK + original_id: int | str = int_local_id + for str_id, mapped_int in session.string_id_map.items(): + if mapped_int == int_local_id: + original_id = str_id + break + await self._send_error(session, original_id, -32001, "Broker shutting down") + session.pending.clear() + + with contextlib.suppress(Exception): + session.writer.close() + await session.writer.wait_closed() diff --git a/tests/test_check_doc_sync.py b/tests/test_check_doc_sync.py index 4cf473c7..3ab6fe54 100644 --- a/tests/test_check_doc_sync.py +++ b/tests/test_check_doc_sync.py @@ -38,6 +38,28 @@ def test_check_doc_sync_accepts_synced_docs() -> None: assert module.check_doc_sync(changed_files) is True +def test_get_changed_files_unstaged_includes_untracked(monkeypatch) -> None: + """Unstaged mode should union tracked modifications with new untracked files.""" + module = load_script_module() + + def fake_git_name_only(args: list[str]) -> set[str]: + if "--name-only" in args and "--cached" not in args: + return {"docs/webui-setup.md"} + return set() + + monkeypatch.setattr(module, "_run_git_name_only", fake_git_name_only) + monkeypatch.setattr( + module, + "_get_untracked_files", + lambda: {"Sources/XcodeMCPWrapper/Documentation.docc/WebUIDashboard.md"}, + ) + + changed_files = module.get_changed_files("unstaged") + + assert "docs/webui-setup.md" in changed_files + assert "Sources/XcodeMCPWrapper/Documentation.docc/WebUIDashboard.md" in changed_files + + def test_get_changed_files_branch_falls_back_when_origin_main_missing(monkeypatch) -> None: """Branch mode should gracefully fall back when origin/main is unavailable.""" module = load_script_module() diff --git a/tests/unit/test_broker_stubs.py b/tests/unit/test_broker_stubs.py index 0c3af5ba..6d935ea3 100644 --- a/tests/unit/test_broker_stubs.py +++ b/tests/unit/test_broker_stubs.py @@ -168,22 +168,22 @@ def test_initial_state_is_init(self) -> None: # --------------------------------------------------------------------------- -# UnixSocketServer stubs +# UnixSocketServer — basic instantiation (full tests in test_broker_transport.py) # --------------------------------------------------------------------------- -class TestUnixSocketServerStubs: +class TestUnixSocketServerInstantiation: def setup_method(self) -> None: self.cfg = BrokerConfig.default() - self.server = UnixSocketServer(self.cfg) + self.daemon_mock = MagicMock() + self.daemon_mock.state = BrokerState.READY + self.server = UnixSocketServer(self.cfg, self.daemon_mock) - def test_start_raises_not_implemented(self) -> None: - with pytest.raises(NotImplementedError): - asyncio.run(self.server.start()) + def test_instantiation_succeeds(self) -> None: + assert self.server is not None - def test_stop_raises_not_implemented(self) -> None: - with pytest.raises(NotImplementedError): - asyncio.run(self.server.stop()) + def test_sessions_initially_empty(self) -> None: + assert self.server.sessions == {} # --------------------------------------------------------------------------- diff --git a/tests/unit/test_broker_transport.py b/tests/unit/test_broker_transport.py new file mode 100644 index 00000000..7d6f901b --- /dev/null +++ b/tests/unit/test_broker_transport.py @@ -0,0 +1,699 @@ +"""Tests for UnixSocketServer — P13-T3 implementation. + +Covers: +- Server instantiation and basic properties +- route_upstream_response: notification broadcast +- route_upstream_response: targeted response routing + ID restoration +- route_upstream_response: unknown client_id dropped silently +- route_upstream_response: malformed line silently ignored +- Client request processing: ID remapping (int and string IDs) +- Client request processing: malformed payload returns parse error +- Client request processing: upstream unavailable returns -32001 +- Two concurrent clients receive independent responses +- Graceful stop drains pending requests with -32001 +- Queue TTL during RECONNECTING state +""" + +from __future__ import annotations + +import asyncio +import json +import time +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from mcpbridge_wrapper.broker.transport import _ID_MASK, _SESSION_SHIFT, UnixSocketServer +from mcpbridge_wrapper.broker.types import BrokerConfig, BrokerState, ClientSession + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_config(tmp_path: Any) -> BrokerConfig: + from pathlib import Path + + base = Path(tmp_path) + return BrokerConfig( + socket_path=base / "broker.sock", + pid_file=base / "broker.pid", + upstream_cmd=["true"], + reconnect_backoff_cap=1, + queue_ttl=2, + graceful_shutdown_timeout=1, + ) + + +def _make_daemon_mock(state: BrokerState = BrokerState.READY) -> MagicMock: + daemon = MagicMock() + daemon.state = state + upstream = MagicMock() + upstream.stdin = MagicMock() + upstream.stdin.write = MagicMock() + upstream.stdin.drain = AsyncMock() + daemon._upstream = upstream + return daemon + + +def _make_writer() -> MagicMock: + writer = MagicMock() + writer.write = MagicMock() + writer.drain = AsyncMock() + writer.close = MagicMock() + writer.wait_closed = AsyncMock() + writer.get_extra_info = MagicMock(return_value=None) + return writer + + +def _make_session(session_id: int = 1) -> ClientSession: + return ClientSession( + session_id=session_id, + peer_uid=501, + connected_at=time.time(), + writer=_make_writer(), + ) + + +def _make_server(tmp_path: Any, state: BrokerState = BrokerState.READY) -> UnixSocketServer: + cfg = _make_config(tmp_path) + daemon = _make_daemon_mock(state) + return UnixSocketServer(cfg, daemon) + + +# --------------------------------------------------------------------------- +# Instantiation +# --------------------------------------------------------------------------- + + +class TestUnixSocketServerInstantiation: + @pytest.mark.asyncio + async def test_sessions_initially_empty(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + assert server.sessions == {} + + @pytest.mark.asyncio + async def test_next_session_id_starts_at_one(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + assert server._next_session_id == 1 + + +# --------------------------------------------------------------------------- +# route_upstream_response — notifications (broadcast) +# --------------------------------------------------------------------------- + + +class TestRouteUpstreamNotification: + @pytest.mark.asyncio + async def test_notification_broadcast_to_all_clients(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + s1 = _make_session(1) + s2 = _make_session(2) + server._sessions[1] = s1 + server._sessions[2] = s2 + + notification = json.dumps({"jsonrpc": "2.0", "method": "notify", "id": None}) + await server.route_upstream_response(notification) + + s1.writer.write.assert_called() + s2.writer.write.assert_called() + + @pytest.mark.asyncio + async def test_notification_without_id_field_is_broadcast(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + s1 = _make_session(1) + server._sessions[1] = s1 + + # Message with no "id" field at all + notification = json.dumps({"jsonrpc": "2.0", "method": "progress", "params": {}}) + # This has no "id" key → msg.get("id") returns None → broadcast + await server.route_upstream_response(notification) + + s1.writer.write.assert_called() + + @pytest.mark.asyncio + async def test_malformed_json_is_silently_dropped(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + s1 = _make_session(1) + server._sessions[1] = s1 + + await server.route_upstream_response("not json at all {{{") + # No writes should have been made + s1.writer.write.assert_not_called() + + @pytest.mark.asyncio + async def test_non_object_json_is_silently_dropped(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + s1 = _make_session(1) + server._sessions[1] = s1 + + await server.route_upstream_response("[1, 2, 3]") + s1.writer.write.assert_not_called() + + +# --------------------------------------------------------------------------- +# route_upstream_response — targeted responses +# --------------------------------------------------------------------------- + + +class TestRouteUpstreamTargetedResponse: + @pytest.mark.asyncio + async def test_integer_id_routed_to_correct_session(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + session_id = 3 + original_id = 42 + broker_id = (session_id << _SESSION_SHIFT) | (original_id & _ID_MASK) + + s = _make_session(session_id) + loop = asyncio.get_event_loop() + fut: asyncio.Future[str] = loop.create_future() + s.pending[broker_id] = fut + server._sessions[session_id] = s + + response = json.dumps({"jsonrpc": "2.0", "id": broker_id, "result": {"ok": True}}) + await server.route_upstream_response(response) + + # Writer should have been called with the restored original ID + call_arg = s.writer.write.call_args[0][0] + decoded = json.loads(call_arg.rstrip(b"\n")) + assert decoded["id"] == original_id + + # Future should be resolved + assert fut.done() + assert json.loads(fut.result())["id"] == original_id + + @pytest.mark.asyncio + async def test_string_id_restored_from_map(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + session_id = 1 + s = _make_session(session_id) + # Simulate that "req-abc" was mapped to int alias 5 + s.string_id_map["req-abc"] = 5 + broker_id = (session_id << _SESSION_SHIFT) | 5 + loop = asyncio.get_event_loop() + fut: asyncio.Future[str] = loop.create_future() + s.pending[broker_id] = fut + server._sessions[session_id] = s + + response = json.dumps({"jsonrpc": "2.0", "id": broker_id, "result": {}}) + await server.route_upstream_response(response) + + call_arg = s.writer.write.call_args[0][0] + decoded = json.loads(call_arg.rstrip(b"\n")) + assert decoded["id"] == "req-abc" + + @pytest.mark.asyncio + async def test_unknown_client_id_drops_silently(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + # client_id 99 has no session + broker_id = (99 << _SESSION_SHIFT) | 1 + response = json.dumps({"jsonrpc": "2.0", "id": broker_id, "result": {}}) + # Should not raise + await server.route_upstream_response(response) + + @pytest.mark.asyncio + async def test_non_integer_broker_id_dropped(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + s1 = _make_session(1) + server._sessions[1] = s1 + # String id from upstream (unexpected) + response = json.dumps({"jsonrpc": "2.0", "id": "unexpected-str", "result": {}}) + await server.route_upstream_response(response) + s1.writer.write.assert_not_called() + + +# --------------------------------------------------------------------------- +# _process_client_line — request ID remapping +# --------------------------------------------------------------------------- + + +class TestProcessClientLine: + @pytest.mark.asyncio + async def test_integer_id_is_remapped(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + session = _make_session(2) + server._sessions[2] = session + + request = json.dumps({"jsonrpc": "2.0", "id": 10, "method": "tools/list"}) + await server._process_client_line(session, request) + + expected_broker_id = (2 << _SESSION_SHIFT) | 10 + written = session.pending + assert expected_broker_id in written + + call_bytes: bytes = server._daemon._upstream.stdin.write.call_args[0][0] + sent = json.loads(call_bytes.rstrip(b"\n")) + assert sent["id"] == expected_broker_id + + @pytest.mark.asyncio + async def test_string_id_is_aliased(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + request = json.dumps({"jsonrpc": "2.0", "id": "call-1", "method": "tools/call"}) + await server._process_client_line(session, request) + + assert "call-1" in session.string_id_map + alias = session.string_id_map["call-1"] + expected_broker_id = (1 << _SESSION_SHIFT) | alias + assert expected_broker_id in session.pending + + @pytest.mark.asyncio + async def test_malformed_json_sends_parse_error(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + await server._process_client_line(session, "{broken json") + + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["error"]["code"] == -32700 + + @pytest.mark.asyncio + async def test_upstream_unavailable_returns_32001(self, tmp_path: Any) -> None: + cfg = _make_config(tmp_path) + daemon = _make_daemon_mock() + daemon._upstream = None + server = UnixSocketServer(cfg, daemon) + session = _make_session(1) + server._sessions[1] = session + + request = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "tools/list"}) + await server._process_client_line(session, request) + + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["error"]["code"] == -32001 + + @pytest.mark.asyncio + async def test_notification_forwarded_without_pending(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + # Notification has no id field + request = json.dumps({"jsonrpc": "2.0", "method": "cancelled"}) + await server._process_client_line(session, request) + + assert session.pending == {} + # Was written to upstream + server._daemon._upstream.stdin.write.assert_called() + + +# --------------------------------------------------------------------------- +# Two concurrent clients +# --------------------------------------------------------------------------- + + +class TestConcurrentClients: + @pytest.mark.asyncio + async def test_two_clients_receive_independent_responses(self, tmp_path: Any) -> None: + """Responses for two concurrent clients are routed independently.""" + server = _make_server(tmp_path) + s1 = _make_session(1) + s2 = _make_session(2) + server._sessions[1] = s1 + server._sessions[2] = s2 + + # Simulate s1 pending request with broker_id for session 1, original_id=1 + broker_id_1 = (1 << _SESSION_SHIFT) | 1 + loop = asyncio.get_event_loop() + fut1: asyncio.Future[str] = loop.create_future() + s1.pending[broker_id_1] = fut1 + + # Simulate s2 pending request with broker_id for session 2, original_id=1 + broker_id_2 = (2 << _SESSION_SHIFT) | 1 + fut2: asyncio.Future[str] = loop.create_future() + s2.pending[broker_id_2] = fut2 + + # Route response for s1 + resp1 = json.dumps({"jsonrpc": "2.0", "id": broker_id_1, "result": {"for": "s1"}}) + await server.route_upstream_response(resp1) + + # Route response for s2 + resp2 = json.dumps({"jsonrpc": "2.0", "id": broker_id_2, "result": {"for": "s2"}}) + await server.route_upstream_response(resp2) + + # s1's response went only to s1 + assert fut1.done() + result1 = json.loads(fut1.result()) + assert result1["result"]["for"] == "s1" + assert result1["id"] == 1 # original restored + + # s2's response went only to s2 + assert fut2.done() + result2 = json.loads(fut2.result()) + assert result2["result"]["for"] == "s2" + assert result2["id"] == 1 # original restored + + # Each writer called exactly once for targeted response + s1.writer.write.assert_called_once() + s2.writer.write.assert_called_once() + + +# --------------------------------------------------------------------------- +# Graceful stop — drain pending +# --------------------------------------------------------------------------- + + +class TestGracefulStop: + @pytest.mark.asyncio + async def test_stop_sends_32001_for_pending_requests(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + broker_id = (1 << _SESSION_SHIFT) | 7 + loop = asyncio.get_event_loop() + fut: asyncio.Future[str] = loop.create_future() + session.pending[broker_id] = fut + + # Patch asyncio.start_unix_server to avoid actual socket creation + with patch("asyncio.start_unix_server", new=AsyncMock(return_value=MagicMock())): + await server.start() + + await server.stop() + + # pending should be cleared + assert session.pending == {} + # Writer should have been called with a -32001 error + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["error"]["code"] == -32001 + + +# --------------------------------------------------------------------------- +# Queue TTL during RECONNECTING +# --------------------------------------------------------------------------- + + +class TestQueueTTL: + @pytest.mark.asyncio + async def test_ttl_exceeded_returns_32001(self, tmp_path: Any) -> None: + cfg = _make_config(tmp_path) + # queue_ttl = 2 from _make_config; we'll force immediate expiry + cfg = BrokerConfig( + socket_path=cfg.socket_path, + pid_file=cfg.pid_file, + upstream_cmd=["true"], + reconnect_backoff_cap=1, + queue_ttl=0, # immediate expiry + graceful_shutdown_timeout=1, + ) + daemon = _make_daemon_mock(state=BrokerState.RECONNECTING) + server = UnixSocketServer(cfg, daemon) + session = _make_session(1) + server._sessions[1] = session + + request = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "tools/list"}) + await server._process_client_line(session, request) + + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["error"]["code"] == -32001 + assert "TTL" in response["error"]["message"] + + @pytest.mark.asyncio + async def test_reconnect_becomes_ready_proceeds(self, tmp_path: Any) -> None: + """When daemon transitions RECONNECTING → READY, request is forwarded.""" + from unittest.mock import PropertyMock + + cfg = _make_config(tmp_path) + daemon = _make_daemon_mock(state=BrokerState.RECONNECTING) + server = UnixSocketServer(cfg, daemon) + session = _make_session(1) + server._sessions[1] = session + + # Call sequence: + # 1. daemon_state = self._daemon.state → RECONNECTING (enters reconnect branch) + # 2. while self._daemon.state == RECONNECTING → RECONNECTING (loop body runs once) + # 3. while self._daemon.state == RECONNECTING → READY (exits while loop) + # 4. if self._daemon.state not in (READY,) → READY (does NOT return error) + state_sequence = [ + BrokerState.RECONNECTING, + BrokerState.RECONNECTING, + BrokerState.READY, + BrokerState.READY, + ] + state_mock = PropertyMock(side_effect=state_sequence) + type(daemon).state = state_mock + + request = json.dumps({"jsonrpc": "2.0", "id": 5, "method": "tools/list"}) + await server._process_client_line(session, request) + + # Request should have been forwarded to upstream + daemon._upstream.stdin.write.assert_called() + + +# --------------------------------------------------------------------------- +# _handle_client — session registration and cleanup +# --------------------------------------------------------------------------- + + +class TestHandleClient: + @pytest.mark.asyncio + async def test_session_registered_and_removed(self, tmp_path: Any) -> None: + """_handle_client registers a session during the read loop and removes it after.""" + server = _make_server(tmp_path) + + writer = _make_writer() + # Set stop event so the read loop exits immediately + server._stop_event.set() + + reader = MagicMock() + reader.readline = AsyncMock(return_value=b"") + + await server._handle_client(reader, writer) + + # Session should be removed after disconnect + assert server._sessions == {} + + @pytest.mark.asyncio + async def test_peer_uid_from_tuple(self, tmp_path: Any) -> None: + """_handle_client handles tuple peername correctly.""" + server = _make_server(tmp_path) + server._stop_event.set() + + writer = _make_writer() + writer.get_extra_info = MagicMock(return_value=("/tmp/sock", 501)) + + reader = MagicMock() + reader.readline = AsyncMock(return_value=b"") + + await server._handle_client(reader, writer) + # Should not raise; session cleaned up + assert server._sessions == {} + + @pytest.mark.asyncio + async def test_exception_in_read_loop_is_handled(self, tmp_path: Any) -> None: + """_handle_client logs and cleans up when the read loop raises.""" + server = _make_server(tmp_path) + + writer = _make_writer() + reader = MagicMock() + reader.readline = AsyncMock(side_effect=RuntimeError("boom")) + + await server._handle_client(reader, writer) + assert server._sessions == {} + + +# --------------------------------------------------------------------------- +# _read_client_loop — branches +# --------------------------------------------------------------------------- + + +class TestReadClientLoop: + @pytest.mark.asyncio + async def test_empty_line_is_skipped(self, tmp_path: Any) -> None: + """_read_client_loop skips empty decoded lines.""" + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + reader = MagicMock() + # Return empty string line, then b"" to signal disconnect + reader.readline = AsyncMock(side_effect=[b"\n", b""]) + await server._read_client_loop(session, reader) + + # No upstream writes triggered (line was empty) + server._daemon._upstream.stdin.write.assert_not_called() + + @pytest.mark.asyncio + async def test_read_exception_breaks_loop(self, tmp_path: Any) -> None: + """_read_client_loop breaks on unexpected read exception.""" + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + reader = MagicMock() + reader.readline = AsyncMock(side_effect=OSError("connection reset")) + await server._read_client_loop(session, reader) + # Should complete without raising + + @pytest.mark.asyncio + async def test_timeout_then_disconnect(self, tmp_path: Any) -> None: + """_read_client_loop retries on TimeoutError then exits on EOF.""" + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + reader = MagicMock() + reader.readline = AsyncMock(side_effect=[asyncio.TimeoutError(), b""]) + await server._read_client_loop(session, reader) + # Should complete without raising + + +# --------------------------------------------------------------------------- +# _process_client_line — additional branches +# --------------------------------------------------------------------------- + + +class TestProcessClientLineAdditional: + @pytest.mark.asyncio + async def test_non_dict_json_sends_parse_error(self, tmp_path: Any) -> None: + """A JSON array body triggers a parse error.""" + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + await server._process_client_line(session, "[1, 2, 3]") + + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["error"]["code"] == -32700 + + @pytest.mark.asyncio + async def test_float_id_sends_parse_error(self, tmp_path: Any) -> None: + """A float request ID is rejected with a parse error.""" + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + request = json.dumps({"jsonrpc": "2.0", "id": 1.5, "method": "tools/list"}) + await server._process_client_line(session, request) + + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["error"]["code"] == -32700 + + @pytest.mark.asyncio + async def test_string_id_reuses_existing_alias(self, tmp_path: Any) -> None: + """Sending the same string ID twice reuses the same integer alias.""" + server = _make_server(tmp_path) + session = _make_session(1) + server._sessions[1] = session + + request = json.dumps({"jsonrpc": "2.0", "id": "stable-id", "method": "tools/list"}) + # Send twice + await server._process_client_line(session, request) + first_alias = session.string_id_map.get("stable-id") + + await server._process_client_line(session, request) + second_alias = session.string_id_map.get("stable-id") + + assert first_alias == second_alias + assert first_alias is not None + + @pytest.mark.asyncio + async def test_upstream_write_failure_returns_32001(self, tmp_path: Any) -> None: + """If upstream stdin.drain raises, client gets a -32001 error.""" + cfg = _make_config(tmp_path) + daemon = _make_daemon_mock() + daemon._upstream.stdin.drain = AsyncMock(side_effect=OSError("pipe broken")) + server = UnixSocketServer(cfg, daemon) + session = _make_session(1) + server._sessions[1] = session + + request = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "tools/list"}) + await server._process_client_line(session, request) + + # Should have received a -32001 error + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["error"]["code"] == -32001 + + @pytest.mark.asyncio + async def test_reconnecting_then_unavailable_returns_32001(self, tmp_path: Any) -> None: + """After reconnect wait, if state is not READY, return -32001.""" + cfg = _make_config(tmp_path) + cfg = BrokerConfig( + socket_path=cfg.socket_path, + pid_file=cfg.pid_file, + upstream_cmd=["true"], + reconnect_backoff_cap=1, + queue_ttl=5, + graceful_shutdown_timeout=1, + ) + daemon = _make_daemon_mock(state=BrokerState.RECONNECTING) + server = UnixSocketServer(cfg, daemon) + session = _make_session(1) + server._sessions[1] = session + + call_count = 0 + + def state_side_effect(obj: Any) -> BrokerState: + nonlocal call_count + call_count += 1 + # First call in daemon_state check + if call_count == 1: + return BrokerState.RECONNECTING + # While loop check — immediately exit by returning non-RECONNECTING + if call_count == 2: + return BrokerState.STOPPING # not RECONNECTING, exits while loop + return BrokerState.STOPPING + + type(daemon).state = property(state_side_effect) + try: + request = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "tools/list"}) + await server._process_client_line(session, request) + finally: + del type(daemon).state + + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["error"]["code"] == -32001 + + +# --------------------------------------------------------------------------- +# _drain_session — string ID pending request +# --------------------------------------------------------------------------- + + +class TestDrainSession: + @pytest.mark.asyncio + async def test_drain_with_string_id_sends_string_in_error(self, tmp_path: Any) -> None: + server = _make_server(tmp_path) + session = _make_session(1) + session.string_id_map["my-req"] = 3 + broker_id = (1 << _SESSION_SHIFT) | 3 + loop = asyncio.get_event_loop() + fut: asyncio.Future[str] = loop.create_future() + session.pending[broker_id] = fut + + await server._drain_session(session) + + assert session.pending == {} + assert fut.cancelled() + call_bytes: bytes = session.writer.write.call_args[0][0] + response = json.loads(call_bytes.rstrip(b"\n")) + assert response["id"] == "my-req" + assert response["error"]["code"] == -32001 + + @pytest.mark.asyncio + async def test_route_response_already_done_future_skipped(self, tmp_path: Any) -> None: + """A future that is already done is not set again.""" + server = _make_server(tmp_path) + session = _make_session(1) + broker_id = (1 << _SESSION_SHIFT) | 2 + loop = asyncio.get_event_loop() + fut: asyncio.Future[str] = loop.create_future() + fut.set_result("already done") + session.pending[broker_id] = fut + server._sessions[1] = session + + response = json.dumps({"jsonrpc": "2.0", "id": broker_id, "result": {}}) + # Should not raise InvalidStateError + await server.route_upstream_response(response)