Skip to content

Streaming Tool Results for Real-Time Use Cases #3837

@sarojrout

Description

@sarojrout

Is your feature request related to a problem? Please describe.
Yes. Currently, tools must complete execution before results are shown to users or agents. This causes issues for:

  1. Long-Running Operations: Tools that take time (e.g., data processing, API calls) block the entire agent response
  2. Real-Time Monitoring: Can't show progress for monitoring tools (stock prices, system metrics)
  3. User Experience: Users wait with no feedback during tool execution
  4. Interactive Tools: Tools that need user feedback during execution can't provide it
    Describe the solution you'd like
    A clear and concise description of what you want to happen.

Example Problem:

@tool
async def process_large_dataset(file_path: str) -> dict:
    # Takes 30 seconds to process
    # Agent and user wait with no feedback
    result = await expensive_processing(file_path)
    return result

Extend the streaming tools infrastructure to support progressive result delivery:

Proposed API:

@streaming_tool
async def monitor_stock_price(symbol: str) -> AsyncGenerator[dict, None]:
    """Monitor stock price with real-time updates."""
    while True:
        price = await get_current_price(symbol)
        yield {
            "symbol": symbol,
            "price": price,
            "timestamp": time.time(),
            "status": "streaming"  # or "complete" for final result
        }
        await asyncio.sleep(1)

@streaming_tool
async def process_large_dataset(file_path: str) -> AsyncGenerator[dict, None]:
    """Process dataset with progress updates."""
    total_rows = await get_row_count(file_path)
    processed = 0
    
    async for batch in process_in_batches(file_path):
        processed += len(batch)
        yield {
            "progress": processed / total_rows,
            "processed": processed,
            "total": total_rows,
            "status": "streaming"
        }
    
    # Final result
    yield {
        "result": final_result,
        "status": "complete"
    }

Behavior:

  1. Progressive Results: Tools can yield partial results as they become available
  2. Agent Reaction: Agent can react to intermediate results (e.g., stop monitoring if threshold reached)
  3. User Feedback: Users see progress for long-running operations
  4. Cancellation Support: Allow users/agents to cancel tool execution mid-stream
  5. Status Indicators: Each yielded result includes status (streaming vs complete)

Usage:

from google.adk.tools import streaming_tool
from google.adk import Agent

@streaming_tool
async def monitor_system_health() -> AsyncGenerator[dict, None]:
    while True:
        metrics = await collect_metrics()
        yield {"metrics": metrics, "status": "streaming"}
        await asyncio.sleep(5)

agent = Agent(
    name="monitor_agent",
    tools=[monitor_system_health],
    instruction="Monitor system health and alert on issues"
)

Describe alternatives you've considered

  1. Polling: Agent polls tool status repeatedly:

    • Inefficient
    • Adds latency
    • Wastes tokens on status checks
  2. Separate Endpoints: Use separate API endpoints for status:

    • Breaks agent flow
    • Requires additional infrastructure
    • Complex state management
  3. Background Tasks: Run tools in background and check later:

    • No real-time feedback
    • Poor user experience
    • Can't react to intermediate results

Additional context

  • Real-Time Monitoring: Stock prices, system metrics, sensor data
  • Long-Running Processing: Data processing with progress updates
  • Interactive Tools: Tools that need user feedback during execution
  • Streaming Data Sources: Tools that consume streaming APIs

Implementation Notes:

  • Should extend existing streaming tools infrastructure
  • Needs integration with agent's tool execution flow
  • Should support cancellation via agent/user input
  • Can leverage existing AsyncGenerator patterns

Related Code:

  • Streaming tools: contributing/samples/live_bidi_streaming_tools_agent/
  • Tool execution: src/google/adk/tools/
  • Streaming infrastructure: src/google/adk/flows/llm_flows/base_llm_flow.py

Priority:

Medium - Important for real-time use cases, but not critical for core functionality.

Metadata

Metadata

Assignees

No one assigned

    Labels

    tools[Component] This issue is related to tools

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions