feat(subscriptions): implement WebSocket channel — Phase 2#62
Conversation
assertions and monotonic event checks
│ needless lifetimes in websocket integration helpers
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
|
Additional fixes in the subscriptions smoke regression and closes the R4 backport gaps that caused it. 1) R4 backport topic handling (
|
smunini
left a comment
There was a problem hiding this comment.
Overall a good PR, however, I think there was a compilation without any fhir version feature and it caused a bunch of changes that are unnecessary. I think we can revert all of the changes to the fhirpath crate for example. It's ok to assume we will always have at least one fhirversion feature turned on (or all of them) during a compile. Perhaps something to add to CLAUDE.md.
34c0915 to
13be1ac
Compare
default-features=false on helios-fhir dep
default_cli_fhir_version cfg ladder
restore cfg-gated fallback arm in parse_fhir_resource
unconditional fallback arm in extract_parameters
keep helios-fhirpath/R<X> forwarding for cfg-gated arms
default-features=false on intra-workspace deps; keep axum ws
helios-subscriptions?/R<X> forwarding and default-features=false
FhirVersion::notification_bundle_type helper
assumption [skip ci]
The match in get_compartment_params_for_version cfg-gates each arm by FHIR feature, but FhirVersion variants are compiled based on helios-fhir's own features (which default to R4). Builds that exclude R4 on helios-rest (e.g. single-R4B) leave FhirVersion::R4 uncovered. A wildcard fallback previously handled this but was dropped in e10a951. Return Result<_, String> with an explicit Err for the fallback, matching the pattern used in responses/format.rs, and map it to RestError::InternalError at the call site.
Internal workspace crates pulled in helios-fhir, helios-persistence, helios-fhirpath, helios-sof, helios-serde, and helios-subscriptions with their default features on. Because cargo unifies features across the build graph, helios-fhir/R4 was forced on by default even in builds targeting only R4B/R5/R6, producing FhirVersion variants that the version-gated match arms in callers could not cover. Add default-features = false to internal consumers. Each parent crate's R<x>/backend feature table already forwards what it needs, so no functional change when default features are selected upstream. Known leak: helios-audit still pulls helios-fhir/R4 unconditionally because its source hardcodes helios_fhir::r4::AuditEvent. Wildcard fallback arms (already used in responses/format.rs and fhir_types.rs) are extended to the four analogous matches in helios-fhirpath so the leak stays harmless. Fix a pre-existing bug in fhir_types.rs where get_r5_resource_types / get_r6_resource_types referenced R4_RESOURCE_TYPES in branches where R4 was not enabled.
The hand-maintained R*_RESOURCE_TYPES constants in fhir_types.rs were gated with `not(feature = "R4")` etc., so any multi-version build that also enabled R4 dropped the later-version constants entirely. The extractor's `is_valid_resource_type` then rejected types like SubscriptionTopic with HTTP 400 whenever the server ran as R4B/R5/R6 on a build that also included R4. Replace the constants with OnceLock-cached lookups that delegate to each version's generated Resource enum via FhirResourceTypeProvider. `is_valid_resource_type` now checks the union across all enabled versions; `get_resource_type_names_for_version` returns the correct per-version list. Adds regression tests covering single- and multi-version feature combinations.
Summary
Implements Phase 2 of the FHIR Topic-Based Subscriptions roadmap: the WebSocket channel. Builds on top of the Phase 1 rest-hook infrastructure without any breaking changes.
Clients subscribe with
channel.type = "websocket", obtain a short-lived binding token from$get-ws-binding-token, connect to/ws/subscriptions/bind?token=<token>, and receive notification bundles as JSON text frames. The protocol is unidirectional (server → client).Related: discussion #59, Phase 1 PR #61.
What changed
helios-subscriptionscrateNew files:
src/channels/ws_token.rs—WsBindingTokenManager: generates UUID v4 tokens with configurable expiry (default 30 s), single-use via atomicDashMap::remove()src/channels/ws_manager.rs—WebSocketManager: tracks(tenant_id, subscription_id) → Vec<(client_id, UnboundedSender)>; broadcasts notifications, prunes closed channels, removes all clients on subscription deletionsrc/channels/websocket.rs—WebSocketChannelimplementingChannelDispatcher: dispatches by broadcasting to connected clients (best-effort, alwaysSuccess); handshake is a no-op (activation is immediate)tests/websocket_integration.rs— 8 integration tests: activation, delivery to single/multiple clients, zero-client no-panic, disconnected-client cleanup, binding token lifecycle, subscription-delete closes clients, tenant isolationModified files:
src/channels/mod.rs— declare the three new modulessrc/config.rs— addws_token_lifetime_secs: i64(default30)src/engine/mod.rs— addws_manager,ws_channel,ws_token_managerfields; exposews_manager()/ws_token_manager()accessors; addChannelType::Websocketarms toactivate_subscription()anddispatch_with_retry(); callws_manager.remove_all_clients()when a Subscription is deletedsrc/lib.rs— re-exportWebSocketManagerandWsBindingTokenManagerfor use by the REST layerhelios-restcrateNew files:
src/handlers/ws.rs—ws_bind_handler: validates the binding token, upgrades the HTTP connection, sends the handshake bundle as the first frame, then loopstokio::select!over the notification channel and the socket (for close detection); cleans up viaws_manager.remove_client()on exitModified files:
Cargo.toml— add"ws"to axum features (pulls intokio-tungsteniteautomatically)src/handlers/subscriptions.rs— addget_ws_binding_token_handler: validates channel type iswebsocket, generates token, returnsParameterswithtoken/expiration/websocket-urlsrc/handlers/mod.rs— registerwsmodule behind#[cfg(feature = "subscriptions")]src/routing/fhir_routes.rs— add/{resource_type}/{id}/$get-ws-binding-tokenand/ws/subscriptions/bindroutessrc/lib.rs— add"websocket"tosupported_channel_typesin the engine initializerDesign decisions
WebSocket dispatch is best-effort.
dispatch()always returnsDispatchResult::Successeven when zero clients are connected. Treating zero clients as an error would incorrectly drive the subscription intoerror/offstatus — a WebSocket subscription is valid before any client connects.Handshake is deferred to connect time. Unlike rest-hook (which POSTs a handshake to validate the endpoint), WebSocket subscriptions activate immediately. The handshake bundle is sent as the first WebSocket frame when a client connects and binds.
Binding tokens are single-use.
DashMap::remove()is atomic — a token can only be consumed once even under concurrent connection attempts.Subscription deletion closes clients. When a
Subscriptionresource is deleted,ws_manager.remove_all_clients()drops all sender halves. This causes therx.recv()calls in the WebSocket handlers to returnNone, triggering graceful close without an explicit close frame.No retry for WebSocket. The retry loop in
dispatch_with_retrystill runs, but sinceWebSocketChannel.dispatch()always returnsSuccess, it exits on the first call. Retries are meaningless for in-process channel broadcasting.Test plan
cargo test -p helios-subscriptions— 116 tests pass (106 unit + 2 rest-hook integration + 8 WebSocket integration)cargo test -p helios-rest— 170 tests passcargo clippy --all-targets --all-features -- -D warnings …— zero warningscargo fmt --all— cleanHFS_SUBSCRIPTIONS_ENABLED=true cargo run --bin hfs --features subscriptions, create a websocket subscription, call$get-ws-binding-token, connect withwebsocat "ws://localhost:8080/ws/subscriptions/bind?token=<token>", POST a matching resource, verify notification bundle arrives