Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@ on:

permissions:
contents: read
env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true"

jobs:
quality-gate:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Setup Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.13"

- name: Setup uv
uses: astral-sh/setup-uv@v4
uses: astral-sh/setup-uv@v7
with:
enable-cache: false

Expand Down Expand Up @@ -70,15 +68,15 @@ jobs:

steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Setup Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}

- name: Setup uv
uses: astral-sh/setup-uv@v4
uses: astral-sh/setup-uv@v7
with:
enable-cache: false

Expand Down
8 changes: 3 additions & 5 deletions .github/workflows/dependency-health.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@ on:

permissions:
contents: read
env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true"

jobs:
dependency-health:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Setup Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.13"

- name: Setup uv
uses: astral-sh/setup-uv@v4
uses: astral-sh/setup-uv@v7
with:
enable-cache: false

Expand Down
8 changes: 3 additions & 5 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,24 @@ on:
permissions:
contents: write
id-token: write
env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: "true"

jobs:
publish:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 0

- name: Setup Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: "3.13"

- name: Setup uv
uses: astral-sh/setup-uv@v4
uses: astral-sh/setup-uv@v7
with:
enable-cache: false

Expand Down
211 changes: 175 additions & 36 deletions src/opencode_a2a/server/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@
DefaultRequestHandler,
)
from a2a.types import (
Artifact,
InternalError,
Message,
Part,
Role,
Task,
TaskArtifactUpdateEvent,
TaskIdParams,
TaskNotCancelableError,
TaskNotFoundError,
TaskQueryParams,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
TextPart,
)
from a2a.utils.errors import ServerError
from fastapi import FastAPI, Request
Expand Down Expand Up @@ -84,9 +94,15 @@
build_session_state_repository,
initialize_state_repository,
)
from .task_store import build_database_engine, build_task_store, initialize_task_store
from .task_store import (
TaskStoreOperationError,
build_database_engine,
build_task_store,
initialize_task_store,
)

logger = logging.getLogger(__name__)
TASK_STORE_ERROR_TYPE = "TASK_STORE_UNAVAILABLE"

__all__ = [
"_RequestBodyTooLargeError",
Expand Down Expand Up @@ -134,52 +150,155 @@
class OpencodeRequestHandler(DefaultRequestHandler):
"""Custom request handler to gracefully handle client disconnects and prevent dead loops."""

@staticmethod
def _task_store_failure_message(operation: str) -> str:
if operation == "get":
return "Task store unavailable while loading task state."
if operation == "save":
return "Task store unavailable while persisting task state."
if operation == "delete":
return "Task store unavailable while deleting task state."
return "Task store unavailable."

@classmethod
def _task_store_failure_metadata(cls, operation: str) -> dict[str, dict[str, dict[str, str]]]:
return {
"opencode": {
"error": {
"type": TASK_STORE_ERROR_TYPE,
"operation": operation,
}
}
}

@classmethod
def _task_store_server_error(cls, exc: TaskStoreOperationError) -> ServerError:
return ServerError(
error=InternalError(message=cls._task_store_failure_message(exc.operation))
)

@classmethod
def _task_store_failure_task(
cls,
*,
task_id: str,
context_id: str,
operation: str,
) -> Task:
message_text = cls._task_store_failure_message(operation)
error_message = Message(
message_id=f"{task_id}:task-store-error",
role=Role.agent,
parts=[Part(root=TextPart(text=message_text))],
task_id=task_id,
context_id=context_id,
)
return Task(
id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.failed, message=error_message),
history=[error_message],
metadata=cls._task_store_failure_metadata(operation),
)

@classmethod
def _task_store_failure_events(
cls,
*,
task_id: str,
context_id: str,
operation: str,
) -> tuple[TaskArtifactUpdateEvent, TaskStatusUpdateEvent]:
message_text = cls._task_store_failure_message(operation)
return (
TaskArtifactUpdateEvent(
task_id=task_id,
context_id=context_id,
artifact=Artifact(
artifact_id=f"{task_id}:error",
parts=[Part(root=TextPart(text=message_text))],
),
append=False,
last_chunk=True,
),
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.failed),
metadata=cls._task_store_failure_metadata(operation),
final=True,
),
)

@staticmethod
def _resolve_context_id_from_params(params, task_id: str) -> str: # noqa: ANN001
message = getattr(params, "message", None)
return (
getattr(message, "contextId", None) or getattr(message, "context_id", None) or task_id
)

