Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions agentflow_cli/cli/commands/a2a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""A2A server command implementation."""

from __future__ import annotations

import importlib
import os
import sys
from pathlib import Path
from typing import Any

from dotenv import load_dotenv

from agentflow_cli.cli.commands import BaseCommand
from agentflow_cli.cli.constants import (
DEFAULT_A2A_DESCRIPTION,
DEFAULT_A2A_HOST,
DEFAULT_A2A_NAME,
DEFAULT_A2A_PORT,
DEFAULT_A2A_VERSION,
DEFAULT_CONFIG_FILE,
)
from agentflow_cli.cli.core.config import ConfigManager
from agentflow_cli.cli.core.validation import validate_cli_options
from agentflow_cli.cli.exceptions import ConfigurationError, ServerError


class A2ACommand(BaseCommand):
"""Command to start an A2A-protocol-compliant agent server."""

def execute(
self,
config: str = DEFAULT_CONFIG_FILE,
host: str = DEFAULT_A2A_HOST,
port: int = DEFAULT_A2A_PORT,
name: str | None = None,
description: str | None = None,
streaming: bool = False,
**kwargs: Any,
) -> int:
try:
self.output.print_banner(
"A2A Server",
"Starting A2A-protocol agent server via Uvicorn.",
)

# Validate host/port/config
validated_options = validate_cli_options(host, port, config)

# Load agentflow.json
config_manager = ConfigManager()
actual_config_path = config_manager.find_config_file(validated_options["config"])
config_data = config_manager.load_config(str(actual_config_path))

# Load .env
env_file_path = config_manager.resolve_env_file()
if env_file_path:
self.logger.info("Loading environment from: %s", env_file_path)
load_dotenv(env_file_path)
else:
load_dotenv()

# Make user graph importable
os.environ["GRAPH_PATH"] = str(actual_config_path)
sys.path.insert(0, str(actual_config_path.parent))
sys.path.insert(0, str(Path(__file__).parent.parent.parent))

# Read optional "a2a" section from agentflow.json
a2a_config: dict[str, Any] = config_data.get("a2a") or {}

# Resolve final values: CLI flag > agentflow.json > defaults
agent_name = name or a2a_config.get("name") or DEFAULT_A2A_NAME
agent_description = description or a2a_config.get("description") or DEFAULT_A2A_DESCRIPTION
use_streaming = streaming or bool(a2a_config.get("streaming", False))
agent_version = a2a_config.get("version") or DEFAULT_A2A_VERSION
executor_path: str | None = a2a_config.get("executor")
skills_config: list[dict] = a2a_config.get("skills") or []

agent_url = f"http://{validated_options['host']}:{validated_options['port']}/"

self.logger.info(
"Starting A2A server: name=%s, host=%s, port=%d, streaming=%s",
agent_name,
validated_options["host"],
validated_options["port"],
use_streaming,
)

# ---------------------------------------------------------------- #
# Import a2a-sdk — give a clear error if not installed #
# ---------------------------------------------------------------- #
try:
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentSkill
except ImportError as exc:
raise ServerError(
"a2a-sdk is not installed. "
"Run: pip install 'agentflow-cli[a2a]' or pip install a2a-sdk",
host=host,
port=port,
) from exc

# ---------------------------------------------------------------- #
# Import agentflow a2a helpers #
# ---------------------------------------------------------------- #
try:
from agentflow.a2a_integration import make_agent_card
from agentflow.a2a_integration.executor import AgentFlowExecutor
except ImportError as exc:
raise ServerError(
"agentflow a2a_integration is not available. "
"Make sure you have 'agentflow[a2a_sdk]' installed.",
host=host,
port=port,
) from exc

# ---------------------------------------------------------------- #
# Load the CompiledGraph #
# ---------------------------------------------------------------- #
import asyncio

from agentflow_cli.src.app.loader import load_graph

agent_path: str = config_data["agent"]
compiled_graph = asyncio.get_event_loop().run_until_complete(
load_graph(agent_path)
)

# ---------------------------------------------------------------- #
# Build skills list from config (optional) #
# ---------------------------------------------------------------- #
skills = []
for s in skills_config:
skills.append(
AgentSkill(
id=s.get("id", "run_graph"),
name=s.get("name", agent_name),
description=s.get("description", agent_description),
tags=s.get("tags", []),
examples=s.get("examples", []),
)
)

# ---------------------------------------------------------------- #
# Build the AgentCard #
# ---------------------------------------------------------------- #
card = make_agent_card(
name=agent_name,
description=agent_description,
url=agent_url,
streaming=use_streaming,
version=agent_version,
skills=skills if skills else None,
)

# ---------------------------------------------------------------- #
# Resolve executor — custom class or default AgentFlowExecutor #
# ---------------------------------------------------------------- #
if executor_path:
try:
module_name, class_name = executor_path.rsplit(":", 1)
module = importlib.import_module(module_name)
executor_cls = getattr(module, class_name)
self.logger.info("Loaded custom executor: %s", executor_path)
executor = executor_cls(compiled_graph)
except Exception as exc:
raise ConfigurationError(
f"Failed to load custom executor '{executor_path}': {exc}",
config_path=str(actual_config_path),
) from exc
else:
executor = AgentFlowExecutor(compiled_graph, streaming=use_streaming)

# ---------------------------------------------------------------- #
# Build and start the A2A server #
# ---------------------------------------------------------------- #
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
)
starlette_app = A2AStarletteApplication(
agent_card=card,
http_handler=handler,
)

self.output.info(
f"A2A agent '{agent_name}' listening on "
f"http://{validated_options['host']}:{validated_options['port']}/"
)
self.output.info(
f"Agent card: "
f"http://{validated_options['host']}:{validated_options['port']}"
f"/.well-known/agent-card.json"
)

import uvicorn

uvicorn.run(
starlette_app.build(),
host=validated_options["host"],
port=validated_options["port"],
)

return 0

except (ConfigurationError, ServerError) as e:
return self.handle_error(e)
except Exception as e:
server_error = ServerError(
f"Failed to start A2A server: {e}", host=host, port=port
)
return self.handle_error(server_error)
7 changes: 7 additions & 0 deletions agentflow_cli/cli/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
DEFAULT_PYTHON_VERSION: Final[str] = "3.13"
DEFAULT_SERVICE_NAME: Final[str] = "agentflow-api"

# A2A server defaults
DEFAULT_A2A_HOST: Final[str] = "0.0.0.0"
DEFAULT_A2A_PORT: Final[int] = 9999
DEFAULT_A2A_NAME: Final[str] = "AgentFlowAgent"
DEFAULT_A2A_DESCRIPTION: Final[str] = "An agentflow-powered A2A agent"
DEFAULT_A2A_VERSION: Final[str] = "1.0.0"

# File paths and names
CONFIG_FILENAMES: Final[list[str]] = [
"agentflow.json",
Expand Down
77 changes: 76 additions & 1 deletion agentflow_cli/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@
import typer
from dotenv import load_dotenv

from agentflow_cli.cli.commands.a2a import A2ACommand
from agentflow_cli.cli.commands.api import APICommand
from agentflow_cli.cli.commands.build import BuildCommand
from agentflow_cli.cli.commands.init import InitCommand
from agentflow_cli.cli.commands.version import VersionCommand
from agentflow_cli.cli.constants import DEFAULT_CONFIG_FILE, DEFAULT_HOST, DEFAULT_PORT
from agentflow_cli.cli.constants import (
DEFAULT_A2A_HOST,
DEFAULT_A2A_PORT,
DEFAULT_CONFIG_FILE,
DEFAULT_HOST,
DEFAULT_PORT,
)
from agentflow_cli.cli.core.output import OutputFormatter
from agentflow_cli.cli.exceptions import PyagenityCLIError
from agentflow_cli.cli.logger import setup_cli_logging
Expand Down Expand Up @@ -106,6 +113,74 @@ def api(
sys.exit(handle_exception(e))


@app.command()
def a2a(
config: str = typer.Option(
DEFAULT_CONFIG_FILE,
"--config",
"-c",
help="Path to agentflow.json config file",
),
host: str = typer.Option(
DEFAULT_A2A_HOST,
"--host",
"-H",
help="Host to bind the A2A server on (default: 0.0.0.0)",
),
port: int = typer.Option(
DEFAULT_A2A_PORT,
"--port",
"-p",
help="Port to run the A2A server on (default: 9999)",
),
name: str = typer.Option(
None,
"--name",
"-n",
help="Agent name shown in the agent card (overrides agentflow.json)",
),
description: str = typer.Option(
None,
"--description",
"-d",
help="Agent description shown in the agent card (overrides agentflow.json)",
),
streaming: bool = typer.Option(
False,
"--streaming/--no-streaming",
help="Enable A2A streaming (SSE) responses",
),
verbose: bool = typer.Option(
False,
"--verbose",
"-v",
help="Enable verbose logging",
),
quiet: bool = typer.Option(
False,
"--quiet",
"-q",
help="Suppress all output except errors",
),
) -> None:
"""Start an A2A-protocol agent server for the configured graph."""
setup_cli_logging(verbose=verbose, quiet=quiet)

try:
command = A2ACommand(output)
exit_code = command.execute(
config=config,
host=host,
port=port,
name=name,
description=description,
streaming=streaming,
)
sys.exit(exit_code)
except Exception as e:
sys.exit(handle_exception(e))


@app.command()
def version(
verbose: bool = typer.Option(
Expand Down
23 changes: 23 additions & 0 deletions examples/currency_agent/agentflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"agent": "graph:app",
"env": ".env",
"a2a": {
"name": "CurrencyAgent",
"description": "Currency conversion with INPUT_REQUIRED for missing info.",
"version": "1.0.0",
"streaming": true,
"executor": "executor:CurrencyAgentExecutor",
"skills": [
{
"id": "currency_conversion",
"name": "Currency Conversion",
"description": "Convert between currencies. Asks if info is missing.",
"tags": ["currency", "finance", "exchange-rate"],
"examples": [
"How much is 100 USD in EUR?",
"Convert 50 GBP to JPY"
]
}
]
}
}
Loading
Loading