feat(live): add live signal-streaming service#190
Merged
Conversation
New off-car service that fans out decoded signals to downstream WS/SSE consumers and serves a 60s in-memory backfill window to bridge Clickhouse ingestion delay. - Subscribes to query/live/# on MQTT without $share/ so every replica caches the full firehose; clients can land on any replica. - SignalHub supports exact subscriptions, glob patterns (path.Match: ecu_*, *_status, *) and per-publish dedup across both routes. - Per-(vehicle, signal_name) ring buffer evicts by producer timestamp; snapshot drives initial backfill on connect/reconnect. - WS (/live/ws) and SSE (/live/sse) endpoints share the hub. SSE Last-Event-ID is the last signal microsecond timestamp and is used as the resume cursor. - gr26's in-pod hub remains in place for legacy/TCM-dash/debugging. Adds the service to docker-compose, kerbecs routing (ws + sse as passthrough), the release script, deploy workflow, and a per-service image-build workflow modeled on gr26.yml.
gr26 now stamps signals[i].CreatedAt = time.Now().UTC() in CreateSignals
so the field is meaningful on the MQTT wire (previously zero, since
Clickhouse's DEFAULT now64(6) only applies to the persisted row, not
the published struct). live's cache eviction, Snapshot filter, suppress
maps, and SSE event ids all switch to CreatedAt — a wall-clock anchor
that doesn't drift with producer/CAN-frame clock skew.
Backfill is now sent as a single batched message per transport instead
of one frame/event per signal:
- WS: first frame is always a JSON array (possibly empty); subsequent
frames are single Signal objects. Clients distinguish by Array.isArray.
- SSE: `event: backfill` carries the array in `data:`; `event: signal`
carries one Signal per event. `id:` on both is CreatedAt micros.
CH insert keeps using the now64(6) default — created_at is not added to
the INSERT column list, so persisted timestamps remain server-authoritative.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
livethat fans out decoded signals fromquery/live/#to downstream WS/SSE consumers$share/prefix so every replica caches the full firehose; clients can land on any replica via round-robinSignalHubsupports exact subscriptions, glob patterns (ecu_*,*_status,*) and per-publish dedup across both routes(vehicle, signal_name)ring buffer evicted by producer timestamp; snapshot drives initial backfill on connect/reconnect/live/ws, SSE at/live/sse; SSELast-Event-IDis the last signal microsecond timestamp and resumes from theredocker-compose.yaml(port 7015),kerbecs.yaml(ws/sse passthrough + default),live.ymlGH workflow modeled ongr26.yml,deploy.ymlSERVICES list + awk regex,scripts/release.shGO_DEPENDENTS + GO_CONFIG_SERVICESGaucho-Racing/infrastructurekustomization needs amapache/liveimage entry before the first release will deploy it