Skip to content

⚙️ FEATURE: Managed mode — CLI reads WORKFLOW_ID and wires ServerAPI from environment #257

@FernandoCelmer

Description

@FernandoCelmer

Context

The library ships an abstract Server provider so that workflow and task status can be reported to a remote HTTP backend in real time. A working reference implementation lives in examples/server_flow/server_flow/server.py.

This is enough for a user who writes their own Python entry-point and wires the provider manually. It is not enough for a managed runner — any orchestrator that launches a container and calls:

dotflow start --step user_pkg.actions.step_one

…expecting status to flow back over HTTP without the user's code having to import anything extra.

A managed runner typically injects these environment variables into the container:

SERVER_BASE_URL=https://example.com/api/v1
SERVER_USER_TOKEN=<short-lived token>
WORKFLOW_ID=<UUID already known to the backend>
TASK_ID=<pre-created task row id>

For the loop to close, two things have to happen inside that container:

  1. The DotFlow instance must use the same workflow_id the backend knows about — otherwise status updates get written to a ghost workflow that the backend never sees.
  2. A ServerAPI provider must call back over HTTP so the remote observer sees Not started → In progress → Completed in real time.

Today neither happens automatically. The user would have to hand-roll both for every workflow — which defeats the "connect a repo, click run" UX a managed runner is selling.


Current behaviour (end-to-end trace)

Real container run captured in a managed environment:

  • Backend creates a workflow with id W = 0545d4ca-99d1-4e65-a2eb-90f199773faf.
  • Container is dispatched with the env vars above, including WORKFLOW_ID=W.
  • dotflow start --step user_pkg.actions.step_one runs.
  • Logs:
    INFO [dotflow]: ID 4e5016e5-3f8a-4041-a7b1-c3d9ec6f73b3 - 1 - In progress
    INFO [dotflow]: ID 4e5016e5-3f8a-4041-a7b1-c3d9ec6f73b3 - 1 - Completed
    
    The id 4e5016e5-... is not the W the backend tracks — it was generated randomly by DotFlow().
  • No PATCH /workflows/{W}/tasks/{tid} is ever issued because the default Server provider is a no-op.
  • From the observer's point of view the task stays in its initial state forever while the container has long since exited.

Root cause — what the lib does today

1. dotflow/cli/commands/start.py

class StartCommand(Command):
    def setup(self):
        workflow = self._new_workflow()
        workflow.task.add(
            step=self.params.step,
            callback=self.params.callback,
            initial_context=self.params.initial_context,
        )
        workflow.start(mode=self.params.mode)

    def _new_workflow(self):
        if not self.params.storage:
            return DotFlow()   # ← no workflow_id, no server config
        ...
        config = Config(storage=storage)
        return DotFlow(config=config)   # ← same problem

The CLI does not read WORKFLOW_ID, SERVER_BASE_URL or SERVER_USER_TOKEN. It also never attaches a Server provider that talks to a real HTTP endpoint.

2. dotflow/core/dotflow.py

class DotFlow:
    def __init__(self, config=None, workflow_id=None):
        self.workflow_id = workflow_id or uuid4()      # ← random if not passed
        self._config = config if config else Config()
        self._config.server.create_workflow(workflow=self.workflow_id)   # ← always fires

When workflow_id is not provided, a random UUID is generated. And create_workflow is called unconditionally — even if the caller passed an already-existing id.

3. dotflow/providers/server_default.py

class ServerDefault(Server):
    def create_workflow(self, workflow): pass
    def update_workflow(self, workflow, status=""): pass
    def create_task(self, task): pass
    def update_task(self, task): pass

Config() defaults to ServerDefault, which is a no-op. So unless the user explicitly wires a real provider, no HTTP calls happen.

4. The working reference lives in examples/

A working HTTP implementation already exists at examples/server_flow/server_flow/server.py:

class ServerAPI(Server):
    def __init__(self, base_url, user_token, timeout=15.0):
        self._base_url = base_url.rstrip("/")
        self._user_token = user_token
        self._timeout = timeout

    def create_workflow(self, workflow):
        self._post(f"{self._base_url}/workflows", json={"id": str(workflow)})

    def update_workflow(self, workflow, status=""):
        self._patch(f"{self._base_url}/workflows/{workflow}", json={"status": status})

    def create_task(self, task):
        data = task.result(max=5_000_000)
        data["id"] = data.pop("task_id", task.task_id)
        self._post(f"{self._base_url}/workflows/{task.workflow_id}/tasks", json=data)

    def update_task(self, task):
        data = task.result(max=5_000_000)
        self._patch(
            f"{self._base_url}/workflows/{task.workflow_id}/tasks/{task.task_id}",
            json=data,
        )

The example is excellent for self-hosted scenarios where the user copies the file into their project. It is not usable by a managed runner that cannot expect the user to write any integration code.


Proposed changes

1. Move ServerAPI into the library

Create dotflow/providers/server_api.py and expose it from dotflow.providers:

# dotflow/providers/__init__.py

from dotflow.providers.server_default import ServerDefault
from dotflow.providers.server_api import ServerAPI
from dotflow.providers.storage_default import StorageDefault
from dotflow.providers.storage_file import StorageFile
from dotflow.providers.storage_gcs import StorageGCS
from dotflow.providers.storage_s3 import StorageS3

__all__ = [
    "ServerAPI",
    "ServerDefault",
    "StorageDefault",
    "StorageFile",
    "StorageGCS",
    "StorageS3",
]
# dotflow/providers/server_api.py

"""ServerAPI — sends workflow/task events to a remote HTTP endpoint."""

from __future__ import annotations

import logging
from typing import Any

from requests import patch, post
from requests.exceptions import RequestException

from dotflow.abc.server import Server

logger = logging.getLogger(__name__)


class ServerAPI(Server):
    """HTTP implementation of the Server ABC.

    Posts workflow creation and task status updates to a remote API
    using a user token in the ``X-User-Token`` header. Failures are
    logged and swallowed so a temporary network glitch never crashes
    a running workflow.
    """

    def __init__(
        self,
        base_url: str,
        user_token: str,
        timeout: float = 15.0,
    ):
        self._base_url = base_url.rstrip("/")
        self._user_token = user_token
        self._timeout = timeout

    @property
    def _headers(self) -> dict:
        return {
            "X-User-Token": self._user_token,
            "Content-Type": "application/json",
        }

    def _post(self, url: str, json: dict) -> None:
        try:
            response = post(
                url,
                json=json,
                headers=self._headers,
                timeout=self._timeout,
            )
            logger.info(
                "POST %s [%s]",
                url,
                response.status_code,
            )
        except RequestException as error:
            logger.error("POST %s failed: %s", url, error)

    def _patch(self, url: str, json: dict) -> None:
        try:
            response = patch(
                url,
                json=json,
                headers=self._headers,
                timeout=self._timeout,
            )
            logger.info(
                "PATCH %s [%s]",
                url,
                response.status_code,
            )
        except RequestException as error:
            logger.error("PATCH %s failed: %s", url, error)

    def create_workflow(self, workflow: Any) -> None:
        self._post(
            f"{self._base_url}/workflows",
            json={"id": str(workflow)},
        )

    def update_workflow(self, workflow: Any, status: str = "") -> None:
        self._patch(
            f"{self._base_url}/workflows/{workflow}",
            json={"status": status},
        )

    def create_task(self, task: Any) -> None:
        data = task.result(max=5_000_000)
        data["id"] = data.pop("task_id", task.task_id)
        self._post(
            f"{self._base_url}/workflows/{task.workflow_id}/tasks",
            json=data,
        )

    def update_task(self, task: Any) -> None:
        data = task.result(max=5_000_000)
        self._patch(
            f"{self._base_url}/workflows/{task.workflow_id}/tasks/{task.task_id}",
            json=data,
        )

2. Make DotFlow.__init__ skip create_workflow when the id comes from outside

When a managed runner provides the workflow id, the workflow row already exists on the remote side. Calling create_workflow from inside the container would either duplicate the row or fail with a unique-id violation. The safest contract:

If the caller passed an explicit workflow_id, assume the workflow already exists on the server and do not call create_workflow.

# dotflow/core/dotflow.py

class DotFlow:
    def __init__(
        self,
        config: Config | None = None,
        workflow_id: str | None = None,
    ) -> None:
        self._externally_provided_id = workflow_id is not None
        self.workflow_id = workflow_id or uuid4()
        self._config = config if config else Config()

        if not self._externally_provided_id:
            self._config.server.create_workflow(workflow=self.workflow_id)

        self.task = TaskBuilder(
            config=self._config, workflow_id=self.workflow_id
        )

        self.start = partial(
            Manager,
            tasks=self.task.queue,
            workflow_id=self.workflow_id,
            config=self._config,
        )

        self.schedule = partial(
            self._config.scheduler.start, workflow=self.start
        )

Existing usage without an explicit workflow_id keeps working identically — the behaviour change is strictly additive.

3. CLI auto-wires managed mode from environment

StartCommand reads three optional env vars. If present, it activates managed mode:

# dotflow/cli/commands/start.py

import os
from os import system

from dotflow import Config, DotFlow
from dotflow.cli.command import Command
from dotflow.core.types.execution import TypeExecution
from dotflow.providers import (
    ServerAPI,
    ServerDefault,
    StorageDefault,
    StorageFile,
    StorageGCS,
    StorageS3,
)


class StartCommand(Command):
    def setup(self):
        workflow = self._new_workflow()

        workflow.task.add(
            step=self.params.step,
            callback=self.params.callback,
            initial_context=self.params.initial_context,
        )

        workflow.start(mode=self.params.mode)

        if self.params.mode == TypeExecution.BACKGROUND:
            system("/bin/bash")

    def _new_workflow(self):
        config = self._build_config()
        workflow_id = os.getenv("WORKFLOW_ID")
        return DotFlow(config=config, workflow_id=workflow_id)

    def _build_config(self):
        storage = self._build_storage()
        server = self._build_server()

        if storage is None and server is None:
            return None
        return Config(
            storage=storage or StorageDefault(),
            server=server or ServerDefault(),
        )

    def _build_server(self):
        base_url = os.getenv("SERVER_BASE_URL")
        user_token = os.getenv("SERVER_USER_TOKEN")
        if not base_url or not user_token:
            return None
        return ServerAPI(base_url=base_url, user_token=user_token)

    def _build_storage(self):
        if not self.params.storage:
            return None

        storage_classes = {
            "default": StorageDefault,
            "file": StorageFile,
            "s3": StorageS3,
            "gcs": StorageGCS,
        }
        storage_cls = storage_classes.get(self.params.storage)
        storage_with_path = {StorageDefault, StorageFile}

        if storage_cls in storage_with_path:
            return storage_cls(path=self.params.path)
        return storage_cls()

Key properties:

  • dotflow start --step ... with no env vars keeps getting a no-op ServerDefault (unchanged behaviour).
  • With SERVER_BASE_URL + SERVER_USER_TOKEN set, a real ServerAPI is wired automatically.
  • With WORKFLOW_ID set, that id is reused everywhere — logs, internal state, and HTTP callbacks all reference the same UUID the remote observer is watching.

Behaviour matrix

Env var set Stand-alone usage Managed runner usage
WORKFLOW_ID no yes
SERVER_BASE_URL no yes
SERVER_USER_TOKEN no yes
create_workflow fires on start yes no
update_task fires after each task no (ServerDefault) yes (ServerAPI)
Workflow id in logs random UUID matches WORKFLOW_ID

Acceptance criteria

  • dotflow.providers.ServerAPI is importable and documented
  • dotflow start --step X with no env vars behaves exactly as today
  • dotflow start --step X with WORKFLOW_ID set uses that id as workflow.workflow_id
  • dotflow start --step X with SERVER_BASE_URL and SERVER_USER_TOKEN set emits one HTTP call per task finish: PATCH {base}/workflows/{wid}/tasks/{tid}
  • dotflow start --step X with WORKFLOW_ID set does not call POST /workflows (no ghost workflow is created)
  • Network failures on the server provider are logged and swallowed — they never abort the running workflow
  • Unit tests cover each of the four matrix rows above
  • examples/server_flow/ continues to work unchanged (explicit-wire pattern remains supported)

Testing plan

Unit

  • ServerAPI constructor builds the expected headers and URL.
  • ServerAPI.update_task sends the right URL shape with workflow_id and task_id from the task object.
  • DotFlow(workflow_id="abc") does not call server.create_workflow.
  • DotFlow() (no id) calls server.create_workflow exactly once.
  • StartCommand._build_server returns None when either env var is missing, returns a ServerAPI when both are set.
  • StartCommand._new_workflow passes os.environ["WORKFLOW_ID"] into DotFlow when set.

Integration

  • Spin up a local HTTP stub that accepts PATCH /workflows/{wid}/tasks/{tid} and records calls.
  • Set SERVER_BASE_URL, SERVER_USER_TOKEN, WORKFLOW_ID env vars.
  • Run dotflow start --step tests.fixtures.dummy:workflow.
  • Assert the stub got at least one PATCH whose URL contains the WORKFLOW_ID and whose body contains the expected terminal status.

Migration and backwards compatibility

  • Self-hosted usage without any of the three env vars sees identical behaviour.
  • Anyone already passing workflow_id= explicitly in Python code keeps working — the only delta is that create_workflow is no longer called automatically in that case. The workflow is assumed to exist on the server, which matches the common case (checkpoint resume, managed runner). If a legacy script depended on the old implicit creation, the upgrade note should instruct them to either drop the explicit workflow_id or to call server.create_workflow themselves.
  • examples/server_flow/ stays in the repo as a reference for the explicit-wire pattern.

Non-goals

Explicitly out of scope for this issue (tracked separately):

  • Streaming live stdout/stderr from the container to a remote observer (would need WebSocket/SSE; today only final task status is reported).
  • Automatic retry with backoff on transient ServerAPI failures — current behaviour is log + continue, which is intentional for a first pass.
  • Per-task TASK_ID injection from env. The library already generates task ids; the remote backend should honour whatever the lib reports rather than pre-creating rows.

References

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions