Skip to content

Fix broadcast channel subscriber lag with replay mechanism #6

@kitplummer

Description

@kitplummer

Summary

The broadcast channel for change events has a fixed buffer of 256 messages. Slow subscribers silently lose events with only a warning log. There is no replay mechanism — clients that reconnect or lag behind must do a full List* call to rebuild state, with no way to know what they missed.

Current State

src/node.rs:102:

let (change_tx, _) = broadcast::channel(256);

When a subscriber falls behind (src/node.rs:142-143, 174-175):

Err(broadcast::error::RecvError::Lagged(n)) => {
    warn!("sync change listener lagged {n} messages");
}

The gRPC Subscribe stream (src/service.rs:314-350) wraps this broadcast channel. If a client's network is slow or it pauses processing, events are silently dropped.

Problems

  1. Silent data loss: Lagged subscribers miss change events with no indication to the client
  2. No reconnect strategy: A client that disconnects has no way to request "changes since X"
  3. No backpressure: Fast producers (e.g., watcher polling 100+ packages) can overwhelm slow consumers
  4. Buffer size is fixed: 256 may be too small for fleet-wide state sync bursts

Proposed Approach

Phase 1: Immediate improvements

  1. Make buffer size configurable via --broadcast-buffer-size / PEAT_SIDECAR_BROADCAST_BUFFER_SIZE
  2. Signal lag to gRPC clients: Send a sentinel DocumentChange with a special CHANGE_TYPE_RESYNC type so clients know they need to re-list
  3. Log with context: Include collection name and doc_id range in lag warnings

Phase 2: Sequence-based replay

  1. Add monotonic sequence numbers to ChangeEvent and DocumentChange proto message
  2. Maintain a bounded replay log (ring buffer of last N changes with sequence numbers)
  3. Accept since_sequence in SubscribeRequest: If the requested sequence is still in the replay log, replay from there; otherwise signal full resync needed
  4. Proto changes:
    message SubscribeRequest {
      repeated string collections = 1;
      optional uint64 since_sequence = 2;  // Resume from sequence number
    }
    
    message DocumentChange {
      string collection = 1;
      string doc_id = 2;
      ChangeType change_type = 3;
      optional string json_data = 4;
      uint64 sequence = 5;  // Monotonic sequence number
    }

Impact

Reliable change notification is critical for:

  • Fleet Command Hub maintaining real-time fleet view
  • Dashboards and monitoring tools consuming the Subscribe stream
  • Any client that can't afford to miss state transitions (e.g., command acknowledgments)

Files

  • src/node.rs:40-47ChangeEvent struct (add sequence field)
  • src/node.rs:102 — broadcast channel creation
  • src/node.rs:142-143, 174-175 — lag handling
  • src/service.rs:314-350Subscribe gRPC implementation
  • proto/sidecar.proto:309-326SubscribeRequest and DocumentChange messages

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions