Skip to content

fix: restore DAG status live updates#2028

Merged
yottahmd merged 5 commits intomainfrom
fix-dag-status-live-updates
Apr 22, 2026
Merged

fix: restore DAG status live updates#2028
yottahmd merged 5 commits intomainfrom
fix-dag-status-live-updates

Conversation

@yottahmd
Copy link
Copy Markdown
Collaborator

@yottahmd yottahmd commented Apr 22, 2026

Summary

  • preserve request-scoped SSE payloads while keeping browser clients on the existing multiplexed stream
  • restore live updates for DAG-run list, DAG definition, and exact DAG-run status views
  • recompute aggregate run status after manual step status changes so UI state reflects backend mutations

Root Cause

The SSE multiplexer shared one topic payload/hash across sessions even when fetchers depended on request context such as auth, workspace, or remote node. Some DAG-run list topics were also moved to pure event-driven invalidation even though their visible summaries can change without a reliable event for every field. On the DAG definition page, the start/enqueue flow discarded the returned dagRunId, so the page stayed on the DAG definition topic instead of subscribing to the exact newly-started DAG-run.

Validation

  • go test ./internal/service/frontend/... -count=1
  • pnpm typecheck
  • pnpm exec vitest run src/features/dags/components/__tests__/DAGStatus.test.tsx src/features/dags/components/dag-details/__tests__/DAGDetailsPanel.test.tsx src/features/dags/components/dag-details/__tests__/DAGDetailsSidePanel.test.tsx src/features/dags/components/common/__tests__/DAGActions.test.tsx

Summary by CodeRabbit

  • New Features

    • Track newly started DAG runs and follow a specific run in the UI; expose a callback when a run is started.
  • Improvements

    • Aggregate DAG-run status now recomputes automatically when individual step statuses change.
    • UI applies status updates optimistically and triggers targeted refreshes for affected views.
    • Real-time updates refined: per-session streaming behaves more reliably and queue views use on-demand invalidation while DAG-run topics remain polling.
  • Tests

    • Added tests covering status recomputation, streaming/polling behavior, and related UI flows.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 38744ec3-67b1-4148-93ad-866c74525553

📥 Commits

Reviewing files that changed from the base of the PR and between b361916 and 1ece5a7.

📒 Files selected for processing (2)
  • internal/service/frontend/api/v1/dags.go
  • internal/service/frontend/api/v1/dags_internal_test.go

📝 Walkthrough

Walkthrough

Recomputes DAG-run aggregate status from node statuses after step updates, routes multiple read paths through a read-timeout wrapper, refactors SSE multiplexer to per-session fetch contexts and per-session hashing, and adds optimistic frontend node-status updates, tracked-run wiring, and associated tests.

Changes

Cohort / File(s) Summary
Backend: DAG-run status & log helpers
internal/service/frontend/api/v1/dagruns.go, internal/service/frontend/api/v1/dagruns_internal_test.go, internal/service/frontend/api/v1/dagruns_test.go
Add deriveManualDAGRunStatus(...); recompute and persist DAG-run aggregate status after per-step status updates; add log/step-log helper read wrappers and tests validating derived status behavior.
Backend: SSE registration & server tests
internal/service/frontend/server.go, internal/service/frontend/server_test.go
Change SSE refresh registration so DAG-run-related topics remain on polling mode when eventstore is present (only queues use OnDemand); add test asserting repeated fetches for DAG-run topics with eventstore.
Backend: SSE multiplexer (per-session fetch)
internal/service/frontend/sse/multiplex.go, internal/service/frontend/sse/multiplex_test.go
Introduce per-session detached fetch contexts (fetchCtx via context.WithoutCancel), replace shared lastHash with lastHashBySession, fetch/enqueue payloads per session in poll, and update session lifecycle handling; add tests ensuring detached fetch contexts preserve per-session payloads after cancellation.
Backend: read-timeout wrapper usage
internal/service/frontend/api/v1/dags.go, internal/service/frontend/api/v1/queues.go, internal/service/frontend/api/v1/docs.go
Wrap multiple read operations in withDAGRunReadTimeout(...), switching internal calls to use the derived readCtx; some handlers now return zero-value response structs alongside errors for validation/access failures.
Frontend: optimistic node-status utilities
ui/src/features/dags/lib/nodeStatus.ts
Add pure helpers to immutably update node statuses: updateDAGRunNodeStatus and updateDAGRunsNodeStatus, plus status-label mapping.
Frontend: node-status UI wiring & modal typing
ui/src/features/dags/components/dag-details/NodeStatusTable.tsx, ui/src/features/dags/components/dag-details/NodeStatusTableRow.tsx, ui/src/features/dags/components/dag-execution/StatusUpdateModal.tsx
Add optional onNodeStatusUpdated callback to table/row components and invoke it on successful PATCH; call dagContext.refresh() after success; allow async onSubmit in StatusUpdateModal and safely await it.
Frontend: optimistic DAG UI & tests
ui/src/features/dags/components/DAGStatus.tsx, ui/src/features/dags/components/__tests__/DAGStatus.test.tsx
Introduce local displayDAGRun state and applyDisplayNodeStatus for optimistic updates, call dagContext.refresh() after backend success, and add tests for success/error flows verifying PATCH shape and refresh behavior.
Frontend: tracked-run start flow & context
ui/src/features/dags/contexts/DAGContext.ts, ui/src/features/dags/components/DAGActions.tsx, ui/src/features/dags/components/dag-details/DAGDetailsContent.tsx
Add optional `onRunStarted?: (dagRunId) => void
Frontend: tracked-run selection, SSE integration & history
ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx, ui/src/pages/dags/dag/index.tsx, ui/src/features/dags/components/dag-details/__tests__/DAGDetailsPanel.test.tsx, ui/src/features/dags/components/dag-execution/DAGExecutionHistory.tsx
Add trackedDagRunId state and SSE-backed tracked-run query; handle onRunStarted to switch to tracked run and status tab, update selection/navigation/URL composition, and optimistically update history lists; update tests accordingly.
Frontend: tests & misc UI updates
various files under ui/src/features/dags/... and test suites
Adjust modal submit typing and related call sites; update multiple tests/components to use display/tracked-run behavior and new callbacks; add/adjust tests covering deriveManualDAGRunStatus and SSE behaviors.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant DAGStatus as DAGStatus UI
    participant API as Backend API
    participant DB as Database
    participant DAGContext

    User->>DAGStatus: Click status update (e.g., "Mark Failed")
    DAGStatus->>DAGStatus: applyDisplayNodeStatus (optimistic update)
    DAGStatus->>API: PATCH /dag-runs/{dag}/{id}/steps/{step}/status
    API->>DB: persist node status
    API->>API: deriveManualDAGRunStatus(nodes, fallback)
    API->>DB: persist recomputed DAG-run aggregate status
    DB-->>API: OK
    API-->>DAGStatus: 200 OK
    DAGStatus->>DAGContext: refresh()
    DAGContext-->>DAGStatus: refetch/update
    DAGStatus->>User: render final state
Loading
sequenceDiagram
    participant Client1 as Client 1
    participant Client2 as Client 2
    participant Multiplexer as SSE Multiplexer
    participant Fetcher as Topic Fetcher
    participant EventStore

    Client1->>Multiplexer: Subscribe(topic, ctx=sessionCtx1)
    Client2->>Multiplexer: Subscribe(topic, ctx=sessionCtx2)
    Multiplexer->>Multiplexer: createSession(fetchCtx = WithoutCancel(sessionCtx))
    Multiplexer->>Fetcher: WakeTopic
    Fetcher->>EventStore: Fetch using session1.fetchCtx
    Fetcher->>EventStore: Fetch using session2.fetchCtx
    EventStore-->>Fetcher: payloads
    Fetcher-->>Multiplexer: per-session payloads/hashes
    Multiplexer-->>Client1: Message(payload1)
    Multiplexer-->>Client2: Message(payload2)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 19.51% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'fix: restore DAG status live updates' accurately summarizes the main objective of the PR, which is to restore live updates for DAG-run status and address SSE payload sharing issues across sessions.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix-dag-status-live-updates

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
ui/src/features/dags/components/DAGStatus.tsx (1)

1-3: ⚠️ Potential issue | 🟡 Minor

Add the GPL header to this modified source file.

This TSX file is missing the repository license header. Please run make addlicense or add the standard header.

Proposed header
+// Copyright (C) 2026 Yota Hamada
+// SPDX-License-Identifier: GPL-3.0-or-later
+
 import { useErrorModal } from '@/components/ui/error-modal';

As per coding guidelines, **/*.{go,ts,tsx,js}: Apply GPL v3 license headers on source files, managed via make addlicense.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/dags/components/DAGStatus.tsx` around lines 1 - 3, This file
(DAGStatus.tsx) is missing the repository GPL v3 license header; add the
standard GPL v3 header to the top of the file (above all imports) for this
modified source file (ui/src/features/dags/components/DAGStatus.tsx). You can
run the repository utility `make addlicense` to insert the correct header
automatically, or manually paste the standard GPL v3 boilerplate header used
across TS/TSX/JS files in this repo, ensuring it appears before the existing
imports and preserves existing file encoding/comments.
ui/src/features/dags/components/dag-execution/DAGExecutionHistory.tsx (1)

1-5: ⚠️ Potential issue | 🟡 Minor

Add the GPL header to this modified source file.

This TSX file is missing the repository license header. Please run make addlicense or add the standard header.

Proposed header
+// Copyright (C) 2026 Yota Hamada
+// SPDX-License-Identifier: GPL-3.0-or-later
+
 /**
  * DAGExecutionHistory component displays the execution history of a DAG.

As per coding guidelines, **/*.{go,ts,tsx,js}: Apply GPL v3 license headers on source files, managed via make addlicense.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/dags/components/dag-execution/DAGExecutionHistory.tsx` around
lines 1 - 5, The file defining the DAGExecutionHistory React component is
missing the repository GPL v3 license header; fix by running the repository
tooling (run `make addlicense`) to automatically prepend the standard GPL v3
header to this TSX file or manually add the standard header block at the very
top of the file before the existing comments, ensuring it appears above the
DAGExecutionHistory component declaration and any imports so the file passes the
**/*.{go,ts,tsx,js} license check.
ui/src/pages/dags/dag/index.tsx (1)

175-193: ⚠️ Potential issue | 🟠 Major

Ensure a just-started run overrides any existing URL dagRunId.

With dagRunId || trackedDagRunId, handleRunStarted() is ineffective when the page already has a dagRunId query param: the SSE/query stays pinned to the old run. Either replace the URL dagRunId with nextDAGRunId in handleRunStarted(), or explicitly prioritize the just-started ID while clearing it on URL-driven run selection.

One possible direction
-  const effectiveDAGRunId = dagRunId || trackedDagRunId;
+  const effectiveDAGRunId = trackedDagRunId || dagRunId;

If using this precedence, also clear trackedDagRunId when the user intentionally navigates to a different URL run ID, otherwise local state can mask URL selection.

Also applies to: 270-276

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/pages/dags/dag/index.tsx` around lines 175 - 193, The current
precedence effectiveDAGRunId = dagRunId || trackedDagRunId prevents
handleRunStarted from switching SSE/query to a freshly started run; update logic
so a just-started run (nextDAGRunId used by handleRunStarted) takes priority
over a URL dagRunId: modify effectiveDAGRunId computation (and places using it
such as useDAGRunSSE and the '/dag-runs/{name}/{dagRunId}' useQuery call) to
prefer nextDAGRunId when present, and ensure handleRunStarted replaces or clears
dagRunId tracking state (trackedDagRunId) so URL-driven selection doesn’t mask
the new run; also add symmetrical behavior when the user intentionally navigates
to a different URL run id to clear nextDAGRunId/trackedDagRunId as appropriate.
ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx (1)

156-164: ⚠️ Potential issue | 🟠 Major

Preserve the tracked run when opening fullscreen.

After onRunStarted() sets trackedDagRunId, fullscreen navigation drops that state by navigating to /dags/${fileName} without query params. The page then falls back to latest/local instead of the exact just-started run, especially in a new tab.

