diff --git a/agentflow_cli/cli/commands/a2a.py b/agentflow_cli/cli/commands/a2a.py new file mode 100644 index 0000000..f2dc786 --- /dev/null +++ b/agentflow_cli/cli/commands/a2a.py @@ -0,0 +1,213 @@ +"""A2A server command implementation.""" + +from __future__ import annotations + +import importlib +import os +import sys +from pathlib import Path +from typing import Any + +from dotenv import load_dotenv + +from agentflow_cli.cli.commands import BaseCommand +from agentflow_cli.cli.constants import ( + DEFAULT_A2A_DESCRIPTION, + DEFAULT_A2A_HOST, + DEFAULT_A2A_NAME, + DEFAULT_A2A_PORT, + DEFAULT_A2A_VERSION, + DEFAULT_CONFIG_FILE, +) +from agentflow_cli.cli.core.config import ConfigManager +from agentflow_cli.cli.core.validation import validate_cli_options +from agentflow_cli.cli.exceptions import ConfigurationError, ServerError + + +class A2ACommand(BaseCommand): + """Command to start an A2A-protocol-compliant agent server.""" + + def execute( + self, + config: str = DEFAULT_CONFIG_FILE, + host: str = DEFAULT_A2A_HOST, + port: int = DEFAULT_A2A_PORT, + name: str | None = None, + description: str | None = None, + streaming: bool = False, + **kwargs: Any, + ) -> int: + try: + self.output.print_banner( + "A2A Server", + "Starting A2A-protocol agent server via Uvicorn.", + ) + + # Validate host/port/config + validated_options = validate_cli_options(host, port, config) + + # Load agentflow.json + config_manager = ConfigManager() + actual_config_path = config_manager.find_config_file(validated_options["config"]) + config_data = config_manager.load_config(str(actual_config_path)) + + # Load .env + env_file_path = config_manager.resolve_env_file() + if env_file_path: + self.logger.info("Loading environment from: %s", env_file_path) + load_dotenv(env_file_path) + else: + load_dotenv() + + # Make user graph importable + os.environ["GRAPH_PATH"] = str(actual_config_path) + sys.path.insert(0, str(actual_config_path.parent)) + sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + + # Read optional "a2a" section from agentflow.json + a2a_config: dict[str, Any] = config_data.get("a2a") or {} + + # Resolve final values: CLI flag > agentflow.json > defaults + agent_name = name or a2a_config.get("name") or DEFAULT_A2A_NAME + agent_description = description or a2a_config.get("description") or DEFAULT_A2A_DESCRIPTION + use_streaming = streaming or bool(a2a_config.get("streaming", False)) + agent_version = a2a_config.get("version") or DEFAULT_A2A_VERSION + executor_path: str | None = a2a_config.get("executor") + skills_config: list[dict] = a2a_config.get("skills") or [] + + agent_url = f"http://{validated_options['host']}:{validated_options['port']}/" + + self.logger.info( + "Starting A2A server: name=%s, host=%s, port=%d, streaming=%s", + agent_name, + validated_options["host"], + validated_options["port"], + use_streaming, + ) + + # ---------------------------------------------------------------- # + # Import a2a-sdk — give a clear error if not installed # + # ---------------------------------------------------------------- # + try: + from a2a.server.apps import A2AStarletteApplication + from a2a.server.request_handlers import DefaultRequestHandler + from a2a.server.tasks import InMemoryTaskStore + from a2a.types import AgentSkill + except ImportError as exc: + raise ServerError( + "a2a-sdk is not installed. " + "Run: pip install 'agentflow-cli[a2a]' or pip install a2a-sdk", + host=host, + port=port, + ) from exc + + # ---------------------------------------------------------------- # + # Import agentflow a2a helpers # + # ---------------------------------------------------------------- # + try: + from agentflow.a2a_integration import make_agent_card + from agentflow.a2a_integration.executor import AgentFlowExecutor + except ImportError as exc: + raise ServerError( + "agentflow a2a_integration is not available. " + "Make sure you have 'agentflow[a2a_sdk]' installed.", + host=host, + port=port, + ) from exc + + # ---------------------------------------------------------------- # + # Load the CompiledGraph # + # ---------------------------------------------------------------- # + import asyncio + + from agentflow_cli.src.app.loader import load_graph + + agent_path: str = config_data["agent"] + compiled_graph = asyncio.get_event_loop().run_until_complete( + load_graph(agent_path) + ) + + # ---------------------------------------------------------------- # + # Build skills list from config (optional) # + # ---------------------------------------------------------------- # + skills = [] + for s in skills_config: + skills.append( + AgentSkill( + id=s.get("id", "run_graph"), + name=s.get("name", agent_name), + description=s.get("description", agent_description), + tags=s.get("tags", []), + examples=s.get("examples", []), + ) + ) + + # ---------------------------------------------------------------- # + # Build the AgentCard # + # ---------------------------------------------------------------- # + card = make_agent_card( + name=agent_name, + description=agent_description, + url=agent_url, + streaming=use_streaming, + version=agent_version, + skills=skills if skills else None, + ) + + # ---------------------------------------------------------------- # + # Resolve executor — custom class or default AgentFlowExecutor # + # ---------------------------------------------------------------- # + if executor_path: + try: + module_name, class_name = executor_path.rsplit(":", 1) + module = importlib.import_module(module_name) + executor_cls = getattr(module, class_name) + self.logger.info("Loaded custom executor: %s", executor_path) + executor = executor_cls(compiled_graph) + except Exception as exc: + raise ConfigurationError( + f"Failed to load custom executor '{executor_path}': {exc}", + config_path=str(actual_config_path), + ) from exc + else: + executor = AgentFlowExecutor(compiled_graph, streaming=use_streaming) + + # ---------------------------------------------------------------- # + # Build and start the A2A server # + # ---------------------------------------------------------------- # + handler = DefaultRequestHandler( + agent_executor=executor, + task_store=InMemoryTaskStore(), + ) + starlette_app = A2AStarletteApplication( + agent_card=card, + http_handler=handler, + ) + + self.output.info( + f"A2A agent '{agent_name}' listening on " + f"http://{validated_options['host']}:{validated_options['port']}/" + ) + self.output.info( + f"Agent card: " + f"http://{validated_options['host']}:{validated_options['port']}" + f"/.well-known/agent-card.json" + ) + + import uvicorn + + uvicorn.run( + starlette_app.build(), + host=validated_options["host"], + port=validated_options["port"], + ) + + return 0 + + except (ConfigurationError, ServerError) as e: + return self.handle_error(e) + except Exception as e: + server_error = ServerError( + f"Failed to start A2A server: {e}", host=host, port=port + ) + return self.handle_error(server_error) diff --git a/agentflow_cli/cli/constants.py b/agentflow_cli/cli/constants.py index c0241ab..f9981a8 100644 --- a/agentflow_cli/cli/constants.py +++ b/agentflow_cli/cli/constants.py @@ -14,6 +14,13 @@ DEFAULT_PYTHON_VERSION: Final[str] = "3.13" DEFAULT_SERVICE_NAME: Final[str] = "agentflow-api" +# A2A server defaults +DEFAULT_A2A_HOST: Final[str] = "0.0.0.0" +DEFAULT_A2A_PORT: Final[int] = 9999 +DEFAULT_A2A_NAME: Final[str] = "AgentFlowAgent" +DEFAULT_A2A_DESCRIPTION: Final[str] = "An agentflow-powered A2A agent" +DEFAULT_A2A_VERSION: Final[str] = "1.0.0" + # File paths and names CONFIG_FILENAMES: Final[list[str]] = [ "agentflow.json", diff --git a/agentflow_cli/cli/main.py b/agentflow_cli/cli/main.py index 2f1ad39..24cf502 100644 --- a/agentflow_cli/cli/main.py +++ b/agentflow_cli/cli/main.py @@ -5,11 +5,18 @@ import typer from dotenv import load_dotenv +from agentflow_cli.cli.commands.a2a import A2ACommand from agentflow_cli.cli.commands.api import APICommand from agentflow_cli.cli.commands.build import BuildCommand from agentflow_cli.cli.commands.init import InitCommand from agentflow_cli.cli.commands.version import VersionCommand -from agentflow_cli.cli.constants import DEFAULT_CONFIG_FILE, DEFAULT_HOST, DEFAULT_PORT +from agentflow_cli.cli.constants import ( + DEFAULT_A2A_HOST, + DEFAULT_A2A_PORT, + DEFAULT_CONFIG_FILE, + DEFAULT_HOST, + DEFAULT_PORT, +) from agentflow_cli.cli.core.output import OutputFormatter from agentflow_cli.cli.exceptions import PyagenityCLIError from agentflow_cli.cli.logger import setup_cli_logging @@ -106,6 +113,74 @@ def api( sys.exit(handle_exception(e)) +@app.command() +def a2a( + config: str = typer.Option( + DEFAULT_CONFIG_FILE, + "--config", + "-c", + help="Path to agentflow.json config file", + ), + host: str = typer.Option( + DEFAULT_A2A_HOST, + "--host", + "-H", + help="Host to bind the A2A server on (default: 0.0.0.0)", + ), + port: int = typer.Option( + DEFAULT_A2A_PORT, + "--port", + "-p", + help="Port to run the A2A server on (default: 9999)", + ), + name: str = typer.Option( + None, + "--name", + "-n", + help="Agent name shown in the agent card (overrides agentflow.json)", + ), + description: str = typer.Option( + None, + "--description", + "-d", + help="Agent description shown in the agent card (overrides agentflow.json)", + ), + streaming: bool = typer.Option( + False, + "--streaming/--no-streaming", + help="Enable A2A streaming (SSE) responses", + ), + verbose: bool = typer.Option( + False, + "--verbose", + "-v", + help="Enable verbose logging", + ), + quiet: bool = typer.Option( + False, + "--quiet", + "-q", + help="Suppress all output except errors", + ), +) -> None: + """Start an A2A-protocol agent server for the configured graph.""" + setup_cli_logging(verbose=verbose, quiet=quiet) + + try: + command = A2ACommand(output) + exit_code = command.execute( + config=config, + host=host, + port=port, + name=name, + description=description, + streaming=streaming, + ) + sys.exit(exit_code) + except Exception as e: + sys.exit(handle_exception(e)) + + @app.command() def version( verbose: bool = typer.Option( diff --git a/examples/currency_agent/agentflow.json b/examples/currency_agent/agentflow.json new file mode 100644 index 0000000..b06214e --- /dev/null +++ b/examples/currency_agent/agentflow.json @@ -0,0 +1,23 @@ +{ + "agent": "graph:app", + "env": ".env", + "a2a": { + "name": "CurrencyAgent", + "description": "Currency conversion with INPUT_REQUIRED for missing info.", + "version": "1.0.0", + "streaming": true, + "executor": "executor:CurrencyAgentExecutor", + "skills": [ + { + "id": "currency_conversion", + "name": "Currency Conversion", + "description": "Convert between currencies. Asks if info is missing.", + "tags": ["currency", "finance", "exchange-rate"], + "examples": [ + "How much is 100 USD in EUR?", + "Convert 50 GBP to JPY" + ] + } + ] + } +} diff --git a/examples/currency_agent/executor.py b/examples/currency_agent/executor.py new file mode 100644 index 0000000..1c4781e --- /dev/null +++ b/examples/currency_agent/executor.py @@ -0,0 +1,81 @@ +""" +Custom executor for the CurrencyAgent. + +Extends AgentFlowExecutor to emit INPUT_REQUIRED when the LLM asks +for missing information (e.g. "Which currency do you want to convert to?"). + +Referenced in agentflow.json as: + "executor": "executor:CurrencyAgentExecutor" +""" + +from __future__ import annotations + +import logging + +from a2a.server.agent_execution.context import RequestContext +from a2a.server.events.event_queue import EventQueue +from a2a.server.tasks.task_updater import TaskUpdater +from a2a.types import TaskState, TextPart + +from agentflow.a2a_integration.executor import AgentFlowExecutor +from agentflow.state import Message as AFMessage +from agentflow.utils.constants import ResponseGranularity + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- # +# Heuristic: is the LLM asking for more input? # +# --------------------------------------------------------------------------- # + +_ASKING_PHRASES = [ + "could you", "please provide", "please specify", + "what amount", "which currency", "what currency", + "let me know", "can you tell", "i need", + "please tell", "what is the", "what date", +] + + +def _is_asking_for_input(text: str) -> bool: + low = text.lower().strip() + return low.endswith("?") or any(p in low for p in _ASKING_PHRASES) + + +# --------------------------------------------------------------------------- # +# Executor # +# --------------------------------------------------------------------------- # + +class CurrencyAgentExecutor(AgentFlowExecutor): + """Runs the currency graph; emits INPUT_REQUIRED for vague queries.""" + + async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + updater = TaskUpdater( + event_queue=event_queue, + task_id=context.task_id or "", + context_id=context.context_id or "", + ) + await updater.submit() + await updater.start_work() + + try: + user_text = context.get_user_input() if context.message else "" + messages = [AFMessage.text_message(user_text, role="user")] + run_config = {"thread_id": context.context_id or context.task_id or ""} + + result = await self.graph.ainvoke( + {"messages": messages}, + config=run_config, + response_granularity=ResponseGranularity.FULL, + ) + response_text = self._extract_response_text(result) + + if _is_asking_for_input(response_text): + msg = updater.new_agent_message(parts=[TextPart(text=response_text)]) + await updater.update_status(TaskState.input_required, message=msg) + else: + await updater.add_artifact([TextPart(text=response_text)]) + await updater.complete() + + except Exception as exc: + logger.exception("CurrencyAgentExecutor failed") + err = updater.new_agent_message(parts=[TextPart(text=f"Error: {exc!s}")]) + await updater.failed(message=err) diff --git a/examples/currency_agent/graph.py b/examples/currency_agent/graph.py new file mode 100644 index 0000000..7085cc0 --- /dev/null +++ b/examples/currency_agent/graph.py @@ -0,0 +1,123 @@ +""" +Currency agent graph — built with agentflow. + +Uses the Frankfurter API (free, no key needed) for live exchange rates. +The compiled graph is exposed as ``app`` — referenced in agentflow.json as +``"agent": "graph:app"``. +""" + +from __future__ import annotations + +import httpx +from dotenv import load_dotenv +from litellm import acompletion + +from agentflow.adapters.llm.model_response_converter import ModelResponseConverter +from agentflow.graph import StateGraph, ToolNode +from agentflow.state import AgentState +from agentflow.utils.constants import END +from agentflow.utils.converter import convert_messages + +load_dotenv() + +# --------------------------------------------------------------------------- # +# Tool — Frankfurter API # +# --------------------------------------------------------------------------- # + +async def get_exchange_rate( + currency_from: str, + currency_to: str, + currency_date: str = "latest", + amount: float = 1.0, +) -> dict: + """Get exchange rate between two currencies using the Frankfurter API. + + Args: + currency_from: Source currency code (e.g. USD). + currency_to: Target currency code (e.g. INR). + currency_date: Date in YYYY-MM-DD format or 'latest'. + amount: Amount to convert. + + Returns: + dict with keys: amount, base, date, rates. + """ + url = f"https://api.frankfurter.app/{currency_date}" + params = {"from": currency_from, "to": currency_to, "amount": amount} + async with httpx.AsyncClient() as client: + response = await client.get(url, params=params) + response.raise_for_status() + return response.json() + + +tool_node = ToolNode([get_exchange_rate]) + +# --------------------------------------------------------------------------- # +# LLM node # +# --------------------------------------------------------------------------- # + +SYSTEM_PROMPT = ( + "You are a helpful currency conversion assistant. " + "Use the get_exchange_rate tool to look up live exchange rates " + "from the Frankfurter API. Always tell the user the converted " + "amount and the date of the rate." +) + + +async def llm_node(state: AgentState): + messages = convert_messages( + system_prompts=[{"role": "system", "content": SYSTEM_PROMPT}], + state=state, + ) + + # If last message is a tool result, summarise without tools + if state.context and state.context[-1].role == "tool": + response = await acompletion( + model="gemini/gemini-2.5-flash", + messages=messages, + ) + else: + tools = await tool_node.all_tools() + response = await acompletion( + model="gemini/gemini-2.5-flash", + messages=messages, + tools=tools, + ) + + return ModelResponseConverter(response, converter="litellm") + + +# --------------------------------------------------------------------------- # +# Routing # +# --------------------------------------------------------------------------- # + +def should_use_tools(state: AgentState) -> str: + if not state.context: + return END + + last = state.context[-1] + + if ( + hasattr(last, "tools_calls") + and last.tools_calls + and last.role == "assistant" + ): + return "TOOL" + + if last.role == "tool": + return "MAIN" + + return END + + +# --------------------------------------------------------------------------- # +# Graph # +# --------------------------------------------------------------------------- # + +graph = StateGraph() +graph.add_node("MAIN", llm_node) +graph.add_node("TOOL", tool_node) +graph.add_conditional_edges("MAIN", should_use_tools, {"TOOL": "TOOL", "MAIN": "MAIN", END: END}) +graph.add_edge("TOOL", "MAIN") +graph.set_entry_point("MAIN") + +app = graph.compile() diff --git a/pyproject.toml b/pyproject.toml index 1029bd6..3a79a52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,10 @@ Issues = "https://github.com/10xHub/agentflow-cli/issues" Documentation = "https://agentflow-cli.readthedocs.io/" [project.optional-dependencies] +a2a = [ + "a2a-sdk", + "httpx", +] sentry = [ "sentry-sdk>=2.10.0", ]