Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces graph-based pipeline execution with new schema and executor modules, refactors pipeline management to support multi-instance loading via instance keys, redesigns pipeline processing with multi-port queuing, removes WebSocket fal-connection-id tracking and WebRTC endpoints, consolidates Docker workflows, simplifies logging, and eliminates internal caching mechanisms. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as API Endpoint
participant GraphState as Graph State
participant GraphExec as Graph Executor
participant PipelineMgr as Pipeline Manager
participant FrameProc as Frame Processor
Client->>API: POST /pipelines/load (with graph)
API->>GraphState: set_api_graph(GraphConfig)
GraphState->>GraphState: Persist to disk
Client->>API: POST /process (with frames)
API->>FrameProc: put(frame)
API->>GraphExec: build_graph(GraphConfig, PipelineManager)
GraphExec->>PipelineMgr: load_pipelines([(node_id, pipeline_id, params)])
PipelineMgr->>PipelineMgr: Create instance keys, load per graph
GraphExec->>GraphExec: Wire queues between pipeline nodes
GraphExec->>GraphExec: Set sink processor and source queues
GraphExec->>FrameProc: _setup_graph(GraphRun)
FrameProc->>FrameProc: Initialize per-node processors with port queues
FrameProc->>FrameProc: Connect output_consumers for downstream routing
Client->>API: PUT /parameters (node_id specified)
API->>FrameProc: update_parameters(node_id, params)
FrameProc->>FrameProc: Route to specific node processor
Note over FrameProc: Graph execution flow<br/>(frames fan-out through queues)
FrameProc->>FrameProc: put() → _graph_source_queues
loop For each pipeline node
FrameProc->>FrameProc: process_chunk(per-port frames)
FrameProc->>FrameProc: output to downstream output_queues
end
Client->>API: GET /output
API->>FrameProc: get()
FrameProc->>FrameProc: last_processor() → sink output
API->>Client: Return frame from sink
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
…hanges Introduce server-side graph execution infrastructure including graph schema definitions, graph executor for processing node graphs, and graph state management. Update pipeline manager, frame processor, and pipeline processor to support graph-based execution. Add graph inputs/outputs to pipeline schemas. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Rafał Leszko <rafal@livepeer.org>
3a90176 to
f466988
Compare
There was a problem hiding this comment.
Actionable comments posted: 13
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/scope/server/frame_processor.py (2)
171-191:⚠️ Potential issue | 🔴 CriticalAllow graph-only sessions to reach graph setup.
start()still hard-fails whenself.pipeline_idsis empty, but in graph modepipeline_idsis only populated later by_setup_pipeline_chain_sync(). A request that sends just the newgraphfield never reaches graph construction.Suggested fix
- if not self.pipeline_ids: + if not self.pipeline_ids and not self.parameters.get("graph"): error_msg = "No pipeline IDs provided, cannot start" logger.error(error_msg) self.running = False # Publish error for startup failure publish_event(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/scope/server/frame_processor.py` around lines 171 - 191, The startup currently aborts when self.pipeline_ids is empty inside start(), which blocks graph-only sessions because pipeline_ids are populated later by _setup_pipeline_chain_sync(); change the check in start() to only treat missing pipeline_ids as a fatal error when not running in graph mode (e.g., check self.mode != "graph" or presence of self.graph), so graph-mode sessions proceed to call _setup_pipeline_chain_sync() to build pipeline_ids; keep the existing error logging/publish_event branch for true local-mode failures (referencing start(), self.pipeline_ids, and _setup_pipeline_chain_sync()).
893-918:⚠️ Potential issue | 🟠 MajorRoute input-source frames through the graph source fan-out too.
put()already uses_graph_source_queues, but the input-source receiver still writes only tofirst_processor.input_queue. Spout/NDI/camera input will therefore bypass explicit source wiring and miss multi-source graphs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/scope/server/frame_processor.py` around lines 893 - 918, The input-source branch is bypassing the graph fan-out by writing directly to first_processor.input_queue; replace that direct write with the existing put() path that uses _graph_source_queues so spout/NDI/camera frames reach all wired sources. Specifically, after preparing frame_tensor (the same tensor you currently unsqueeze and would send), call self.put(frame_tensor) instead of first_processor.input_queue.put_nowait(...); remove or adapt the queue.Full handling around first_processor.input_queue since put() handles fan-out/queue semantics via _graph_source_queues.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.github/workflows/docker-build.yml:
- Around line 117-123: The workflow currently injects the user-controlled
github.head_ref directly into an inline shell expression (used to build
BRANCH_TAG) which enables command injection; fix it by first assigning
github.head_ref to an environment variable (e.g., export BRANCH_REF="${{
github.head_ref }}") and then use that safe variable in the shell script (e.g.,
BRANCH_TAG=$(printf '%s' "$BRANCH_REF" | sed 's/\//-/g')), ensuring all
expansions are quoted; update the places referencing BRANCH_TAG and avoid using
github.head_ref directly in the inline script or unquoted expansions.
In `@app/package.json`:
- Line 4: The package.json "version" was incorrectly downgraded from 0.1.7 to
0.1.6; update the "version" field in app/package.json to a higher patch than the
already released 0.1.7 (e.g., 0.1.8) so it is not a downgrade and ensure it is
kept synchronized with the Python package's version in pyproject.toml; change
the "version" string value accordingly.
In `@pyproject.toml`:
- Line 3: The package version was downgraded to version = "0.1.6" which breaks
semantic versioning; verify whether 0.1.7 was already published and if so bump
the version in pyproject.toml to the next appropriate release (e.g., "0.1.8" or
higher) and update any release notes/changelog accordingly so the published
package version monotonically increases.
In `@src/scope/core/pipelines/base_schema.py`:
- Around line 248-250: BasePipelineConfig currently exposes implicit graph ports
by default: change the ClassVar definitions inputs and outputs on
BasePipelineConfig from ["video"] to empty lists (e.g., ClassVar[list[str]] =
[]) so ports are opt-in; then update each pipeline config that should expose a
video port (ControllerVisualizerConfig, GrayConfig, OpticalFlowConfig,
PassthroughConfig, RIFEConfig, ScribbleConfig, VideoDepthAnythingConfig) to
explicitly declare their own inputs and outputs = ["video"]; ensure
get_schema_with_metadata() continues to read these per-class attributes so only
configs that opt in publish graph ports.
In `@src/scope/server/frame_processor.py`:
- Around line 946-955: The current code only runs
GraphConfig.model_validate(...) which checks shape but not graph structure;
after obtaining api_graph (either via GraphConfig.model_validate(...) or via
_build_linear_graph(...)) call api_graph.validate_structure() and raise or
propagate any validation errors so structurally invalid graphs (duplicate node
IDs, dangling edges, etc.) are rejected before _setup_graph(...) or build_graph
runs; update the block that assigns api_graph to ensure validate_structure() is
invoked and its exceptions are forwarded to fail fast.
- Around line 627-633: The current branch treats missing or misspelled node_id
as a broadcast; change the logic so broadcast only occurs when node_id is
explicitly None: if node_id is None iterate self.pipeline_processors and call
update_parameters(parameters); else if node_id exists in
self._processors_by_node_id call that processor's update_parameters(parameters);
otherwise do not broadcast—log or raise a clear error/warning (e.g., mentioning
node_id) so stale/unknown IDs do not turn into global updates; reference the
variables node_id, self._processors_by_node_id, self.pipeline_processors and the
method update_parameters to locate where to change the conditional flow.
In `@src/scope/server/graph_executor.py`:
- Around line 68-73: The code populates stream_queues keyed by (to_node,
to_port) which silently overwrites a previous stream edge when multiple edges
fan into the same input; update the logic in the block that iterates graph.edges
(and the analogous block at lines ~104-109) to detect duplicate stream targets
and either (A) raise a clear validation error (e.g., ValueError) when more than
one edge has the same (e.to_node, e.to_port) to make fan-in explicit and
rejected, or (B) change the keying to be per-edge (use a unique edge id or the
edge object as the key) and implement an explicit merge/dispatcher for multiple
queues into the single destination port; reference the symbols stream_queues,
graph.edges, e.kind, e.to_node, e.to_port and DEFAULT_INPUT_QUEUE_MAXSIZE when
making the change so the fix is applied in both places.
- Around line 154-163: The loop that wires throttlers stops after the first
match because of the trailing break; remove the break so every edge with e.kind
== "stream" and e.from_port == "video" sets
producer.throttler.set_next_processor(consumer) for all matching
producer/consumer pairs (loop over graph.edges) and optionally guard against
overwriting an existing downstream by checking the producer.throttler state
(e.g., only call set_next_processor if not already set) before assigning.
In `@src/scope/server/graph_schema.py`:
- Around line 116-158: validate_structure currently omits cycle detection so
cyclic graphs pass validation; add a DAG check (e.g., Kahn's algorithm or DFS
back-edge detection) inside validate_structure using the existing node id set
and edges (use self.nodes, self.edges, edge.from_node, edge.to_node, node_ids)
and if a cycle is found append an error like "Graph contains cycle: <describe
nodes involved>" to errors and return it along with the other checks; ensure the
algorithm uses node_ids/node_id_set for lookup and is efficient for large
graphs.
In `@src/scope/server/graph_state.py`:
- Around line 80-89: set_api_graph() and clear_api_graph() currently release
_lock before performing file I/O which allows get_api_graph() to race and
observe or recreate stale files; fix this by moving the disk mutations
(_write_to_file(graph) in set_api_graph and the file deletion call used in
clear_api_graph) inside the same critical section guarded by _lock so the
in-memory update and corresponding file mutation occur atomically; ensure any
call to _write_to_file, _delete_file (or similar file helpers) occurs while
holding _lock and keep logging after releasing the lock if desired.
In `@src/scope/server/pipeline_manager.py`:
- Around line 487-491: The logger.info call uses zip(node_ids, pipeline_ids)
without an explicit strict parameter; update that call to pass strict=True
(i.e., zip(node_ids, pipeline_ids, strict=True)) to satisfy the B905 linter rule
and ensure mismatched lengths raise immediately—modify the expression inside the
f-string in the function that builds node_ids/pipeline_ids from pipelines (the
logger.info line referencing node_ids, pipeline_ids, pipelines).
In `@src/scope/server/pipeline_processor.py`:
- Around line 513-550: The code is currently duplicating non-video tensor
outputs into parameters_queue even after they've been enqueued to downstream
output_queues; update the logic that builds extra_params so it skips adding any
port that was already delivered via output_queues (i.e., where
self.output_queues.get(port) exists and self.output_consumers.get(port) is
true), and only add truly persistent parameter outputs to parameters_queue (also
ensure tensors added to parameters_queue are detached/moved to CPU to avoid
keeping GPU memory). Apply the same guard to the second occurrence mentioned
(the block around the 555-567 range) and reference output_dict, output_queues,
output_consumers, parameters_queue, extra_params, and pipeline_id when making
the change.
- Around line 540-549: The fan-out loop in pipeline_processor.py currently
breaks out of the per-port iteration on a queue.Full which prevents later queues
from getting the frame; change the exception handling in the inner loop that
iterates over queues (where q.put_nowait(frame if q is queues[0] else
frame.clone())) so that on queue.Full you log (as you do for port == "video")
but do NOT break — simply continue to the next queue so each downstream consumer
is attempted independently; keep using frame.clone() for non-first queues and
preserve the existing logging via logger.info(f"Output queue full for
{self.pipeline_id}, dropping frame").
---
Outside diff comments:
In `@src/scope/server/frame_processor.py`:
- Around line 171-191: The startup currently aborts when self.pipeline_ids is
empty inside start(), which blocks graph-only sessions because pipeline_ids are
populated later by _setup_pipeline_chain_sync(); change the check in start() to
only treat missing pipeline_ids as a fatal error when not running in graph mode
(e.g., check self.mode != "graph" or presence of self.graph), so graph-mode
sessions proceed to call _setup_pipeline_chain_sync() to build pipeline_ids;
keep the existing error logging/publish_event branch for true local-mode
failures (referencing start(), self.pipeline_ids, and
_setup_pipeline_chain_sync()).
- Around line 893-918: The input-source branch is bypassing the graph fan-out by
writing directly to first_processor.input_queue; replace that direct write with
the existing put() path that uses _graph_source_queues so spout/NDI/camera
frames reach all wired sources. Specifically, after preparing frame_tensor (the
same tensor you currently unsqueeze and would send), call self.put(frame_tensor)
instead of first_processor.input_queue.put_nowait(...); remove or adapt the
queue.Full handling around first_processor.input_queue since put() handles
fan-out/queue semantics via _graph_source_queues.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c9173233-66dc-428e-b7f0-e78eabaea0b6
⛔ Files ignored due to path filters (2)
app/package-lock.jsonis excluded by!**/package-lock.jsonuv.lockis excluded by!**/*.lock
📒 Files selected for processing (22)
.github/workflows/docker-build-image.yml.github/workflows/docker-build.ymlapp/package.jsonpyproject.tomlsrc/scope/cloud/fal_app.pysrc/scope/core/pipelines/base_schema.pysrc/scope/core/pipelines/krea_realtime_video/schema.pysrc/scope/core/pipelines/longlive/schema.pysrc/scope/core/pipelines/memflow/schema.pysrc/scope/core/pipelines/reward_forcing/schema.pysrc/scope/core/pipelines/streamdiffusionv2/schema.pysrc/scope/server/app.pysrc/scope/server/cloud_connection.pysrc/scope/server/frame_processor.pysrc/scope/server/graph_executor.pysrc/scope/server/graph_schema.pysrc/scope/server/graph_state.pysrc/scope/server/logs_config.pysrc/scope/server/pipeline_manager.pysrc/scope/server/pipeline_processor.pysrc/scope/server/schema.pytests/test_plugin_api.py
💤 Files with no reviewable changes (5)
- src/scope/cloud/fal_app.py
- tests/test_plugin_api.py
- .github/workflows/docker-build-image.yml
- src/scope/server/logs_config.py
- src/scope/server/cloud_connection.py
.github/workflows/docker-build.yml
Outdated
| elif [ "${{ github.event_name }}" == "pull_request" ]; then | ||
| # Pull request: use sanitized branch name + short SHA | ||
| BRANCH_TAG=$(echo "${{ github.head_ref }}" | sed 's/\//-/g') | ||
| SHORT_SHA=$(echo "${{ github.event.pull_request.head.sha }}" | cut -c1-7) | ||
| TAGS="$DOCKERHUB:$BRANCH_TAG$SUFFIX,$DOCKERHUB:$SHORT_SHA$SUFFIX" | ||
| TAGS="$TAGS,$GHCR:$BRANCH_TAG$SUFFIX,$GHCR:$SHORT_SHA$SUFFIX" | ||
| fi |
There was a problem hiding this comment.
Script injection vulnerability via github.head_ref.
github.head_ref is user-controlled and can contain shell metacharacters. Using it directly in an inline script allows command injection attacks. For example, a malicious branch name like `whoami` or $(id) could execute arbitrary commands in the workflow.
🔒 Proposed fix: Pass through an environment variable
- name: Set image tag
id: tag
+ env:
+ HEAD_REF: ${{ github.head_ref }}
run: |
DOCKERHUB="daydreamlive/scope"
GHCR="ghcr.io/daydreamlive/scope"
SUFFIX="${{ matrix.suffix }}"
if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then
# Manual dispatch: use provided version + latest
VERSION="${{ github.event.inputs.version }}"
TAGS="$DOCKERHUB:$VERSION$SUFFIX,$DOCKERHUB:latest$SUFFIX"
TAGS="$TAGS,$GHCR:$VERSION$SUFFIX,$GHCR:latest$SUFFIX"
elif [[ "${{ github.ref }}" == refs/tags/* ]]; then
# Git tag: extract version (e.g., v0.1.0 -> 0.1.0) + latest
VERSION_TAG="${{ github.ref_name }}"
VERSION="${VERSION_TAG#v}" # Remove 'v' prefix if present
TAGS="$DOCKERHUB:$VERSION$SUFFIX,$DOCKERHUB:latest$SUFFIX"
TAGS="$TAGS,$GHCR:$VERSION$SUFFIX,$GHCR:latest$SUFFIX"
elif [ "${{ github.ref }}" == "refs/heads/main" ]; then
# Main branch
SHORT_SHA=$(echo "${{ github.sha }}" | cut -c1-7)
TAGS="$DOCKERHUB:main$SUFFIX,$DOCKERHUB:$SHORT_SHA$SUFFIX"
TAGS="$TAGS,$GHCR:main$SUFFIX,$GHCR:$SHORT_SHA$SUFFIX"
elif [ "${{ github.ref }}" == "refs/heads/runpod-serverless" ]; then
# Runpod serverless branch
TAGS="$DOCKERHUB:runpod-serverless$SUFFIX"
TAGS="$TAGS,$GHCR:runpod-serverless$SUFFIX"
elif [ "${{ github.event_name }}" == "pull_request" ]; then
# Pull request: use sanitized branch name + short SHA
- BRANCH_TAG=$(echo "${{ github.head_ref }}" | sed 's/\//-/g')
+ BRANCH_TAG=$(echo "$HEAD_REF" | sed 's/\//-/g')
SHORT_SHA=$(echo "${{ github.event.pull_request.head.sha }}" | cut -c1-7)
TAGS="$DOCKERHUB:$BRANCH_TAG$SUFFIX,$DOCKERHUB:$SHORT_SHA$SUFFIX"
TAGS="$TAGS,$GHCR:$BRANCH_TAG$SUFFIX,$GHCR:$SHORT_SHA$SUFFIX"
fi
echo "tags=$TAGS" >> $GITHUB_OUTPUTEnvironment variables are expanded by the runner before shell execution, preventing injection. See GitHub's security hardening guide for details.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.github/workflows/docker-build.yml around lines 117 - 123, The workflow
currently injects the user-controlled github.head_ref directly into an inline
shell expression (used to build BRANCH_TAG) which enables command injection; fix
it by first assigning github.head_ref to an environment variable (e.g., export
BRANCH_REF="${{ github.head_ref }}") and then use that safe variable in the
shell script (e.g., BRANCH_TAG=$(printf '%s' "$BRANCH_REF" | sed 's/\//-/g')),
ensuring all expansions are quoted; update the places referencing BRANCH_TAG and
avoid using github.head_ref directly in the inline script or unquoted
expansions.
app/package.json
Outdated
| "name": "daydream-scope-desktop", | ||
| "productName": "Daydream Scope", | ||
| "version": "0.1.7", | ||
| "version": "0.1.6", |
There was a problem hiding this comment.
Version downgrade detected: 0.1.7 → 0.1.6.
Same issue as in pyproject.toml. The desktop app version should remain synchronized with the Python package, but both should increase rather than decrease. If 0.1.7 was already released/distributed, this should be bumped to 0.1.8 or higher.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/package.json` at line 4, The package.json "version" was incorrectly
downgraded from 0.1.7 to 0.1.6; update the "version" field in app/package.json
to a higher patch than the already released 0.1.7 (e.g., 0.1.8) so it is not a
downgrade and ensure it is kept synchronized with the Python package's version
in pyproject.toml; change the "version" string value accordingly.
pyproject.toml
Outdated
| [project] | ||
| name = "daydream-scope" | ||
| version = "0.1.7" | ||
| version = "0.1.6" |
There was a problem hiding this comment.
Version downgrade detected: 0.1.7 → 0.1.6.
Decreasing the version number violates semantic versioning conventions and can cause practical issues:
- Package managers expect monotonically increasing versions; users on 0.1.7 won't receive this as an upgrade.
- PyPI rejects re-uploads of existing versions but also won't treat 0.1.6 as newer than 0.1.7.
If 0.1.7 was already released, this should be 0.1.8 (or higher). If 0.1.7 was never published, please verify this is intentional.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pyproject.toml` at line 3, The package version was downgraded to version =
"0.1.6" which breaks semantic versioning; verify whether 0.1.7 was already
published and if so bump the version in pyproject.toml to the next appropriate
release (e.g., "0.1.8" or higher) and update any release notes/changelog
accordingly so the published package version monotonically increases.
| # Graph port declaration: which stream ports this pipeline exposes | ||
| inputs: ClassVar[list[str]] = ["video"] | ||
| outputs: ClassVar[list[str]] = ["video"] |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
python - <<'PY'
import ast
from pathlib import Path
root = Path("src/scope/core/pipelines")
for path in root.rglob("*.py"):
try:
tree = ast.parse(path.read_text())
except SyntaxError as exc:
print(f"Syntax error in {path}: {exc}")
continue
for node in tree.body:
if not isinstance(node, ast.ClassDef):
continue
base_names = set()
for base in node.bases:
if isinstance(base, ast.Name):
base_names.add(base.id)
elif isinstance(base, ast.Attribute):
base_names.add(base.attr)
if "BasePipelineConfig" not in base_names:
continue
assigned = set()
for stmt in node.body:
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
if isinstance(target, ast.Name):
assigned.add(target.id)
elif isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, ast.Name):
assigned.add(stmt.target.id)
if "inputs" not in assigned or "outputs" not in assigned:
print(f"{path}:{node.lineno}: {node.name} missing explicit graph ports")
PYRepository: daydreamlive/scope
Length of output: 735
Make graph ports opt-in instead of implicit base defaults.
Seven pipeline configs inherit the new video input/output defaults from BasePipelineConfig without explicitly declaring them:
ControllerVisualizerConfig(controller_viz/schema.py:6)GrayConfig(gray/schema.py:4)OpticalFlowConfig(optical_flow/schema.py:8)PassthroughConfig(passthrough/schema.py:4)RIFEConfig(rife/schema.py:5)ScribbleConfig(scribble/schema.py:5)VideoDepthAnythingConfig(video_depth_anything/schema.py:5)
This means get_schema_with_metadata() now publishes video ports for all of them, even though they never opted in. Defaulting to empty lists and requiring explicit port declaration would keep the metadata honest and prevent accidental exposure of graph interfaces for configs that haven't been audited for graph mode.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/scope/core/pipelines/base_schema.py` around lines 248 - 250,
BasePipelineConfig currently exposes implicit graph ports by default: change the
ClassVar definitions inputs and outputs on BasePipelineConfig from ["video"] to
empty lists (e.g., ClassVar[list[str]] = []) so ports are opt-in; then update
each pipeline config that should expose a video port
(ControllerVisualizerConfig, GrayConfig, OpticalFlowConfig, PassthroughConfig,
RIFEConfig, ScribbleConfig, VideoDepthAnythingConfig) to explicitly declare
their own inputs and outputs = ["video"]; ensure get_schema_with_metadata()
continues to read these per-class attributes so only configs that opt in publish
graph ports.
| # Route to specific node or broadcast to all pipeline processors | ||
| node_id = parameters.pop("node_id", None) | ||
| if node_id and node_id in self._processors_by_node_id: | ||
| self._processors_by_node_id[node_id].update_parameters(parameters) | ||
| else: | ||
| for processor in self.pipeline_processors: | ||
| processor.update_parameters(parameters) |
There was a problem hiding this comment.
Unknown node_id should not fall back to broadcast.
The schema says broadcast happens when node_id is None; this branch also broadcasts on typos or stale node IDs. That turns a targeted update into a global one.
Suggested fix
- if node_id and node_id in self._processors_by_node_id:
- self._processors_by_node_id[node_id].update_parameters(parameters)
- else:
+ if node_id is None:
for processor in self.pipeline_processors:
processor.update_parameters(parameters)
+ elif node_id in self._processors_by_node_id:
+ self._processors_by_node_id[node_id].update_parameters(parameters)
+ else:
+ logger.warning("Ignoring update for unknown node_id '%s'", node_id)
+ return False📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Route to specific node or broadcast to all pipeline processors | |
| node_id = parameters.pop("node_id", None) | |
| if node_id and node_id in self._processors_by_node_id: | |
| self._processors_by_node_id[node_id].update_parameters(parameters) | |
| else: | |
| for processor in self.pipeline_processors: | |
| processor.update_parameters(parameters) | |
| # Route to specific node or broadcast to all pipeline processors | |
| node_id = parameters.pop("node_id", None) | |
| if node_id is None: | |
| for processor in self.pipeline_processors: | |
| processor.update_parameters(parameters) | |
| elif node_id in self._processors_by_node_id: | |
| self._processors_by_node_id[node_id].update_parameters(parameters) | |
| else: | |
| logger.warning("Ignoring update for unknown node_id '%s'", node_id) | |
| return False |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/scope/server/frame_processor.py` around lines 627 - 633, The current
branch treats missing or misspelled node_id as a broadcast; change the logic so
broadcast only occurs when node_id is explicitly None: if node_id is None
iterate self.pipeline_processors and call update_parameters(parameters); else if
node_id exists in self._processors_by_node_id call that processor's
update_parameters(parameters); otherwise do not broadcast—log or raise a clear
error/warning (e.g., mentioning node_id) so stale/unknown IDs do not turn into
global updates; reference the variables node_id, self._processors_by_node_id,
self.pipeline_processors and the method update_parameters to locate where to
change the conditional flow.
| def validate_structure(self) -> list[str]: | ||
| """Validate the graph structure and return a list of error messages. | ||
|
|
||
| Checks: | ||
| - No duplicate node IDs | ||
| - At least one source and one sink node | ||
| - Pipeline nodes have a pipeline_id | ||
| - All edge references point to existing nodes | ||
| """ | ||
| errors: list[str] = [] | ||
| node_ids = [n.id for n in self.nodes] | ||
|
|
||
| # Check for duplicate node IDs | ||
| seen: set[str] = set() | ||
| for nid in node_ids: | ||
| if nid in seen: | ||
| errors.append(f"Duplicate node ID: '{nid}'") | ||
| seen.add(nid) | ||
|
|
||
| # At least one source and one sink | ||
| if not self.get_source_node_ids(): | ||
| errors.append("Graph must have at least one source node") | ||
| if not self.get_sink_node_ids(): | ||
| errors.append("Graph must have at least one sink node") | ||
|
|
||
| # Pipeline nodes must have pipeline_id | ||
| for node in self.nodes: | ||
| if node.type == "pipeline" and not node.pipeline_id: | ||
| errors.append(f"Pipeline node '{node.id}' is missing pipeline_id") | ||
|
|
||
| # Edge references must point to existing nodes | ||
| node_id_set = set(node_ids) | ||
| for edge in self.edges: | ||
| if edge.from_node not in node_id_set: | ||
| errors.append( | ||
| f"Edge references non-existent source node: '{edge.from_node}'" | ||
| ) | ||
| if edge.to_node not in node_id_set: | ||
| errors.append( | ||
| f"Edge references non-existent target node: '{edge.to_node}'" | ||
| ) | ||
|
|
||
| return errors |
There was a problem hiding this comment.
validate_structure() still accepts cyclic graphs.
The module and docs describe a DAG, but this validator never checks for cycles. Because processors wait for all upstream queues before running, a cyclic graph can deadlock the stream at startup instead of being rejected up front.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/scope/server/graph_schema.py` around lines 116 - 158, validate_structure
currently omits cycle detection so cyclic graphs pass validation; add a DAG
check (e.g., Kahn's algorithm or DFS back-edge detection) inside
validate_structure using the existing node id set and edges (use self.nodes,
self.edges, edge.from_node, edge.to_node, node_ids) and if a cycle is found
append an error like "Graph contains cycle: <describe nodes involved>" to errors
and return it along with the other checks; ensure the algorithm uses
node_ids/node_id_set for lookup and is efficient for large graphs.
src/scope/server/graph_state.py
Outdated
| def set_api_graph(graph: GraphConfig) -> None: | ||
| """Store a graph config set via the API (memory + file).""" | ||
| with _lock: | ||
| global _graph_config | ||
| _graph_config = graph | ||
| # Persist outside the lock to avoid holding it during I/O | ||
| _write_to_file(graph) | ||
| logger.info( | ||
| f"API graph set with {len(graph.nodes)} nodes and {len(graph.edges)} edges" | ||
| ) |
There was a problem hiding this comment.
Keep the file mutation in the same critical section as the in-memory update.
set_api_graph() and clear_api_graph() release _lock before touching disk. get_api_graph() can interleave in that window and either repopulate memory from a file that clear_api_graph() is about to delete, or observe a stale file recreated after a later clear.
Also applies to: 92-98
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/scope/server/graph_state.py` around lines 80 - 89, set_api_graph() and
clear_api_graph() currently release _lock before performing file I/O which
allows get_api_graph() to race and observe or recreate stale files; fix this by
moving the disk mutations (_write_to_file(graph) in set_api_graph and the file
deletion call used in clear_api_graph) inside the same critical section guarded
by _lock so the in-memory update and corresponding file mutation occur
atomically; ensure any call to _write_to_file, _delete_file (or similar file
helpers) occurs while holding _lock and keep logging after releasing the lock if
desired.
| node_ids = [node_id for node_id, _, _ in pipelines] | ||
| pipeline_ids = [pid for _, pid, _ in pipelines] | ||
| logger.info( | ||
| f"Loading {len(pipelines)} pipeline(s): {list(zip(node_ids, pipeline_ids))}" | ||
| ) |
There was a problem hiding this comment.
Fix linter error: zip() without explicit strict= parameter.
The pipeline failure indicates zip() should have an explicit strict= parameter per B905.
Proposed fix
logger.info(
- f"Loading {len(pipelines)} pipeline(s): {list(zip(node_ids, pipeline_ids))}"
+ f"Loading {len(pipelines)} pipeline(s): {list(zip(node_ids, pipeline_ids, strict=True))}"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| node_ids = [node_id for node_id, _, _ in pipelines] | |
| pipeline_ids = [pid for _, pid, _ in pipelines] | |
| logger.info( | |
| f"Loading {len(pipelines)} pipeline(s): {list(zip(node_ids, pipeline_ids))}" | |
| ) | |
| node_ids = [node_id for node_id, _, _ in pipelines] | |
| pipeline_ids = [pid for _, pid, _ in pipelines] | |
| logger.info( | |
| f"Loading {len(pipelines)} pipeline(s): {list(zip(node_ids, pipeline_ids, strict=True))}" | |
| ) |
🧰 Tools
🪛 GitHub Actions: Lint
[error] 490-490: Ruff check failed: B905 'zip()' without an explicit 'strict=' parameter. Add explicit value for parameter 'strict='.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/scope/server/pipeline_manager.py` around lines 487 - 491, The logger.info
call uses zip(node_ids, pipeline_ids) without an explicit strict parameter;
update that call to pass strict=True (i.e., zip(node_ids, pipeline_ids,
strict=True)) to satisfy the B905 linter rule and ensure mismatched lengths
raise immediately—modify the expression inside the f-string in the function that
builds node_ids/pipeline_ids from pipelines (the logger.info line referencing
node_ids, pipeline_ids, pipelines).
| for port, value in output_dict.items(): | ||
| if value is None or not isinstance(value, torch.Tensor): | ||
| continue | ||
| queues = self.output_queues.get(port) | ||
| if not queues: | ||
| continue | ||
| # Resize output queues to meet target max size. | ||
| # Only resize when there are downstream pipeline consumers; | ||
| # the sink has no consumers so its queues stay fixed for | ||
| # frame_processor.get(). | ||
| if self.output_consumers.get(port): | ||
| target_size = value.shape[0] * OUTPUT_QUEUE_MAX_SIZE_FACTOR | ||
| self._resize_output_queue(port, target_size) | ||
| # Re-read queues after potential resize – _resize_output_queue | ||
| # may replace self.output_queues[port] with a new list. | ||
| queues = self.output_queues.get(port) | ||
| if not queues: | ||
| continue | ||
| if value.dtype != torch.uint8: | ||
| value = ( | ||
| (value * 255.0) | ||
| .clamp(0, 255) | ||
| .to(dtype=torch.uint8) | ||
| .contiguous() | ||
| .detach() | ||
| ) | ||
| frames = [value[i].unsqueeze(0) for i in range(value.shape[0])] | ||
| for frame in frames: | ||
| for q in queues: | ||
| try: | ||
| q.put_nowait(frame if q is queues[0] else frame.clone()) | ||
| except queue.Full: | ||
| if port == "video": | ||
| logger.info( | ||
| f"Output queue full for {self.pipeline_id}, dropping frame" | ||
| ) | ||
| break | ||
|
|
There was a problem hiding this comment.
Don't mirror streamed tensor outputs into parameters_queue.
extra_params forwards every non-video output to every downstream processor, even when that same port was already delivered via output_queues above. That duplicates batch tensors into persistent parameter state, routes ports to unrelated consumers, and keeps stale GPU tensors alive after the chunk has been consumed.
Also applies to: 555-567
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/scope/server/pipeline_processor.py` around lines 513 - 550, The code is
currently duplicating non-video tensor outputs into parameters_queue even after
they've been enqueued to downstream output_queues; update the logic that builds
extra_params so it skips adding any port that was already delivered via
output_queues (i.e., where self.output_queues.get(port) exists and
self.output_consumers.get(port) is true), and only add truly persistent
parameter outputs to parameters_queue (also ensure tensors added to
parameters_queue are detached/moved to CPU to avoid keeping GPU memory). Apply
the same guard to the second occurrence mentioned (the block around the 555-567
range) and reference output_dict, output_queues, output_consumers,
parameters_queue, extra_params, and pipeline_id when making the change.
| for frame in frames: | ||
| for q in queues: | ||
| try: | ||
| q.put_nowait(frame if q is queues[0] else frame.clone()) | ||
| except queue.Full: | ||
| if port == "video": | ||
| logger.info( | ||
| f"Output queue full for {self.pipeline_id}, dropping frame" | ||
| ) | ||
| break |
There was a problem hiding this comment.
Keep fanning out even when one downstream queue is full.
On Line 549, break exits the per-port fan-out loop, so one slow consumer prevents every later queue on that port from receiving the frame. In a branched graph, that turns localized backpressure into dropped frames for unrelated consumers.
Suggested fix
for frame in frames:
for q in queues:
try:
q.put_nowait(frame if q is queues[0] else frame.clone())
except queue.Full:
if port == "video":
logger.info(
f"Output queue full for {self.pipeline_id}, dropping frame"
)
- break
+ continue📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for frame in frames: | |
| for q in queues: | |
| try: | |
| q.put_nowait(frame if q is queues[0] else frame.clone()) | |
| except queue.Full: | |
| if port == "video": | |
| logger.info( | |
| f"Output queue full for {self.pipeline_id}, dropping frame" | |
| ) | |
| break | |
| for frame in frames: | |
| for q in queues: | |
| try: | |
| q.put_nowait(frame if q is queues[0] else frame.clone()) | |
| except queue.Full: | |
| if port == "video": | |
| logger.info( | |
| f"Output queue full for {self.pipeline_id}, dropping frame" | |
| ) | |
| continue |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/scope/server/pipeline_processor.py` around lines 540 - 549, The fan-out
loop in pipeline_processor.py currently breaks out of the per-port iteration on
a queue.Full which prevents later queues from getting the frame; change the
exception handling in the inner loop that iterates over queues (where
q.put_nowait(frame if q is queues[0] else frame.clone())) so that on queue.Full
you log (as you do for port == "video") but do NOT break — simply continue to
the next queue so each downstream consumer is attempted independently; keep
using frame.clone() for non-first queues and preserve the existing logging via
logger.info(f"Output queue full for {self.pipeline_id}, dropping frame").
🚀 fal.ai Preview Deployment
TestingConnect to this preview deployment by running this on your branch: 🧪 E2E tests will run automatically against this deployment. |
✅ E2E Tests passed
Test ArtifactsCheck the workflow run for screenshots. |
4e46b52 to
39df79a
Compare
Squashed follow-up fixes and improvements: - Fix graph executor pipeline lookup to use node ID instead of pipeline ID - Fix review issues in pipeline processor and frame processor - Validate graph edge ports and clean up _load_events on unload - Acquire input_queue_lock when resizing graph queues - Remove pipeline throttler and add graph validation - Move VACE input video routing from runtime to graph construction - Remove redundant video-only normalization from pipeline_processor - Reject duplicate stream edges targeting the same input port - Fix unknown node_id falling back to broadcast in update_parameters - Simplify graph_executor.py build_graph wiring - Remove unused graph_state.py module - Remove old pipeline chaining remnants - Move build_linear_graph from frame_processor.py to graph_schema.py - Fix queue resize - Refactor pipeline reconciliation logic for readability and add unit tests - Fix ruff B905 lint error: add strict=True to zip() call Signed-off-by: Rafał Leszko <rafal@livepeer.org>
39df79a to
8a9c6ec
Compare
ryanontheinside
left a comment
There was a problem hiding this comment.
lgtm. i couldnt break it.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Rafał Leszko <rafal@livepeer.org>
Summary
video,conditioning_video), rather than being limited to a single input and single outputconditioning_video) on the graph node, replacing the previous special-case runtime logic that handled VACE mode separatelyAPI changes
1. Pipeline load
Each pipeline can now have a
node_idand its ownload_params. Top-levelpipeline_idsandload_paramscan be null when using the new format.Example request body:
{ "pipelines": [ { "node_id": "pipeline", "pipeline_id": "passthrough", "load_params": {"height": 320, "width": 576} }, { "node_id": "pipeline_1", "pipeline_id": "longlive", "load_params": {"height": 320, "width": 576} } ], "pipeline_ids": null, "load_params": null, "connection_id": null, "connection_info": null, "user_id": null }2. Initial parameters (WebRTC connection start)
Initial parameters sent when the WebRTC connection is started can include a
graphfield that describes the pipeline graph (nodes and edges). Other fields (e.g.input_mode,prompts, …) are unchanged.Example
graphfield:(Full initial payload still includes
input_mode,prompts,prompt_interpolation_method,noise_scale, etc.)3. Update params
Update requests can include
node_idso parameters are applied to a specific graph node (e.g. a pipeline instance).Example:
{"node_id": "pipeline_1", "prompts": [{"text": "batman", "weight": 100}]}