Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions src/fast_agent/acp/server/agent_acp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ async def prompt(self, params: PromptRequest) -> PromptResponse:
# Set up streaming if connection is available and agent supports it
stream_listener = None
remove_listener: Callable[[], None] | None = None
streaming_tasks: list[asyncio.Task] = []
if self._connection and isinstance(agent, StreamingAgentProtocol):
update_lock = asyncio.Lock()

Expand Down Expand Up @@ -336,7 +337,9 @@ def on_stream_chunk(chunk: str):
)

# Send update asynchronously (don't await in sync callback)
asyncio.create_task(send_stream_update(chunk))
# Track task to ensure all chunks complete before returning PromptResponse
task = asyncio.create_task(send_stream_update(chunk))
streaming_tasks.append(task)

# Register the stream listener and keep the cleanup function
stream_listener = on_stream_chunk
Expand All @@ -359,15 +362,34 @@ def on_stream_chunk(chunk: str):
response_length=len(response_text),
)

# Always send final update with complete response
# (streaming sends chunks during execution, this is the final complete message)
if self._connection and response_text:
# Wait for all streaming tasks to complete before sending final message
# and returning PromptResponse. This ensures all chunks arrive before END_TURN.
if streaming_tasks:
try:
await asyncio.gather(*streaming_tasks)
logger.debug(
f"All {len(streaming_tasks)} streaming tasks completed",
name="acp_streaming_complete",
session_id=session_id,
task_count=len(streaming_tasks),
)
except Exception as e:
logger.error(
f"Error waiting for streaming tasks: {e}",
name="acp_streaming_wait_error",
exc_info=True,
)

# Only send final update if no streaming chunks were sent
# When chunks were streamed, the final chunk already contains the complete response
# This prevents duplicate messages from being sent to the client
if not streaming_tasks and self._connection and response_text:
try:
message_chunk = update_agent_message_text(response_text)
notification = session_notification(session_id, message_chunk)
await self._connection.sessionUpdate(notification)
logger.info(
"Sent final sessionUpdate with complete response",
"Sent final sessionUpdate with complete response (no streaming)",
name="acp_final_update",
session_id=session_id,
)
Expand Down
Loading