⚙️ FEATURE-#201: Separate Engine from Execution Strategy#202
⚙️ FEATURE-#201: Separate Engine from Execution Strategy#202FernandoCelmer merged 14 commits intodevelopfrom
Conversation
…mposable behaviors
…tegies to use TaskEngine
…ngine.execute_with_retry()
There was a problem hiding this comment.
🔍 Code Review
Code issues found: 6
| # | Severity | Comment |
|---|---|---|
| 1 | [Blocking] | Executor leak on success path |
| 2 | [Blocking] | Duration None when tracer calls end_task |
| 3 | [Blocking] | ExecutionWithClassError bypasses retry guard |
| 4 | [Suggestion] | Double storage read on checkpoint |
| 5 | [Suggestion] | Action docstring outdated after refactor |
| 6 | [Suggestion] | Parallel still uses deprecated Execution (see below) |
6. [Suggestion] Parallel still uses deprecated Execution
File: dotflow/core/workflow.py ~line 430
Problem — The new Execution docstring explicitly states: "New code should use TaskEngine directly." The strategies Sequential, SequentialGroup, and Background were migrated to TaskEngine, but Parallel.run() still uses Execution:
process = _mp.Process(
target=Execution,
args=(task, self.workflow_id, previous_context, self._flow_callback),
)This creates an architectural inconsistency: 3 out of 4 strategies use the new pattern, 1 uses the old one.
Failure scenario — Future developers may use Parallel as a reference and perpetuate the use of Execution instead of TaskEngine. Additionally, any bug fix or improvement made to TaskEngine will not automatically apply to tasks executed via Parallel, creating behavioral divergence between execution modes.
Fix — If the Parallel migration was intentionally deferred (e.g., multiprocessing pickling complexity), add an explicit TODO comment:
# TODO: Migrate Parallel to use TaskEngine directly.
# Currently uses Execution wrapper due to multiprocessing pickling constraints.
process = _mp.Process(
target=Execution,
args=(task, self.workflow_id, previous_context, self._flow_callback),
)
Description
Refactor the task execution architecture by introducing a
TaskEnginethat encapsulates the task lifecycle, separating it from execution strategies. Moves retry, timeout and backoff from@actionto the engine.Issue: 📌 ISSUE-#201
Summary
Phase 1 — TaskEngine
TaskEngine(dotflow/core/engine.py) — manages lifecycle via context manager: status transitions, duration, tracer, error handlingExecution— refactored as backward-compatible wrapper aroundTaskEngineFlowbase class —_has_checkpoint()and_restore_checkpoint()extracted (DRY)Sequential,SequentialGroup,Background) — useTaskEnginedirectlyPhase 2 — Retry/Timeout in Engine
execute_with_retry()— retry loop, timeout viaThreadPoolExecutor, backoffAction._run_action()— simplified to single attempt@actionattributes —retry,timeout,retry_delay,backoffpropagated to closurePhase 3 — Composable Context Managers
checkpoint_context()— automatic checkpoint post-executionTaskEngineBug Fix
Type of change
Validation
docs_src/examples verifiedChecklist: