In [None]:
from google.adk.agents import BaseAgent
from google.adk.events import Event, EventActions
from google.adk.agents.invocation_context import InvocationContext
from typing import AsyncGenerator

class SimpleValidationAgent(BaseAgent):
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        # Retrieve input data from the context state
        user_input = ctx.state.get("user_input")

        # Perform validation: check if input is a non-empty string
        if isinstance(user_input, str) and user_input.strip():
            validation_status = "valid"
        else:
            validation_status = "invalid"

        # Update the context state with the validation result
        ctx.state["validation_status"] = validation_status

        # Yield an event indicating the validation result
        yield Event(
            action=EventActions.TOOL_RESULT,
            name=self.name,
            output={"validation_status": validation_status},
        )


In [3]:
from google.adk.agents import SequentialAgent, BaseAgent
from google.adk.events import Event, EventActions
from google.adk.agents.invocation_context import InvocationContext
from typing import AsyncGenerator

# Define a processing agent that acts based on validation status
class ProcessingAgent(BaseAgent):
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        validation_status = ctx.state.get("validation_status")
        if validation_status == "valid":
            # Proceed with processing
            result = f"Processed input: {ctx.state.get('user_input')}"
        else:
            # Handle invalid input
            result = "Input validation failed."

        # Update the context state with the result
        ctx.state["processing_result"] = result

        # Yield an event indicating the processing result
        yield Event(
            action=EventActions.TOOL_RESULT,
            name=self.name,
            output={"processing_result": result},
        )

# Instantiate the agents
validator = SimpleValidationAgent(name="Validator")
processor = ProcessingAgent(name="Processor")

# Create a sequential workflow
workflow = SequentialAgent(name="ValidationWorkflow", sub_agents=[validator, processor])


In [None]:
import asyncio
from google.adk.agents.invocation_context import InvocationContext

async def run_workflow():
    # Initialize the context with user input
    ctx = InvocationContext(state={"user_input": "Sample input"})

    # Run the workflow
    async for event in workflow.run_async(ctx):
        print(f"Event: {event}")

# Execute the asynchronous function
asyncio.run(run_workflow())


In [5]:
pip install yfinance

Collecting yfinance
  Obtaining dependency information for yfinance from https://files.pythonhosted.org/packages/73/b5/d50eec88bc731bb8570ae42a9b764a36144e217361c33fa068391ff59ba3/yfinance-0.2.61-py2.py3-none-any.whl.metadata
  Downloading yfinance-0.2.61-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting multitasking>=0.0.7 (from yfinance)
  Obtaining dependency information for multitasking>=0.0.7 from https://files.pythonhosted.org/packages/3e/8a/bb3160e76e844db9e69a413f055818969c8acade64e1a9ac5ce9dfdcf6c1/multitasking-0.0.11-py3-none-any.whl.metadata
  Downloading multitasking-0.0.11-py3-none-any.whl.metadata (5.5 kB)
Collecting frozendict>=2.3.4 (from yfinance)
  Obtaining dependency information for frozendict>=2.3.4 from https://files.pythonhosted.org/packages/04/13/d9839089b900fa7b479cce495d62110cddc4bd5630a04d8469916c0e79c5/frozendict-2.4.6-py311-none-any.whl.metadata
  Downloading frozendict-2.4.6-py311-none-any.whl.metadata (23 kB)
Collecting peewee>=3.16.2 (from yfinance)
  Do


[notice] A new release of pip is available: 23.2.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

import yfinance as yf


APP_NAME = "stock_app"
USER_ID = "1234"
SESSION_ID = "session1234"

def get_stock_price(symbol: str):
    """
    Retrieves the current stock price for a given symbol.

    Args:
        symbol (str): The stock symbol (e.g., "AAPL", "GOOG").

    Returns:
        float: The current stock price, or None if an error occurs.
    """
    try:
        stock = yf.Ticker(symbol)
        historical_data = stock.history(period="1d")
        if not historical_data.empty:
            current_price = historical_data['Close'].iloc[-1]
            return current_price
        else:
            return None
    except Exception as e:
        print(f"Error retrieving stock price for {symbol}: {e}")
        return None


stock_price_agent = Agent(
    model='gemini-2.0-flash',
    name='stock_agent',
    instruction= 'You are an agent who retrieves stock prices. If a ticker symbol is provided, fetch the current price. If only a company name is given, first perform a Google search to find the correct ticker symbol before retrieving the stock price. If the provided ticker symbol is invalid or data cannot be retrieved, inform the user that the stock price could not be found.',
    description='This agent specializes in retrieving real-time stock prices. Given a stock ticker symbol (e.g., AAPL, GOOG, MSFT) or the stock name, use the tools and reliable data sources to provide the most up-to-date price.',
    tools=[get_stock_price], # You can add Python functions directly to the tools list; they will be automatically wrapped as FunctionTools.
)


# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=stock_price_agent, app_name=APP_NAME, session_service=session_service)


# Agent Interaction
def call_agent(query):
    content = types.Content(role='user', parts=[types.Part(text=query)])
    events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    for event in events:
        if event.is_final_response():
            final_response = event.content.parts[0].text
            print("Agent Response: ", final_response)

call_agent("stock price of GOOG")

Exception in thread Thread-30 (_asyncio_thread_main):
Traceback (most recent call last):
  File "C:\Users\gsbal\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "d:\projects\VectorSearch\venv\Lib\site-packages\ipykernel\ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "C:\Users\gsbal\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "d:\projects\VectorSearch\venv\Lib\site-packages\google\adk\runners.py", line 137, in _asyncio_thread_main
    asyncio.run(_invoke_run_async())
  File "C:\Users\gsbal\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\gsbal\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^

In [11]:
import asyncio

# Define an async generator that yields data
async def simple_async_generator():
    for i in range(3):
        await asyncio.sleep(1)  # simulate a delay (like a server or sensor)
        yield f"Message {i + 1}"

# Define an async function that uses 'async for' to consume the generator
async def main():
    async for msg in simple_async_generator():
        print(f"Received: {msg}")

# Run the main function
await main()


Received: Message 1
Received: Message 2
Received: Message 3


In [16]:
import asyncio
import random

# Async generator simulating a live chat message stream
async def chat_message_stream():
    messages = [
        "Muppuram ericha sivaney",
        "Ingey eppuram ponaalum erivadhu enney?",
        "saadhiku oru samayam sonnaaney",
        "endha samayathilum kaakum or saadhi solvaana?",
        "..."
    ]
    for msg in messages:
        await asyncio.sleep(random.uniform(0.5, 2.0))  # simulate unpredictable message delays
        yield f"{msg}"

# Async function consuming messages as they arrive
async def receive_messages():
    async for message in chat_message_stream():
        print(f"{message}", end=" ")

# In a notebook, use:
await receive_messages()

# In a script, use:
# asyncio.run(receive_messages())


Muppuram ericha sivaney Ingey eppuram ponaalum erivadhu enney? saadhiku oru samayam sonnaaney endha samayathilum kaakum or saadhi solvaana? ... 

  return compile(source, filename, mode, flags,


ValidationError: 5 validation errors for InvocationContext
session_service
  Field required [type=missing, input_value={'state': {}}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
invocation_id
  Field required [type=missing, input_value={'state': {}}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
agent
  Field required [type=missing, input_value={'state': {}}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
session
  Field required [type=missing, input_value={'state': {}}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing
state
  Extra inputs are not permitted [type=extra_forbidden, input_value={}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/extra_forbidden