Skip to content

Broadcast

Ori Pekelman edited this page May 22, 2026 · 1 revision

Tep::Broadcast

Topic-keyed pub/sub. Subscribe fds to topics; publish writes the payload to every matching fd. Foundation of Battery 2 (Broadcast) and the seam Presence (Battery 3) + LiveView (Battery 4) lean on.

Public surface

sub_id = Tep::Broadcast.subscribe(topic, fd)         # raw bytes
sub_id = Tep::Broadcast.subscribe_ws(topic, fd)      # WS TEXT frame
Tep::Broadcast.publish(topic, payload)               # → match count
Tep::Broadcast.unsubscribe(sub_id)
Tep::Broadcast.unsubscribe_fd(fd)                    # drop ALL for fd
Tep::Broadcast.subscriber_count
Tep::Broadcast.subscribers_for(topic)
Tep::Broadcast.clear                                 # tests / shutdown

Subscription model is fd-based, not block/callback-based. Spinel can't reliably round-trip blocks-as-values across module boundaries, so the registry stores opaque integer fds.

Two delivery modes:

  • subscribe(topic, fd) writes raw bytes to fd via sphttp_write_str. Suits SSE / log fan-out / anything that doesn't need WS framing.
  • subscribe_ws(topic, fd) wraps payloads in a WS TEXT frame via Tep::WebSocket::Driver.send_frame. The fd is expected to be an established WS connection.

publish returns the matched count (number of subscribers the payload was attempted to). Bad/closed fds silently fail at the underlying write; apps that need delivery confirmation track their own ack channel.

Typical WS handler shape

websocket "/chat_live" do |ws|
  on_open do |evt|
    Tep::Broadcast.subscribe_ws("room:lobby", ws.fd)
    ws.text("welcome")
  end
  on_close do |evt|
    Tep::Broadcast.unsubscribe_fd(ws.fd)
  end
end

# Anywhere else, when a message lands:
Tep::Broadcast.publish("room:lobby", "alice: hello")
# Every WS subscribed to "room:lobby" sees a TEXT frame "alice: hello"

unsubscribe_fd is what your on_close always wants — one call drops every topic the fd was subscribed to, no matter how many subscribe_ws calls happened over the session.

Cross-worker pub/sub (PG LISTEN/NOTIFY)

on_start do
  Tep::Broadcast.enable_pg_backend(ENV["PG_URL"], "tep_broadcast")
end

When enabled, every publish ALSO NOTIFYs the configured PG channel. Other workers subscribed to the same channel receive the message and dispatch to their local subscribers.

Receive side is app-driven in v1: apps drive Tep::Broadcast.poll_pg_once(timeout_ms) from their own loop. Once Tep::Server::Scheduled is reliable upstream (matz/spinel#641), a built-in listener fiber will land alongside enable_pg_backend.

Wire format on PG: length-prefixed "<topic_byte_length>:<topic><payload>" so topics + payloads with arbitrary chars (commas, colons, quotes, newlines) round- trip unambiguously. 8000-byte payload cap per PG default.

Authz

There's no per-topic authz callback in v1 — apps gate authorization at their subscribe call site. Apps that want a global hook compose it themselves:

def allowed?(req, topic)
  return false unless req.identity.may?(:read)
  topic.start_with?("public:") || req.identity.principal_id.start_with?("admin:")
end

websocket "/chat_live" do |ws|
  on_open do |evt|
    if allowed?(req, "room:lobby")
      Tep::Broadcast.subscribe_ws("room:lobby", ws.fd)
    end
  end
end

A central Tep::Broadcast.authorize { |topic, identity, mode| ... } hook is a planned follow-up.

Scope notes

  • Per-process registry. subscribe / publish are local. Cross-worker delivery requires the PG backend (above).
  • No durable subscriptions. A disconnected WS loses its subscription on close; reconnects subscribe fresh.
  • No backpressure. publish writes synchronously; a slow consumer's fd's send buffer fills, sphttp_write_str returns -1 silently, the broadcast counts as "matched" but effectively dropped. Apps with slow consumers handle backpressure upstream.

Clone this wiki locally