From 58afa3e9ee08b8ec65d1c4cf12f96c95f7b5e827 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 16:30:34 +0000 Subject: [PATCH 1/4] Initial plan From 9ef46b4f200423413b56c6edc038df6c3efb09c6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 16:44:45 +0000 Subject: [PATCH 2/4] Add workflow engine with step registry, expression engine, catalog system, and CLI commands Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 500 +++++++++++++++ src/specify_cli/workflows/__init__.py | 66 ++ src/specify_cli/workflows/base.py | 132 ++++ src/specify_cli/workflows/catalog.py | 482 +++++++++++++++ src/specify_cli/workflows/engine.py | 575 ++++++++++++++++++ src/specify_cli/workflows/expressions.py | 290 +++++++++ src/specify_cli/workflows/steps/__init__.py | 1 + .../workflows/steps/command/__init__.py | 65 ++ .../workflows/steps/do_while/__init__.py | 51 ++ .../workflows/steps/fan_in/__init__.py | 52 ++ .../workflows/steps/fan_out/__init__.py | 56 ++ .../workflows/steps/gate/__init__.py | 64 ++ .../workflows/steps/if_then/__init__.py | 55 ++ .../workflows/steps/shell/__init__.py | 63 ++ .../workflows/steps/switch/__init__.py | 70 +++ .../workflows/steps/while_loop/__init__.py | 64 ++ workflows/catalog.community.json | 6 + workflows/catalog.json | 6 + workflows/speckit/workflow.yml | 59 ++ 19 files changed, 2657 insertions(+) create mode 100644 src/specify_cli/workflows/__init__.py create mode 100644 src/specify_cli/workflows/base.py create mode 100644 src/specify_cli/workflows/catalog.py create mode 100644 src/specify_cli/workflows/engine.py create mode 100644 src/specify_cli/workflows/expressions.py create mode 100644 src/specify_cli/workflows/steps/__init__.py create mode 100644 src/specify_cli/workflows/steps/command/__init__.py create mode 100644 src/specify_cli/workflows/steps/do_while/__init__.py create mode 100644 src/specify_cli/workflows/steps/fan_in/__init__.py create mode 100644 src/specify_cli/workflows/steps/fan_out/__init__.py create mode 100644 src/specify_cli/workflows/steps/gate/__init__.py create mode 100644 src/specify_cli/workflows/steps/if_then/__init__.py create mode 100644 src/specify_cli/workflows/steps/shell/__init__.py create mode 100644 src/specify_cli/workflows/steps/switch/__init__.py create mode 100644 src/specify_cli/workflows/steps/while_loop/__init__.py create mode 100644 workflows/catalog.community.json create mode 100644 workflows/catalog.json create mode 100644 workflows/speckit/workflow.yml diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index e37c4b45f..edfa47fea 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4072,6 +4072,506 @@ def extension_set_priority( console.print("\n[dim]Lower priority = higher precedence in template resolution[/dim]") +# ===== Workflow Commands ===== + +workflow_app = typer.Typer( + name="workflow", + help="Manage and run automation workflows", + add_completion=False, +) +app.add_typer(workflow_app, name="workflow") + +workflow_catalog_app = typer.Typer( + name="catalog", + help="Manage workflow catalogs", + add_completion=False, +) +workflow_app.add_typer(workflow_catalog_app, name="catalog") + + +@workflow_app.command("run") +def workflow_run( + source: str = typer.Argument(..., help="Workflow ID or YAML file path"), + input_values: list[str] = typer.Option( + None, "--input", "-i", help="Input values as key=value pairs" + ), +): + """Run a workflow from an installed ID or local YAML path.""" + from .workflows.engine import WorkflowEngine + + project_root = Path.cwd() + engine = WorkflowEngine(project_root) + + try: + definition = engine.load_workflow(source) + except FileNotFoundError: + console.print(f"[red]Error:[/red] Workflow not found: {source}") + raise typer.Exit(1) + except ValueError as exc: + console.print(f"[red]Error:[/red] Invalid workflow: {exc}") + raise typer.Exit(1) + + # Validate + errors = engine.validate(definition) + if errors: + console.print("[red]Workflow validation failed:[/red]") + for err in errors: + console.print(f" • {err}") + raise typer.Exit(1) + + # Parse inputs + inputs: dict[str, Any] = {} + if input_values: + for kv in input_values: + if "=" not in kv: + console.print(f"[red]Error:[/red] Invalid input format: {kv!r} (expected key=value)") + raise typer.Exit(1) + key, _, value = kv.partition("=") + inputs[key.strip()] = value.strip() + + console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})") + console.print(f"[dim]Version: {definition.version}[/dim]\n") + + try: + state = engine.execute(definition, inputs) + except ValueError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + except Exception as exc: + console.print(f"[red]Workflow failed:[/red] {exc}") + raise typer.Exit(1) + + status_colors = { + "completed": "green", + "paused": "yellow", + "failed": "red", + "aborted": "red", + } + color = status_colors.get(state.status.value, "white") + console.print(f"\n[{color}]Status: {state.status.value}[/{color}]") + console.print(f"[dim]Run ID: {state.run_id}[/dim]") + + if state.status.value == "paused": + console.print(f"\nResume with: [cyan]specify workflow resume {state.run_id}[/cyan]") + + +@workflow_app.command("resume") +def workflow_resume( + run_id: str = typer.Argument(..., help="Run ID to resume"), +): + """Resume a paused or failed workflow run.""" + from .workflows.engine import WorkflowEngine + + project_root = Path.cwd() + engine = WorkflowEngine(project_root) + + try: + state = engine.resume(run_id) + except FileNotFoundError: + console.print(f"[red]Error:[/red] Run not found: {run_id}") + raise typer.Exit(1) + except ValueError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + except Exception as exc: + console.print(f"[red]Resume failed:[/red] {exc}") + raise typer.Exit(1) + + status_colors = { + "completed": "green", + "paused": "yellow", + "failed": "red", + "aborted": "red", + } + color = status_colors.get(state.status.value, "white") + console.print(f"\n[{color}]Status: {state.status.value}[/{color}]") + + +@workflow_app.command("status") +def workflow_status( + run_id: str = typer.Argument(None, help="Run ID to inspect (shows all if omitted)"), +): + """Show workflow run status.""" + from .workflows.engine import WorkflowEngine + + project_root = Path.cwd() + engine = WorkflowEngine(project_root) + + if run_id: + try: + from .workflows.engine import RunState + state = RunState.load(run_id, project_root) + except FileNotFoundError: + console.print(f"[red]Error:[/red] Run not found: {run_id}") + raise typer.Exit(1) + + status_colors = { + "completed": "green", + "paused": "yellow", + "failed": "red", + "aborted": "red", + "running": "blue", + "created": "dim", + } + color = status_colors.get(state.status.value, "white") + + console.print(f"\n[bold cyan]Workflow Run: {state.run_id}[/bold cyan]") + console.print(f" Workflow: {state.workflow_id}") + console.print(f" Status: [{color}]{state.status.value}[/{color}]") + console.print(f" Created: {state.created_at}") + console.print(f" Updated: {state.updated_at}") + + if state.current_step_id: + console.print(f" Current: {state.current_step_id}") + + if state.step_results: + console.print(f"\n [bold]Steps ({len(state.step_results)}):[/bold]") + for step_id, step_data in state.step_results.items(): + s = step_data.get("status", "unknown") + sc = {"completed": "green", "failed": "red", "paused": "yellow"}.get(s, "white") + console.print(f" [{sc}]●[/{sc}] {step_id}: {s}") + else: + runs = engine.list_runs() + if not runs: + console.print("[yellow]No workflow runs found.[/yellow]") + return + + console.print("\n[bold cyan]Workflow Runs:[/bold cyan]\n") + for run_data in runs: + s = run_data.get("status", "unknown") + sc = {"completed": "green", "failed": "red", "paused": "yellow", "running": "blue"}.get(s, "white") + console.print( + f" [{sc}]●[/{sc}] {run_data['run_id']} " + f"{run_data.get('workflow_id', '?')} " + f"[{sc}]{s}[/{sc}] " + f"[dim]{run_data.get('updated_at', '?')}[/dim]" + ) + + +@workflow_app.command("list") +def workflow_list(): + """List installed workflows.""" + from .workflows.catalog import WorkflowRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + installed = registry.list() + + if not installed: + console.print("[yellow]No workflows installed.[/yellow]") + console.print("\nInstall a workflow with:") + console.print(" [cyan]specify workflow add [/cyan]") + return + + console.print("\n[bold cyan]Installed Workflows:[/bold cyan]\n") + for wf_id, wf_data in installed.items(): + console.print(f" [bold]{wf_data.get('name', wf_id)}[/bold] ({wf_id}) v{wf_data.get('version', '?')}") + desc = wf_data.get("description", "") + if desc: + console.print(f" {desc}") + console.print() + + +@workflow_app.command("add") +def workflow_add( + source: str = typer.Argument(..., help="Workflow ID, URL, or local path"), +): + """Install a workflow from catalog, URL, or local path.""" + from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError + from .workflows.engine import WorkflowDefinition + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + workflows_dir = project_root / ".specify" / "workflows" + + # Try as a local file/directory + source_path = Path(source) + if source_path.exists(): + if source_path.is_file() and source_path.suffix in (".yml", ".yaml"): + # Install from local YAML file + try: + definition = WorkflowDefinition.from_yaml(source_path) + except (ValueError, yaml.YAMLError) as exc: + console.print(f"[red]Error:[/red] Invalid workflow YAML: {exc}") + raise typer.Exit(1) + + dest_dir = workflows_dir / definition.id + dest_dir.mkdir(parents=True, exist_ok=True) + import shutil + shutil.copy2(source_path, dest_dir / "workflow.yml") + + registry.add(definition.id, { + "name": definition.name, + "version": definition.version, + "description": definition.description, + "source": str(source_path), + }) + console.print(f"[green]✓[/green] Workflow '{definition.name}' ({definition.id}) installed") + return + elif source_path.is_dir(): + wf_file = source_path / "workflow.yml" + if not wf_file.exists(): + console.print(f"[red]Error:[/red] No workflow.yml found in {source}") + raise typer.Exit(1) + try: + definition = WorkflowDefinition.from_yaml(wf_file) + except (ValueError, yaml.YAMLError) as exc: + console.print(f"[red]Error:[/red] Invalid workflow YAML: {exc}") + raise typer.Exit(1) + + dest_dir = workflows_dir / definition.id + dest_dir.mkdir(parents=True, exist_ok=True) + import shutil + shutil.copy2(wf_file, dest_dir / "workflow.yml") + + registry.add(definition.id, { + "name": definition.name, + "version": definition.version, + "description": definition.description, + "source": str(source_path), + }) + console.print(f"[green]✓[/green] Workflow '{definition.name}' ({definition.id}) installed") + return + + # Try from catalog + catalog = WorkflowCatalog(project_root) + try: + info = catalog.get_workflow_info(source) + except WorkflowCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + if not info: + console.print(f"[red]Error:[/red] Workflow '{source}' not found in catalog") + raise typer.Exit(1) + + if not info.get("_install_allowed", True): + console.print(f"[yellow]Warning:[/yellow] Workflow '{source}' is from a discovery-only catalog") + console.print("Direct installation is not enabled for this catalog source.") + raise typer.Exit(1) + + # Register as available (actual download would happen from the URL) + registry.add(source, { + "name": info.get("name", source), + "version": info.get("version", "0.0.0"), + "description": info.get("description", ""), + "source": "catalog", + "catalog_name": info.get("_catalog_name", ""), + }) + console.print(f"[green]✓[/green] Workflow '{info.get('name', source)}' registered from catalog") + + +@workflow_app.command("remove") +def workflow_remove( + workflow_id: str = typer.Argument(..., help="Workflow ID to uninstall"), +): + """Uninstall a workflow.""" + from .workflows.catalog import WorkflowRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = WorkflowRegistry(project_root) + + if not registry.is_installed(workflow_id): + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' is not installed") + raise typer.Exit(1) + + # Remove workflow files + workflow_dir = project_root / ".specify" / "workflows" / workflow_id + if workflow_dir.exists(): + import shutil + shutil.rmtree(workflow_dir) + + registry.remove(workflow_id) + console.print(f"[green]✓[/green] Workflow '{workflow_id}' removed") + + +@workflow_app.command("search") +def workflow_search( + query: str = typer.Argument(None, help="Search query"), + tag: str = typer.Option(None, "--tag", help="Filter by tag"), +): + """Search workflow catalogs.""" + from .workflows.catalog import WorkflowCatalog, WorkflowCatalogError + + project_root = Path.cwd() + catalog = WorkflowCatalog(project_root) + + try: + results = catalog.search(query=query, tag=tag) + except WorkflowCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + if not results: + console.print("[yellow]No workflows found.[/yellow]") + return + + console.print(f"\n[bold cyan]Workflows ({len(results)}):[/bold cyan]\n") + for wf in results: + console.print(f" [bold]{wf.get('name', wf.get('id', '?'))}[/bold] ({wf.get('id', '?')}) v{wf.get('version', '?')}") + desc = wf.get("description", "") + if desc: + console.print(f" {desc}") + tags = wf.get("tags", []) + if tags: + console.print(f" [dim]Tags: {', '.join(tags)}[/dim]") + console.print() + + +@workflow_app.command("info") +def workflow_info( + workflow_id: str = typer.Argument(..., help="Workflow ID"), +): + """Show workflow details and step graph.""" + from .workflows.catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError + from .workflows.engine import WorkflowEngine + + project_root = Path.cwd() + + # Check installed first + registry = WorkflowRegistry(project_root) + installed = registry.get(workflow_id) + + engine = WorkflowEngine(project_root) + + definition = None + try: + definition = engine.load_workflow(workflow_id) + except FileNotFoundError: + pass + + if definition: + console.print(f"\n[bold cyan]{definition.name}[/bold cyan] ({definition.id})") + console.print(f" Version: {definition.version}") + if definition.author: + console.print(f" Author: {definition.author}") + if definition.description: + console.print(f" Description: {definition.description}") + if definition.default_integration: + console.print(f" Integration: {definition.default_integration}") + if installed: + console.print(" [green]Installed[/green]") + + if definition.inputs: + console.print("\n [bold]Inputs:[/bold]") + for name, inp in definition.inputs.items(): + if isinstance(inp, dict): + req = "required" if inp.get("required") else "optional" + console.print(f" {name} ({inp.get('type', 'string')}) — {req}") + + if definition.steps: + console.print(f"\n [bold]Steps ({len(definition.steps)}):[/bold]") + for step in definition.steps: + stype = step.get("type", "command") + console.print(f" → {step.get('id', '?')} [{stype}]") + return + + # Try catalog + catalog = WorkflowCatalog(project_root) + try: + info = catalog.get_workflow_info(workflow_id) + except WorkflowCatalogError: + info = None + + if info: + console.print(f"\n[bold cyan]{info.get('name', workflow_id)}[/bold cyan] ({workflow_id})") + console.print(f" Version: {info.get('version', '?')}") + if info.get("description"): + console.print(f" Description: {info['description']}") + if info.get("tags"): + console.print(f" Tags: {', '.join(info['tags'])}") + console.print(" [yellow]Not installed[/yellow]") + else: + console.print(f"[red]Error:[/red] Workflow '{workflow_id}' not found") + raise typer.Exit(1) + + +@workflow_catalog_app.command("list") +def workflow_catalog_list(): + """List configured workflow catalog sources.""" + from .workflows.catalog import WorkflowCatalog, WorkflowCatalogError + + project_root = Path.cwd() + catalog = WorkflowCatalog(project_root) + + try: + configs = catalog.get_catalog_configs() + except WorkflowCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print("\n[bold cyan]Workflow Catalog Sources:[/bold cyan]\n") + for i, cfg in enumerate(configs): + install_status = "[green]install allowed[/green]" if cfg["install_allowed"] else "[yellow]discovery only[/yellow]" + console.print(f" [{i}] [bold]{cfg['name']}[/bold] — {install_status}") + console.print(f" {cfg['url']}") + if cfg.get("description"): + console.print(f" [dim]{cfg['description']}[/dim]") + console.print() + + +@workflow_catalog_app.command("add") +def workflow_catalog_add( + url: str = typer.Argument(..., help="Catalog URL to add"), + name: str = typer.Option(None, "--name", help="Catalog name"), +): + """Add a workflow catalog source.""" + from .workflows.catalog import WorkflowCatalog, WorkflowValidationError + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = WorkflowCatalog(project_root) + try: + catalog.add_catalog(url, name) + except WorkflowValidationError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print(f"[green]✓[/green] Catalog source added: {url}") + + +@workflow_catalog_app.command("remove") +def workflow_catalog_remove( + index: int = typer.Argument(..., help="Catalog index to remove (from 'catalog list')"), +): + """Remove a workflow catalog source by index.""" + from .workflows.catalog import WorkflowCatalog, WorkflowValidationError + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = WorkflowCatalog(project_root) + try: + removed_name = catalog.remove_catalog(index) + except WorkflowValidationError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print(f"[green]✓[/green] Catalog source '{removed_name}' removed") + + def main(): app() diff --git a/src/specify_cli/workflows/__init__.py b/src/specify_cli/workflows/__init__.py new file mode 100644 index 000000000..047d6e74b --- /dev/null +++ b/src/specify_cli/workflows/__init__.py @@ -0,0 +1,66 @@ +"""Workflow engine for multi-step, resumable automation workflows. + +Provides: +- ``StepBase`` — abstract base every step type must implement. +- ``StepContext`` — execution context passed to each step. +- ``StepResult`` — return value from step execution. +- ``STEP_REGISTRY`` — maps ``type_key`` to ``StepBase`` subclass instances. +- ``WorkflowEngine`` — orchestrator that loads, validates, and executes + workflow YAML definitions. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .base import StepBase + +# Maps step type_key → StepBase instance. +STEP_REGISTRY: dict[str, StepBase] = {} + + +def _register_step(step: StepBase) -> None: + """Register a step type instance in the global registry. + + Raises ``ValueError`` for falsy keys and ``KeyError`` for duplicates. + """ + key = step.type_key + if not key: + raise ValueError("Cannot register step type with an empty type_key.") + if key in STEP_REGISTRY: + raise KeyError(f"Step type with key {key!r} is already registered.") + STEP_REGISTRY[key] = step + + +def get_step_type(type_key: str) -> StepBase | None: + """Return the step type for *type_key*, or ``None`` if not registered.""" + return STEP_REGISTRY.get(type_key) + + +# -- Register built-in step types ---------------------------------------- + +def _register_builtin_steps() -> None: + """Register all built-in step types.""" + from .steps.command import CommandStep + from .steps.do_while import DoWhileStep + from .steps.fan_in import FanInStep + from .steps.fan_out import FanOutStep + from .steps.gate import GateStep + from .steps.if_then import IfThenStep + from .steps.shell import ShellStep + from .steps.switch import SwitchStep + from .steps.while_loop import WhileStep + + _register_step(CommandStep()) + _register_step(DoWhileStep()) + _register_step(FanInStep()) + _register_step(FanOutStep()) + _register_step(GateStep()) + _register_step(IfThenStep()) + _register_step(ShellStep()) + _register_step(SwitchStep()) + _register_step(WhileStep()) + + +_register_builtin_steps() diff --git a/src/specify_cli/workflows/base.py b/src/specify_cli/workflows/base.py new file mode 100644 index 000000000..b144ca903 --- /dev/null +++ b/src/specify_cli/workflows/base.py @@ -0,0 +1,132 @@ +"""Base classes for workflow step types. + +Provides: +- ``StepBase`` — abstract base every step type must implement. +- ``StepContext`` — execution context passed to each step. +- ``StepResult`` — return value from step execution. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + + +class StepStatus(str, Enum): + """Status of a step execution.""" + + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + SKIPPED = "skipped" + PAUSED = "paused" + + +class RunStatus(str, Enum): + """Status of a workflow run.""" + + CREATED = "created" + RUNNING = "running" + PAUSED = "paused" + COMPLETED = "completed" + FAILED = "failed" + ABORTED = "aborted" + + +@dataclass +class StepContext: + """Execution context passed to each step. + + Contains everything the step needs to resolve expressions, dispatch + commands, and record results. + """ + + #: Resolved workflow inputs (from user prompts / defaults). + inputs: dict[str, Any] = field(default_factory=dict) + + #: Accumulated step results keyed by step ID. + #: Each entry is ``{"integration": ..., "model": ..., "options": ..., + #: "input": ..., "output": ...}``. + steps: dict[str, dict[str, Any]] = field(default_factory=dict) + + #: Current fan-out item (set only inside fan-out iterations). + item: Any = None + + #: Fan-in aggregated results (set only for fan-in steps). + fan_in: dict[str, Any] = field(default_factory=dict) + + #: Workflow-level default integration key. + default_integration: str | None = None + + #: Workflow-level default model. + default_model: str | None = None + + #: Workflow-level default options. + default_options: dict[str, Any] = field(default_factory=dict) + + #: Project root path. + project_root: str | None = None + + #: Current run ID. + run_id: str | None = None + + +@dataclass +class StepResult: + """Return value from a step execution.""" + + #: Step status. + status: StepStatus = StepStatus.COMPLETED + + #: Output data (stored as ``steps..output``). + output: dict[str, Any] = field(default_factory=dict) + + #: Nested steps to execute (for control-flow steps like if/then). + next_steps: list[dict[str, Any]] = field(default_factory=list) + + #: Error message if step failed. + error: str | None = None + + +class StepBase(ABC): + """Abstract base class for workflow step types. + + Every step type — built-in or extension-provided — implements this + interface and registers in ``STEP_REGISTRY``. + """ + + #: Matches the ``type:`` value in workflow YAML. + type_key: str = "" + + @abstractmethod + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + """Execute the step with the given config and context. + + Parameters + ---------- + config: + The step configuration from workflow YAML. + context: + The execution context with inputs, accumulated step results, etc. + + Returns + ------- + StepResult with status, output data, and optional nested steps. + """ + + def validate(self, config: dict[str, Any]) -> list[str]: + """Validate step configuration and return a list of error messages. + + An empty list means the configuration is valid. + """ + errors: list[str] = [] + if "id" not in config: + errors.append("Step is missing required 'id' field.") + return errors + + def can_resume(self, state: dict[str, Any]) -> bool: + """Return whether this step can be resumed from the given state.""" + return True diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py new file mode 100644 index 000000000..8d6665c38 --- /dev/null +++ b/src/specify_cli/workflows/catalog.py @@ -0,0 +1,482 @@ +"""Workflow catalog — discovery, install, and management of workflows. + +Mirrors the existing extension/preset catalog pattern with: +- Multi-catalog stack (env var → project → user → built-in) +- SHA256-hashed per-URL caching with 1-hour TTL +- Workflow registry for installed workflow tracking +- Search across all configured catalog sources +""" + +from __future__ import annotations + +import hashlib +import json +import os +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import yaml + + +# --------------------------------------------------------------------------- +# Errors +# --------------------------------------------------------------------------- + + +class WorkflowCatalogError(Exception): + """Base error for workflow catalog operations.""" + + +class WorkflowValidationError(WorkflowCatalogError): + """Validation error for catalog config or workflow data.""" + + +# --------------------------------------------------------------------------- +# CatalogEntry +# --------------------------------------------------------------------------- + + +@dataclass +class WorkflowCatalogEntry: + """Represents a single catalog source in the catalog stack.""" + + url: str + name: str + priority: int + install_allowed: bool + description: str = "" + + +# --------------------------------------------------------------------------- +# WorkflowRegistry +# --------------------------------------------------------------------------- + + +class WorkflowRegistry: + """Manages the registry of installed workflows. + + Tracks installed workflows and their metadata in + ``.specify/workflows/workflow-registry.json``. + """ + + REGISTRY_FILE = "workflow-registry.json" + SCHEMA_VERSION = "1.0" + + def __init__(self, project_root: Path) -> None: + self.project_root = project_root + self.workflows_dir = project_root / ".specify" / "workflows" + self.registry_path = self.workflows_dir / self.REGISTRY_FILE + self.data = self._load() + + def _load(self) -> dict[str, Any]: + """Load registry from disk or create default.""" + if self.registry_path.exists(): + with open(self.registry_path, encoding="utf-8") as f: + return json.load(f) + return {"schema_version": self.SCHEMA_VERSION, "workflows": {}} + + def save(self) -> None: + """Persist registry to disk.""" + self.workflows_dir.mkdir(parents=True, exist_ok=True) + with open(self.registry_path, "w", encoding="utf-8") as f: + json.dump(self.data, f, indent=2) + + def add(self, workflow_id: str, metadata: dict[str, Any]) -> None: + """Add or update an installed workflow entry.""" + from datetime import datetime, timezone + + existing = self.data["workflows"].get(workflow_id, {}) + metadata["installed_at"] = existing.get( + "installed_at", datetime.now(timezone.utc).isoformat() + ) + metadata["updated_at"] = datetime.now(timezone.utc).isoformat() + self.data["workflows"][workflow_id] = metadata + self.save() + + def remove(self, workflow_id: str) -> bool: + """Remove an installed workflow entry. Returns True if found.""" + if workflow_id in self.data["workflows"]: + del self.data["workflows"][workflow_id] + self.save() + return True + return False + + def get(self, workflow_id: str) -> dict[str, Any] | None: + """Get metadata for an installed workflow.""" + return self.data["workflows"].get(workflow_id) + + def list(self) -> dict[str, dict[str, Any]]: + """Return all installed workflows.""" + return dict(self.data["workflows"]) + + def is_installed(self, workflow_id: str) -> bool: + """Check if a workflow is installed.""" + return workflow_id in self.data["workflows"] + + +# --------------------------------------------------------------------------- +# WorkflowCatalog +# --------------------------------------------------------------------------- + + +class WorkflowCatalog: + """Manages workflow catalog fetching, caching, and searching. + + Resolution order for catalog sources: + 1. ``SPECKIT_WORKFLOW_CATALOG_URL`` env var (overrides all) + 2. Project-level ``.specify/workflow-catalogs.yml`` + 3. User-level ``~/.specify/workflow-catalogs.yml`` + 4. Built-in defaults (official + community) + """ + + DEFAULT_CATALOG_URL = ( + "https://raw.githubusercontent.com/github/spec-kit/main/" + "workflows/catalog.json" + ) + COMMUNITY_CATALOG_URL = ( + "https://raw.githubusercontent.com/github/spec-kit/main/" + "workflows/catalog.community.json" + ) + CACHE_DURATION = 3600 # 1 hour + + def __init__(self, project_root: Path) -> None: + self.project_root = project_root + self.workflows_dir = project_root / ".specify" / "workflows" + self.cache_dir = self.workflows_dir / ".cache" + + # -- Catalog resolution ----------------------------------------------- + + def _validate_catalog_url(self, url: str) -> None: + """Validate that a catalog URL uses HTTPS (localhost HTTP allowed).""" + from urllib.parse import urlparse + + parsed = urlparse(url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + if parsed.scheme != "https" and not ( + parsed.scheme == "http" and is_localhost + ): + raise WorkflowValidationError( + f"Catalog URL must use HTTPS (got {parsed.scheme}://). " + "HTTP is only allowed for localhost." + ) + if not parsed.netloc: + raise WorkflowValidationError( + "Catalog URL must be a valid URL with a host." + ) + + def _load_catalog_config( + self, config_path: Path + ) -> list[WorkflowCatalogEntry] | None: + """Load catalog stack configuration from a YAML file.""" + if not config_path.exists(): + return None + try: + data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + except (yaml.YAMLError, OSError, UnicodeError) as exc: + raise WorkflowValidationError( + f"Failed to read catalog config {config_path}: {exc}" + ) + catalogs_data = data.get("catalogs", []) + if not catalogs_data: + raise WorkflowValidationError( + f"Catalog config {config_path} exists but contains no " + f"'catalogs' entries." + ) + if not isinstance(catalogs_data, list): + raise WorkflowValidationError( + f"Invalid catalog config: 'catalogs' must be a list, " + f"got {type(catalogs_data).__name__}" + ) + + entries: list[WorkflowCatalogEntry] = [] + skipped: list[int] = [] + for idx, item in enumerate(catalogs_data): + if not isinstance(item, dict): + raise WorkflowValidationError( + f"Invalid catalog entry at index {idx}: " + f"expected a mapping, got {type(item).__name__}" + ) + url = str(item.get("url", "")).strip() + if not url: + skipped.append(idx) + continue + self._validate_catalog_url(url) + try: + priority = int(item.get("priority", idx + 1)) + except (TypeError, ValueError): + raise WorkflowValidationError( + f"Invalid priority for catalog " + f"'{item.get('name', idx + 1)}': " + f"expected integer, got {item.get('priority')!r}" + ) + raw_install = item.get("install_allowed", False) + if isinstance(raw_install, str): + install_allowed = raw_install.strip().lower() in ( + "true", + "yes", + "1", + ) + else: + install_allowed = bool(raw_install) + entries.append( + WorkflowCatalogEntry( + url=url, + name=str(item.get("name", f"catalog-{idx + 1}")), + priority=priority, + install_allowed=install_allowed, + description=str(item.get("description", "")), + ) + ) + entries.sort(key=lambda e: e.priority) + if not entries: + raise WorkflowValidationError( + f"Catalog config {config_path} contains {len(catalogs_data)} " + f"entries but none have valid URLs." + ) + return entries + + def get_active_catalogs(self) -> list[WorkflowCatalogEntry]: + """Get the ordered list of active catalogs.""" + # 1. Environment variable override + env_url = os.environ.get("SPECKIT_WORKFLOW_CATALOG_URL", "").strip() + if env_url: + self._validate_catalog_url(env_url) + return [ + WorkflowCatalogEntry( + url=env_url, + name="env-override", + priority=1, + install_allowed=True, + description="From SPECKIT_WORKFLOW_CATALOG_URL", + ) + ] + + # 2. Project-level config + project_config = self.project_root / ".specify" / "workflow-catalogs.yml" + project_entries = self._load_catalog_config(project_config) + if project_entries is not None: + return project_entries + + # 3. User-level config + home = Path.home() + user_config = home / ".specify" / "workflow-catalogs.yml" + user_entries = self._load_catalog_config(user_config) + if user_entries is not None: + return user_entries + + # 4. Built-in defaults + return [ + WorkflowCatalogEntry( + url=self.DEFAULT_CATALOG_URL, + name="default", + priority=1, + install_allowed=True, + description="Official workflows", + ), + WorkflowCatalogEntry( + url=self.COMMUNITY_CATALOG_URL, + name="community", + priority=2, + install_allowed=False, + description="Community-contributed workflows (discovery only)", + ), + ] + + # -- Caching ---------------------------------------------------------- + + def _get_cache_paths(self, url: str) -> tuple[Path, Path]: + """Get cache file paths for a URL (hash-based).""" + url_hash = hashlib.sha256(url.encode()).hexdigest()[:16] + cache_file = self.cache_dir / f"workflow-catalog-{url_hash}.json" + meta_file = self.cache_dir / f"workflow-catalog-{url_hash}-meta.json" + return cache_file, meta_file + + def _is_url_cache_valid(self, url: str) -> bool: + """Check if cached data for a URL is still fresh.""" + _, meta_file = self._get_cache_paths(url) + if not meta_file.exists(): + return False + try: + with open(meta_file, encoding="utf-8") as f: + meta = json.load(f) + fetched_at = meta.get("fetched_at", 0) + return (time.time() - fetched_at) < self.CACHE_DURATION + except (json.JSONDecodeError, OSError): + return False + + def _fetch_single_catalog( + self, entry: WorkflowCatalogEntry, force_refresh: bool = False + ) -> dict[str, Any]: + """Fetch a single catalog, using cache when possible.""" + cache_file, meta_file = self._get_cache_paths(entry.url) + + if not force_refresh and self._is_url_cache_valid(entry.url): + try: + with open(cache_file, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + + # Fetch from URL + from urllib.request import urlopen + + try: + with urlopen(entry.url, timeout=30) as resp: # noqa: S310 + data = json.loads(resp.read().decode("utf-8")) + except Exception as exc: + # Fall back to cache if available + if cache_file.exists(): + with open(cache_file, encoding="utf-8") as f: + return json.load(f) + raise WorkflowCatalogError( + f"Failed to fetch catalog from {entry.url}: {exc}" + ) from exc + + # Write cache + self.cache_dir.mkdir(parents=True, exist_ok=True) + with open(cache_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + with open(meta_file, "w", encoding="utf-8") as f: + json.dump({"url": entry.url, "fetched_at": time.time()}, f) + + return data + + def _get_merged_workflows( + self, force_refresh: bool = False + ) -> dict[str, dict[str, Any]]: + """Merge workflows from all active catalogs (higher priority wins).""" + catalogs = self.get_active_catalogs() + merged: dict[str, dict[str, Any]] = {} + + # Process in reverse priority (lower priority first, higher overwrites) + for entry in reversed(catalogs): + try: + data = self._fetch_single_catalog(entry, force_refresh) + except WorkflowCatalogError: + continue + workflows = data.get("workflows", {}) + # Handle both dict and list formats + if isinstance(workflows, dict): + for wf_id, wf_data in workflows.items(): + wf_data["_catalog_name"] = entry.name + wf_data["_install_allowed"] = entry.install_allowed + merged[wf_id] = wf_data + elif isinstance(workflows, list): + for wf_data in workflows: + wf_id = wf_data.get("id", "") + if wf_id: + wf_data["_catalog_name"] = entry.name + wf_data["_install_allowed"] = entry.install_allowed + merged[wf_id] = wf_data + return merged + + # -- Public API ------------------------------------------------------- + + def search( + self, + query: str | None = None, + tag: str | None = None, + ) -> list[dict[str, Any]]: + """Search workflows across all configured catalogs.""" + merged = self._get_merged_workflows() + results: list[dict[str, Any]] = [] + + for wf_id, wf_data in merged.items(): + wf_data.setdefault("id", wf_id) + if query: + q = query.lower() + searchable = " ".join( + [ + wf_data.get("name", ""), + wf_data.get("description", ""), + wf_data.get("id", ""), + ] + ).lower() + if q not in searchable: + continue + if tag: + tags = wf_data.get("tags", []) + if tag.lower() not in [t.lower() for t in tags]: + continue + results.append(wf_data) + return results + + def get_workflow_info(self, workflow_id: str) -> dict[str, Any] | None: + """Get details for a specific workflow from the catalog.""" + merged = self._get_merged_workflows() + wf = merged.get(workflow_id) + if wf: + wf.setdefault("id", workflow_id) + return wf + + def get_catalog_configs(self) -> list[dict[str, Any]]: + """Return current catalog configuration as a list of dicts.""" + entries = self.get_active_catalogs() + return [ + { + "name": e.name, + "url": e.url, + "priority": e.priority, + "install_allowed": e.install_allowed, + "description": e.description, + } + for e in entries + ] + + def add_catalog(self, url: str, name: str | None = None) -> None: + """Add a catalog source to the project-level config.""" + self._validate_catalog_url(url) + config_path = self.project_root / ".specify" / "workflow-catalogs.yml" + + data: dict[str, Any] = {"catalogs": []} + if config_path.exists(): + data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or { + "catalogs": [] + } + + catalogs = data.get("catalogs", []) + # Check for duplicate URL + for cat in catalogs: + if cat.get("url") == url: + raise WorkflowValidationError( + f"Catalog URL already configured: {url}" + ) + + catalogs.append( + { + "name": name or f"catalog-{len(catalogs) + 1}", + "url": url, + "priority": len(catalogs) + 1, + "install_allowed": True, + "description": "", + } + ) + data["catalogs"] = catalogs + + config_path.parent.mkdir(parents=True, exist_ok=True) + with open(config_path, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False) + + def remove_catalog(self, index: int) -> str: + """Remove a catalog source by index (0-based). Returns the removed name.""" + config_path = self.project_root / ".specify" / "workflow-catalogs.yml" + if not config_path.exists(): + raise WorkflowValidationError("No catalog config file found.") + + data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + catalogs = data.get("catalogs", []) + + if index < 0 or index >= len(catalogs): + raise WorkflowValidationError( + f"Catalog index {index} out of range (0-{len(catalogs) - 1})." + ) + + removed = catalogs.pop(index) + data["catalogs"] = catalogs + + with open(config_path, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False) + + return removed.get("name", f"catalog-{index + 1}") diff --git a/src/specify_cli/workflows/engine.py b/src/specify_cli/workflows/engine.py new file mode 100644 index 000000000..183a12374 --- /dev/null +++ b/src/specify_cli/workflows/engine.py @@ -0,0 +1,575 @@ +"""Workflow engine — loads, validates, and executes workflow YAML definitions. + +The engine is the orchestrator that: +- Parses workflow YAML definitions +- Validates step configurations and requirements +- Executes steps sequentially, dispatching to the correct step type +- Manages state persistence for resume capability +- Handles control flow (branching, loops, fan-out/fan-in) +""" + +from __future__ import annotations + +import json +import re +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import yaml + +from .base import RunStatus, StepContext, StepResult, StepStatus + + +# -- Workflow Definition -------------------------------------------------- + + +class WorkflowDefinition: + """Parsed and validated workflow YAML definition.""" + + def __init__(self, data: dict[str, Any], source_path: Path | None = None) -> None: + self.data = data + self.source_path = source_path + + workflow = data.get("workflow", {}) + self.id: str = workflow.get("id", "") + self.name: str = workflow.get("name", "") + self.version: str = workflow.get("version", "0.0.0") + self.author: str = workflow.get("author", "") + self.description: str = workflow.get("description", "") + self.schema_version: str = data.get("schema_version", "1.0") + + # Defaults + self.default_integration: str | None = workflow.get("integration") + self.default_model: str | None = workflow.get("model") + self.default_options: dict[str, Any] = workflow.get("options", {}) + + # Requirements + self.requires: dict[str, Any] = data.get("requires", {}) + + # Inputs + self.inputs: dict[str, Any] = data.get("inputs", {}) + + # Steps + self.steps: list[dict[str, Any]] = data.get("steps", []) + + @classmethod + def from_yaml(cls, path: Path) -> WorkflowDefinition: + """Load a workflow definition from a YAML file.""" + with open(path, encoding="utf-8") as f: + data = yaml.safe_load(f) + if not isinstance(data, dict): + msg = f"Workflow YAML must be a mapping, got {type(data).__name__}." + raise ValueError(msg) + return cls(data, source_path=path) + + @classmethod + def from_string(cls, content: str) -> WorkflowDefinition: + """Load a workflow definition from a YAML string.""" + data = yaml.safe_load(content) + if not isinstance(data, dict): + msg = f"Workflow YAML must be a mapping, got {type(data).__name__}." + raise ValueError(msg) + return cls(data) + + +# -- Workflow Validation -------------------------------------------------- + +# ID format: lowercase alphanumeric with hyphens +_ID_PATTERN = re.compile(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$") + +# Valid step types (matching STEP_REGISTRY keys) +VALID_STEP_TYPES = { + "command", + "shell", + "gate", + "if", + "switch", + "while", + "do-while", + "fan-out", + "fan-in", +} + + +def validate_workflow(definition: WorkflowDefinition) -> list[str]: + """Validate a workflow definition and return a list of error messages. + + An empty list means the workflow is valid. + """ + errors: list[str] = [] + + # -- Top-level fields ------------------------------------------------- + if not definition.id: + errors.append("Workflow is missing 'workflow.id'.") + elif not _ID_PATTERN.match(definition.id): + errors.append( + f"Workflow ID {definition.id!r} must be lowercase alphanumeric " + f"with hyphens." + ) + + if not definition.name: + errors.append("Workflow is missing 'workflow.name'.") + + if not definition.version: + errors.append("Workflow is missing 'workflow.version'.") + + # -- Inputs ----------------------------------------------------------- + for input_name, input_def in definition.inputs.items(): + if not isinstance(input_def, dict): + errors.append(f"Input {input_name!r} must be a mapping.") + continue + input_type = input_def.get("type") + if input_type and input_type not in ("string", "number", "boolean"): + errors.append( + f"Input {input_name!r} has invalid type {input_type!r}. " + f"Must be 'string', 'number', or 'boolean'." + ) + + # -- Steps ------------------------------------------------------------ + if not definition.steps: + errors.append("Workflow has no steps defined.") + + seen_ids: set[str] = set() + _validate_steps(definition.steps, seen_ids, errors) + + return errors + + +def _validate_steps( + steps: list[dict[str, Any]], + seen_ids: set[str], + errors: list[str], +) -> None: + """Recursively validate a list of steps.""" + from . import STEP_REGISTRY + + for step_config in steps: + if not isinstance(step_config, dict): + errors.append(f"Step must be a mapping, got {type(step_config).__name__}.") + continue + + step_id = step_config.get("id") + if not step_id: + errors.append("Step is missing 'id' field.") + continue + + if step_id in seen_ids: + errors.append(f"Duplicate step ID {step_id!r}.") + seen_ids.add(step_id) + + # Determine step type + step_type = step_config.get("type", "command") + if step_type not in VALID_STEP_TYPES: + errors.append( + f"Step {step_id!r} has invalid type {step_type!r}." + ) + continue + + # Delegate to step-specific validation + step_impl = STEP_REGISTRY.get(step_type) + if step_impl: + step_errors = step_impl.validate(step_config) + errors.extend(step_errors) + + # Recursively validate nested steps + for nested_key in ("then", "else", "steps"): + nested = step_config.get(nested_key) + if isinstance(nested, list): + _validate_steps(nested, seen_ids, errors) + + # Validate switch cases + cases = step_config.get("cases") + if isinstance(cases, dict): + for _case_key, case_steps in cases.items(): + if isinstance(case_steps, list): + _validate_steps(case_steps, seen_ids, errors) + + # Validate switch default + default = step_config.get("default") + if isinstance(default, list): + _validate_steps(default, seen_ids, errors) + + # Validate fan-out nested step + fan_step = step_config.get("step") + if isinstance(fan_step, dict): + _validate_steps([fan_step], seen_ids, errors) + + +# -- Run State Persistence ------------------------------------------------ + + +class RunState: + """Manages workflow run state for persistence and resume.""" + + def __init__( + self, + run_id: str | None = None, + workflow_id: str = "", + project_root: Path | None = None, + ) -> None: + self.run_id = run_id or str(uuid.uuid4())[:8] + self.workflow_id = workflow_id + self.project_root = project_root or Path(".") + self.status = RunStatus.CREATED + self.current_step_index = 0 + self.current_step_id: str | None = None + self.step_results: dict[str, dict[str, Any]] = {} + self.inputs: dict[str, Any] = {} + self.created_at = datetime.now(timezone.utc).isoformat() + self.updated_at = self.created_at + self.log_entries: list[dict[str, Any]] = [] + + @property + def runs_dir(self) -> Path: + return self.project_root / ".specify" / "workflows" / "runs" / self.run_id + + def save(self) -> None: + """Persist current state to disk.""" + self.updated_at = datetime.now(timezone.utc).isoformat() + runs_dir = self.runs_dir + runs_dir.mkdir(parents=True, exist_ok=True) + + state_data = { + "run_id": self.run_id, + "workflow_id": self.workflow_id, + "status": self.status.value, + "current_step_index": self.current_step_index, + "current_step_id": self.current_step_id, + "step_results": self.step_results, + "created_at": self.created_at, + "updated_at": self.updated_at, + } + with open(runs_dir / "state.json", "w", encoding="utf-8") as f: + json.dump(state_data, f, indent=2) + + inputs_data = {"inputs": self.inputs} + with open(runs_dir / "inputs.json", "w", encoding="utf-8") as f: + json.dump(inputs_data, f, indent=2) + + @classmethod + def load(cls, run_id: str, project_root: Path) -> RunState: + """Load a run state from disk.""" + runs_dir = project_root / ".specify" / "workflows" / "runs" / run_id + state_path = runs_dir / "state.json" + if not state_path.exists(): + msg = f"Run state not found: {state_path}" + raise FileNotFoundError(msg) + + with open(state_path, encoding="utf-8") as f: + state_data = json.load(f) + + state = cls( + run_id=state_data["run_id"], + workflow_id=state_data["workflow_id"], + project_root=project_root, + ) + state.status = RunStatus(state_data["status"]) + state.current_step_index = state_data.get("current_step_index", 0) + state.current_step_id = state_data.get("current_step_id") + state.step_results = state_data.get("step_results", {}) + state.created_at = state_data.get("created_at", "") + state.updated_at = state_data.get("updated_at", "") + + inputs_path = runs_dir / "inputs.json" + if inputs_path.exists(): + with open(inputs_path, encoding="utf-8") as f: + inputs_data = json.load(f) + state.inputs = inputs_data.get("inputs", {}) + + return state + + def append_log(self, entry: dict[str, Any]) -> None: + """Append a log entry to the run log.""" + entry["timestamp"] = datetime.now(timezone.utc).isoformat() + self.log_entries.append(entry) + + runs_dir = self.runs_dir + runs_dir.mkdir(parents=True, exist_ok=True) + with open(runs_dir / "log.jsonl", "a", encoding="utf-8") as f: + f.write(json.dumps(entry) + "\n") + + +# -- Workflow Engine ------------------------------------------------------ + + +class WorkflowEngine: + """Orchestrator that loads, validates, and executes workflow definitions.""" + + def __init__(self, project_root: Path | None = None) -> None: + self.project_root = project_root or Path(".") + + def load_workflow(self, source: str | Path) -> WorkflowDefinition: + """Load a workflow from an installed ID or a local YAML path. + + Parameters + ---------- + source: + Either a workflow ID (looked up in the installed workflows + directory) or a path to a YAML file. + + Returns + ------- + A validated ``WorkflowDefinition``. + + Raises + ------ + FileNotFoundError: + If the workflow file cannot be found. + ValueError: + If the workflow YAML is invalid. + """ + path = Path(source) + + # Try as a direct file path first + if path.suffix in (".yml", ".yaml") and path.exists(): + return WorkflowDefinition.from_yaml(path) + + # Try as an installed workflow ID + installed_path = ( + self.project_root + / ".specify" + / "workflows" + / str(source) + / "workflow.yml" + ) + if installed_path.exists(): + return WorkflowDefinition.from_yaml(installed_path) + + msg = f"Workflow not found: {source}" + raise FileNotFoundError(msg) + + def validate(self, definition: WorkflowDefinition) -> list[str]: + """Validate a workflow definition.""" + return validate_workflow(definition) + + def execute( + self, + definition: WorkflowDefinition, + inputs: dict[str, Any] | None = None, + run_id: str | None = None, + ) -> RunState: + """Execute a workflow definition. + + Parameters + ---------- + definition: + The validated workflow definition. + inputs: + User-provided input values. + run_id: + Optional run ID (auto-generated if not provided). + + Returns + ------- + The final ``RunState`` after execution completes (or pauses). + """ + from . import STEP_REGISTRY + + state = RunState( + run_id=run_id, + workflow_id=definition.id, + project_root=self.project_root, + ) + + # Resolve inputs + resolved_inputs = self._resolve_inputs(definition, inputs or {}) + state.inputs = resolved_inputs + state.status = RunStatus.RUNNING + state.save() + + context = StepContext( + inputs=resolved_inputs, + default_integration=definition.default_integration, + default_model=definition.default_model, + default_options=definition.default_options, + project_root=str(self.project_root), + run_id=state.run_id, + ) + + # Execute steps + try: + self._execute_steps(definition.steps, context, state, STEP_REGISTRY) + except WorkflowAbortError: + state.status = RunStatus.ABORTED + state.append_log({"event": "workflow_aborted"}) + except Exception as exc: + state.status = RunStatus.FAILED + state.append_log({"event": "workflow_failed", "error": str(exc)}) + state.save() + raise + + if state.status == RunStatus.RUNNING: + state.status = RunStatus.COMPLETED + state.append_log({"event": "workflow_finished", "status": state.status.value}) + state.save() + return state + + def resume(self, run_id: str) -> RunState: + """Resume a paused or failed workflow run.""" + state = RunState.load(run_id, self.project_root) + if state.status not in (RunStatus.PAUSED, RunStatus.FAILED): + msg = f"Cannot resume run {run_id!r} with status {state.status.value!r}." + raise ValueError(msg) + + # Load the workflow definition + definition = self.load_workflow(state.workflow_id) + + # Restore context + context = StepContext( + inputs=state.inputs, + steps=state.step_results, + default_integration=definition.default_integration, + default_model=definition.default_model, + default_options=definition.default_options, + project_root=str(self.project_root), + run_id=state.run_id, + ) + + from . import STEP_REGISTRY + + state.status = RunStatus.RUNNING + state.save() + + # Resume from the current step + remaining_steps = definition.steps[state.current_step_index :] + try: + self._execute_steps(remaining_steps, context, state, STEP_REGISTRY) + except WorkflowAbortError: + state.status = RunStatus.ABORTED + state.append_log({"event": "workflow_aborted"}) + except Exception as exc: + state.status = RunStatus.FAILED + state.append_log({"event": "resume_failed", "error": str(exc)}) + state.save() + raise + + if state.status == RunStatus.RUNNING: + state.status = RunStatus.COMPLETED + state.save() + return state + + def _execute_steps( + self, + steps: list[dict[str, Any]], + context: StepContext, + state: RunState, + registry: dict[str, Any], + ) -> None: + """Execute a list of steps sequentially.""" + for i, step_config in enumerate(steps): + step_id = step_config.get("id", f"step-{i}") + step_type = step_config.get("type", "command") + + state.current_step_id = step_id + state.current_step_index = i + state.save() + + state.append_log( + {"event": "step_started", "step_id": step_id, "type": step_type} + ) + + step_impl = registry.get(step_type) + if not step_impl: + state.status = RunStatus.FAILED + state.append_log( + { + "event": "step_failed", + "step_id": step_id, + "error": f"Unknown step type: {step_type!r}", + } + ) + return + + result: StepResult = step_impl.execute(step_config, context) + + # Record step results in context + step_data = { + "integration": step_config.get("integration") + or context.default_integration, + "model": step_config.get("model") or context.default_model, + "options": step_config.get("options", {}), + "input": step_config.get("input", {}), + "output": result.output, + "status": result.status.value, + } + context.steps[step_id] = step_data + state.step_results[step_id] = step_data + + state.append_log( + { + "event": "step_completed", + "step_id": step_id, + "status": result.status.value, + } + ) + + # Handle gate pauses + if result.status == StepStatus.PAUSED: + state.status = RunStatus.PAUSED + state.save() + return + + # Handle failures + if result.status == StepStatus.FAILED: + state.status = RunStatus.FAILED + state.append_log( + { + "event": "step_failed", + "step_id": step_id, + "error": result.error, + } + ) + state.save() + return + + # Execute nested steps (from control flow) + if result.next_steps: + self._execute_steps(result.next_steps, context, state, registry) + if state.status in ( + RunStatus.PAUSED, + RunStatus.FAILED, + RunStatus.ABORTED, + ): + return + + def _resolve_inputs( + self, + definition: WorkflowDefinition, + provided: dict[str, Any], + ) -> dict[str, Any]: + """Resolve workflow inputs against definitions and provided values.""" + resolved: dict[str, Any] = {} + for name, input_def in definition.inputs.items(): + if not isinstance(input_def, dict): + continue + if name in provided: + resolved[name] = provided[name] + elif "default" in input_def: + resolved[name] = input_def["default"] + elif input_def.get("required", False): + msg = f"Required input {name!r} not provided." + raise ValueError(msg) + return resolved + + def list_runs(self) -> list[dict[str, Any]]: + """List all workflow runs in the project.""" + runs_dir = self.project_root / ".specify" / "workflows" / "runs" + if not runs_dir.exists(): + return [] + + runs: list[dict[str, Any]] = [] + for run_dir in sorted(runs_dir.iterdir()): + if not run_dir.is_dir(): + continue + state_path = run_dir / "state.json" + if state_path.exists(): + with open(state_path, encoding="utf-8") as f: + state_data = json.load(f) + runs.append(state_data) + return runs + + +class WorkflowAbortError(Exception): + """Raised when a workflow is aborted (e.g., gate rejection).""" diff --git a/src/specify_cli/workflows/expressions.py b/src/specify_cli/workflows/expressions.py new file mode 100644 index 000000000..af606722d --- /dev/null +++ b/src/specify_cli/workflows/expressions.py @@ -0,0 +1,290 @@ +"""Sandboxed expression evaluator for workflow templates. + +Provides a safe Jinja2 subset for evaluating expressions in workflow YAML. +No file I/O, no imports, no arbitrary code execution. +""" + +from __future__ import annotations + +import re +from typing import Any + + +# -- Custom filters ------------------------------------------------------- + +def _filter_default(value: Any, default_value: Any = "") -> Any: + """Return *default_value* when *value* is ``None`` or empty string.""" + if value is None or value == "": + return default_value + return value + + +def _filter_join(value: Any, separator: str = ", ") -> str: + """Join a list into a string with *separator*.""" + if isinstance(value, list): + return separator.join(str(v) for v in value) + return str(value) + + +def _filter_map(value: Any, attr: str) -> list[Any]: + """Map a list of dicts to a specific attribute.""" + if isinstance(value, list): + result = [] + for item in value: + if isinstance(item, dict): + # Support dot notation: "result.status" → item["result"]["status"] + parts = attr.split(".") + v = item + for part in parts: + if isinstance(v, dict): + v = v.get(part) + else: + v = None + break + result.append(v) + else: + result.append(item) + return result + return [] + + +def _filter_contains(value: Any, substring: str) -> bool: + """Check if a string or list contains *substring*.""" + if isinstance(value, str): + return substring in value + if isinstance(value, list): + return substring in value + return False + + +# -- Expression resolution ------------------------------------------------ + +_EXPR_PATTERN = re.compile(r"\{\{(.+?)\}\}") + + +def _resolve_dot_path(obj: Any, path: str) -> Any: + """Resolve a dotted path like ``steps.specify.output.file`` against *obj*. + + Supports dict key access and list indexing (e.g., ``task_list[0]``). + """ + parts = path.split(".") + current = obj + for part in parts: + # Handle list indexing: name[0] + idx_match = re.match(r"^(\w+)\[(\d+)\]$", part) + if idx_match: + key, idx = idx_match.group(1), int(idx_match.group(2)) + if isinstance(current, dict): + current = current.get(key) + else: + return None + if isinstance(current, list) and 0 <= idx < len(current): + current = current[idx] + else: + return None + elif isinstance(current, dict): + current = current.get(part) + else: + return None + if current is None: + return None + return current + + +def _build_namespace(context: Any) -> dict[str, Any]: + """Build the variable namespace from a StepContext.""" + ns: dict[str, Any] = {} + if hasattr(context, "inputs"): + ns["inputs"] = context.inputs or {} + if hasattr(context, "steps"): + ns["steps"] = context.steps or {} + if hasattr(context, "item"): + ns["item"] = context.item + if hasattr(context, "fan_in"): + ns["fan_in"] = context.fan_in or {} + return ns + + +def _evaluate_simple_expression(expr: str, namespace: dict[str, Any]) -> Any: + """Evaluate a simple expression against the namespace. + + Supports: + - Dot-path access: ``steps.specify.output.file`` + - Comparisons: ``==``, ``!=``, ``>``, ``<``, ``>=``, ``<=`` + - Boolean operators: ``and``, ``or``, ``not`` + - ``in``, ``not in`` + - Pipe filters: ``| default('...')``, ``| join(', ')``, ``| contains('...')``, ``| map('...')`` + - String and numeric literals + """ + expr = expr.strip() + + # Handle pipe filters first + if "|" in expr: + parts = expr.split("|", 1) + value = _evaluate_simple_expression(parts[0].strip(), namespace) + filter_expr = parts[1].strip() + + # Parse filter name and argument + filter_match = re.match(r"(\w+)\((.+)\)", filter_expr) + if filter_match: + fname = filter_match.group(1) + farg = filter_match.group(2).strip().strip("'\"") + if fname == "default": + return _filter_default(value, farg) + if fname == "join": + return _filter_join(value, farg) + if fname == "map": + return _filter_map(value, farg) + if fname == "contains": + return _filter_contains(value, farg) + # Filter without args + filter_name = filter_expr.strip() + if filter_name == "default": + return _filter_default(value) + return value + + # Boolean operators + if " and " in expr: + parts = expr.split(" and ", 1) + left = _evaluate_simple_expression(parts[0].strip(), namespace) + right = _evaluate_simple_expression(parts[1].strip(), namespace) + return bool(left) and bool(right) + + if " or " in expr: + parts = expr.split(" or ", 1) + left = _evaluate_simple_expression(parts[0].strip(), namespace) + right = _evaluate_simple_expression(parts[1].strip(), namespace) + return bool(left) or bool(right) + + if expr.startswith("not "): + inner = _evaluate_simple_expression(expr[4:].strip(), namespace) + return not bool(inner) + + # Comparison operators (order matters — check multi-char ops first) + for op in ("!=", "==", ">=", "<=", ">", "<", " not in ", " in "): + if op in expr: + parts = expr.split(op, 1) + left = _evaluate_simple_expression(parts[0].strip(), namespace) + right = _evaluate_simple_expression(parts[1].strip(), namespace) + if op == "==": + return left == right + if op == "!=": + return left != right + if op == ">": + return _safe_compare(left, right, ">") + if op == "<": + return _safe_compare(left, right, "<") + if op == ">=": + return _safe_compare(left, right, ">=") + if op == "<=": + return _safe_compare(left, right, "<=") + if op == " in ": + return left in right if right is not None else False + if op == " not in ": + return left not in right if right is not None else True + + # String literal + if (expr.startswith("'") and expr.endswith("'")) or ( + expr.startswith('"') and expr.endswith('"') + ): + return expr[1:-1] + + # Numeric literal + try: + if "." in expr: + return float(expr) + return int(expr) + except (ValueError, TypeError): + pass + + # Boolean literal + if expr.lower() == "true": + return True + if expr.lower() == "false": + return False + + # Null + if expr.lower() in ("none", "null"): + return None + + # List literal (simple) + if expr.startswith("[") and expr.endswith("]"): + inner = expr[1:-1].strip() + if not inner: + return [] + items = [_evaluate_simple_expression(i.strip(), namespace) for i in inner.split(",")] + return items + + # Variable reference (dot-path) + return _resolve_dot_path(namespace, expr) + + +def _safe_compare(left: Any, right: Any, op: str) -> bool: + """Safely compare two values, coercing types when possible.""" + try: + if isinstance(left, str): + left = float(left) if "." in left else int(left) + if isinstance(right, str): + right = float(right) if "." in right else int(right) + except (ValueError, TypeError): + return False + try: + if op == ">": + return left > right # type: ignore[operator] + if op == "<": + return left < right # type: ignore[operator] + if op == ">=": + return left >= right # type: ignore[operator] + if op == "<=": + return left <= right # type: ignore[operator] + except TypeError: + return False + return False + + +def evaluate_expression(template: str, context: Any) -> Any: + """Evaluate a template string with ``{{ ... }}`` expressions. + + If the entire string is a single expression, returns the raw value + (preserving type). Otherwise, substitutes each expression inline + and returns a string. + + Parameters + ---------- + template: + The template string (e.g., ``"{{ steps.plan.output.task_count }}"`` + or ``"Processed {{ inputs.feature_name }}"``. + context: + A ``StepContext`` or compatible object. + + Returns + ------- + The resolved value (any type for single-expression templates, + string for multi-expression or mixed templates). + """ + if not isinstance(template, str): + return template + + namespace = _build_namespace(context) + + # Single expression: return typed value + match = _EXPR_PATTERN.fullmatch(template.strip()) + if match: + return _evaluate_simple_expression(match.group(1).strip(), namespace) + + # Multi-expression: string interpolation + def _replacer(m: re.Match[str]) -> str: + val = _evaluate_simple_expression(m.group(1).strip(), namespace) + return str(val) if val is not None else "" + + return _EXPR_PATTERN.sub(_replacer, template) + + +def evaluate_condition(condition: str, context: Any) -> bool: + """Evaluate a condition expression and return a boolean. + + Convenience wrapper around ``evaluate_expression`` that coerces + the result to bool. + """ + result = evaluate_expression(condition, context) + return bool(result) diff --git a/src/specify_cli/workflows/steps/__init__.py b/src/specify_cli/workflows/steps/__init__.py new file mode 100644 index 000000000..0aa9182dd --- /dev/null +++ b/src/specify_cli/workflows/steps/__init__.py @@ -0,0 +1 @@ +"""Auto-discovery for built-in step types.""" diff --git a/src/specify_cli/workflows/steps/command/__init__.py b/src/specify_cli/workflows/steps/command/__init__.py new file mode 100644 index 000000000..acfa690af --- /dev/null +++ b/src/specify_cli/workflows/steps/command/__init__.py @@ -0,0 +1,65 @@ +"""Command step — dispatches to an integration CLI.""" + +from __future__ import annotations + +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_expression + + +class CommandStep(StepBase): + """Default step type — dispatches a spec-kit command to a CLI integration.""" + + type_key = "command" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + """Execute a command step by resolving integration and building output. + + In the current implementation this records the resolved command + configuration into the step output. Actual CLI dispatch is + handled by the engine layer. + """ + command = config.get("command", "") + input_data = config.get("input", {}) + + # Resolve expressions in input + resolved_input: dict[str, Any] = {} + for key, value in input_data.items(): + resolved_input[key] = evaluate_expression(value, context) + + # Resolve integration (step → workflow default → project default) + integration = config.get("integration") or context.default_integration + if integration and isinstance(integration, str) and "{{" in integration: + integration = evaluate_expression(integration, context) + + # Resolve model + model = config.get("model") or context.default_model + if model and isinstance(model, str) and "{{" in model: + model = evaluate_expression(model, context) + + # Merge options (workflow defaults ← step overrides) + options = dict(context.default_options) + step_options = config.get("options", {}) + if step_options: + options.update(step_options) + + return StepResult( + status=StepStatus.COMPLETED, + output={ + "command": command, + "integration": integration, + "model": model, + "options": options, + "input": resolved_input, + "exit_code": 0, + }, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + if "command" not in config: + errors.append( + f"Command step {config.get('id', '?')!r} is missing 'command' field." + ) + return errors diff --git a/src/specify_cli/workflows/steps/do_while/__init__.py b/src/specify_cli/workflows/steps/do_while/__init__.py new file mode 100644 index 000000000..c67f45e4c --- /dev/null +++ b/src/specify_cli/workflows/steps/do_while/__init__.py @@ -0,0 +1,51 @@ +"""Do-While loop step — execute at least once, then repeat while condition is truthy.""" + +from __future__ import annotations + +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus + + +class DoWhileStep(StepBase): + """Execute body at least once, then check condition. + + Continues while condition is truthy. ``max_iterations`` is + required as a safety cap. + """ + + type_key = "do-while" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + max_iterations = config.get("max_iterations", 10) + nested_steps = config.get("steps", []) + + # Always execute at least once + return StepResult( + status=StepStatus.COMPLETED, + output={ + "condition_result": True, + "max_iterations": max_iterations, + "loop_type": "do-while", + }, + next_steps=nested_steps, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + if "condition" not in config: + errors.append( + f"Do-while step {config.get('id', '?')!r} is missing " + f"'condition' field." + ) + if "max_iterations" not in config: + errors.append( + f"Do-while step {config.get('id', '?')!r} is missing " + f"'max_iterations' field." + ) + nested = config.get("steps", []) + if not isinstance(nested, list): + errors.append( + f"Do-while step {config.get('id', '?')!r}: 'steps' must be a list." + ) + return errors diff --git a/src/specify_cli/workflows/steps/fan_in/__init__.py b/src/specify_cli/workflows/steps/fan_in/__init__.py new file mode 100644 index 000000000..520a3532e --- /dev/null +++ b/src/specify_cli/workflows/steps/fan_in/__init__.py @@ -0,0 +1,52 @@ +"""Fan-in step — join point for parallel steps.""" + +from __future__ import annotations + +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_expression + + +class FanInStep(StepBase): + """Join point — blocks until all ``wait_for:`` steps complete. + + Aggregates their results into ``fan_in.results``. + """ + + type_key = "fan-in" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + wait_for = config.get("wait_for", []) + output_config = config.get("output", {}) + + # Collect results from referenced steps + results = [] + for step_id in wait_for: + step_data = context.steps.get(step_id, {}) + results.append(step_data.get("output", {})) + + # Resolve output expressions with fan_in in context + context.fan_in = {"results": results} + resolved_output: dict[str, Any] = {"results": results} + + for key, expr in output_config.items(): + if isinstance(expr, str) and "{{" in expr: + resolved_output[key] = evaluate_expression(expr, context) + else: + resolved_output[key] = expr + + return StepResult( + status=StepStatus.COMPLETED, + output=resolved_output, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + wait_for = config.get("wait_for", []) + if not isinstance(wait_for, list) or not wait_for: + errors.append( + f"Fan-in step {config.get('id', '?')!r}: " + f"'wait_for' must be a non-empty list of step IDs." + ) + return errors diff --git a/src/specify_cli/workflows/steps/fan_out/__init__.py b/src/specify_cli/workflows/steps/fan_out/__init__.py new file mode 100644 index 000000000..66bbfb734 --- /dev/null +++ b/src/specify_cli/workflows/steps/fan_out/__init__.py @@ -0,0 +1,56 @@ +"""Fan-out step — parallel dispatch over a collection.""" + +from __future__ import annotations + +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_expression + + +class FanOutStep(StepBase): + """Parallel dispatch over ``items:`` collection. + + Iterates over items and dispatches the nested ``step:`` template + for each item, up to ``max_concurrency:`` at a time. + """ + + type_key = "fan-out" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + items_expr = config.get("items", "[]") + items = evaluate_expression(items_expr, context) + if not isinstance(items, list): + items = [] + + max_concurrency = config.get("max_concurrency", 1) + step_template = config.get("step", {}) + + return StepResult( + status=StepStatus.COMPLETED, + output={ + "items": items, + "max_concurrency": max_concurrency, + "step_template": step_template, + "item_count": len(items), + }, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + if "items" not in config: + errors.append( + f"Fan-out step {config.get('id', '?')!r} is missing " + f"'items' field." + ) + if "step" not in config: + errors.append( + f"Fan-out step {config.get('id', '?')!r} is missing " + f"'step' field (nested step template)." + ) + step = config.get("step") + if step is not None and not isinstance(step, dict): + errors.append( + f"Fan-out step {config.get('id', '?')!r}: 'step' must be a mapping." + ) + return errors diff --git a/src/specify_cli/workflows/steps/gate/__init__.py b/src/specify_cli/workflows/steps/gate/__init__.py new file mode 100644 index 000000000..7925a40ab --- /dev/null +++ b/src/specify_cli/workflows/steps/gate/__init__.py @@ -0,0 +1,64 @@ +"""Gate step — human review gate.""" + +from __future__ import annotations + +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_expression + + +class GateStep(StepBase): + """Pause for human review with interactive options. + + The user's choice is stored in ``output.choice``. ``on_reject`` + controls abort / skip / retry behaviour. + """ + + type_key = "gate" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + message = config.get("message", "Review required.") + if isinstance(message, str) and "{{" in message: + message = evaluate_expression(message, context) + + options = config.get("options", ["approve", "reject"]) + on_reject = config.get("on_reject", "abort") + + show_file = config.get("show_file") + if show_file and isinstance(show_file, str) and "{{" in show_file: + show_file = evaluate_expression(show_file, context) + + # In non-interactive mode, auto-approve + # The engine layer is responsible for presenting the gate to the user + # and pausing execution. This default implementation records the + # gate config so the engine can act on it. + return StepResult( + status=StepStatus.PAUSED, + output={ + "message": message, + "options": options, + "on_reject": on_reject, + "show_file": show_file, + "choice": None, # Filled by engine after user interaction + }, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + if "message" not in config: + errors.append( + f"Gate step {config.get('id', '?')!r} is missing 'message' field." + ) + options = config.get("options", []) + if options and not isinstance(options, list): + errors.append( + f"Gate step {config.get('id', '?')!r}: 'options' must be a list." + ) + on_reject = config.get("on_reject", "abort") + if on_reject not in ("abort", "skip", "retry"): + errors.append( + f"Gate step {config.get('id', '?')!r}: 'on_reject' must be " + f"'abort', 'skip', or 'retry'." + ) + return errors diff --git a/src/specify_cli/workflows/steps/if_then/__init__.py b/src/specify_cli/workflows/steps/if_then/__init__.py new file mode 100644 index 000000000..3a9090ad2 --- /dev/null +++ b/src/specify_cli/workflows/steps/if_then/__init__.py @@ -0,0 +1,55 @@ +"""If/Then/Else step — conditional branching.""" + +from __future__ import annotations + +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_condition + + +class IfThenStep(StepBase): + """Branch based on a boolean condition expression. + + Both ``then:`` and ``else:`` contain inline step arrays — full step + definitions, not ID references. + """ + + type_key = "if" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + condition = config.get("condition", "false") + result = evaluate_condition(condition, context) + + if result: + branch = config.get("then", []) + else: + branch = config.get("else", []) + + return StepResult( + status=StepStatus.COMPLETED, + output={"condition_result": result}, + next_steps=branch, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + if "condition" not in config: + errors.append( + f"If step {config.get('id', '?')!r} is missing 'condition' field." + ) + if "then" not in config: + errors.append( + f"If step {config.get('id', '?')!r} is missing 'then' field." + ) + then_branch = config.get("then", []) + if not isinstance(then_branch, list): + errors.append( + f"If step {config.get('id', '?')!r}: 'then' must be a list of steps." + ) + else_branch = config.get("else", []) + if else_branch and not isinstance(else_branch, list): + errors.append( + f"If step {config.get('id', '?')!r}: 'else' must be a list of steps." + ) + return errors diff --git a/src/specify_cli/workflows/steps/shell/__init__.py b/src/specify_cli/workflows/steps/shell/__init__.py new file mode 100644 index 000000000..f1330c8e6 --- /dev/null +++ b/src/specify_cli/workflows/steps/shell/__init__.py @@ -0,0 +1,63 @@ +"""Shell step — run a local shell command.""" + +from __future__ import annotations + +import subprocess +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_expression + + +class ShellStep(StepBase): + """Run a local shell command (non-agent). + + Captures exit code and stdout/stderr. + """ + + type_key = "shell" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + run_cmd = config.get("run", "") + if isinstance(run_cmd, str) and "{{" in run_cmd: + run_cmd = evaluate_expression(run_cmd, context) + + cwd = context.project_root or "." + + try: + proc = subprocess.run( + run_cmd, + shell=True, + capture_output=True, + text=True, + cwd=cwd, + timeout=300, + ) + return StepResult( + status=StepStatus.COMPLETED, + output={ + "exit_code": proc.returncode, + "stdout": proc.stdout, + "stderr": proc.stderr, + }, + ) + except subprocess.TimeoutExpired: + return StepResult( + status=StepStatus.FAILED, + error="Shell command timed out after 300 seconds.", + output={"exit_code": -1, "stdout": "", "stderr": "timeout"}, + ) + except OSError as exc: + return StepResult( + status=StepStatus.FAILED, + error=f"Shell command failed: {exc}", + output={"exit_code": -1, "stdout": "", "stderr": str(exc)}, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + if "run" not in config: + errors.append( + f"Shell step {config.get('id', '?')!r} is missing 'run' field." + ) + return errors diff --git a/src/specify_cli/workflows/steps/switch/__init__.py b/src/specify_cli/workflows/steps/switch/__init__.py new file mode 100644 index 000000000..e58d3c23c --- /dev/null +++ b/src/specify_cli/workflows/steps/switch/__init__.py @@ -0,0 +1,70 @@ +"""Switch step — multi-branch dispatch.""" + +from __future__ import annotations + +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_expression + + +class SwitchStep(StepBase): + """Multi-branch dispatch on an expression. + + Evaluates ``expression:`` once, matches against ``cases:`` keys + (exact match, string-coerced). Falls through to ``default:`` if + no case matches. + """ + + type_key = "switch" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + expression = config.get("expression", "") + value = evaluate_expression(expression, context) + + # String-coerce for matching + str_value = str(value) if value is not None else "" + + cases = config.get("cases", {}) + for case_key, case_steps in cases.items(): + if str(case_key) == str_value: + return StepResult( + status=StepStatus.COMPLETED, + output={"matched_case": str(case_key), "expression_value": value}, + next_steps=case_steps, + ) + + # Default fallback + default_steps = config.get("default", []) + return StepResult( + status=StepStatus.COMPLETED, + output={"matched_case": "__default__", "expression_value": value}, + next_steps=default_steps, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + if "expression" not in config: + errors.append( + f"Switch step {config.get('id', '?')!r} is missing " + f"'expression' field." + ) + cases = config.get("cases", {}) + if not isinstance(cases, dict): + errors.append( + f"Switch step {config.get('id', '?')!r}: 'cases' must be a mapping." + ) + else: + for key, val in cases.items(): + if not isinstance(val, list): + errors.append( + f"Switch step {config.get('id', '?')!r}: " + f"case {key!r} must be a list of steps." + ) + default = config.get("default") + if default is not None and not isinstance(default, list): + errors.append( + f"Switch step {config.get('id', '?')!r}: " + f"'default' must be a list of steps." + ) + return errors diff --git a/src/specify_cli/workflows/steps/while_loop/__init__.py b/src/specify_cli/workflows/steps/while_loop/__init__.py new file mode 100644 index 000000000..bfd14abea --- /dev/null +++ b/src/specify_cli/workflows/steps/while_loop/__init__.py @@ -0,0 +1,64 @@ +"""While loop step — repeat while condition is truthy.""" + +from __future__ import annotations + +from typing import Any + +from specify_cli.workflows.base import StepBase, StepContext, StepResult, StepStatus +from specify_cli.workflows.expressions import evaluate_condition + + +class WhileStep(StepBase): + """Repeat nested steps while condition is truthy. + + Evaluates condition *before* each iteration. If falsy on first + check, the body never runs. ``max_iterations`` is required as + a safety cap. + """ + + type_key = "while" + + def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: + condition = config.get("condition", "false") + max_iterations = config.get("max_iterations", 10) + nested_steps = config.get("steps", []) + + result = evaluate_condition(condition, context) + if result: + return StepResult( + status=StepStatus.COMPLETED, + output={ + "condition_result": True, + "max_iterations": max_iterations, + "loop_type": "while", + }, + next_steps=nested_steps, + ) + + return StepResult( + status=StepStatus.COMPLETED, + output={ + "condition_result": False, + "max_iterations": max_iterations, + "loop_type": "while", + }, + ) + + def validate(self, config: dict[str, Any]) -> list[str]: + errors = super().validate(config) + if "condition" not in config: + errors.append( + f"While step {config.get('id', '?')!r} is missing " + f"'condition' field." + ) + if "max_iterations" not in config: + errors.append( + f"While step {config.get('id', '?')!r} is missing " + f"'max_iterations' field." + ) + nested = config.get("steps", []) + if not isinstance(nested, list): + errors.append( + f"While step {config.get('id', '?')!r}: 'steps' must be a list." + ) + return errors diff --git a/workflows/catalog.community.json b/workflows/catalog.community.json new file mode 100644 index 000000000..c654f5ed2 --- /dev/null +++ b/workflows/catalog.community.json @@ -0,0 +1,6 @@ +{ + "schema_version": "1.0", + "updated_at": "2026-04-10T00:00:00Z", + "catalog_url": "https://raw.githubusercontent.com/github/spec-kit/main/workflows/catalog.community.json", + "workflows": {} +} diff --git a/workflows/catalog.json b/workflows/catalog.json new file mode 100644 index 000000000..97f8986b8 --- /dev/null +++ b/workflows/catalog.json @@ -0,0 +1,6 @@ +{ + "schema_version": "1.0", + "updated_at": "2026-04-10T00:00:00Z", + "catalog_url": "https://raw.githubusercontent.com/github/spec-kit/main/workflows/catalog.json", + "workflows": {} +} diff --git a/workflows/speckit/workflow.yml b/workflows/speckit/workflow.yml new file mode 100644 index 000000000..d04b62a26 --- /dev/null +++ b/workflows/speckit/workflow.yml @@ -0,0 +1,59 @@ +schema_version: "1.0" +workflow: + id: "speckit" + name: "Full SDD Cycle" + version: "1.0.0" + author: "github" + description: "Runs specify → plan → tasks → implement with review gates" + integration: claude + +requires: + speckit_version: ">=0.15.0" + integrations: + any: ["claude", "gemini"] + +inputs: + feature_name: + type: string + required: true + prompt: "Feature name" + scope: + type: string + default: "full" + enum: ["full", "backend-only", "frontend-only"] + +steps: + - id: specify + command: speckit.specify + input: + args: "{{ inputs.feature_name }}" + output: + spec_file: "{{ result.file }}" + + - id: review-spec + type: gate + message: "Review the generated spec before planning." + show_file: "{{ steps.specify.output.spec_file }}" + options: [approve, edit, reject] + on_reject: abort + + - id: plan + command: speckit.plan + input: + args: "{{ steps.specify.output.spec_file }}" + + - id: review-plan + type: gate + message: "Review the plan before generating tasks." + options: [approve, reject] + on_reject: abort + + - id: tasks + command: speckit.tasks + input: + args: "{{ steps.plan.output.plan_file }}" + + - id: implement + command: speckit.implement + input: + args: "{{ steps.tasks.output.task_file }}" From 51d09c05bb0ad165804e6dd5e7194fc5a10fbac7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 16:48:22 +0000 Subject: [PATCH 3/4] Add comprehensive tests for workflow engine (94 tests) Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- tests/test_workflows.py | 1391 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1391 insertions(+) create mode 100644 tests/test_workflows.py diff --git a/tests/test_workflows.py b/tests/test_workflows.py new file mode 100644 index 000000000..91519af2e --- /dev/null +++ b/tests/test_workflows.py @@ -0,0 +1,1391 @@ +"""Tests for the workflow engine subsystem. + +Covers: +- Step registry & auto-discovery +- Base classes (StepBase, StepContext, StepResult) +- Expression engine +- All 9 built-in step types +- Workflow definition loading & validation +- Workflow engine execution & state persistence +- Workflow catalog & registry +""" + +from __future__ import annotations + +import json +import shutil +import tempfile +from pathlib import Path + +import pytest +import yaml + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def temp_dir(): + """Create a temporary directory for tests.""" + tmpdir = tempfile.mkdtemp() + yield Path(tmpdir) + shutil.rmtree(tmpdir) + + +@pytest.fixture +def project_dir(temp_dir): + """Create a mock spec-kit project with .specify/ directory.""" + specify_dir = temp_dir / ".specify" + specify_dir.mkdir() + (specify_dir / "workflows").mkdir() + return temp_dir + + +@pytest.fixture +def sample_workflow_yaml(): + """Return a valid minimal workflow YAML string.""" + return """ +schema_version: "1.0" +workflow: + id: "test-workflow" + name: "Test Workflow" + version: "1.0.0" + description: "A test workflow" + +inputs: + feature_name: + type: string + required: true + scope: + type: string + default: "full" + +steps: + - id: step-one + command: speckit.specify + input: + args: "{{ inputs.feature_name }}" + + - id: step-two + command: speckit.plan + input: + args: "{{ steps.step-one.output.command }}" +""" + + +@pytest.fixture +def sample_workflow_file(project_dir, sample_workflow_yaml): + """Write a sample workflow YAML to a file and return its path.""" + wf_dir = project_dir / ".specify" / "workflows" / "test-workflow" + wf_dir.mkdir(parents=True, exist_ok=True) + wf_path = wf_dir / "workflow.yml" + wf_path.write_text(sample_workflow_yaml, encoding="utf-8") + return wf_path + + +# ===== Step Registry Tests ===== + +class TestStepRegistry: + """Test STEP_REGISTRY and auto-discovery.""" + + def test_registry_populated(self): + from specify_cli.workflows import STEP_REGISTRY + + assert len(STEP_REGISTRY) == 9 + + def test_all_step_types_registered(self): + from specify_cli.workflows import STEP_REGISTRY + + expected = { + "command", "shell", "gate", "if", "switch", + "while", "do-while", "fan-out", "fan-in", + } + assert set(STEP_REGISTRY.keys()) == expected + + def test_get_step_type(self): + from specify_cli.workflows import get_step_type + + step = get_step_type("command") + assert step is not None + assert step.type_key == "command" + + def test_get_step_type_missing(self): + from specify_cli.workflows import get_step_type + + assert get_step_type("nonexistent") is None + + def test_register_step_duplicate_raises(self): + from specify_cli.workflows import _register_step + from specify_cli.workflows.steps.command import CommandStep + + with pytest.raises(KeyError, match="already registered"): + _register_step(CommandStep()) + + def test_register_step_empty_key_raises(self): + from specify_cli.workflows import _register_step + from specify_cli.workflows.base import StepBase, StepResult + + class EmptyStep(StepBase): + type_key = "" + def execute(self, config, context): + return StepResult() + + with pytest.raises(ValueError, match="empty type_key"): + _register_step(EmptyStep()) + + +# ===== Base Classes Tests ===== + +class TestBaseClasses: + """Test StepBase, StepContext, StepResult.""" + + def test_step_context_defaults(self): + from specify_cli.workflows.base import StepContext + + ctx = StepContext() + assert ctx.inputs == {} + assert ctx.steps == {} + assert ctx.item is None + assert ctx.fan_in == {} + assert ctx.default_integration is None + + def test_step_context_with_data(self): + from specify_cli.workflows.base import StepContext + + ctx = StepContext( + inputs={"name": "test"}, + default_integration="claude", + default_model="sonnet-4", + ) + assert ctx.inputs == {"name": "test"} + assert ctx.default_integration == "claude" + assert ctx.default_model == "sonnet-4" + + def test_step_result_defaults(self): + from specify_cli.workflows.base import StepResult, StepStatus + + result = StepResult() + assert result.status == StepStatus.COMPLETED + assert result.output == {} + assert result.next_steps == [] + assert result.error is None + + def test_step_status_values(self): + from specify_cli.workflows.base import StepStatus + + assert StepStatus.PENDING == "pending" + assert StepStatus.RUNNING == "running" + assert StepStatus.COMPLETED == "completed" + assert StepStatus.FAILED == "failed" + assert StepStatus.SKIPPED == "skipped" + assert StepStatus.PAUSED == "paused" + + def test_run_status_values(self): + from specify_cli.workflows.base import RunStatus + + assert RunStatus.CREATED == "created" + assert RunStatus.RUNNING == "running" + assert RunStatus.PAUSED == "paused" + assert RunStatus.COMPLETED == "completed" + assert RunStatus.FAILED == "failed" + assert RunStatus.ABORTED == "aborted" + + +# ===== Expression Engine Tests ===== + +class TestExpressions: + """Test sandboxed expression evaluator.""" + + def test_simple_variable(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext(inputs={"name": "login"}) + assert evaluate_expression("{{ inputs.name }}", ctx) == "login" + + def test_step_output_reference(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext( + steps={"specify": {"output": {"file": "spec.md"}}} + ) + assert evaluate_expression("{{ steps.specify.output.file }}", ctx) == "spec.md" + + def test_string_interpolation(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext(inputs={"name": "login"}) + result = evaluate_expression("Feature: {{ inputs.name }} done", ctx) + assert result == "Feature: login done" + + def test_comparison_equals(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext(inputs={"scope": "full"}) + assert evaluate_expression("{{ inputs.scope == 'full' }}", ctx) is True + assert evaluate_expression("{{ inputs.scope == 'partial' }}", ctx) is False + + def test_comparison_not_equals(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext( + steps={"run-tests": {"output": {"exit_code": 1}}} + ) + result = evaluate_expression("{{ steps.run-tests.output.exit_code != 0 }}", ctx) + assert result is True + + def test_numeric_comparison(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext( + steps={"plan": {"output": {"task_count": 7}}} + ) + assert evaluate_expression("{{ steps.plan.output.task_count > 5 }}", ctx) is True + assert evaluate_expression("{{ steps.plan.output.task_count < 5 }}", ctx) is False + + def test_boolean_and(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext(inputs={"a": True, "b": True}) + assert evaluate_expression("{{ inputs.a and inputs.b }}", ctx) is True + + def test_boolean_or(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext(inputs={"a": False, "b": True}) + assert evaluate_expression("{{ inputs.a or inputs.b }}", ctx) is True + + def test_filter_default(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext() + assert evaluate_expression("{{ inputs.missing | default('fallback') }}", ctx) == "fallback" + + def test_filter_join(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext(inputs={"tags": ["a", "b", "c"]}) + assert evaluate_expression("{{ inputs.tags | join(', ') }}", ctx) == "a, b, c" + + def test_filter_contains(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext(inputs={"text": "hello world"}) + assert evaluate_expression("{{ inputs.text | contains('world') }}", ctx) is True + + def test_condition_evaluation(self): + from specify_cli.workflows.expressions import evaluate_condition + from specify_cli.workflows.base import StepContext + + ctx = StepContext(inputs={"ready": True}) + assert evaluate_condition("{{ inputs.ready }}", ctx) is True + assert evaluate_condition("{{ inputs.missing }}", ctx) is False + + def test_non_string_passthrough(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext() + assert evaluate_expression(42, ctx) == 42 + assert evaluate_expression(None, ctx) is None + + def test_string_literal(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext() + assert evaluate_expression("{{ 'hello' }}", ctx) == "hello" + + def test_numeric_literal(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext() + assert evaluate_expression("{{ 42 }}", ctx) == 42 + + def test_boolean_literal(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext() + assert evaluate_expression("{{ true }}", ctx) is True + assert evaluate_expression("{{ false }}", ctx) is False + + def test_list_indexing(self): + from specify_cli.workflows.expressions import evaluate_expression + from specify_cli.workflows.base import StepContext + + ctx = StepContext( + steps={"tasks": {"output": {"task_list": [{"file": "a.md"}, {"file": "b.md"}]}}} + ) + result = evaluate_expression("{{ steps.tasks.output.task_list[0].file }}", ctx) + assert result == "a.md" + + +# ===== Step Type Tests ===== + +class TestCommandStep: + """Test the command step type.""" + + def test_execute_basic(self): + from specify_cli.workflows.steps.command import CommandStep + from specify_cli.workflows.base import StepContext, StepStatus + + step = CommandStep() + ctx = StepContext( + inputs={"name": "login"}, + default_integration="claude", + ) + config = { + "id": "test", + "command": "speckit.specify", + "input": {"args": "{{ inputs.name }}"}, + } + result = step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + assert result.output["command"] == "speckit.specify" + assert result.output["integration"] == "claude" + assert result.output["input"]["args"] == "login" + + def test_validate_missing_command(self): + from specify_cli.workflows.steps.command import CommandStep + + step = CommandStep() + errors = step.validate({"id": "test"}) + assert any("missing 'command'" in e for e in errors) + + def test_step_override_integration(self): + from specify_cli.workflows.steps.command import CommandStep + from specify_cli.workflows.base import StepContext + + step = CommandStep() + ctx = StepContext(default_integration="claude") + config = { + "id": "test", + "command": "speckit.plan", + "integration": "gemini", + "input": {}, + } + result = step.execute(config, ctx) + assert result.output["integration"] == "gemini" + + def test_step_override_model(self): + from specify_cli.workflows.steps.command import CommandStep + from specify_cli.workflows.base import StepContext + + step = CommandStep() + ctx = StepContext(default_model="sonnet-4") + config = { + "id": "test", + "command": "speckit.implement", + "model": "opus-4", + "input": {}, + } + result = step.execute(config, ctx) + assert result.output["model"] == "opus-4" + + def test_options_merge(self): + from specify_cli.workflows.steps.command import CommandStep + from specify_cli.workflows.base import StepContext + + step = CommandStep() + ctx = StepContext(default_options={"max-tokens": 8000}) + config = { + "id": "test", + "command": "speckit.plan", + "options": {"thinking-budget": 32768}, + "input": {}, + } + result = step.execute(config, ctx) + assert result.output["options"]["max-tokens"] == 8000 + assert result.output["options"]["thinking-budget"] == 32768 + + +class TestShellStep: + """Test the shell step type.""" + + def test_execute_echo(self): + from specify_cli.workflows.steps.shell import ShellStep + from specify_cli.workflows.base import StepContext, StepStatus + + step = ShellStep() + ctx = StepContext() + config = {"id": "test", "run": "echo hello"} + result = step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + assert result.output["exit_code"] == 0 + assert "hello" in result.output["stdout"] + + def test_execute_failure(self): + from specify_cli.workflows.steps.shell import ShellStep + from specify_cli.workflows.base import StepContext, StepStatus + + step = ShellStep() + ctx = StepContext() + config = {"id": "test", "run": "exit 1"} + result = step.execute(config, ctx) + assert result.status == StepStatus.COMPLETED + assert result.output["exit_code"] == 1 + + def test_validate_missing_run(self): + from specify_cli.workflows.steps.shell import ShellStep + + step = ShellStep() + errors = step.validate({"id": "test"}) + assert any("missing 'run'" in e for e in errors) + + +class TestGateStep: + """Test the gate step type.""" + + def test_execute_returns_paused(self): + from specify_cli.workflows.steps.gate import GateStep + from specify_cli.workflows.base import StepContext, StepStatus + + step = GateStep() + ctx = StepContext() + config = { + "id": "review", + "message": "Review the spec.", + "options": ["approve", "reject"], + "on_reject": "abort", + } + result = step.execute(config, ctx) + assert result.status == StepStatus.PAUSED + assert result.output["message"] == "Review the spec." + assert result.output["options"] == ["approve", "reject"] + + def test_validate_missing_message(self): + from specify_cli.workflows.steps.gate import GateStep + + step = GateStep() + errors = step.validate({"id": "test", "options": ["approve"]}) + assert any("missing 'message'" in e for e in errors) + + def test_validate_invalid_on_reject(self): + from specify_cli.workflows.steps.gate import GateStep + + step = GateStep() + errors = step.validate({ + "id": "test", + "message": "Review", + "on_reject": "invalid", + }) + assert any("on_reject" in e for e in errors) + + +class TestIfThenStep: + """Test the if/then/else step type.""" + + def test_execute_then_branch(self): + from specify_cli.workflows.steps.if_then import IfThenStep + from specify_cli.workflows.base import StepContext + + step = IfThenStep() + ctx = StepContext(inputs={"scope": "full"}) + config = { + "id": "check", + "condition": "{{ inputs.scope == 'full' }}", + "then": [{"id": "a", "command": "speckit.tasks"}], + "else": [{"id": "b", "command": "speckit.plan"}], + } + result = step.execute(config, ctx) + assert result.output["condition_result"] is True + assert len(result.next_steps) == 1 + assert result.next_steps[0]["id"] == "a" + + def test_execute_else_branch(self): + from specify_cli.workflows.steps.if_then import IfThenStep + from specify_cli.workflows.base import StepContext + + step = IfThenStep() + ctx = StepContext(inputs={"scope": "backend"}) + config = { + "id": "check", + "condition": "{{ inputs.scope == 'full' }}", + "then": [{"id": "a", "command": "speckit.tasks"}], + "else": [{"id": "b", "command": "speckit.plan"}], + } + result = step.execute(config, ctx) + assert result.output["condition_result"] is False + assert result.next_steps[0]["id"] == "b" + + def test_validate_missing_condition(self): + from specify_cli.workflows.steps.if_then import IfThenStep + + step = IfThenStep() + errors = step.validate({"id": "test", "then": []}) + assert any("missing 'condition'" in e for e in errors) + + +class TestSwitchStep: + """Test the switch step type.""" + + def test_execute_matches_case(self): + from specify_cli.workflows.steps.switch import SwitchStep + from specify_cli.workflows.base import StepContext + + step = SwitchStep() + ctx = StepContext( + steps={"review": {"output": {"choice": "approve"}}} + ) + config = { + "id": "route", + "expression": "{{ steps.review.output.choice }}", + "cases": { + "approve": [{"id": "plan", "command": "speckit.plan"}], + "reject": [{"id": "log", "type": "shell", "run": "echo rejected"}], + }, + "default": [{"id": "abort", "type": "gate", "message": "Unknown"}], + } + result = step.execute(config, ctx) + assert result.output["matched_case"] == "approve" + assert result.next_steps[0]["id"] == "plan" + + def test_execute_falls_to_default(self): + from specify_cli.workflows.steps.switch import SwitchStep + from specify_cli.workflows.base import StepContext + + step = SwitchStep() + ctx = StepContext( + steps={"review": {"output": {"choice": "unknown"}}} + ) + config = { + "id": "route", + "expression": "{{ steps.review.output.choice }}", + "cases": { + "approve": [{"id": "plan", "command": "speckit.plan"}], + }, + "default": [{"id": "fallback", "type": "gate", "message": "Fallback"}], + } + result = step.execute(config, ctx) + assert result.output["matched_case"] == "__default__" + assert result.next_steps[0]["id"] == "fallback" + + +class TestWhileStep: + """Test the while loop step type.""" + + def test_execute_condition_true(self): + from specify_cli.workflows.steps.while_loop import WhileStep + from specify_cli.workflows.base import StepContext + + step = WhileStep() + ctx = StepContext( + steps={"run-tests": {"output": {"exit_code": 1}}} + ) + config = { + "id": "retry", + "condition": "{{ steps.run-tests.output.exit_code != 0 }}", + "max_iterations": 5, + "steps": [{"id": "fix", "command": "speckit.implement"}], + } + result = step.execute(config, ctx) + assert result.output["condition_result"] is True + assert len(result.next_steps) == 1 + + def test_execute_condition_false(self): + from specify_cli.workflows.steps.while_loop import WhileStep + from specify_cli.workflows.base import StepContext + + step = WhileStep() + ctx = StepContext( + steps={"run-tests": {"output": {"exit_code": 0}}} + ) + config = { + "id": "retry", + "condition": "{{ steps.run-tests.output.exit_code != 0 }}", + "max_iterations": 5, + "steps": [{"id": "fix", "command": "speckit.implement"}], + } + result = step.execute(config, ctx) + assert result.output["condition_result"] is False + assert result.next_steps == [] + + def test_validate_missing_fields(self): + from specify_cli.workflows.steps.while_loop import WhileStep + + step = WhileStep() + errors = step.validate({"id": "test", "steps": []}) + assert any("missing 'condition'" in e for e in errors) + assert any("missing 'max_iterations'" in e for e in errors) + + +class TestDoWhileStep: + """Test the do-while loop step type.""" + + def test_execute_always_runs_once(self): + from specify_cli.workflows.steps.do_while import DoWhileStep + from specify_cli.workflows.base import StepContext + + step = DoWhileStep() + ctx = StepContext() + config = { + "id": "cycle", + "condition": "{{ false }}", + "max_iterations": 3, + "steps": [{"id": "refine", "command": "speckit.specify"}], + } + result = step.execute(config, ctx) + assert len(result.next_steps) == 1 + assert result.output["loop_type"] == "do-while" + + +class TestFanOutStep: + """Test the fan-out step type.""" + + def test_execute_with_items(self): + from specify_cli.workflows.steps.fan_out import FanOutStep + from specify_cli.workflows.base import StepContext + + step = FanOutStep() + ctx = StepContext( + steps={"tasks": {"output": {"task_list": [ + {"file": "a.md"}, + {"file": "b.md"}, + ]}}} + ) + config = { + "id": "parallel", + "items": "{{ steps.tasks.output.task_list }}", + "max_concurrency": 3, + "step": {"id": "impl", "command": "speckit.implement"}, + } + result = step.execute(config, ctx) + assert result.output["item_count"] == 2 + assert result.output["max_concurrency"] == 3 + + def test_validate_missing_fields(self): + from specify_cli.workflows.steps.fan_out import FanOutStep + + step = FanOutStep() + errors = step.validate({"id": "test"}) + assert any("missing 'items'" in e for e in errors) + assert any("missing 'step'" in e for e in errors) + + +class TestFanInStep: + """Test the fan-in step type.""" + + def test_execute_collects_results(self): + from specify_cli.workflows.steps.fan_in import FanInStep + from specify_cli.workflows.base import StepContext + + step = FanInStep() + ctx = StepContext( + steps={ + "parallel": {"output": {"item_count": 2, "status": "done"}} + } + ) + config = { + "id": "collect", + "wait_for": ["parallel"], + "output": {}, + } + result = step.execute(config, ctx) + assert len(result.output["results"]) == 1 + assert result.output["results"][0]["item_count"] == 2 + + def test_validate_empty_wait_for(self): + from specify_cli.workflows.steps.fan_in import FanInStep + + step = FanInStep() + errors = step.validate({"id": "test", "wait_for": []}) + assert any("non-empty list" in e for e in errors) + + +# ===== Workflow Definition Tests ===== + +class TestWorkflowDefinition: + """Test WorkflowDefinition loading and parsing.""" + + def test_from_yaml(self, sample_workflow_file): + from specify_cli.workflows.engine import WorkflowDefinition + + definition = WorkflowDefinition.from_yaml(sample_workflow_file) + assert definition.id == "test-workflow" + assert definition.name == "Test Workflow" + assert definition.version == "1.0.0" + assert len(definition.steps) == 2 + + def test_from_string(self, sample_workflow_yaml): + from specify_cli.workflows.engine import WorkflowDefinition + + definition = WorkflowDefinition.from_string(sample_workflow_yaml) + assert definition.id == "test-workflow" + assert len(definition.inputs) == 2 + + def test_from_string_invalid(self): + from specify_cli.workflows.engine import WorkflowDefinition + + with pytest.raises(ValueError, match="must be a mapping"): + WorkflowDefinition.from_string("- just a list") + + def test_inputs_parsed(self, sample_workflow_yaml): + from specify_cli.workflows.engine import WorkflowDefinition + + definition = WorkflowDefinition.from_string(sample_workflow_yaml) + assert "feature_name" in definition.inputs + assert definition.inputs["feature_name"]["required"] is True + assert definition.inputs["scope"]["default"] == "full" + + +# ===== Workflow Validation Tests ===== + +class TestWorkflowValidation: + """Test workflow validation.""" + + def test_valid_workflow(self, sample_workflow_yaml): + from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow + + definition = WorkflowDefinition.from_string(sample_workflow_yaml) + errors = validate_workflow(definition) + assert errors == [] + + def test_missing_id(self): + from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow + + definition = WorkflowDefinition.from_string(""" +workflow: + name: "Test" + version: "1.0.0" +steps: + - id: step-one + command: speckit.specify +""") + errors = validate_workflow(definition) + assert any("workflow.id" in e for e in errors) + + def test_invalid_id_format(self): + from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow + + definition = WorkflowDefinition.from_string(""" +workflow: + id: "Invalid ID!" + name: "Test" + version: "1.0.0" +steps: + - id: step-one + command: speckit.specify +""") + errors = validate_workflow(definition) + assert any("lowercase alphanumeric" in e for e in errors) + + def test_no_steps(self): + from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow + + definition = WorkflowDefinition.from_string(""" +workflow: + id: "test" + name: "Test" + version: "1.0.0" +steps: [] +""") + errors = validate_workflow(definition) + assert any("no steps" in e.lower() for e in errors) + + def test_duplicate_step_ids(self): + from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow + + definition = WorkflowDefinition.from_string(""" +workflow: + id: "test" + name: "Test" + version: "1.0.0" +steps: + - id: same-id + command: speckit.specify + - id: same-id + command: speckit.plan +""") + errors = validate_workflow(definition) + assert any("Duplicate" in e for e in errors) + + def test_invalid_step_type(self): + from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow + + definition = WorkflowDefinition.from_string(""" +workflow: + id: "test" + name: "Test" + version: "1.0.0" +steps: + - id: bad + type: nonexistent +""") + errors = validate_workflow(definition) + assert any("invalid type" in e.lower() for e in errors) + + def test_nested_step_validation(self): + from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow + + definition = WorkflowDefinition.from_string(""" +workflow: + id: "test" + name: "Test" + version: "1.0.0" +steps: + - id: branch + type: if + condition: "{{ true }}" + then: + - id: nested-a + command: speckit.specify + else: + - id: nested-b + command: speckit.plan +""") + errors = validate_workflow(definition) + assert errors == [] + + def test_invalid_input_type(self): + from specify_cli.workflows.engine import WorkflowDefinition, validate_workflow + + definition = WorkflowDefinition.from_string(""" +workflow: + id: "test" + name: "Test" + version: "1.0.0" +inputs: + bad: + type: array +steps: + - id: step-one + command: speckit.specify +""") + errors = validate_workflow(definition) + assert any("invalid type" in e.lower() for e in errors) + + +# ===== Workflow Engine Tests ===== + +class TestWorkflowEngine: + """Test WorkflowEngine execution.""" + + def test_load_from_file(self, sample_workflow_file, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + engine = WorkflowEngine(project_dir) + definition = engine.load_workflow(str(sample_workflow_file)) + assert definition.id == "test-workflow" + + def test_load_from_installed_id(self, sample_workflow_file, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + engine = WorkflowEngine(project_dir) + definition = engine.load_workflow("test-workflow") + assert definition.id == "test-workflow" + + def test_load_not_found(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + engine = WorkflowEngine(project_dir) + with pytest.raises(FileNotFoundError): + engine.load_workflow("nonexistent") + + def test_execute_simple_workflow(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + from specify_cli.workflows.base import RunStatus + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "simple" + name: "Simple" + version: "1.0.0" + integration: claude +inputs: + name: + type: string + default: "test" +steps: + - id: step-one + command: speckit.specify + input: + args: "{{ inputs.name }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + state = engine.execute(definition, {"name": "login"}) + + assert state.status == RunStatus.COMPLETED + assert "step-one" in state.step_results + assert state.step_results["step-one"]["output"]["command"] == "speckit.specify" + assert state.step_results["step-one"]["output"]["input"]["args"] == "login" + + def test_execute_with_gate_pauses(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + from specify_cli.workflows.base import RunStatus + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "gated" + name: "Gated" + version: "1.0.0" +steps: + - id: step-one + command: speckit.specify + input: + args: "test" + - id: gate + type: gate + message: "Review?" + options: [approve, reject] + on_reject: abort + - id: step-two + command: speckit.plan + input: + args: "test" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + state = engine.execute(definition) + + assert state.status == RunStatus.PAUSED + assert "gate" in state.step_results + assert state.step_results["gate"]["status"] == "paused" + + def test_execute_with_shell_step(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + from specify_cli.workflows.base import RunStatus + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "shell-test" + name: "Shell Test" + version: "1.0.0" +steps: + - id: echo + type: shell + run: "echo workflow-output" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + state = engine.execute(definition) + + assert state.status == RunStatus.COMPLETED + assert "workflow-output" in state.step_results["echo"]["output"]["stdout"] + + def test_execute_with_if_then(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + from specify_cli.workflows.base import RunStatus + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "branching" + name: "Branching" + version: "1.0.0" +inputs: + scope: + type: string + default: "full" +steps: + - id: check + type: if + condition: "{{ inputs.scope == 'full' }}" + then: + - id: full-tasks + type: shell + run: "echo full" + else: + - id: partial-tasks + type: shell + run: "echo partial" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + state = engine.execute(definition, {"scope": "full"}) + + assert state.status == RunStatus.COMPLETED + assert "full-tasks" in state.step_results + assert "partial-tasks" not in state.step_results + + def test_execute_missing_required_input(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "needs-input" + name: "Needs Input" + version: "1.0.0" +inputs: + name: + type: string + required: true +steps: + - id: step-one + command: speckit.specify + input: + args: "{{ inputs.name }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + + with pytest.raises(ValueError, match="Required input"): + engine.execute(definition, {}) + + +# ===== State Persistence Tests ===== + +class TestRunState: + """Test RunState persistence and loading.""" + + def test_save_and_load(self, project_dir): + from specify_cli.workflows.engine import RunState + from specify_cli.workflows.base import RunStatus + + state = RunState( + run_id="test-run", + workflow_id="test-workflow", + project_root=project_dir, + ) + state.status = RunStatus.RUNNING + state.inputs = {"name": "login"} + state.step_results = { + "step-one": { + "output": {"file": "spec.md"}, + "status": "completed", + } + } + state.save() + + loaded = RunState.load("test-run", project_dir) + assert loaded.run_id == "test-run" + assert loaded.workflow_id == "test-workflow" + assert loaded.status == RunStatus.RUNNING + assert loaded.inputs == {"name": "login"} + assert "step-one" in loaded.step_results + + def test_load_not_found(self, project_dir): + from specify_cli.workflows.engine import RunState + + with pytest.raises(FileNotFoundError): + RunState.load("nonexistent", project_dir) + + def test_append_log(self, project_dir): + from specify_cli.workflows.engine import RunState + + state = RunState( + run_id="log-test", + workflow_id="test", + project_root=project_dir, + ) + state.append_log({"event": "test_event", "data": "hello"}) + + log_file = state.runs_dir / "log.jsonl" + assert log_file.exists() + lines = log_file.read_text().strip().split("\n") + entry = json.loads(lines[0]) + assert entry["event"] == "test_event" + assert "timestamp" in entry + + +class TestListRuns: + """Test listing workflow runs.""" + + def test_list_empty(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine + + engine = WorkflowEngine(project_dir) + assert engine.list_runs() == [] + + def test_list_after_execution(self, project_dir): + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "list-test" + name: "List Test" + version: "1.0.0" +steps: + - id: step-one + type: shell + run: "echo test" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + engine.execute(definition) + + runs = engine.list_runs() + assert len(runs) == 1 + assert runs[0]["workflow_id"] == "list-test" + + +# ===== Workflow Registry Tests ===== + +class TestWorkflowRegistry: + """Test WorkflowRegistry operations.""" + + def test_add_and_get(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test", "version": "1.0.0"}) + + entry = registry.get("test-wf") + assert entry is not None + assert entry["name"] == "Test" + assert "installed_at" in entry + + def test_remove(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + registry.add("test-wf", {"name": "Test"}) + assert registry.is_installed("test-wf") + + registry.remove("test-wf") + assert not registry.is_installed("test-wf") + + def test_list(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + registry.add("wf-a", {"name": "A"}) + registry.add("wf-b", {"name": "B"}) + + installed = registry.list() + assert "wf-a" in installed + assert "wf-b" in installed + + def test_is_installed(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry = WorkflowRegistry(project_dir) + assert not registry.is_installed("missing") + + registry.add("exists", {"name": "Exists"}) + assert registry.is_installed("exists") + + def test_persistence(self, project_dir): + from specify_cli.workflows.catalog import WorkflowRegistry + + registry1 = WorkflowRegistry(project_dir) + registry1.add("test-wf", {"name": "Test"}) + + # Load fresh + registry2 = WorkflowRegistry(project_dir) + assert registry2.is_installed("test-wf") + + +# ===== Workflow Catalog Tests ===== + +class TestWorkflowCatalog: + """Test WorkflowCatalog catalog resolution.""" + + def test_default_catalogs(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog + + catalog = WorkflowCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 2 + assert entries[0].name == "default" + assert entries[1].name == "community" + + def test_env_var_override(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import WorkflowCatalog + + monkeypatch.setenv("SPECKIT_WORKFLOW_CATALOG_URL", "https://example.com/catalog.json") + catalog = WorkflowCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 1 + assert entries[0].name == "env-override" + assert entries[0].url == "https://example.com/catalog.json" + + def test_project_level_config(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog + + config_path = project_dir / ".specify" / "workflow-catalogs.yml" + config_path.write_text(yaml.dump({ + "catalogs": [{ + "name": "custom", + "url": "https://example.com/wf-catalog.json", + "priority": 1, + "install_allowed": True, + }] + })) + + catalog = WorkflowCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 1 + assert entries[0].name == "custom" + + def test_validate_url_http_rejected(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError + + catalog = WorkflowCatalog(project_dir) + with pytest.raises(WorkflowValidationError, match="HTTPS"): + catalog._validate_catalog_url("http://evil.com/catalog.json") + + def test_validate_url_localhost_http_allowed(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog + + catalog = WorkflowCatalog(project_dir) + # Should not raise + catalog._validate_catalog_url("http://localhost:8080/catalog.json") + + def test_add_catalog(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog + + catalog = WorkflowCatalog(project_dir) + catalog.add_catalog("https://example.com/new-catalog.json", "my-catalog") + + config_path = project_dir / ".specify" / "workflow-catalogs.yml" + assert config_path.exists() + data = yaml.safe_load(config_path.read_text()) + assert len(data["catalogs"]) == 1 + assert data["catalogs"][0]["url"] == "https://example.com/new-catalog.json" + + def test_add_catalog_duplicate_rejected(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError + + catalog = WorkflowCatalog(project_dir) + catalog.add_catalog("https://example.com/catalog.json") + + with pytest.raises(WorkflowValidationError, match="already configured"): + catalog.add_catalog("https://example.com/catalog.json") + + def test_remove_catalog(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog + + catalog = WorkflowCatalog(project_dir) + catalog.add_catalog("https://example.com/c1.json", "first") + catalog.add_catalog("https://example.com/c2.json", "second") + + removed = catalog.remove_catalog(0) + assert removed == "first" + + config_path = project_dir / ".specify" / "workflow-catalogs.yml" + data = yaml.safe_load(config_path.read_text()) + assert len(data["catalogs"]) == 1 + + def test_remove_catalog_invalid_index(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog, WorkflowValidationError + + catalog = WorkflowCatalog(project_dir) + catalog.add_catalog("https://example.com/c1.json") + + with pytest.raises(WorkflowValidationError, match="out of range"): + catalog.remove_catalog(5) + + def test_get_catalog_configs(self, project_dir): + from specify_cli.workflows.catalog import WorkflowCatalog + + catalog = WorkflowCatalog(project_dir) + configs = catalog.get_catalog_configs() + assert len(configs) == 2 + assert configs[0]["name"] == "default" + assert isinstance(configs[0]["install_allowed"], bool) + + +# ===== Integration Test ===== + +class TestWorkflowIntegration: + """End-to-end workflow execution tests.""" + + def test_full_sequential_workflow(self, project_dir): + """Execute a multi-step sequential workflow end to end.""" + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + from specify_cli.workflows.base import RunStatus + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "e2e-test" + name: "E2E Test" + version: "1.0.0" + integration: claude +inputs: + feature: + type: string + default: "login" +steps: + - id: specify + command: speckit.specify + input: + args: "{{ inputs.feature }}" + + - id: check-scope + type: if + condition: "{{ inputs.feature == 'login' }}" + then: + - id: echo-full + type: shell + run: "echo full scope" + else: + - id: echo-partial + type: shell + run: "echo partial scope" + + - id: plan + command: speckit.plan + input: + args: "{{ steps.specify.output.command }}" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + state = engine.execute(definition) + + assert state.status == RunStatus.COMPLETED + assert "specify" in state.step_results + assert "check-scope" in state.step_results + assert "echo-full" in state.step_results + assert "echo-partial" not in state.step_results + assert "plan" in state.step_results + + def test_switch_workflow(self, project_dir): + """Test switch step type in a workflow.""" + from specify_cli.workflows.engine import WorkflowEngine, WorkflowDefinition + from specify_cli.workflows.base import RunStatus + + yaml_str = """ +schema_version: "1.0" +workflow: + id: "switch-test" + name: "Switch Test" + version: "1.0.0" +inputs: + action: + type: string + default: "plan" +steps: + - id: route + type: switch + expression: "{{ inputs.action }}" + cases: + specify: + - id: do-specify + type: shell + run: "echo specify" + plan: + - id: do-plan + type: shell + run: "echo plan" + default: + - id: do-default + type: shell + run: "echo default" +""" + definition = WorkflowDefinition.from_string(yaml_str) + engine = WorkflowEngine(project_dir) + state = engine.execute(definition) + + assert state.status == RunStatus.COMPLETED + assert "do-plan" in state.step_results + assert "do-specify" not in state.step_results From 273dd70876bde5e765ed6ec20e7bb6857ba47756 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 10 Apr 2026 16:51:05 +0000 Subject: [PATCH 4/4] Address review feedback: do-while condition preservation and URL scheme validation Agent-Logs-Url: https://github.com/github/spec-kit/sessions/72a7bb5d-071f-4d67-a507-7e1abae2384d Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/workflows/catalog.py | 12 +++++++++++- src/specify_cli/workflows/steps/do_while/__init__.py | 10 ++++++++-- tests/test_workflows.py | 1 + 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 8d6665c38..aa76c1707 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -319,9 +319,19 @@ def _fetch_single_catalog( except (json.JSONDecodeError, OSError): pass - # Fetch from URL + # Fetch from URL — validate scheme before opening + from urllib.parse import urlparse from urllib.request import urlopen + parsed = urlparse(entry.url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + if parsed.scheme != "https" and not ( + parsed.scheme == "http" and is_localhost + ): + raise WorkflowCatalogError( + f"Refusing to fetch catalog from non-HTTPS URL: {entry.url}" + ) + try: with urlopen(entry.url, timeout=30) as resp: # noqa: S310 data = json.loads(resp.read().decode("utf-8")) diff --git a/src/specify_cli/workflows/steps/do_while/__init__.py b/src/specify_cli/workflows/steps/do_while/__init__.py index c67f45e4c..af5b35101 100644 --- a/src/specify_cli/workflows/steps/do_while/__init__.py +++ b/src/specify_cli/workflows/steps/do_while/__init__.py @@ -12,6 +12,10 @@ class DoWhileStep(StepBase): Continues while condition is truthy. ``max_iterations`` is required as a safety cap. + + The first invocation always returns the nested steps for execution. + The ``condition`` field is stored in the output so the engine can + evaluate it after the body runs and decide whether to re-invoke. """ type_key = "do-while" @@ -19,12 +23,14 @@ class DoWhileStep(StepBase): def execute(self, config: dict[str, Any], context: StepContext) -> StepResult: max_iterations = config.get("max_iterations", 10) nested_steps = config.get("steps", []) + condition = config.get("condition", "false") - # Always execute at least once + # Always execute body at least once; the engine layer evaluates + # `condition` after each iteration to decide whether to loop. return StepResult( status=StepStatus.COMPLETED, output={ - "condition_result": True, + "condition": condition, "max_iterations": max_iterations, "loop_type": "do-while", }, diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 91519af2e..b9243c20f 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -640,6 +640,7 @@ def test_execute_always_runs_once(self): result = step.execute(config, ctx) assert len(result.next_steps) == 1 assert result.output["loop_type"] == "do-while" + assert result.output["condition"] == "{{ false }}" class TestFanOutStep: