-
Notifications
You must be signed in to change notification settings - Fork 1
Presence
Topic-keyed who's-here registry, agent-aware. Tracks (principal, session, topic) tuples with structured status; emits diff events when state changes so subscribers (typically WS clients via Tep::Broadcast) get real-time join/leave/status updates.
Battery 3 in docs/BATTERIES-DESIGN.md.
class Tep::PresenceEntry
attr_reader :topic # String
attr_reader :principal_id # String, opaque
attr_reader :kind # :human | :agent_for
attr_reader :agent_id # String, empty when kind == :human
attr_reader :fd # session-id surrogate
attr_reader :since # unix epoch sec
# Structured status (Tep::PresenceStatus inlined):
attr_accessor :status_state # :available | :busy | :blocked
attr_accessor :status_note # free text, ~140 char soft hint
attr_accessor :status_until # unix epoch sec; 0 = no expiry
endfd is the session-id surrogate: one WS connection = one fd =
one entry. A human in three browser tabs gets three entries
(kind :human, three fds, same principal_id). An agent acting
on behalf of that human gets its own entry (kind :agent_for,
agent_id populated, separate fd). That's the Phoenix.Presence
shape with the agentic kind+agent_id pair grafted on.
Tep::Presence.track(req, topic, fd)
Tep::Presence.untrack(topic, fd)
Tep::Presence.untrack_by_fd(fd) # WS-close hook shape
Tep::Presence.list(topic) # Array[PresenceEntry] (local)
Tep::Presence.count(topic)
Tep::Presence.count_humans(topic)
Tep::Presence.count_agents(topic)
Tep::Presence.set_status(topic, fd, state, note, until_ts)
Tep::Presence.clear_status(topic, fd)
Tep::Presence.find_entry(topic, fd)
Tep::Presence.sweep_expired_status # reset expired + emit "status" diffs
Tep::Presence.clear # tests / shutdown
# Cross-worker (PG mirror):
Tep::Presence.enable_pg_mirror(conninfo)
Tep::Presence.disable_pg_mirror
Tep::Presence.list_global(topic) # across all workers
Tep::Presence.count_global(topic)track extracts principal_id / kind / agent_id from
req.identity (set by Tep::Auth). kind is :human
when req.identity.human?, :agent_for when agent?.
Dedup: repeat calls for the same (topic, fd) leave the existing
entry. Apps can call track freely from before-filters /
reconnect paths.
Three states, no more:
| State | Meaning |
|---|---|
:available |
Default. Ready to handle work. UI: green dot. |
:busy |
Working on something, will respond eventually. UI: yellow. |
:blocked |
Waiting on something external — won't respond until unblocked. UI: red. |
The state vocabulary is the minimum that lets a collaborating
bot decide whether to pick up work. Anything finer (away vs
extended-away, dnd vs offline) is a UI concern that the note
field carries.
Examples mapping to the agentic use case:
# Bot blocked on rate limit:
Tep::Presence.set_status(topic, fd, :blocked,
"Claude API throttled", Time.now.to_i + 600)
# Human on vacation:
Tep::Presence.set_status(topic, fd, :blocked,
"vacation", Time.parse("2026-05-28").to_i)
# Bot working, no expiry:
Tep::Presence.set_status(topic, fd, :busy,
"summarizing thread #1234", 0)sweep_expired_status resets entries whose status_until has
passed back to :available and emits a "status" diff for each.
Apps call this periodically (from a before-filter,
periodic timer, or once Scheduled is reliable: a background
fiber).
Every track / untrack / set_status / sweep emits a diff event
via Tep::Broadcast on
Tep::Presence.diff_topic(topic) (which is "presence:<topic>").
Subscribers receive the diff as flat JSON:
{
"kind": "join" | "leave" | "status",
"topic": "room:lobby",
"principal": "user:42",
"ekind": "human" | "agent_for",
"agent_id": "summarizer-bot",
"fd": 5,
"since": 1716392400,
"state": "available" | "busy" | "blocked",
"note": "summarizing thread #1234",
"until_ts": 0
}Typical WS handler shape:
websocket "/chat_live" do |ws|
on_open do |evt|
Tep::Presence.track(req, "room:lobby", ws.fd)
Tep::Broadcast.subscribe_ws(
Tep::Presence.diff_topic("room:lobby"), ws.fd)
end
on_close do |evt|
Tep::Broadcast.unsubscribe_fd(ws.fd)
Tep::Presence.untrack_by_fd(ws.fd)
end
endon_start do
Tep::Presence.enable_pg_mirror(ENV["PG_URL"])
endWhen enabled, every track / untrack / set_status also writes
the canonical tep_presence row, tagged with this worker's
worker_id (PID + boot-epoch). list_global(topic) / count_global(topic)
SELECT across all workers; the local list / count
remain fast paths for per-worker queries.
For realtime cross-worker DIFFS, the PG-LISTEN backend in
Tep::Broadcast already handles that: every diff
fan-out goes through NOTIFY tep_broadcast, and every worker's
subscribers see the message.
A worker that crashes leaves rows behind until the next
same-worker_id start (the enable_pg_mirror defensively DELETEs
its own worker_id at boot). Auto-pruning via heartbeat is a
planned follow-up.