Skip to content

Refactor: Separate Engine from Execution Strategy #201

@FernandoCelmer

Description

@FernandoCelmer

Motivation

Currently, the task lifecycle logic (managing status, handling errors, measuring duration) is mixed with the logic of how tasks are executed (sequential, parallel, background). This creates two problems:

  1. Execution does too much — it runs in __init__, mixing status control, function invocation, error handling, and callbacks into a single _execution() method.
  2. Strategies (Sequential, Parallel, etc.) duplicate logic — each one re-implements checkpoint handling, previous_context control, and the call to Execution.

Separating these responsibilities makes the code more testable, extensible, and easier to understand.


Proposal

Introduce a TaskEngine that encapsulates the task lifecycle (status, retry, timeout, error handling, duration), and keep the execution strategies (Sequential, Parallel, etc.) responsible only for ordering and parallelism.

Proposed Architecture

Strategy (Sequential, Parallel, ...)
  └── decides the ORDER of execution
       └── TaskEngine
            └── manages the LIFECYCLE of each task
                 └── calls the user function (@action)

Current Problem in Detail

1. Execution.__init__ runs the task in the constructor

# dotflow/core/execution.py (current)
class Execution:
    def __init__(self, task, workflow_id, previous_context=None, _flow_callback=basic_callback):
        self.task = task
        self.task.status = TypeStatus.IN_PROGRESS
        self.task.previous_context = previous_context
        self.task.workflow_id = workflow_id
        self._execution(_flow_callback)  # ← runs in __init__!

This prevents any external control over the execution lifecycle. For example, it's not possible to pause between creation and execution, or inject intermediate behavior.

2. _execution() mixes responsibilities

# dotflow/core/execution.py (current)
def _execution(self, _flow_callback):
    try:
        start = datetime.now()
        current_context = self.task.step(...)        # calls the function
        # ... handles class result ...
        self.task.current_context = current_context   # persists
        self.task.duration = (datetime.now() - start).total_seconds()  # measures time
        self.task.status = TypeStatus.COMPLETED       # changes status
    except Exception as err:
        self.task.errors = err                        # records error
        self.task.status = TypeStatus.FAILED          # changes status
    finally:
        self.task.callback(task=self.task)             # task callback
        _flow_callback(task=self.task)                 # flow callback

Status, duration, persistence, callbacks — all in one place. If we want to add new behavior (e.g., structured logging, metrics, pre/post execution hooks), we need to modify this method.

3. Strategies repeat the same pattern

Sequential, SequentialGroup, Background, and Parallel all repeat the same checkpoint + Execution + failure control pattern:

# This block appears in Sequential.run(), SequentialGroup._run_group(), Background._run_sequential()
for task in tasks:
    if self._has_checkpoint(task):
        # ... restore context ...
        continue

    Execution(task=task, workflow_id=..., previous_context=..., _flow_callback=...)

    previous_context = task.config.storage.get(...)

    if not self.ignore and task.status == TypeStatus.FAILED:
        break

Proposed Design

TaskEngine — manages the lifecycle

# dotflow/core/engine.py (proposal)
from contextlib import contextmanager
from datetime import datetime

from dotflow.core.action import Action
from dotflow.core.context import Context
from dotflow.core.types import TypeStatus


class TaskEngine:
    """Manages the execution lifecycle of a single task."""

    def __init__(self, task, workflow_id, previous_context=None):
        self.task = task
        self.workflow_id = workflow_id
        self.previous_context = previous_context
        self._start_time = None

    @contextmanager
    def start(self):
        """Prepares the task for execution and manages its lifecycle."""
        self.task.workflow_id = self.workflow_id
        self.task.previous_context = self.previous_context
        self.task.status = TypeStatus.IN_PROGRESS
        self._start_time = datetime.now()

        try:
            yield self
        except Exception as err:
            self.task.errors = err
            self.task.current_context = None
            self.task.status = TypeStatus.FAILED
        else:
            self.task.duration = (datetime.now() - self._start_time).total_seconds()
            if self.task.status == TypeStatus.IN_PROGRESS:
                self.task.status = TypeStatus.COMPLETED

    def execute(self):
        """Executes the task function and returns the context."""
        current_context = self.task.step(
            initial_context=self.task.initial_context,
            previous_context=self.task.previous_context,
            task=self.task,
        )

        if type(current_context.storage) not in Execution.VALID_OBJECTS:
            current_context = self._execute_class(current_context.storage)

        self.task.current_context = current_context
        return current_context

    def _execute_class(self, class_instance):
        """Existing _execution_with_class logic, moved here."""
        # ... (same current logic)
        pass

Simplified Strategies

# dotflow/core/workflow.py — Simplified Sequential (proposal)
class Sequential(Flow):
    def run(self):
        previous_context = Context(workflow_id=self.workflow_id)

        for task in self.tasks:
            if self._has_checkpoint(task):
                previous_context = self._restore_checkpoint(task)
                continue

            engine = TaskEngine(
                task=task,
                workflow_id=self.workflow_id,
                previous_context=previous_context,
            )

            with engine.start():
                engine.execute()

            self._flow_callback(task=task)

            previous_context = task.config.storage.get(
                key=task.config.storage.key(task=task)
            )

            if not self.ignore and task.status == TypeStatus.FAILED:
                break

Composable Behaviors via Context Managers

The context manager pattern allows adding new behaviors without modifying the engine:

# Example: adding metrics in the future (without changing TaskEngine)
with engine.start():
    with engine.timeout_context():     # future: timeout managed by engine
        engine.execute()

Benefits

Aspect Before After
Testability Need to mock the entire Execution to test a strategy Engine and strategy are independently testable
Retry Lives inside the @action decorator, invisible to the engine Can be moved to the engine (future phase), giving visibility to RETRYING status
Extensibility Adding pre/post execution hooks requires modifying _execution() Just add a context manager
DRY 4 strategies repeat the same checkpoint+execution pattern Pattern centralized in the engine
Debugging Stack trace starts in Execution.__init__ Explicit and traceable lifecycle

Implementation Plan

Phase 1 — Introduce TaskEngine (non-breaking)

  • Create dotflow/core/engine.py with the TaskEngine class
  • Move the logic from Execution._execution() and Execution._execution_with_class() to TaskEngine
  • Keep Execution as a compatibility wrapper that uses TaskEngine internally
  • Adapt strategies (Sequential, Parallel, etc.) to use TaskEngine
  • Extract the repeated checkpoint pattern into a method in the base Flow class
  • Unit tests for TaskEngine in isolation

Phase 2 — Move retry to the engine (controlled breaking change)

  • Add while engine.is_running() loop with retry support in the engine
  • Keep @action compatible but delegate retry to the engine when available
  • RETRY status transitions are now managed by the engine

Phase 3 — Composable context managers

  • engine.timeout_context() — timeout managed by the engine
  • engine.checkpoint_context() — automatic checkpoint pre/post execution
  • Document how to add custom behaviors via composition

Notes

  • Phase 1 is backward-compatible — Execution continues to work as a wrapper
  • Phase 2 may be breaking for users who depend on @action retry directly
  • The @action decorator continues to exist for parameter validation and async handling — the engine does not replace it, it orchestrates it

Metadata

Metadata

Assignees

No one assigned

    Labels

    0.15.0Introduced in v0.15.0enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions