In [None]:
# https://zenn.dev/google_cloud_jp/articles/1b1cbd5318bdfe
# 【超速報】Agent Development Kit で会話型エージェントを作成する
# 20250608 : ADK Version 1.2.1

%pip install --upgrade --user \
    google-adk==1.2.1 \
    google-cloud-aiplatform==1.96.0

In [None]:
# 事前準備 @cloud shell

# gcloud services enable \
#   aiplatform.googleapis.com \
#   notebooks.googleapis.com \
#   cloudresourcemanager.googleapis.com

# Create Workbench instance
# PROJECT_ID=$(gcloud config list --format 'value(core.project)')
# gcloud workbench instances create agent-development \
#   --project=$PROJECT_ID \
#   --location=us-central1-a \
#   --machine-type=e2-standard-2


In [None]:
# https://zenn.dev/google_cloud_jp/articles/1b1cbd5318bdfe
# 【超速報】Agent Development Kit で会話型エージェントを作成する
# 20250608 : ADK Version 1.2.1

%pip install --upgrade --user \
    google-adk==1.2.1 \
    google-cloud-aiplatform==1.96.0

In [None]:
import IPython
app = IPython.Application.instance()
_ = app.kernel.do_shutdown(True)

In [None]:
import json, os, pprint, time, uuid
import vertexai
from google import genai
from google.genai.types import (
    HttpOptions, GenerateContentConfig, Part, Content
)

[PROJECT_ID] = !gcloud config list --format 'value(core.project)'
LOCATION = 'us-central1'

vertexai.init(
    project=PROJECT_ID,
    location=LOCATION,
    staging_bucket=f'gs://{PROJECT_ID}'
)

os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID
os.environ['GOOGLE_CLOUD_LOCATION'] = LOCATION
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'True'

In [None]:
def generate_response(system_instruction, contents,
                      response_schema, model='gemini-2.0-flash-001'):
    client = genai.Client(vertexai=True,
                          project=PROJECT_ID, location=LOCATION,
                          http_options=HttpOptions(api_version='v1'))
    response = client.models.generate_content(
        model=model,
        contents=contents,
        config=GenerateContentConfig(
            system_instruction=system_instruction,
            temperature=0.4,
            response_mime_type='application/json',
            response_schema=response_schema,
        )
    )
    return '\n'.join(
        [p.text for p in response.candidates[0].content.parts if p.text]
    )

In [None]:
response_schema = {
    "type": "object",
    "properties": {
        "greeting": {"type": "string"},
        },
    "required": ["greeting"],
}
system_instruction = '''
ネットショップの仮想店員として、丁寧で、かつ、フレンドリーな雰囲気の挨拶を返してください。
架空の商品名などは含めないこと。
'''
contents = 'こんにちは、中井です。何か、おすすめはありますか？'

print(generate_response(system_instruction, contents, response_schema))

In [None]:
def _generate_plan(goal):
    system_instruction = '''
You are a professional event planner. Work on the following tasks.

[task]
A. generate event contents to achieve the given [goal].

[format instruction]
In Japanese. No markdowns. The output has the following three items:
"title": a short title of the event
"summary": three sentence summary of the event
"timeline": timeline of the event such as durations and contents in a bullet list
'''

    response_schema = {
        "type": "object",
        "properties": {
            "title": {"type": "string"},
            "summary": {"type": "string"},
            "timeline":  {"type": "string"},
        },
        "required": ["title", "summary", "timeline"],
    }

    parts = []
    parts.append(Part.from_text(text=f'[goal]\n{goal}'))
    contents=[Content(role='user', parts=parts)]
    return generate_response(system_instruction, contents, response_schema)


def generate_plan(goal:str) -> dict:
    """
    Create an initial plan to achieve the goal.
   
    Args:
        goal: The goal of the event.
       
    Returns:
        dict: A dictionary containing the plan with the following keys:
            title: title of the event
            summary: a short summary of the event
            timeline: timeline of the event
    """
    response = _generate_plan(goal)
    return json.loads(response)