Proposed fix
   function handleFullscreenClick(e?: React.MouseEvent): void {
     const tabPath = activeTab === 'status' ? '' : `/${activeTab}`;
-    const url = `/dags/${fileName}${tabPath}`;
+    const searchParams = new URLSearchParams();
+    if (trackedDagRunId) {
+      searchParams.set('dagRunId', trackedDagRunId);
+      searchParams.set('dagRunName', data?.dag?.name || '');
+    }
+    if (remoteNode && remoteNode !== 'local') {
+      searchParams.set('remoteNode', remoteNode);
+    }
+    const query = searchParams.toString();
+    const url = `/dags/${fileName}${tabPath}${query ? `?${query}` : ''}`;
 
     if (e?.metaKey || e?.ctrlKey) {
       window.open(url, '_blank');
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx` around lines
156 - 164, handleFullscreenClick currently builds a URL without preserving the
tracked run state so the trackedDagRunId (from component state/props) is lost;
update handleFullscreenClick to append the trackedDagRunId as a query param
(e.g. ?trackedDagRunId=...) when trackedDagRunId is truthy, making sure to
url-encode it and to preserve any existing query string when building the url
used by window.open and navigate (refer to handleFullscreenClick, activeTab,
fileName, navigate, and trackedDagRunId).
internal/service/frontend/sse/multiplex.go (1)

250-289: ⚠️ Potential issue | 🟡 Minor

Verify that all SSE fetchers have bounded internal timeouts, especially unbounded I/O operations.

The use of context.WithoutCancel(ctx) is correct—it preserves request-scoped values while shielding polls from client disconnect. However, several registered fetchers lack bounded timeouts and will continue running to completion even after the client disconnects:

  • GetDAGRunDetailsData: calls getDAGRunDetailsData() directly without timeout wrapper
  • GetDAGRunLogsData and GetStepLogData: perform file I/O via fileutil.ReadLogContent() without timeout
  • GetQueuesListData: calls a.ListQueues() without timeout
  • GetDAGHistoryData: calls dagStore.GetDetails() and dagRunMgr.ListRecentStatus() without timeout

The withDAGRunReadTimeout() wrapper (10-second timeout) is used by some fetchers but not consistently. Consider adding bounded timeouts to all fetchers that perform I/O, especially log and status reads, since fetchCtx no longer provides one.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/sse/multiplex.go` around lines 250 - 289,
createSession uses context.WithoutCancel(ctx) so fetchers must enforce their own
timeouts; update any fetchers that perform I/O to use a bounded timeout wrapper
(e.g. the existing withDAGRunReadTimeout(10s)) before calling blocking
operations. Specifically, wrap the calls in GetDAGRunDetailsData
(dagStore.GetDetails), GetDAGRunLogsData and GetStepLogData
(fileutil.ReadLogContent), GetQueuesListData (a.ListQueues), and
GetDAGHistoryData (dagStore.GetDetails and dagRunMgr.ListRecentStatus) with a
timeout context (use withDAGRunReadTimeout or add a similar helper) and handle
context deadline/cancel errors so the fetchers return promptly when the timeout
elapses.
🧹 Nitpick comments (4)
ui/src/features/dags/components/__tests__/DAGStatus.test.tsx (1)

146-189: Consider adding a failure-path test.

The test covers the happy path (PATCH resolves with error: undefined). A complementary case where PATCH resolves with an error would lock in that the optimistic update is not applied and refresh is not invoked on failure — important given the new optimistic-update behavior introduced in this PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/dags/components/__tests__/DAGStatus.test.tsx` around lines
146 - 189, Add a complementary failure-path test for DAGStatus: mock
useClient/PATCH (patchMock) to resolve with an error object, render the
component inside DAGContext with a mock refresh, open the status modal and click
"Mark failed", then assert that patchMock was called with the same endpoint and
body (status: NodeStatus.Failed) but the UI does not show the optimistic update
("Graph status: Failed" should not be present) and refresh was not called
(expect(refresh).not.toHaveBeenCalled()). Use the same helpers/fixtures (dagRun,
appBarValue) and roles used in the existing test to mirror the happy-path flow.
ui/src/features/dags/lib/nodeStatus.ts (1)

10-35: Consider an exhaustive switch instead of a not_started fallback.

The default branch silently maps any unknown NodeStatus value to not_started, which could hide enum drift if the backend adds a new status. Using status satisfies never in the default (after handling every case) would make the TS compiler catch missing cases at build time.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/dags/lib/nodeStatus.ts` around lines 10 - 35, The switch in
nodeStatusLabel currently falls back to NodeStatusLabel.not_started in a default
branch which can hide added enum values; modify the nodeStatusLabel function to
remove the default return and instead make the switch exhaustive by adding an
unreachable-type check after the switch (e.g., assign status to a never-typed
variable or call an assertUnreachable helper) so the TypeScript compiler will
error when a new NodeStatus case is missing; reference NodeStatus,
NodeStatusLabel and the nodeStatusLabel function when making this change.
ui/src/features/dags/components/dag-execution/StatusUpdateModal.tsx (1)

64-84: Unhandled promise rejections from async onSubmit.

void onSubmit(...) discards the returned promise, so if the async handler ever rejects (e.g., an unexpected throw outside the existing showError path) the rejection becomes unhandled. Current implementations in DAGStatus.tsx and NodeStatusTableRow.tsx catch errors internally, so this is not a live bug — but wrapping the call in .catch(() => {}) or an async IIFE would make the contract safer against future handlers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/dags/components/dag-execution/StatusUpdateModal.tsx` around
lines 64 - 84, The keydown handler currently calls void onSubmit(...) which
discards its promise and can lead to unhandled rejections; update handleKeyDown
in StatusUpdateModal so the async onSubmit calls are awaited and errors are
caught — e.g., wrap the onSubmit(...) invocation in an async IIFE with try/catch
(or append .catch(() => {}) to the returned promise) for both branches that use
onSubmit(step, NodeStatus.Success) and onSubmit(step, NodeStatus.Failed); keep
dismissModal behavior unchanged and reference the existing symbols
handleKeyDown, onSubmit, selectedButton, NodeStatus.Success, and
NodeStatus.Failed.
internal/service/frontend/sse/multiplex.go (1)

1197-1271: Per-session polling fan-out: expect N× backend load on shared topics and consider bounded concurrency.

Two operational concerns with the new poll() loop, both stemming from the per-session fetch model:

  1. Load amplification on shared topics. For each polling tick, the topic now issues one fetcher(session.fetchCtx, identifier) call per subscribed session. A dashboard with k viewers watching dagruns previously triggered one DAG-run list fetch per tick; it now triggers k. Combined with the revert to polling mode in registerDedicatedSSEFetchers, the DAG-run list hot path (GetDAGRunsListData) scales with connected browsers. Worth validating on a realistic deployment and considering per-type rate/coalescing safeguards if this becomes visible in DB/store load.

  2. Sequential per-session fetching blocks the topic goroutine. Each topic has a single goroutine; poll() walks sessions one-by-one. A single slow/hung fetcher (e.g., a stalled remote-node RPC inside one session's ctx) delays every other session's update on the same topic, and defers the next tick. At minimum, bounded parallelism (e.g., a small errgroup with semaphore) would decouple sessions. You could also cache by a fetch-context fingerprint (user/workspace/remote-node) so two sessions with identical auth context share a single fetch per tick, restoring most of the old amortization without losing the per-session correctness property.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/sse/multiplex.go` around lines 1197 - 1271,
Summary: poll() currently issues per-session sequential fetches causing N×
backend load and head-of-line blocking; introduce bounded concurrency and
optional per-tick fetch coalescing by fetch-context fingerprint. Fix: in
multiplexTopic.poll replace the sequential loop over sessions (streamSession,
fetchPayload) with a bounded-parallel fetch strategy: first compute a
fingerprint for each session.fetchCtx (e.g., user/workspace/remote-node) and
group sessions by fingerprint to coalesce identical fetches; spawn concurrent
fetch workers (use an errgroup or worker pool with a semaphore of configurable
size) to call fetchPayload once per fingerprint and capture payload/error and
duration; after each fetch, iterate associated sessions to computeHash,
consult/update lastHashBySession[session], generate eventID via mux.nextID(),
buildMessageData and enqueueMessage for each session; aggregate fetch durations
and successCount from the concurrent results (ensure synchronization for metrics
and lastChangeEventID updates), honor session presence by checking t.sessions
under clientsMu, and keep existing errorBackoff/backoffUntil behavior when all
fetches fail. Ensure any context cancellations per session.fetchCtx are
respected and that updates to shared maps (lastHashBySession, lastChangeEventID,
totalFetchDuration) are protected by existing clientsMu or a dedicated mutex.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 2311-2388: deriveManualDAGRunStatus currently maps NodeRetrying to
core.Queued and treats steps with ContinueOn.Failure && ContinueOn.MarkSuccess
as "uncontinuable" failures; change the NodeRetrying handling so hasRetrying
results in core.Running (not core.Queued) and adjust the failure detection
inside deriveManualDAGRunStatus (the switch case for core.NodeFailed) so that
any node with node.Step.ContinueOn.Failure is considered a continuable failure
(set hasContinuableFailure) and only nodes without ContinueOn.Failure set
hasUncontinuableFailure; update the return switch accordingly (remove the queued
mapping and ensure hasRetrying -> return core.Running).

In `@internal/service/frontend/server_test.go`:
- Around line 99-133: The test is flaky because the SSE multiplexer (created via
sse.NewMultiplexer and stored in mux) uses a 1s default polling interval; after
creating mux call mux.SetWatcherIntervalsForTest(20*time.Millisecond,
20*time.Millisecond) to shorten the poll intervals (or alternatively increase
the require.Eventually timeout to >=3s) so the second fetch happens within the
test deadline; update the test to set these test intervals immediately after mux
creation before registering fetchers or starting the handler.

In `@internal/service/frontend/sse/multiplex.go`:
- Around line 1218-1263: The change altered metric semantics:
t.mux.metrics.FetchError is now called per-session and RecordFetchDuration
records an average; restore consistent observability by (a) keeping FetchError
at topic-level (call t.mux.metrics.FetchError once per topic-tick, e.g., when
successCount==0 and firstErr!=nil) instead of inside the per-session loop, and
(b) emit per-fetch samples to metrics rather than only the mean — call
t.mux.metrics.RecordFetchDuration(string(t.topicType), fetchDuration) for each
individual fetch sample (or call RecordFetchDuration for each session's duration
before aggregating) using the per-session duration values collected into
totalFetchDuration/sessions so histograms receive raw samples while you may
still compute and log the average using totalFetchDuration/successCount for
summary purposes.

In `@ui/src/features/dags/components/dag-execution/DAGExecutionHistory.tsx`:
- Around line 281-289: The local-only update via updateDAGRunsNodeStatus /
setDisplayDAGRuns doesn't refresh backend-computed DAG-run aggregates or list
metadata; after the PATCH that updates a node status, call the SWR mutate for
the /dags/{fileName}/dag-runs key (the same query this component owns) to
revalidate server state, in addition to the existing dagContext.refresh();
update the code around setDisplayDAGRuns(...) and dagContext.refresh() to invoke
mutate(...) so the history table and overview reflect backend-recomputed run
status and metadata.

---

Outside diff comments:
In `@internal/service/frontend/sse/multiplex.go`:
- Around line 250-289: createSession uses context.WithoutCancel(ctx) so fetchers
must enforce their own timeouts; update any fetchers that perform I/O to use a
bounded timeout wrapper (e.g. the existing withDAGRunReadTimeout(10s)) before
calling blocking operations. Specifically, wrap the calls in
GetDAGRunDetailsData (dagStore.GetDetails), GetDAGRunLogsData and GetStepLogData
(fileutil.ReadLogContent), GetQueuesListData (a.ListQueues), and
GetDAGHistoryData (dagStore.GetDetails and dagRunMgr.ListRecentStatus) with a
timeout context (use withDAGRunReadTimeout or add a similar helper) and handle
context deadline/cancel errors so the fetchers return promptly when the timeout
elapses.

In `@ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx`:
- Around line 156-164: handleFullscreenClick currently builds a URL without
preserving the tracked run state so the trackedDagRunId (from component
state/props) is lost; update handleFullscreenClick to append the trackedDagRunId
as a query param (e.g. ?trackedDagRunId=...) when trackedDagRunId is truthy,
making sure to url-encode it and to preserve any existing query string when
building the url used by window.open and navigate (refer to
handleFullscreenClick, activeTab, fileName, navigate, and trackedDagRunId).

In `@ui/src/features/dags/components/dag-execution/DAGExecutionHistory.tsx`:
- Around line 1-5: The file defining the DAGExecutionHistory React component is
missing the repository GPL v3 license header; fix by running the repository
tooling (run `make addlicense`) to automatically prepend the standard GPL v3
header to this TSX file or manually add the standard header block at the very
top of the file before the existing comments, ensuring it appears above the
DAGExecutionHistory component declaration and any imports so the file passes the
**/*.{go,ts,tsx,js} license check.

In `@ui/src/features/dags/components/DAGStatus.tsx`:
- Around line 1-3: This file (DAGStatus.tsx) is missing the repository GPL v3
license header; add the standard GPL v3 header to the top of the file (above all
imports) for this modified source file
(ui/src/features/dags/components/DAGStatus.tsx). You can run the repository
utility `make addlicense` to insert the correct header automatically, or
manually paste the standard GPL v3 boilerplate header used across TS/TSX/JS
files in this repo, ensuring it appears before the existing imports and
preserves existing file encoding/comments.

In `@ui/src/pages/dags/dag/index.tsx`:
- Around line 175-193: The current precedence effectiveDAGRunId = dagRunId ||
trackedDagRunId prevents handleRunStarted from switching SSE/query to a freshly
started run; update logic so a just-started run (nextDAGRunId used by
handleRunStarted) takes priority over a URL dagRunId: modify effectiveDAGRunId
computation (and places using it such as useDAGRunSSE and the
'/dag-runs/{name}/{dagRunId}' useQuery call) to prefer nextDAGRunId when
present, and ensure handleRunStarted replaces or clears dagRunId tracking state
(trackedDagRunId) so URL-driven selection doesn’t mask the new run; also add
symmetrical behavior when the user intentionally navigates to a different URL
run id to clear nextDAGRunId/trackedDagRunId as appropriate.

---

Nitpick comments:
In `@internal/service/frontend/sse/multiplex.go`:
- Around line 1197-1271: Summary: poll() currently issues per-session sequential
fetches causing N× backend load and head-of-line blocking; introduce bounded
concurrency and optional per-tick fetch coalescing by fetch-context fingerprint.
Fix: in multiplexTopic.poll replace the sequential loop over sessions
(streamSession, fetchPayload) with a bounded-parallel fetch strategy: first
compute a fingerprint for each session.fetchCtx (e.g.,
user/workspace/remote-node) and group sessions by fingerprint to coalesce
identical fetches; spawn concurrent fetch workers (use an errgroup or worker
pool with a semaphore of configurable size) to call fetchPayload once per
fingerprint and capture payload/error and duration; after each fetch, iterate
associated sessions to computeHash, consult/update lastHashBySession[session],
generate eventID via mux.nextID(), buildMessageData and enqueueMessage for each
session; aggregate fetch durations and successCount from the concurrent results
(ensure synchronization for metrics and lastChangeEventID updates), honor
session presence by checking t.sessions under clientsMu, and keep existing
errorBackoff/backoffUntil behavior when all fetches fail. Ensure any context
cancellations per session.fetchCtx are respected and that updates to shared maps
(lastHashBySession, lastChangeEventID, totalFetchDuration) are protected by
existing clientsMu or a dedicated mutex.

In `@ui/src/features/dags/components/__tests__/DAGStatus.test.tsx`:
- Around line 146-189: Add a complementary failure-path test for DAGStatus: mock
useClient/PATCH (patchMock) to resolve with an error object, render the
component inside DAGContext with a mock refresh, open the status modal and click
"Mark failed", then assert that patchMock was called with the same endpoint and
body (status: NodeStatus.Failed) but the UI does not show the optimistic update
("Graph status: Failed" should not be present) and refresh was not called
(expect(refresh).not.toHaveBeenCalled()). Use the same helpers/fixtures (dagRun,
appBarValue) and roles used in the existing test to mirror the happy-path flow.

In `@ui/src/features/dags/components/dag-execution/StatusUpdateModal.tsx`:
- Around line 64-84: The keydown handler currently calls void onSubmit(...)
which discards its promise and can lead to unhandled rejections; update
handleKeyDown in StatusUpdateModal so the async onSubmit calls are awaited and
errors are caught — e.g., wrap the onSubmit(...) invocation in an async IIFE
with try/catch (or append .catch(() => {}) to the returned promise) for both
branches that use onSubmit(step, NodeStatus.Success) and onSubmit(step,
NodeStatus.Failed); keep dismissModal behavior unchanged and reference the
existing symbols handleKeyDown, onSubmit, selectedButton, NodeStatus.Success,
and NodeStatus.Failed.

In `@ui/src/features/dags/lib/nodeStatus.ts`:
- Around line 10-35: The switch in nodeStatusLabel currently falls back to
NodeStatusLabel.not_started in a default branch which can hide added enum
values; modify the nodeStatusLabel function to remove the default return and
instead make the switch exhaustive by adding an unreachable-type check after the
switch (e.g., assign status to a never-typed variable or call an
assertUnreachable helper) so the TypeScript compiler will error when a new
NodeStatus case is missing; reference NodeStatus, NodeStatusLabel and the
nodeStatusLabel function when making this change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c6e6160c-1866-420c-ae55-f723de724ecb

📥 Commits

Reviewing files that changed from the base of the PR and between b9cd41d and 46fa924.

📒 Files selected for processing (19)
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dagruns_test.go
  • internal/service/frontend/server.go
  • internal/service/frontend/server_test.go
  • internal/service/frontend/sse/multiplex.go
  • internal/service/frontend/sse/multiplex_test.go
  • ui/src/features/dags/components/DAGStatus.tsx
  • ui/src/features/dags/components/__tests__/DAGStatus.test.tsx
  • ui/src/features/dags/components/common/DAGActions.tsx
  • ui/src/features/dags/components/dag-details/DAGDetailsContent.tsx
  • ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx
  • ui/src/features/dags/components/dag-details/NodeStatusTable.tsx
  • ui/src/features/dags/components/dag-details/NodeStatusTableRow.tsx
  • ui/src/features/dags/components/dag-details/__tests__/DAGDetailsPanel.test.tsx
  • ui/src/features/dags/components/dag-execution/DAGExecutionHistory.tsx
  • ui/src/features/dags/components/dag-execution/StatusUpdateModal.tsx
  • ui/src/features/dags/contexts/DAGContext.ts
  • ui/src/features/dags/lib/nodeStatus.ts
  • ui/src/pages/dags/dag/index.tsx

Comment thread internal/service/frontend/api/v1/dagruns.go
Comment thread internal/service/frontend/server_test.go Outdated
Comment thread internal/service/frontend/sse/multiplex.go Outdated
@yottahmd
Copy link
Copy Markdown
Collaborator Author

Addressed CodeRabbit feedback in c12675a4.

  • NodeRetrying now derives aggregate Running, and ContinueOn.Failure is treated as continuable even with MarkSuccess.
  • The polling-sensitive SSE test now waits long enough for the default 1s interval instead of using a non-existent helper.
  • SSE metrics now emit per-fetch duration samples and at most one fetch-error metric per topic tick.
  • DAG history status updates now revalidate the owned history query after a successful manual step-status PATCH.
  • Added missing license headers, failure-path optimistic-update coverage, exhaustive node-status mapping, and async status-modal submit guarding.
  • The DAG details page now preserves/overrides exact run IDs correctly when starting a new run or opening fullscreen.
  • SSE fetchers that were using detached request contexts now use the existing bounded read timeout wrapper where applicable.

For the per-session coalescing nitpick: I did not add context-fingerprint coalescing in this PR because there is no safe stable fingerprint for all request-scoped authorization/workspace context today. The per-session fetch behavior is intentional for correctness; the concrete risk was reduced by adding bounded fetch timeouts and restoring metrics semantics without changing stream connection count.

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 2381-2384: The current aggregate status logic returns core.Running
for mixed node states (case hasNotStarted && !hasSuccess -> core.NotStarted;
case hasNotStarted -> core.Running), which incorrectly marks manually edited
completed runs as active; update the branch used by the manual-recompute path
(invoked by UpdateDAGRunStepStatus and UpdateSubDAGRunStepStatus) so that when
some nodes are NotStarted while others are Success you return a non-active
aggregate such as core.PartiallySucceeded (or another appropriate non-running
status) instead of core.Running, and ensure dagStatus.Status checks that gate UI
edits still treat this aggregate as editable; adjust the switch/case that
currently returns core.Running to return core.PartiallySucceeded for mixed
NotStarted+Success scenarios.

In `@internal/service/frontend/sse/multiplex.go`:
- Line 251: The four unprotected fetchers GetDAGRunsListData, GetDAGsListData,
GetDocContentData, and GetDocTreeData are running against session.fetchCtx
(created via newStreamSession) without a deadline; wrap each of these fetcher
registrations with the existing timeout wrapper (e.g., withDAGRunReadTimeout or
an equivalent 10s context) so they execute with a bounded per-fetch timeout
(like the other eight fetchers). Locate the registrations for those functions in
multiplex.go and replace their direct usage with the wrapped versions so the
fetch functions use the timed context instead of the detached session.fetchCtx,
ensuring shutdown and topic.stop() cannot be blocked by long-running fetches.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9aca74d8-5e75-40a9-868f-7a7891203b7b

📥 Commits

Reviewing files that changed from the base of the PR and between 46fa924 and c12675a.

📒 Files selected for processing (13)
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dagruns_internal_test.go
  • internal/service/frontend/api/v1/dags.go
  • internal/service/frontend/api/v1/queues.go
  • internal/service/frontend/server_test.go
  • internal/service/frontend/sse/multiplex.go
  • ui/src/features/dags/components/DAGStatus.tsx
  • ui/src/features/dags/components/__tests__/DAGStatus.test.tsx
  • ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx
  • ui/src/features/dags/components/dag-execution/DAGExecutionHistory.tsx
  • ui/src/features/dags/components/dag-execution/StatusUpdateModal.tsx
  • ui/src/features/dags/lib/nodeStatus.ts
  • ui/src/pages/dags/dag/index.tsx
🚧 Files skipped from review as they are similar to previous changes (2)
  • ui/src/features/dags/components/dag-execution/StatusUpdateModal.tsx
  • ui/src/features/dags/components/tests/DAGStatus.test.tsx

Comment thread internal/service/frontend/api/v1/dagruns.go Outdated
Comment thread internal/service/frontend/sse/multiplex.go
Copy link
Copy Markdown
Collaborator Author

Addressed the follow-up CodeRabbit feedback in b361916a.

  • Manual recompute now returns non-active PartiallySucceeded for mixed NotStarted + successful nodes, so manual edits do not make completed runs look actively running.
  • GetDAGRunsListData, GetDAGsListData, GetDocTreeData, and GetDocContentData now run under the same bounded read timeout wrapper used by the other detached-context SSE fetchers.

Validation: go test ./internal/service/frontend/... -count=1.

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/service/frontend/api/v1/dags.go`:
- Around line 1761-1768: The SSE DAG-list handler is using hard-coded fallbacks
("name"/"asc") for sortField and sortOrder which diverges from the HTTP ListDAGs
defaults; update the handler to use the UI defaults from a.config.UI.DAGs when
params.Get("sort") or params.Get("order") are empty (the same source used by
ListDAGs) so sortField and sortOrder are initialized from a.config.UI.DAGs
instead of "name"/"asc".
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9a51e199-fbdf-492f-9d83-3928578815ee

📥 Commits

Reviewing files that changed from the base of the PR and between c12675a and b361916.

📒 Files selected for processing (4)
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dagruns_internal_test.go
  • internal/service/frontend/api/v1/dags.go
  • internal/service/frontend/api/v1/docs.go
✅ Files skipped from review due to trivial changes (1)
  • internal/service/frontend/api/v1/dagruns_internal_test.go

Comment thread internal/service/frontend/api/v1/dags.go Outdated
Copy link
Copy Markdown
Collaborator Author

Addressed the latest CodeRabbit DAG-list default feedback in 1ece5a7f.

  • GetDAGsListData now uses a.config.UI.DAGs.SortField / SortOrder as defaults, with explicit SSE query params still taking precedence.
  • Added regression coverage that compares the HTTP list path and SSE list path under configured name/desc defaults.

Validation: go test ./internal/service/frontend/... -count=1.

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@yottahmd yottahmd merged commit 852cecb into main Apr 22, 2026
11 checks passed
@yottahmd yottahmd deleted the fix-dag-status-live-updates branch April 22, 2026 11:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant