In [3]:
from dotenv import load_dotenv
from google.adk.agents import SequentialAgent, Agent

# Load environment variables (GOOGLE_API_KEY)
load_dotenv()

# This agent's output will be saved to session.state["data"]
step1 = Agent(
    name="Step1_Fetch",
    model="gemini-2.0-flash-exp",
    instruction="Fetch information about the user's request and provide detailed data.",
    output_key="data"
)

# This agent will use the data from the previous step.
# We instruct it on how to find and use this data.
step2 = Agent(
    name="Step2_Process",
    model="gemini-2.0-flash-exp",
    instruction="Analyze the information found in state['data'] and provide a summary."
)

pipeline = SequentialAgent(
    name="MyPipeline",
    sub_agents=[step1, step2]
)

# When the pipeline is run with an initial input, Step1 will execute,
# its response will be stored in session.state["data"], and then
# Step2 will execute, using the information from the state as instructed.

In [4]:
from google.genai import types
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

# Define variables for session setup
APP_NAME = "pipeline_app"
USER_ID = "user_123"
SESSION_ID = "session_001"

# Session and Runner setup
session_service = InMemorySessionService()
session = await session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
)

runner = Runner(
    agent=pipeline,
    app_name=APP_NAME,
    session_service=session_service
)

content = types.Content(
    role='user',
    parts=[types.Part(text="Research the current weather in San Francisco and then analyze it.")]
)

print("--- Running Sequential Pipeline Agent ---\n")

# Execute the agent
async for event in runner.run_async(
    user_id=USER_ID,
    session_id=SESSION_ID,
    new_message=content
):
    if hasattr(event, 'author'):
        print(f"Event from: {event.author}")
    
    if event.is_final_response() and event.content:
        final_response = ""
        
        # Extract text response
        if hasattr(event.content, 'text') and event.content.text:
            final_response = event.content.text
        elif event.content.parts:
            text_parts = [part.text for part in event.content.parts if part.text]
            final_response = "".join(text_parts)
        
        print("\n" + "=" * 80)
        print(f"{event.author} Response:")
        print("=" * 80)
        print(final_response)
        print("=" * 80 + "\n")

# Get final session state
final_session = await session_service.get_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
)

print("\n--- Final Session State ---")
print(f"Data stored in state: {final_session.state.get('data')}")

--- Running Sequential Pipeline Agent ---

Event from: Step1_Fetch

Step1_Fetch Response:
Okay, I can do that. First, I need to fetch the current weather information for San Francisco. Then I will analyze it based on common weather patterns and provide a summary.


Event from: Step2_Process

Step2_Process Response:
Okay, I understand. I'm waiting for the weather data to be placed in `state['data']`. Once it's there, I will analyze it and provide a summary. I will look for things like temperature, humidity, wind speed, precipitation, and cloud cover. I'll then compare it to typical weather for San Francisco this time of year.



--- Final Session State ---
Data stored in state: Okay, I can do that. First, I need to fetch the current weather information for San Francisco. Then I will analyze it based on common weather patterns and provide a summary.