In [None]:
def _update_plan(goal, plan, evaluation):
    system_instruction = '''
You are a professional event planner. Work on the following tasks.

[task]
A. given [goal] and current [plan] for event contents.
   Generate an improved plan based on the given [evaluation].

[format instruction]
In Japanese. No markdowns. The output has the following three items:
"title": a short title of the event
"summary": three sentence summary of the event
"timeline": timeline of the event such as durations and contents in a bullet list
"update: one sentence summary of the update from the previous plan
'''

    response_schema = {
        "type": "object",
        "properties": {
            "title": {"type": "string"},
            "summary": {"type": "string"},
            "timeline":  {"type": "string"},
            "update": {"type": "string"},
        },
        "required": ["title", "summary", "timeline", "update"],
    }

    parts = []
    parts.append(Part.from_text(text=f'[goal]\n{goal}'))
    parts.append(Part.from_text(text=f'[plan]\n{plan}'))
    parts.append(Part.from_text(text=f'[evaluation]\n{evaluation}'))
    contents=[Content(role='user', parts=parts)]
    return generate_response(system_instruction, contents, response_schema)


def update_plan(goal:str, plan:str, evaluation:str) -> dict:
    """
    Create an updated plan to achieve the goal given the current plan and an evaluation comment.

    Args:
        goal: The goal of the event
        plan: Current plan
        evaluation: Evaluation comment in plain text or a JSON string
       
    Returns:
        dict: A dictionary containing the plan with the following keys:
            title: title of the event
            summary: a short summary of the event
            timeline: timeline of the event
            update: one sentence summary of the update from the previous plan
    """
    response = _update_plan(goal, plan, evaluation)
    return json.loads(response)

In [None]:
def _evaluate_plan(goal, plan):
    system_instruction = '''
You are a professional event planner. Work on the following tasks.

[task]
A. given [goal] and [plan] for event contents, evaluate if the plan is effective to achieve the goal.
B. Also give 3 ideas to improve the plan.

[condition]
A. Event contents should include detailed descriptions.

[format instruction]
In Japanese. No markdowns. The output has the following three items:
"evaluation": three sentence evaluation of the plan.
"improvements": a list of 3 ideas to improve the plan. Each idea is in a single sentence.
'''

    response_schema = {
        "type": "object",
        "properties": {
            "evaluation": {"type": "string"},
            "improvements": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "idea": {"type": "string"},
                    },
                    "required": ["idea"],
                },
            },
        },
        "required": ["evaluation", "improvements"],
    }

    parts = []
    parts.append(Part.from_text(text=f'[goal]\n{goal}'))
    parts.append(Part.from_text(text=f'[plan]\n{plan}'))
    contents=[Content(role='user', parts=parts)]
    return generate_response(system_instruction, contents, response_schema)


def evaluate_plan(goal:str, plan:str) -> dict:
    """
    Generate an evaluation for the plan against the goal.

    Args:
        goal: The goal of the event
        plan: Current plan
       
    Returns:
        dict: A dictionary containing the evaluation comment with the following keys:
            evaluation: evaluation comment
            improvements: list of ideas for improvements
    """
    response = _evaluate_plan(goal, plan)
    return json.loads(response)

In [None]:
goal = 'クラウドネイティブなアプリ開発企業の新入社員歓迎イベントを11:00-14:00の180分の構成で考える'
plan = generate_plan(goal)
pprint.pp(plan)

In [None]:
evaluation = evaluate_plan(goal, plan)
pprint.pp(evaluation)

In [None]:
plan2 = update_plan(goal, plan, json.dumps(evaluation))
pprint.pp(plan2)

In [None]:
from google.adk.agents.llm_agent import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

In [None]:
instruction = """
    You are an agent who handles event contents.
    Your outputs should be in Japanese without markdown.
    
    **Interaction flow:**

    1.  Initial plan:
        * When you receive a goal of the event, you should first generate an initial plan using generate_plan().

    2. Present the plan and ask for evaluation:
        * Present the plan to the user, and ask the evaluation and improvement ideas.
            - Show in a human readable format.

    3. (Optional) Get an evaluation from 主任
        * If the user requests to get an evaluation from "主任", get an evaluation using evaluate_plan()
            - Present the result to the user in a human readable format, and ask if the user accept it or not.
        * If the user accept it, go to step 4.
            - When a user say something affirmative, think about if it means to accept 主任's evaluation, or other things.

    4. Upadate plan:
        * Once you get an evaluation from the user or "主任", generate an updated plan using update_plan().
        * Go back to step 2.
"""

planning_client_agent = LlmAgent(
    model='gemini-2.0-flash-001',
    name='planning_client_agent',
    description=(
        'This agent creates and updates event contents given the goal of the event.'
    ),
    instruction=instruction,
    tools=[
        generate_plan,
        update_plan,
        evaluate_plan,
    ],
)

In [None]:
class LocalApp:
    def __init__(self, agent, user_id='default_user'):
        self._agent = agent
        self._user_id = user_id
        self._runner = Runner(
            app_name=self._agent.name,
            agent=self._agent,
            artifact_service=InMemoryArtifactService(),
            session_service=InMemorySessionService(),
            memory_service=InMemoryMemoryService(),
        )
        self._session = None
        
    async def stream(self, query):
        if not self._session:
            self._session = await self._runner.session_service.create_session(
                app_name=self._agent.name,
                user_id=self._user_id,
                session_id=uuid.uuid4().hex,
            )
        content = Content(role='user', parts=[Part.from_text(text=query)])
        async_events = self._runner.run_async(
            user_id=self._user_id,
            session_id=self._session.id,
            new_message=content,
        )
        result = []
        async for event in async_events:
            if DEBUG:
                print(f'----\n{event}\n----')
            if (event.content and event.content.parts):
                response = '\n'.join([p.text for p in event.content.parts if p.text])
                if response:
                    print(response)
                    result.append(response)
        return result

In [None]:
client = LocalApp(planning_client_agent)

In [None]:
DEBUG = False
query = 'クラウドネイティブなアプリ開発企業の新入社員歓迎イベントを11:00-14:00の180分の構成で考えて'
_ = await client.stream(query)

In [None]:
query = '主任の意見を聞きたい'
_ = await client.stream(query)

In [None]:
query = 'そうします。'
_ = await client.stream(query)

In [None]:
query = 'ランチ中は、会話のきっかけになるようなプロジェクト紹介の動画を流して欲しい。先輩のメッセージは30分で十分'
_ = await client.stream(query)

In [None]:
query = 'ばっちぐー'
_ = await client.stream(query)

In [None]:
from vertexai import agent_engines

remote_agent = agent_engines.create(
    agent_engine=planning_client_agent,
    display_name='planning_client_agent',
    requirements=[
        'google-adk==1.2.1',
    ]
)

In [None]:
class RemoteApp:
    def __init__(self, remote_agent, user_id='default_user'):
        self._remote_agent = remote_agent
        self._user_id = user_id
        self._session = remote_agent.create_session(user_id=self._user_id)
    
    def _stream(self, query):
        events = self._remote_agent.stream_query(
            user_id=self._user_id,
            session_id=self._session['id'],
            message=query,
        )
        result = []
        for event in events:
            if DEBUG:
                print(f'----\n{event}\n----')
            if ('content' in event and 'parts' in event['content']):
                response = '\n'.join(
                    [p['text'] for p in event['content']['parts'] if 'text' in p]
                )
                if response:
                    print(response)
                    result.append(response)
        return result

    def stream(self, query):
        # Retry 4 times in case of resource exhaustion
        for c in range(4):
            if c > 0:
                time.sleep(2**(c-1))
            result = self._stream(query)
            if result:
                return result
            if DEBUG:
                print('----\nRetrying...\n----')
        return None # Permanent error

In [None]:
client = RemoteApp(remote_agent)

In [None]:
DEBUG = False
query = 'クラウドネイティブなアプリ開発企業の新入社員歓迎イベントを11:00-14:00の180分の構成で考えて'
_ = client.stream(query)

In [None]:
query = '主任の意見を聞きたい'
_ = client.stream(query)

In [None]:
for agent in agent_engines.list():
    print(agent.gca_resource.name)

In [None]:
for agent in agent_engines.list():
    print(agent.gca_resource.name)
    agent.delete(force=True)