Skip to content
4 changes: 2 additions & 2 deletions samples/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import httpx

from a2a.client import A2ACardResolver, ClientConfig, create_client
from a2a.helpers import get_artifact_text, get_message_text
from a2a.helpers.agent_card import display_agent_card
from a2a.types import Message, Part, Role, SendMessageRequest, TaskState
from a2a.utils import get_artifact_text, get_message_text
from a2a.utils.agent_card import display_agent_card


async def _handle_stream(
Expand Down
5 changes: 2 additions & 3 deletions scripts/test_minimal_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,13 @@
'a2a.server.tasks',
'a2a.types',
'a2a.utils',
'a2a.utils.artifact',
'a2a.utils.constants',
'a2a.utils.error_handlers',
'a2a.utils.helpers',
'a2a.utils.message',
'a2a.utils.parts',
'a2a.utils.proto_utils',
'a2a.utils.task',
'a2a.helpers.agent_card',
'a2a.helpers.proto_helpers',
]


Expand Down
2 changes: 0 additions & 2 deletions src/a2a/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
A2AClientTimeoutError,
AgentCardResolutionError,
)
from a2a.client.helpers import create_text_message_object
from a2a.client.interceptors import ClientCallInterceptor


Expand All @@ -41,6 +40,5 @@
'CredentialService',
'InMemoryContextCredentialStore',
'create_client',
'create_text_message_object',
'minimal_agent_card',
]
20 changes: 1 addition & 19 deletions src/a2a/client/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Helper functions for the A2A client."""

from typing import Any
from uuid import uuid4

from google.protobuf.json_format import ParseDict

from a2a.types.a2a_pb2 import AgentCard, Message, Part, Role
from a2a.types.a2a_pb2 import AgentCard


def parse_agent_card(agent_card_data: dict[str, Any]) -> AgentCard:
Expand Down Expand Up @@ -111,20 +110,3 @@ def _handle_security_compatibility(agent_card_data: dict[str, Any]) -> None:
new_scheme_wrapper = {mapped_name: scheme.copy()}
scheme.clear()
scheme.update(new_scheme_wrapper)


def create_text_message_object(
role: Role = Role.ROLE_USER, content: str = ''
) -> Message:
"""Create a Message object containing a single text Part.

Args:
role: The role of the message sender (user or agent). Defaults to Role.ROLE_USER.
content: The text content of the message. Defaults to an empty string.

Returns:
A `Message` object with a new UUID message_id.
"""
return Message(
role=role, parts=[Part(text=content)], message_id=str(uuid4())
)
34 changes: 34 additions & 0 deletions src/a2a/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Helper functions for the A2A Python SDK."""

from a2a.helpers.agent_card import display_agent_card
from a2a.helpers.proto_helpers import (
get_artifact_text,
get_message_text,
get_stream_response_text,
get_text_parts,
new_artifact,
new_message,
new_task,
new_task_from_user_message,
new_text_artifact,
new_text_artifact_update_event,
new_text_message,
new_text_status_update_event,
)


__all__ = [
'display_agent_card',
'get_artifact_text',
'get_message_text',
'get_stream_response_text',
'get_text_parts',
'new_artifact',
'new_message',
'new_task',
'new_task_from_user_message',
'new_text_artifact',
'new_text_artifact_update_event',
'new_text_message',
'new_text_status_update_event',
]
File renamed without changes.
214 changes: 214 additions & 0 deletions src/a2a/helpers/proto_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""Unified helper functions for creating and handling A2A types."""

import uuid

from collections.abc import Sequence

from a2a.types.a2a_pb2 import (
Artifact,
Message,
Part,
Role,
StreamResponse,
Task,
TaskArtifactUpdateEvent,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)


# --- Message Helpers ---


def new_message(
parts: list[Part],
role: Role = Role.ROLE_AGENT,
context_id: str | None = None,
task_id: str | None = None,
) -> Message:
"""Creates a new message containing a list of Parts."""
return Message(
role=role,
parts=parts,
message_id=str(uuid.uuid4()),
task_id=task_id,
context_id=context_id,
)


def new_text_message(
text: str,
context_id: str | None = None,
task_id: str | None = None,
role: Role = Role.ROLE_AGENT,
) -> Message:
"""Creates a new message containing a single text Part."""
return new_message(
parts=[Part(text=text)],
role=role,
task_id=task_id,
context_id=context_id,
)


def get_message_text(message: Message, delimiter: str = '\n') -> str:
"""Extracts and joins all text content from a Message's parts."""
return delimiter.join(get_text_parts(message.parts))


# --- Artifact Helpers ---


def new_artifact(
parts: list[Part],
name: str,
description: str | None = None,
artifact_id: str | None = None,
) -> Artifact:
"""Creates a new Artifact object."""
return Artifact(
artifact_id=artifact_id or str(uuid.uuid4()),
parts=parts,
name=name,
description=description,
)


def new_text_artifact(
name: str,
text: str,
description: str | None = None,
artifact_id: str | None = None,
) -> Artifact:
"""Creates a new Artifact object containing only a single text Part."""
return new_artifact(
[Part(text=text)],
name,
description,
artifact_id=artifact_id,
)


def get_artifact_text(artifact: Artifact, delimiter: str = '\n') -> str:
"""Extracts and joins all text content from an Artifact's parts."""
return delimiter.join(get_text_parts(artifact.parts))


# --- Task Helpers ---


def new_task_from_user_message(user_message: Message) -> Task:
Comment thread
guglielmo-san marked this conversation as resolved.
"""Creates a new Task object from an initial user message."""
if user_message.role != Role.ROLE_USER:
raise ValueError('Message must be from a user')
if not user_message.parts:
raise ValueError('Message parts cannot be empty')
for part in user_message.parts:
if part.HasField('text') and not part.text:
raise ValueError('Message.text cannot be empty')

return Task(
status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED),
id=user_message.task_id or str(uuid.uuid4()),
context_id=user_message.context_id or str(uuid.uuid4()),
history=[user_message],
)


def new_task(
task_id: str,
context_id: str,
state: TaskState,
artifacts: list[Artifact] | None = None,
history: list[Message] | None = None,
) -> Task:
"""Creates a Task object with a specified status."""
if history is None:
history = []
if artifacts is None:
artifacts = []

return Task(
status=TaskStatus(state=state),
id=task_id,
context_id=context_id,
artifacts=artifacts,
history=history,
)


# --- Part Helpers ---


def get_text_parts(parts: Sequence[Part]) -> list[str]:
"""Extracts text content from all text Parts."""
return [part.text for part in parts if part.HasField('text')]


# --- Event & Stream Helpers ---


def new_text_status_update_event(
task_id: str,
context_id: str,
state: TaskState,
text: str,
) -> TaskStatusUpdateEvent:
"""Creates a TaskStatusUpdateEvent with a single text message."""
return TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(
state=state,
message=new_text_message(
text=text,
role=Role.ROLE_AGENT,
context_id=context_id,
task_id=task_id,
),
),
)


def new_text_artifact_update_event( # noqa: PLR0913
task_id: str,
context_id: str,
name: str,
text: str,
append: bool = False,
last_chunk: bool = False,
artifact_id: str | None = None,
) -> TaskArtifactUpdateEvent:
"""Creates a TaskArtifactUpdateEvent with a single text artifact."""
return TaskArtifactUpdateEvent(
task_id=task_id,
context_id=context_id,
artifact=new_text_artifact(
name=name, text=text, artifact_id=artifact_id
),
append=append,
last_chunk=last_chunk,
)


def get_stream_response_text(
response: StreamResponse, delimiter: str = '\n'
) -> str:
"""Extracts text content from a StreamResponse."""
if response.HasField('message'):
return get_message_text(response.message, delimiter)
if response.HasField('task'):
texts = [
get_artifact_text(a, delimiter) for a in response.task.artifacts
]
return delimiter.join(t for t in texts if t)
if response.HasField('status_update'):
if response.status_update.status.HasField('message'):
return get_message_text(
response.status_update.status.message, delimiter
)
return ''
if response.HasField('artifact_update'):
return get_artifact_text(response.artifact_update.artifact, delimiter)
return ''
2 changes: 1 addition & 1 deletion src/a2a/server/agent_execution/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any

from a2a.helpers.proto_helpers import get_message_text
from a2a.server.context import ServerCallContext
from a2a.server.id_generator import (
IDGenerator,
Expand All @@ -12,7 +13,6 @@
SendMessageRequest,
Task,
)
from a2a.utils import get_message_text
from a2a.utils.errors import InvalidParamsError


Expand Down
Loading
Loading