Fix Phase.on_error signature#23574
Conversation
This comment has been minimized.
This comment has been minimized.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files🚀 New features to boost your workflow:
|
AAraKKe
left a comment
There was a problem hiding this comment.
Thanks! I am approving since the comment is not a blocker but I would like to undrestand a bit what the intention is with the final raise.
My Feedback Legend
Here's a quick guide to the prefixes I use in my comments:
praise: no action needed, just celebrate!
note: just a comment/information, no need to take any action.
question: I need clarification or I'm seeking to understand your approach.
nit: A minor, non-blocking issue (e.g., style, typo). Feel free to ignore.
suggestion: I'm proposing an improvement. This is optional but recommended.
request: A change I believe is necessary before this can be merged.
The only blocking comments are request, any other type of comment can be applied at discretion of the developer.
| async def on_finalize(self, exception: Exception | None) -> None: | ||
| if self._failed_phase is not None: | ||
| raise RuntimeError( | ||
| raise FatalProcessingError( |
There was a problem hiding this comment.
question: why are we raising the error again? This is the finalization step of the orchestrator. We can define the on_error hook in the orchestrator and see where we have failed and do whatever we decide there. Raising a FatalProcessingError again here does not make a lot of sense, I believe, since it is going to blow up at the end, nothing else can catch this.
Not sure if I am missing something but what is the intention of raising here? If the intention is to allow us to catch an error from the command when we launch the run method, maybe we can check whether on_finalize has been called with an exception right? If on_finalize does not receive an exception is that whatever happened we are ok with it. And then we can just printout an error message and raise again so we can catch it later if we need to.
There was a problem hiding this comment.
Thank you for your comment!! The raise was redundant: when a phase fails, on_message_received raises FatalProcessingError, which propagates out of process_messages() and is re-raised in _entry_point's except block before finalize() is even called. The raise in on_finalize was just replacing one FatalProcessingError with another — the caller of run() sees an exception either way.
Fixed in 07a42b0: on_finalize now uses exception as the primary guard (if it's None, the run was clean) and logs the pipeline failure message instead of raising.
Validation ReportAll 20 validations passed. Show details
|
* Fix Phase.on_error signature and test_orchestrator * Tighten on_error's error type * Log instead of raise in on_finalize
* Fix Phase.on_error signature and test_orchestrator * Tighten on_error's error type * Log instead of raise in on_finalize
What does this PR do?
Fixes three issues in the AI phase framework:
Phase.on_errorsignature — the override had(self, message: PhaseTrigger, error: Exception)but the base classBaseProcessor.on_errornow only takes(self, error: MessageProcessingError | ProcessorHookError)(see Route event bus hook errors through on_error with fail_fast policy #23489 and Tighten on_error handler type to OrchestratorHookError #23575). The orchestrator's_task_wrapperalways callsprocessor.on_error(wrapped_error)with a single argument, so the old signature meantPhase.on_errorwas never actually invoked — it silently failed with aTypeErrorthat was swallowed by thefail_fast=Falsepolicy.Pipeline abort propagation —
PhaseOrchestrator.on_finalizewas raisingRuntimeError, but the baseEventBusOrchestrator.finalizeonly letsFatalProcessingErrorandCancelledErrorpass through. Any other exception gets wrapped inOrchestratorHookErrorand swallowed by thefail_fast=Falsepolicy, so the abort error was silently dropped andrun()appeared to succeed after a phase failure. Changed to raiseFatalProcessingErrorso it propagates correctly. Updated the two affected tests accordingly.PhaseTrigger.idsimplification — the id was constructed asf"{phase_id}_finished_{message.id}", causing the string to accumulate the full causal chain of every preceding trigger (e.g.phase_B_finished_phase_A_finished_start). The id only needs to be unique within the queue, and since each phase emits at most one success trigger (guarded by_executed),f"{phase_id}_finished"is sufficient.Motivation
The
on_errormismatch was a latent bug introduced when the orchestrator was refactored to wrap errors before callingon_error. As a result, a failing phase would never write its failure checkpoint or emitPhaseFailedMessage, so the pipeline would silently stall instead of aborting. The other two changes were discovered while fixing this.Review checklist (to be filled by reviewers)
qa/skip-qalabel if the PR doesn't need to be tested during QA.backport/<branch-name>label to the PR and it will automatically open a backport PR once this one is merged