[V2] Run Service - Watch action cache#7017
Conversation
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
There was a problem hiding this comment.
Pull request overview
Introduces server-side state tracking for WatchActions so the service can stream filtered, tree-aware action updates (including child phase aggregates), and updates persistence/querying to associate actions with a run via a run_name column.
Changes:
- Add a per-watch
runStateManagerto compute action-tree visibility under filters and maintain per-node child phase counts. - Update
WatchActionsto send an initial filtered/tree-aware snapshot and then stream incremental changes using the state manager. - Add
run_nameto theactionsmodel and use it in action creation and listing queries; add a helper script for callingWatchActions.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| runs/test/scripts/watch_actions.sh | Adds a buf curl helper script to call RunService/WatchActions. |
| runs/service/run_state_manager.go | New in-memory tree/state manager for filtered visibility and child-phase aggregation. |
| runs/service/run_state_manager_test.go | Unit tests validating phase-count aggregation, filter visibility behavior, and parent-missing errors. |
| runs/service/run_service.go | Refactors WatchActions to use the new state manager; adds helpers for initial listing + streaming updates. |
| runs/repository/models/action.go | Adds RunName column and a Clone() helper; updates GetRunName to prefer stored RunName. |
| runs/repository/impl/project.go | Changes project creation to use ON CONFLICT DO NOTHING. |
| runs/repository/impl/action.go | Populates RunName on inserts and changes ListActions to filter by run_name. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| result := r.db.WithContext(ctx). | ||
| Clauses(clause.OnConflict{DoNothing: true}). | ||
| Create(project) |
There was a problem hiding this comment.
CreateProject now uses ON CONFLICT DO NOTHING, which means a duplicate insert will not return an error (RowsAffected=0). As a result, this method will no longer return ErrProjectAlreadyExists for duplicates, and the isDuplicateProjectError branch becomes ineffective. Consider explicitly checking result.RowsAffected == 0 and returning interfaces.ErrProjectAlreadyExists (or reverting to the prior behavior) to preserve the API contract and existing tests.
| query := r.db.WithContext(ctx).Model(&models.Action{}). | ||
| Where("org = ? AND project = ? AND domain = ?", | ||
| runID.Org, runID.Project, runID.Domain). | ||
| Where("parent_action_name IS NOT NULL") // Exclude the root action/run itself | ||
| Where("org = ? AND project = ? AND domain = ? AND run_name = ?", | ||
| runID.Org, runID.Project, runID.Domain, runID.Name) | ||
|
|
There was a problem hiding this comment.
ListActions now filters by run_name = runID.Name, but existing rows created before this column was populated will likely have run_name set to the default empty string after AutoMigrate. That will cause ListActions (and WatchActions initial load) to return an empty set for pre-existing runs/actions unless you add a backfill migration/upgrade path to populate run_name for existing records.
| Where("org = ? AND project = ? AND domain = ?", | ||
| runID.Org, runID.Project, runID.Domain). | ||
| Where("parent_action_name IS NOT NULL") // Exclude the root action/run itself | ||
| Where("org = ? AND project = ? AND domain = ? AND run_name = ?", |
There was a problem hiding this comment.
This query no longer excludes the root action/run itself (previously parent_action_name IS NOT NULL). If ListActions is intended to return only non-root actions (as the removed comment indicated), this will change the ListActions API behavior and may break clients that assume the run action is not included. If WatchActions needs the root for tree rendering, consider keeping ListActions semantics by reintroducing the exclusion here and adding a separate repo method/flag for WatchActions to include the root when needed.
| Where("org = ? AND project = ? AND domain = ? AND run_name = ?", | |
| Where("org = ? AND project = ? AND domain = ? AND run_name = ? AND parent_action_name IS NOT NULL", |
| case updated, ok := <-updatesCh: | ||
| if !ok { | ||
| return nil | ||
| } | ||
| if err := stream.Send(&workflow.WatchActionsResponse{ | ||
| EnrichedActions: []*workflow.EnrichedAction{s.convertActionToEnrichedProto(updated)}, | ||
| }); err != nil { | ||
| updates, err := rsm.upsertActions(ctx, []*models.Action{updated}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if err := s.sendChangedActions(runID, updates, stream); err != nil { |
There was a problem hiding this comment.
WatchActions now assumes that every updated action received from WatchActionUpdates belongs to the requested run, and it constructs the outgoing ActionIdentifier using the request runID. However, the SQLite polling implementation of WatchActionUpdates currently only filters by org/project/domain (not run), so updates from other runs in the same org/project/domain can leak into this stream. That can produce incorrect action IDs on the wire and may also trigger runStateManager errors (e.g., missing parents). Consider adding a server-side guard here (drop updates whose run doesn't match) and/or updating the repository watcher to include run_name = runID.Name (now that the column exists).
| // GetRunName extracts the run name from the action | ||
| // For root actions (runs), returns the action's own name | ||
| // For child actions, extracts from ActionSpec JSON | ||
| func (a *Action) GetRunName() string { | ||
| if a.RunName != "" { | ||
| return a.RunName | ||
| } | ||
|
|
||
| if a.ParentActionName == nil { | ||
| // Root action - the run name is the action name | ||
| return a.Name | ||
| } | ||
|
|
||
| // TODO: Extract run name from ActionSpec JSON | ||
| // For now, return empty string as placeholder | ||
| return "" | ||
| } |
There was a problem hiding this comment.
GetRunName now returns an empty string for child actions when RunName isn't populated. With the repository now relying on run_name for ListActions filtering and convertActionToEnrichedProto using GetRunName to populate ActionIdentifier.Run.Name, legacy records (or any code path that forgets to set RunName) will produce actions that can't be associated with a run and may be impossible to query/stream correctly. Consider ensuring RunName is always set for all actions (including backfill for existing DB rows) or implementing a fallback extraction for child actions if that’s still required.
runs/service/run_service.go
Outdated
|
|
||
| enriched := make([]*workflow.EnrichedAction, 0, len(updates)) | ||
| for _, update := range updates { | ||
| enriched = append(enriched, s.convertNodeUpdateToEnrichedProto(runID, update)) |
There was a problem hiding this comment.
Should we check if s.convertNodeUpdateToEnrichedProto() is nil
|
|
||
| result := r.db.WithContext(ctx).Create(project) | ||
| result := r.db.WithContext(ctx). | ||
| Clauses(clause.OnConflict{DoNothing: true}). |
There was a problem hiding this comment.
Shouldn't we return an error on conflict?
There was a problem hiding this comment.
This is to prevent duplicated key not allowed logs when start up (adding seed/default projects)
2026/03/13 09:15:49 /Users/nary/workData/open-source/flyte-v2/runs/repository/impl/project.go:32 duplicated key not allowed
[0.433ms] [rows:0] INSERT INTO `projects` (`identifier`,`name`,`description`,`labels`,`state`,`created_at`,`updated_at`) VALUES ("flytesnacks","flytesnacks","","",0,"2026-03-13 01:15:49.009","2026-03-13 01:15:49.009")Fixed in 1b1ff43 to only raise error and log in CreateProject endpoint
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
* feat: simple state manager to store actions Signed-off-by: machichima <nary12321@gmail.com> * feat: run state manager record node tree Signed-off-by: machichima <nary12321@gmail.com> * fix: also include root action in ListActions Signed-off-by: machichima <nary12321@gmail.com> * fix: prevent error when create default project on startup Signed-off-by: machichima <nary12321@gmail.com> * feat: add watch_actions.sh test script Signed-off-by: machichima <nary12321@gmail.com> * fix: continue for nil enrichedAction for defense Signed-off-by: machichima <nary12321@gmail.com> * fix: raise error in CreateProject when already exists Signed-off-by: machichima <nary12321@gmail.com> --------- Signed-off-by: machichima <nary12321@gmail.com>
Tracking issue
Related to: #6971
Why are the changes needed?
We didn't return the
children_phase_countsfromWatchAction. This PR is to count and return this info efficiently.What changes were proposed in this pull request?
runs/service/run_state_manager.goas an in memory cache forWatchAction, to store the node tree infoHow was this patch tested?
runs/test/scripts/watch_actions.shLabels
Please add one or more of the following labels to categorize your PR:
This is important to improve the readability of release notes.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link
main