Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
500 changes: 500 additions & 0 deletions src/specify_cli/__init__.py

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions src/specify_cli/workflows/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Workflow engine for multi-step, resumable automation workflows.

Provides:
- ``StepBase`` — abstract base every step type must implement.
- ``StepContext`` — execution context passed to each step.
- ``StepResult`` — return value from step execution.
- ``STEP_REGISTRY`` — maps ``type_key`` to ``StepBase`` subclass instances.
- ``WorkflowEngine`` — orchestrator that loads, validates, and executes
workflow YAML definitions.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .base import StepBase

# Maps step type_key → StepBase instance.
STEP_REGISTRY: dict[str, StepBase] = {}


def _register_step(step: StepBase) -> None:
"""Register a step type instance in the global registry.

Raises ``ValueError`` for falsy keys and ``KeyError`` for duplicates.
"""
key = step.type_key
if not key:
raise ValueError("Cannot register step type with an empty type_key.")
if key in STEP_REGISTRY:
raise KeyError(f"Step type with key {key!r} is already registered.")
STEP_REGISTRY[key] = step


def get_step_type(type_key: str) -> StepBase | None:
"""Return the step type for *type_key*, or ``None`` if not registered."""
return STEP_REGISTRY.get(type_key)


# -- Register built-in step types ----------------------------------------

def _register_builtin_steps() -> None:
"""Register all built-in step types."""
from .steps.command import CommandStep
from .steps.do_while import DoWhileStep
from .steps.fan_in import FanInStep
from .steps.fan_out import FanOutStep
from .steps.gate import GateStep
from .steps.if_then import IfThenStep
from .steps.shell import ShellStep
from .steps.switch import SwitchStep
from .steps.while_loop import WhileStep

_register_step(CommandStep())
_register_step(DoWhileStep())
_register_step(FanInStep())
_register_step(FanOutStep())
_register_step(GateStep())
_register_step(IfThenStep())
_register_step(ShellStep())
_register_step(SwitchStep())
_register_step(WhileStep())


_register_builtin_steps()
132 changes: 132 additions & 0 deletions src/specify_cli/workflows/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Base classes for workflow step types.

Provides:
- ``StepBase`` — abstract base every step type must implement.
- ``StepContext`` — execution context passed to each step.
- ``StepResult`` — return value from step execution.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class StepStatus(str, Enum):
"""Status of a step execution."""

PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
PAUSED = "paused"


class RunStatus(str, Enum):
"""Status of a workflow run."""

CREATED = "created"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
ABORTED = "aborted"


@dataclass
class StepContext:
"""Execution context passed to each step.

Contains everything the step needs to resolve expressions, dispatch
commands, and record results.
"""

#: Resolved workflow inputs (from user prompts / defaults).
inputs: dict[str, Any] = field(default_factory=dict)

#: Accumulated step results keyed by step ID.
#: Each entry is ``{"integration": ..., "model": ..., "options": ...,
#: "input": ..., "output": ...}``.
steps: dict[str, dict[str, Any]] = field(default_factory=dict)

#: Current fan-out item (set only inside fan-out iterations).
item: Any = None

#: Fan-in aggregated results (set only for fan-in steps).
fan_in: dict[str, Any] = field(default_factory=dict)

#: Workflow-level default integration key.
default_integration: str | None = None

#: Workflow-level default model.
default_model: str | None = None

#: Workflow-level default options.
default_options: dict[str, Any] = field(default_factory=dict)

#: Project root path.
project_root: str | None = None

#: Current run ID.
run_id: str | None = None


@dataclass
class StepResult:
"""Return value from a step execution."""

#: Step status.
status: StepStatus = StepStatus.COMPLETED

#: Output data (stored as ``steps.<id>.output``).
output: dict[str, Any] = field(default_factory=dict)

#: Nested steps to execute (for control-flow steps like if/then).
next_steps: list[dict[str, Any]] = field(default_factory=list)

#: Error message if step failed.
error: str | None = None


class StepBase(ABC):
"""Abstract base class for workflow step types.

Every step type — built-in or extension-provided — implements this
interface and registers in ``STEP_REGISTRY``.
"""

#: Matches the ``type:`` value in workflow YAML.
type_key: str = ""

@abstractmethod
def execute(self, config: dict[str, Any], context: StepContext) -> StepResult:
"""Execute the step with the given config and context.

Parameters
----------
config:
The step configuration from workflow YAML.
context:
The execution context with inputs, accumulated step results, etc.

Returns
-------
StepResult with status, output data, and optional nested steps.
"""

def validate(self, config: dict[str, Any]) -> list[str]:
"""Validate step configuration and return a list of error messages.

An empty list means the configuration is valid.
"""
errors: list[str] = []
if "id" not in config:
errors.append("Step is missing required 'id' field.")
return errors

def can_resume(self, state: dict[str, Any]) -> bool:
"""Return whether this step can be resumed from the given state."""
return True
Loading
Loading