async def on_get_task(
self,
params: TaskQueryParams,
context=None,
) -> Task | None:
try:
return await super().on_get_task(params, context)
except TaskStoreOperationError as exc:
raise self._task_store_server_error(exc) from exc

async def on_cancel_task(
self,
params: TaskIdParams,
context=None,
) -> Task | None:
task = await self.task_store.get(params.id, context)
if not task:
raise ServerError(error=TaskNotFoundError())

# Idempotent contract:
# repeated cancel on already-canceled task returns current terminal state.
if task.status.state == TaskState.canceled:
return task

if task.status.state in TERMINAL_TASK_STATES:
raise ServerError(
error=TaskNotCancelableError(
message=f"Task cannot be canceled - current state: {task.status.state}"
)
)
try:
return await super().on_cancel_task(params, context)
except ServerError as exc:
# Race-safe idempotency: task may become canceled between pre-check and super call.
if isinstance(exc.error, TaskNotCancelableError):
refreshed = await self.task_store.get(params.id, context)
if refreshed and refreshed.status.state == TaskState.canceled:
return refreshed
raise
task = await self.task_store.get(params.id, context)
if not task:
raise ServerError(error=TaskNotFoundError())

# Idempotent contract:
# repeated cancel on already-canceled task returns current terminal state.
if task.status.state == TaskState.canceled:
return task

if task.status.state in TERMINAL_TASK_STATES:
raise ServerError(
error=TaskNotCancelableError(
message=f"Task cannot be canceled - current state: {task.status.state}"
)
)
try:
return await super().on_cancel_task(params, context)
except ServerError as exc:
# Race-safe idempotency: task may become canceled between pre-check and super call.
if isinstance(exc.error, TaskNotCancelableError):
refreshed = await self.task_store.get(params.id, context)
if refreshed and refreshed.status.state == TaskState.canceled:
return refreshed
raise
except TaskStoreOperationError as exc:
raise self._task_store_server_error(exc) from exc

async def on_resubscribe_to_task(
self,
params: TaskIdParams,
context=None,
):
task = await self.task_store.get(params.id, context)
if not task:
raise ServerError(error=TaskNotFoundError())
try:
task = await self.task_store.get(params.id, context)
if not task:
raise ServerError(error=TaskNotFoundError())

# Subscribe contract: terminal tasks replay once and then close stream.
if task.status.state in TERMINAL_TASK_STATES:
yield task
return
# Subscribe contract: terminal tasks replay once and then close stream.
if task.status.state in TERMINAL_TASK_STATES:
yield task
return

async for event in super().on_resubscribe_to_task(params, context):
yield event
async for event in super().on_resubscribe_to_task(params, context):
yield event
except TaskStoreOperationError as exc:
raise self._task_store_server_error(exc) from exc

async def on_message_send_stream(self, params, context=None):
(
Expand All @@ -202,6 +321,18 @@ async def on_message_send_stream(self, params, context=None):
await self._send_push_notification_if_needed(task_id, result_aggregator)
yield event
stream_completed = True
except TaskStoreOperationError as exc:
logger.exception(
"Task store operation failed during streaming task_id=%s operation=%s",
task_id,
exc.operation,
)
for event in self._task_store_failure_events(
task_id=task_id,
context_id=self._resolve_context_id_from_params(params, task_id),
operation=exc.operation,
):
yield event
except (asyncio.CancelledError, GeneratorExit):
logger.warning("Client disconnected. Cancelling producer task %s", task_id)
producer_task.cancel()
Expand Down Expand Up @@ -253,6 +384,17 @@ async def push_notification_callback() -> None:
if bg_consume_task is not None:
bg_consume_task.set_name(f"continue_consuming:{task_id}")
self._track_background_task(bg_consume_task)
except TaskStoreOperationError as exc:
logger.exception(
"Task store operation failed during message/send task_id=%s operation=%s",
task_id,
exc.operation,
)
return self._task_store_failure_task(
task_id=task_id,
context_id=self._resolve_context_id_from_params(params, task_id),
operation=exc.operation,
)
except Exception:
logger.exception("Agent execution failed")
raise
Expand All @@ -276,9 +418,6 @@ async def push_notification_callback() -> None:
pass

if not result:
from a2a.types import InternalError
from a2a.utils.errors import ServerError

raise ServerError(error=InternalError())

if hasattr(result, "id") and result.id:
Expand Down
Loading