In [1]:
import os
GOOGLE_API_KEY =  os.getenv("GOOGLE_API_KEY")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "FALSE"
print("‚úÖ Gemini API key setup complete. ")

‚úÖ Gemini API key setup complete. 


In [None]:
# Exercise: Build an Image Generation Agent with Cost Approval
# The scenario:

# Build an agent that generates images using the MCP server, but requires approval for "bulk" image generation:

# Single image request (1 image): Auto-approve, generate immediately
# Bulk request (>1 image): Pause and ask for approval before generating multiple images
# Explore different publicly available Image GenerationDay 2 exercise.ipynb MCP Servers

In [3]:
import uuid
from google.genai import types

from google.adk.agents.llm_agent import Agent
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from google.adk.tools.tool_context import ToolContext
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters
from google.adk.tools.mcp_tool.mcp_toolset import StreamableHTTPConnectionParams

from google.adk.apps.app import App, ResumabilityConfig
from google.adk.tools.function_tool import FunctionTool

print("‚úÖ ADK components imported successfully.")

‚úÖ ADK components imported successfully.


In [4]:
retry_config = types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],  # Retry on these HTTP errors
)

In [5]:
# PATH_TO_YOUR_MCP_SERVER_SCRIPT = "mcp.py" 

# if PATH_TO_YOUR_MCP_SERVER_SCRIPT == "mcp.py":
#     print("Good action: MCP.PY is in set.")
# mcp_image_server =  McpToolset(
#             connection_params=StdioConnectionParams(
#                 server_params = StdioServerParameters(
#                     command='python3', # Command to run your MCP server script
#                     args=[PATH_TO_YOUR_MCP_SERVER_SCRIPT], # Argument is the path to the script
#                 )
#             ),
#              tool_filter=['generate_image'] # Optional: ensure only specific tools are loaded
#         )

In [6]:
mcp_image_server =  McpToolset(
            connection_params=StreamableHTTPConnectionParams(
                url = "http://127.0.0.1:8200/mcp"
        )
)

In [7]:
image_generator_agent = Agent(
    name="image_generator_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""Use the MCP Tool to generate images for user queries""",
    tools=[mcp_image_server],
)

print("‚úÖ Image agent created with MCP tools")

‚úÖ Image agent created with MCP tools


In [8]:
# from google.adk.runners import InMemoryRunner

# runner = InMemoryRunner(agent=image_generator_agent)

# response = await runner.run_debug("Generate new version of mustang car, 1:1, 1 image", verbose=True)

In [9]:
def place_images(num_images: int, tool_context: ToolContext) -> dict:
    if num_images <= 1: 
        return {
            "status": "approved",
            "order_id": f"ORD-{num_images}-AUTO",
            "image": num_images,
            "message": f"Order auto-approved: {num_images}",
        }
    
    if not tool_context.tool_confirmation:
        tool_context.request_confirmation(
            hint=f"‚ö†Ô∏è Large order: {num_images} images. Do you want to approve?",
            payload={"num_images": num_images}        
        )
        return {  # This is sent to the Agent
            "status": "pending",
            "message": f"Order for {num_images} images requires approval",
        }

    if tool_context.tool_confirmation.confirmed:
        return {
                "status": "approved",
                "order_id": f"ORD-{num_images}-HUMAN",
                "num_images": num_images,
                "message": f"Order approved: {num_images} images",
            }
    else:
        return {
                "status": "rejected",
                "message": f"Order rejected: {num_images} images",
            }


print("‚úÖ Long-running functions created!")


‚úÖ Long-running functions created!


In [10]:
# Create shipping agent with pausable tool
image_agent = Agent(
    name="image_agent",
    model=Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config),
    instruction="""You are a image generation coordinator assistant.
  
  When users request to generate image:
   1. Use the place_images tool with the number of images 
   2. If the order status is 'pending', inform the user that approval is required
   3. After receiving the final result, provide a clear summary including:
      - Order status (approved/rejected)
      - Order ID (if available)
      - Number of images
   4. Keep responses concise but informative
  """,
    tools=[FunctionTool(func=place_images)],
)

print("‚úÖ Image request Agent created!")

‚úÖ Image request Agent created!


In [11]:
image_app = App(
    name='image_coordinator',
    root_agent=image_agent,
    resumability_config=ResumabilityConfig(is_resumable=True),
)

print("‚úÖ Resumable app created!")

‚úÖ Resumable app created!


  resumability_config=ResumabilityConfig(is_resumable=True),


In [12]:
session_service = InMemorySessionService()

image_runner = Runner(
    app = image_app,
        session_service=session_service,
)

print("‚úÖ Runner created!")

‚úÖ Runner created!


In [13]:
response = await image_runner.run_debug("Generate new version of mustang car, 1:1, 9 image", verbose=True)


 ### Created new session: debug_session_id

User > Generate new version of mustang car, 1:1, 9 image




image_agent > [Calling tool: place_images({'num_images': 9})]
image_agent > [Calling tool: adk_request_confirmation({'originalFunctionCall': {'id': 'adk-ccbfddc2-2ddb...)]
image_agent > [Tool result: {'status': 'pending', 'message': 'Order for 9 images requires approval'}]


  ToolConfirmation(


In [14]:
def check_for_approval(events):
    """Check if events contain an approval request.

    Returns:
        dict with approval details or None
    """
    for event in events:
        if event.content and event.content.parts:
            for part in event.content.parts:
                if (
                    part.function_call
                    and part.function_call.name == "adk_request_confirmation"
                ):
                    return {
                        "approval_id": part.function_call.id,
                        "invocation_id": event.invocation_id,
                    }
    return None

In [15]:
def print_agent_response(events):
    """Print agent's text responses from events."""
    for event in events:
        if event.content and event.content.parts:
            for part in event.content.parts:
                if part.text:
                    print(f"Agent > {part.text}")

In [19]:
def create_approval_response(approval_info, approved):
    """Create approval response message."""
    confirmation_response = types.FunctionResponse(
        id=approval_info["approval_id"],
        name="adk_request_confirmation",
        response={"confirmed": approved},
    )
    return types.Content(
        role="user", parts=[types.Part(function_response=confirmation_response)]
    )


print("‚úÖ Helper functions defined")

‚úÖ Helper functions defined


In [20]:
async def run_image_workflow(query: str, auto_approve: bool = True):
    """Runs a image workflow with approval handling.

    Args:
        query: User's image request
        auto_approve: Whether to auto-approve large orders (simulates human decision)
    """

    print(f"\n{'='*60}")
    print(f"User > {query}\n")

    # Generate unique session ID
    session_id = f"order_{uuid.uuid4().hex[:8]}"

    # Create session
    await session_service.create_session(
        app_name="image_coordinator", user_id="test_user", session_id=session_id
    )

    query_content = types.Content(role="user", parts=[types.Part(text=query)])
    events = []

    # -----------------------------------------------------------------------------------------------
    # -----------------------------------------------------------------------------------------------
    # STEP 1: Send initial request to the Agent. If num_containers > 5, the Agent returns the special `adk_request_confirmation` event
    async for event in image_runner.run_async(
        user_id="test_user", session_id=session_id, new_message=query_content
    ):
        events.append(event)

    # -----------------------------------------------------------------------------------------------
    # -----------------------------------------------------------------------------------------------
    # STEP 2: Loop through all the events generated and check if `adk_request_confirmation` is present.
    approval_info = check_for_approval(events)

    # -----------------------------------------------------------------------------------------------
    # -----------------------------------------------------------------------------------------------
    # STEP 3: If the event is present, it's a large order - HANDLE APPROVAL WORKFLOW
    if approval_info:
        print(f"‚è∏Ô∏è  Pausing for approval...")
        print(f"ü§î Human Decision: {'APPROVE ‚úÖ' if auto_approve else 'REJECT ‚ùå'}\n")

        # PATH A: Resume the agent by calling run_async() again with the approval decision
        async for event in image_runner.run_async(
            user_id="test_user",
            session_id=session_id,
            new_message=create_approval_response(
                approval_info, auto_approve
            ),  # Send human decision here
            invocation_id=approval_info[
                "invocation_id"
            ],  # Critical: same invocation_id tells ADK to RESUME
        ):
            if event.content and event.content.parts:
                for part in event.content.parts:
                    if part.text:
                        print(f"Agent > {part.text}")

    # -----------------------------------------------------------------------------------------------
    # -----------------------------------------------------------------------------------------------
    else:
        # PATH B: If the `adk_request_confirmation` is not present - no approval needed - order completed immediately.
        print_agent_response(events)

    print(f"{'='*60}\n")


print("‚úÖ Workflow function ready")

‚úÖ Workflow function ready


In [None]:
# Demo 1: It's a small order. Agent receives auto-approved status from tool
await run_image_workflow("Generate 3 images of New ford Mustang")

# Demo 2: Workflow simulates human decision: APPROVE ‚úÖ
await run_image_workflow("Generate 3 images of New ford Mustang", auto_approve=True)




User > Generate 3 images of New ford Mustang





‚è∏Ô∏è  Pausing for approval...
ü§î Human Decision: APPROVE ‚úÖ

Agent > I have generated 3 images of the New Ford Mustang. The order ID is ORD-3-HUMAN and the status is approved.


User > Generate 3 images of New ford Mustang





‚è∏Ô∏è  Pausing for approval...
ü§î Human Decision: APPROVE ‚úÖ



User > Generate 1 images of New ford Mustang








In [27]:
# Demo 3: Workflow simulates human decision: REJECT ‚ùå
await run_image_workflow("Generate one image of New ford Mustang", auto_approve=False)


User > Generate one image of New ford Mustang

Agent > I can help you generate an image of the Ford Mustang. However, I can only generate a generic image of the car, and I do not have the ability to include specific details like the year or model. Would you like me to proceed with generating one image?

