In [None]:
"""
StoryFlowAgent - Custom Orchestrated Agent Example
Compatible with Latest Google ADK (2025)
"""

import os
import logging
import asyncio
from typing import AsyncGenerator
from typing_extensions import override

from google.adk.agents import LlmAgent, BaseAgent, LoopAgent, SequentialAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from google.adk.models import Gemini
from google.genai import types


# ============================================================
# CONFIG
# ============================================================

APP_NAME = "story_app"
USER_ID = "12345"
SESSION_ID = "123344"

MODEL = Gemini(model="gemini-2.5-flash")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# ============================================================
# CUSTOM ORCHESTRATOR
# ============================================================

class StoryFlowAgent(BaseAgent):

    story_generator: LlmAgent
    critic: LlmAgent
    reviser: LlmAgent
    grammar_check: LlmAgent
    tone_check: LlmAgent
    loop_agent: LoopAgent
    sequential_agent: SequentialAgent

    model_config = {"arbitrary_types_allowed": True}

    def __init__(
        self,
        name: str,
        story_generator: LlmAgent,
        critic: LlmAgent,
        reviser: LlmAgent,
        grammar_check: LlmAgent,
        tone_check: LlmAgent,
    ):

        loop_agent = LoopAgent(
            name="CriticReviserLoop",
            sub_agents=[critic, reviser],
            max_iterations=2,
        )

        sequential_agent = SequentialAgent(
            name="PostProcessing",
            sub_agents=[grammar_check, tone_check],
        )

        super().__init__(
            name=name,
            story_generator=story_generator,
            critic=critic,
            reviser=reviser,
            grammar_check=grammar_check,
            tone_check=tone_check,
            loop_agent=loop_agent,
            sequential_agent=sequential_agent,
            sub_agents=[
                story_generator,
                loop_agent,
                sequential_agent,
            ],
        )

    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:

        logger.info("Starting Story Workflow")

        # 1️⃣ Generate Story
        async for event in self.story_generator.run_async(ctx):
            yield event

        if not ctx.session.state.get("current_story"):
            logger.error("Story generation failed.")
            return

        # 2️⃣ Critic → Reviser Loop
        async for event in self.loop_agent.run_async(ctx):
            yield event

        # 3️⃣ Grammar + Tone Check
        async for event in self.sequential_agent.run_async(ctx):
            yield event

        # 4️⃣ Conditional Regeneration
        if ctx.session.state.get("tone_check_result") == "negative":
            logger.info("Tone negative → Regenerating story")
            async for event in self.story_generator.run_async(ctx):
                yield event

        logger.info("Story Workflow Completed")


# ============================================================
# LLM AGENTS
# ============================================================

story_generator = LlmAgent(
    name="StoryGenerator",
    model=MODEL,
    instruction="""
Write a short story (~100 words) about: {topic}
""",
    output_key="current_story",
)

critic = LlmAgent(
    name="Critic",
    model=MODEL,
    instruction="""
Critique this story:
{{current_story}}

Provide 1-2 sentences of constructive feedback.
""",
    output_key="criticism",
)

reviser = LlmAgent(
    name="Reviser",
    model=MODEL,
    instruction="""
Revise the story:
{{current_story}}

Using this criticism:
{{criticism}}

Output only the revised story.
""",
    output_key="current_story",
)

grammar_check = LlmAgent(
    name="GrammarCheck",
    model=MODEL,
    instruction="""
Check grammar of:
{current_story}

Return corrections list or 'Grammar is good!'
""",
    output_key="grammar_suggestions",
)

tone_check = LlmAgent(
    name="ToneCheck",
    model=MODEL,
    instruction="""
Analyze tone of:
{current_story}

Return one word only:
positive / negative / neutral
""",
    output_key="tone_check_result",
)


# ============================================================
# CREATE CUSTOM AGENT
# ============================================================

story_flow_agent = StoryFlowAgent(
    name="StoryFlowAgent",
    story_generator=story_generator,
    critic=critic,
    reviser=reviser,
    grammar_check=grammar_check,
    tone_check=tone_check,
)


# ============================================================
# EXECUTION
# ============================================================

async def call_agent_async(topic: str):

    session_service = InMemorySessionService()

    # ✅ MUST CREATE SESSION FIRST
    await session_service.create_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=SESSION_ID,
        state={"topic": topic},
    )

    runner = Runner(
        agent=story_flow_agent,
        app_name=APP_NAME,
        session_service=session_service,
    )

    user_message = types.Content(
        role="user",
        parts=[types.Part(text="Generate the story.")],
    )

    final_response = ""

    async for event in runner.run_async(
        user_id=USER_ID,
        session_id=SESSION_ID,
        new_message=user_message,
    ):
        if event.is_final_response() and event.content:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    final_response += part.text

    print("\n--- FINAL STORY ---\n")
    print(final_response)

    final_session = await session_service.get_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=SESSION_ID,
    )

    print("\n--- FINAL SESSION STATE ---\n")
    import json
    print(json.dumps(final_session.state, indent=2))


# ============================================================
# RUN (Jupyter-safe)
# ============================================================

await call_agent_async("a lonely robot finding a friend in a junkyard")
