Skip to content

feat: bring livepeer runner Kafka events to parity with cloud-relay#969

Merged
emranemran merged 1 commit intomainfrom
feat/livepeer-runner-kafka-parity
Apr 21, 2026
Merged

feat: bring livepeer runner Kafka events to parity with cloud-relay#969
emranemran merged 1 commit intomainfrom
feat/livepeer-runner-kafka-parity

Conversation

@emranemran
Copy link
Copy Markdown
Contributor

Summary

Follow-up to #956. That PR added websocket_connected / websocket_disconnected from the livepeer fal wrapper, but the rest of the session lifecycle (pipeline_loaded, session_created, stream_started, stream_heartbeat, stream_stopped, playback_ready, error variants) was either not firing or firing with null identifiers in livepeer mode. Cloud-relay mode (fal_app.py path) already had these events working; this PR brings livepeer mode to the same shape.

Root causes (all in the runner, `src/scope/cloud/livepeer_app.py`)

  1. `FrameProcessor(...)` was constructed without `user_id` / `connection_id` / `session_id` / `connection_info`, so every event it emits (`stream_started`, `stream_heartbeat`, `stream_stopped`, etc.) carried nulls.
  2. `session.manifest_id` was never populated from `job_info.manifest_id`, so there was no identifier available that matched the wrapper's `websocket_connected.connection_id`.
  3. The pipeline/load body injection used the runner's random internal UUID as `connection_id` instead of the manifest_id, so `pipeline_loaded` didn't correlate.
  4. `session_created` / `session_closed` are only emitted from `webrtc.py`, which isn't on the livepeer code path — they have to be emitted explicitly.
  5. `connection_info` env vars (`NOMAD_DC`, `FAL_JOB_ID`, `FAL_LOG_LABELS`, etc.) weren't in the runner subprocess allowlist, so the runner couldn't rebuild the dict even if asked.

Changes

`src/scope/cloud/livepeer_app.py`

  • Add `manifest_id` / `session_id` / `connection_info` fields to `LivepeerSession`; populate them right after parsing `ScopeJobInfo`.
  • New `_build_connection_info()` helper that mirrors the shape built by `livepeer_fal_app.py` from env vars.
  • Pass `session_id`, `user_id`, `connection_id=manifest_id`, `connection_info` into `FrameProcessor` so every downstream event from `frame_processor.py` / `pipeline_processor.py` carries identifiers that join with `websocket_connected`.
  • Emit `session_created` right after `FrameProcessor.start()`, and `session_closed` right after `FrameProcessor.stop()`, using the same shape as `webrtc.py:731` and `:1150`.
  • Flip the pipeline/load body injection from `session.connection_id` to `session.manifest_id`, and pass `connection_info` too.

`src/scope/cloud/livepeer_fal_app.py`

  • Add `NOMAD_DC`, `FAL_JOB_ID`, `FAL_RUNNER_ID`, `FAL_LOG_LABELS`, `FAL_MACHINE_TYPE` to `env_allowlist` so the runner subprocess can reconstruct `connection_info`.

Test plan

  • `uv run ruff check src/` and `ruff format --check src/` pass (already verified)
  • `uv run daydream-scope` starts without import/init errors
  • `./test-cloud-connect.sh --skip-push` after CI build-cloud succeeds → exit 0 with CONNECTED
  • Manual UI test: connect to cloud, load pipeline (`longlive`), start stream ~30s, disconnect
  • ClickHouse query on `scope_cloud_events` filtered by `user_id` and `connection_id = <manifest_id>` shows all of: `websocket_connected`, `pipeline_load_start`, `pipeline_loaded`, `session_created`, `stream_started`, ≥2 `stream_heartbeat`, `stream_stopped`, `session_closed`, `websocket_disconnected` — all sharing the same connection_id
  • Regression check on cloud-relay path (`fal_app.py` deploy): no changes, events unchanged

Not in scope

  • Local-scope-side events (`session_created` from WebRTC path on user's laptop, `session_closed`, webrtc connection errors) still require the user's local env to have `KAFKA_*` configured; that's a config concern, not code.
  • `/api/v1/session/start` not being livepeer-compatible (`mcp_router.py:252` TODO) — untouched; only affects headless test flows.

🤖 Co-authored with Claude Code

PR #956 started publishing websocket_connected / websocket_disconnected
from the livepeer fal wrapper using the orchestrator-provided
manifest_id. But the rest of the session lifecycle
(pipeline_loaded, session_created, stream_started, stream_heartbeat,
stream_stopped, playback_ready, and the error variants) continued to
either not fire or fire with null user_id / connection_id in livepeer
mode because the runner built FrameProcessor without those fields and
never persisted manifest_id.

- Add manifest_id / session_id / connection_info fields to
  LivepeerSession and populate them right after parsing the
  job_info (src/scope/cloud/livepeer_app.py).
- Thread user_id, session_id, manifest_id (as connection_id), and
  connection_info into FrameProcessor so every event it emits
  matches the wrapper's websocket_connected.
- Explicitly publish session_created after FrameProcessor.start() and
  session_closed after stop(), mirroring the shape of the existing
  webrtc.py emissions — livepeer mode doesn't hit the WebRTC offer
  handler so this has to happen here.
- Swap the pipeline/load body injection to use manifest_id instead of
  the runner's random internal UUID, so pipeline_loaded correlates
  too; pass connection_info along.
- Allow NOMAD_DC / FAL_JOB_ID / FAL_RUNNER_ID / FAL_LOG_LABELS /
  FAL_MACHINE_TYPE through the runner subprocess env_allowlist so
  _build_connection_info() can reconstruct the same dict the wrapper
  uses.

After this, ClickHouse queries filtered by user_id or connection_id
(= manifest_id) see the full session lifecycle for livepeer mode, not
just the two wrapper-layer events.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: emranemran <emran.mah@gmail.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 20, 2026

Important

Review skipped

Auto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 090198ca-69ef-4e23-81bb-1d9b8a7d152f

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/livepeer-runner-kafka-parity

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 20, 2026

🚀 fal.ai Preview Deployment

App ID daydream/scope-pr-969--preview
WebSocket wss://fal.run/daydream/scope-pr-969--preview/ws
Commit 291c5c9

Livepeer Runner

App ID daydream/scope-livepeer-pr-969--preview
WebSocket wss://fal.run/daydream/scope-livepeer-pr-969--preview/ws
Auth private

Testing Livepeer Mode

SCOPE_CLOUD_MODE=livepeer SCOPE_CLOUD_APP_ID="daydream/scope-livepeer-pr-969--preview/ws" uv run daydream-scope

media_publishes: list[MediaPublish | None] = field(default_factory=list)
user_id: str | None = None
connection_id: str | None = None
manifest_id: str | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure manifest_id is the same as the connection_id above

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... oh dear it's not. Well that's a simple change then

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i made manifest id to be same as connection id and the old conenction id should be gone now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the manifest_id does not need to be passed in anymore if that's the case? Would simplify some of the ad hoc checks around here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's true. I'll clean this up in a follow up and file a ticket so i don't forget.

emranemran added a commit that referenced this pull request Apr 20, 2026
The previous iteration of this test false-positively passed. It polled
any <video> for playback, which always finds the local input preview
playing even when the browser↔local-scope WebRTC never completes and
no frames ever reach the cloud. The result: ClickHouse saw only
websocket_connected / pipeline_loaded / websocket_disconnected —
nothing that requires a real round-trip through the livepeer runner.

Two fixes:

1. Feed the browser a synthetic camera via
   --use-fake-device-for-media-stream (plus the Camera input toggle in
   the UI). This lets getUserMedia() succeed and a real WebRTC peer
   connection between browser and local scope complete end to end,
   which triggers CloudTrack._start() → LivepeerClient.start_media()
   and the "start_stream" trickle control message the runner needs.

2. Assert on the video inside the "Video Output" card, not any
   <video>. That element only renders when a remoteStream is set,
   so waiting on its visibility and currentTime > 0 is a true
   round-trip signal. After frames start flowing, idle 15s so
   stream_heartbeat events (~every 10s on the runner side) have
   a chance to fire.

Verified locally: test passes in ~2.8 min against scope-livepeer-emran
with passthrough. Full event set lands in ClickHouse when paired with
the parity PR (#969).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: emranemran <emran.mah@gmail.com>
emranemran added a commit that referenced this pull request Apr 20, 2026
The previous iteration of this test false-positively passed. It polled
any <video> for playback, which always finds the local input preview
playing even when the browser↔local-scope WebRTC never completes and
no frames ever reach the cloud. The result: ClickHouse saw only
websocket_connected / pipeline_loaded / websocket_disconnected —
nothing that requires a real round-trip through the livepeer runner.

Two fixes:

1. Feed the browser a synthetic camera via
   --use-fake-device-for-media-stream (plus the Camera input toggle in
   the UI). This lets getUserMedia() succeed and a real WebRTC peer
   connection between browser and local scope complete end to end,
   which triggers CloudTrack._start() → LivepeerClient.start_media()
   and the "start_stream" trickle control message the runner needs.

2. Assert on the video inside the "Video Output" card, not any
   <video>. That element only renders when a remoteStream is set,
   so waiting on its visibility and currentTime > 0 is a true
   round-trip signal. After frames start flowing, idle 15s so
   stream_heartbeat events (~every 10s on the runner side) have
   a chance to fire.

Verified locally: test passes in ~2.8 min against scope-livepeer-emran
with passthrough. Full event set lands in ClickHouse when paired with
the parity PR (#969).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: emranemran <emran.mah@gmail.com>
emranemran added a commit that referenced this pull request Apr 21, 2026
Squash of feat/test-cloud-connect-tooling (PR #962) onto this
branch so we can exercise the parity changes end-to-end via
Playwright + skill-driven "test cloud" flow.

This commit is a throwaway for verification — once the parity code
is signed off, revert this single commit before opening PR #969 for
review so the diff stays focused.

Squashed from:
- feat: add end-to-end cloud-connect test harness and skill
- fix(e2e): update cloud-streaming test for graph-mode UI redesign
- fix(e2e): actually exercise the livepeer trickle path
- feat: lead SKILL with Playwright + fix run-app.sh env var quoting
- docs: make the testing-livepeer-fal-deploy skill discoverable
- docs: route all "test cloud" prompts to the livepeer skill
- feat: skill asks for fal app+env, deploys, then runs Playwright

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: emranemran <emran.mah@gmail.com>
@emranemran emranemran force-pushed the feat/livepeer-runner-kafka-parity branch from 9f655d2 to 291c5c9 Compare April 21, 2026 02:38
@emranemran emranemran merged commit 905991a into main Apr 21, 2026
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants