feat(engine): add jump-to-operator support#4444
Conversation
|
I think we should let the coordinator jump back to a stage, rather than "schedule" or "scheduler". |
Signed-off-by: Xinyuan Lin <xinyual3@uci.edu>
Updated based on the review. I refactored the implementation so that:
|
Thanks a lot! Let's keep plan/schedule those data classes immutable. I can later add tests to guard this property. |
|
A random comment: the PR number 4444 is very special :-) |
one day we will reach 44444 ;) |
|
@Yicong-Huang, please have another pass. |
|
Making a major revision based on the offline discussion with @Yicong-Huang. I will let people know when the PR is ready for review. |
|
@Yicong-Huang, please have a pass. The PR is updated based on the discussion. |
Yicong-Huang
left a comment
There was a problem hiding this comment.
LGTM in general, we can differ minor fix later. Please add more tests as this is a very new behavior
Add cases for multi-jump sequences, unknown-target rejection, jump before any pull, jump after schedule exhaustion, and forward jump. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reject any `Schedule` constructed with gaps or non-zero starting level keys. The schedule generator already produces contiguous-from-0 keys, so this only tightens the contract for direct callers and tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala # amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinatorSpec.scala
What changes were proposed in this PR?
Add a generic controller-side primitive for jumping execution to the region containing a target operator (
JumpToOperatorRegion).Design:
WorkflowSchedulerremains static; jump behavior does not live inWorkflowScheduler.WorkflowExecutionCoordinator, which holds a mutableSchedulereference and rewrites the iteration tail when a jump is requested.Scheduleexposes O(1)getLevelIndexOfOperatorlookup plus arewriteExecutionFrom(levelIndex)data primitive; jump policy stays out ofSchedule.ControllerProcessor.updateExecutionSchedule(...)afterWorkflowScheduler.updateSchedule(...), avoiding inline lambdas inControllerProcessor.Any related issues, documentation, discussions?
Closes #4443
Precursor test coverage for related modules (separate PRs against
main):ScheduleSpeccoveringScheduleiterator semanticsWorkflowSchedulerSpeccoveringWorkflowSchedulercontractHow was this PR tested?
sbt "WorkflowExecutionService/compile"— passes.sbt "WorkflowExecutionService/testOnly org.apache.texera.amber.engine.architecture.scheduling.WorkflowExecutionCoordinatorSpec"— 1/1 tests pass.Was this PR authored or co-authored using generative AI tooling?
Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)