In [None]:
import os 
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm  # For OpenAI support
from google.adk.sessions import InMemorySessionService 
from google.adk.runners import Runner 
from google.genai import types # For GenAI types like Text, Image, etc.
from typing import Optional, Dict, Any

import warnings
# Ignore all warnings
warnings.filterwarnings("ignore")

import logging
logging.basicConfig(level=logging.CRITICAL)
print("Libraries imported successfully.")

In [None]:
# Define Model Constants for easier use
MODEL_GPT = "openai/gpt-4o"

llm = LiteLlm(model=MODEL_GPT)

# Test LLM with a direct call
print(
    llm.llm_client.completion(
        model=llm.model,
        messages=[{"role": "user", "content": "Are you ready?"}],
        tools=[],
    )
)

print("\nOpenAI is ready for use.")

In [None]:
# Convenience libraries for working with Neo4j inside of Google ADK
from neo4j import graphdb

In [None]:
# Sending a simple query to the database
neo4j_is_ready = graphdb.send_query("RETURN 'Neo4j is Ready!' as message")

print(neo4j_is_ready)

Optional Note: Neo4j Database Setup
We set up the database as a sidecar container. You can find the Docker installation instructions (and others) here. We configured the username and password as part of the database setup. We also installed a plugin called APOC (which will be needed in the last notebook). We defined these environment variables, which are used by neo4j_for_adk.py:

NEO4J_URI="bolt://localhost:7687"
NEO4J_USERNAME="your_database_username"
NEO4J_PASSWORD="your_database_password"

In [None]:
# Define a basic tool -- send a parameterized cypher query
def say_hello(person_name: str) -> dict:
    """Formats a welcome message to a named person.

    Args:
        person_name (str): the name of the person saying hello

    Returns:
        dict: A dictionary containing the results of the query.
              Includes a 'status' key ('success' or 'error').
              If 'success', includes a 'query_result' key with an array of result rows.
              If 'error', includes an 'error_message' key.
    """
    return graphdb.send_query(
        "RETURN 'Hello to you, ' + $person_name AS reply", {"person_name": person_name}
    )

In [None]:
# Test the tool
print(say_hello("ABK"))

# return results
# {'status': 'success', 'query_result': [{'reply': 'Hello to you, ABK'}]}

In [None]:
#define the cypher Agent 
hello_agent = Agent(
    name = "HelloAgent_v1_by_basan-ta",
    model = llm, #deined above
    description = "An agent that greets people by name using a Neo4j database.",
    instruction = """You are a helpful assistant, chatting with a user. 
                Be polite and friendly, introducing yourself and asking who the user is. 

                If the user provides their name, use the 'say_hello' tool to get a custom greeting.
                If the tool returns an error, inform the user politely. 
                If the tool is successful, present the reply.
                """,
    tools = [say_hello], #passing the function directly 
)

print(f"Agent '{hello_agent.name}' created successfully.")

Create the Runner and SessionService

SessionService: Responsible for managing conversation history and state for different users and sessions. The InMemorySessionService is a simple implementation that stores everything in memory, suitable for testing and simple applications. It keeps track of the messages exchanged.

Runner: The engine that orchestrates the interaction flow. It takes user input, routes it to the appropriate agent, manages calls to the LLM and tools based on the agent's logic, handles session updates via the SessionService, and yields events representing the progress of the interaction.

In [None]:
#runner and session service
app_name = hello_agent.name + "_basan-ta_app"
user_id = hello_agent.name + "_basan-ta_user"
session_id = hello_agent.name + "session_01"

#initialize session service and runner
session_service = InMemorySessionService()
await session_service.create_session(
    app_name=app_name,
    user_id=user_id,
    session_id=session_id
)

runner = Runner(
    app_name=app_name,
    session_service=session_service
)


In [None]:
#Run the agent 
user_imput = "Hi there! I'm ABK. Can you greet me?"
print(f"User Input: {user_imput}")

# prepare message in ADK format 
content = types.Content(role = 'user', parts = [types.Part(texxt = user_imput)])

final_response_text = "agent did not respond" #default response

#we iterate through the events to get the final response
verbose = False
async for event in runner.run_async(
    user_id = user_id, 
    session_id = session_id,
    new_message= = content
):
    if verbose:
        if verbose:
            print(f"  [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")
    
    # Key Concept: is_final_response() marks the concluding message for the turn.
    if event.is_final_response():
        if event.content and event.content.parts:
            final_response_text = event.content.parts[0].text # Assuming text response in the first part
        elif event.actions and event.actions.escalate: # Handle potential errors/escalations
            final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
        break # Stop processing events once the final response is found

print(f"<<< Agent Response: {final_response_text}")

In [None]:
# Helper Class: AgentCaller

class AgentCaller:
    """A simple wrapper class for interacting with an ADK agent."""

    def __init__(self, agent: Agent, runner: Runner, user_id: str, session_id: str):
        """Initialize the AgentCaller with required components."""
        self.agent = agent
        self.runner = runner
        self.user_id = user_id
        self.session_id = session_id

    def get_session(self):
        return self.runner.session_service.get_session(
            app_name=self.runner.app_name,
            user_id=self.user_id,
            session_id=self.session_id,
        )

    async def call(self, user_message: str, verbose: bool = False):
        """Call the agent with a query and return the response."""
        print(f"\n>>> User Message: {user_message}")

        # Prepare the user's message in ADK format
        content = types.Content(role="user", parts=[types.Part(text=user_message)])

        final_response_text = "Agent did not produce a final response."

        # Key Concept: run_async executes the agent logic and yields Events.
        # We iterate through events to find the final answer.
        async for event in self.runner.run_async(
            user_id=self.user_id, session_id=self.session_id, new_message=content
        ):
            # You can uncomment the line below to see *all* events during execution
            if verbose:
                print(
                    f"  [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}"
                )

            # Key Concept: is_final_response() marks the concluding message for the turn.
            if event.is_final_response():
                if event.content and event.content.parts:
                    # Assuming text response in the first part
                    final_response_text = event.content.parts[0].text
                elif (
                    event.actions and event.actions.escalate
                ):  # Handle potential errors/escalations
                    final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                break  # Stop processing events once the final response is found

        print(f"<<< Agent Response: {final_response_text}")
        return final_response_text

In [None]:
# Make an instance of the AgentCaller
async def make_agent_caller(agent: Agent, initial_state: Optional[Dict[str, Any]] = {}) -> AgentCaller:
    """Create and return an AgentCaller instance for the given agent."""
    app_name = hello_agent.name + "_basan-ta_app"
    user_id = hello_agent.name + "_basan-ta_user"
    session_id = hello_agent.name + "session_01"

    # Initialize a session service and a session
    session_service = InMemorySessionService()
    await session_service.create_session(
        app_name=app_name, user_id=user_id, session_id=session_id, state=initial_state
    )

    runner = Runner(agent=agent, app_name=app_name, session_service=session_service)
    return AgentCaller(agent, runner, user_id, session_id)