In [1]:
import warnings
# Ignore all warnings
warnings.filterwarnings("ignore")
import os

import logging
logging.basicConfig(level=logging.ERROR)
import datetime
from zoneinfo import ZoneInfo
from google.adk.agents import Agent
from pydantic import BaseModel, Field
from google.adk.agents.parallel_agent import ParallelAgent
from google.adk.models.lite_llm import LiteLlm # For multi-model support
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types # For creating message Content/Parts


In [2]:
from yt_transcript import *

yt_url = "https://www.youtube.com/watch?v=sBuoMkJsMsA"

In [3]:
AGENT_MODEL = "gemini-2.0-flash"
root_agent = Agent(
    name="yt_agent_v0",
    model=AGENT_MODEL, 
    description="Extracts transcripts from YouTube videos and summarizes the learnings related to a given topic.",
    instruction = (
    "You are an expert YouTube video summarizer. "
    "Given a topic and a YouTube video URL, use the 'get_youtube_transcript' tool to extract the transcript (returned as a JSON with key 'result'). "
    "From the transcript, generate a clear and concise summary that captures the key learnings relevant to the given topic. "
    "Your output should be only the final summarized content — answer in bullet points."
    "access the video url from session.state['q1']"
    ),
    output_key='yt_agent_v0', 
    tools=[get_youtube_transcript],
)

root_agent_1 = Agent(
    name="yt_agent_v1",
    model=AGENT_MODEL, 
    description="Extracts transcripts from YouTube videos and summarizes the learnings related to a given topic.",
    instruction = (
    "You are an expert YouTube video summarizer. "
    "Given a topic and a YouTube video URL, use the 'get_youtube_transcript' tool to extract the transcript (returned as a JSON with key 'result'). "
    "From the transcript, generate a clear and concise summary that captures the key learnings relevant to the given topic. "
    "Your output should be only the final summarized content — answer in bullet points."
    "access the video url from session.state['q2']"
    ),
    output_key='yt_agent_v1',
    tools=[get_youtube_transcript],
)

root_agent_2 = Agent(
    name="yt_agent_v2",
    model=AGENT_MODEL, 
    description="Extracts transcripts from YouTube videos and summarizes the learnings related to a given topic.",
    instruction = (
    "You are an expert YouTube video summarizer. "
    "Given a topic and a YouTube video URL, use the 'get_youtube_transcript' tool to extract the transcript (returned as a JSON with key 'result'). "
    "From the transcript, generate a clear and concise summary that captures the key learnings relevant to the given topic. "
    "Your output should be only the final summarized content — answer in bullet points."
    "access the video url from session.state['q3']"
    ),
    output_key='yt_agent_v2',
    tools=[get_youtube_transcript],
)

In [4]:
session_service = InMemorySessionService()

APP_NAME = "yt_app"
USER_ID = "user_1"
SESSION_ID = "session_001" 

# Create the specific session where the conversation will happen
session = session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
)
print(f"Session created: App='{APP_NAME}', User='{USER_ID}', Session='{SESSION_ID}'")

# --- Runner ---
runner = Runner(
    agent=root_agent, 
    app_name=APP_NAME,   
    session_service=session_service #
)
print(f"Runner created for agent '{runner.agent.name}'.")

Session created: App='yt_app', User='user_1', Session='session_001'
Runner created for agent 'yt_agent_v0'.


In [5]:
import asyncio
from google.genai import types 

async def call_agent_async(query: str):
  """Sends a query to the agent and prints the final response."""
  print(f"\n>>> User Query: {query}")

  # Prepare the user's message in ADK format
  content = types.Content(role='user', parts=[types.Part(text=query)])

  final_response_text = "Agent did not produce a final response." #default

  async for event in runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content):
      # You can uncomment the line below to see *all* events during execution
      # print(f"  [Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")

      # Key Concept: is_final_response() marks the concluding message for the turn.
      if event.is_final_response():
          if event.content and event.content.parts:
             # Assuming text response in the first part
             final_response_text = event.content.parts[0].text
          elif event.actions and event.actions.escalate: # Handle potential errors/escalations
             final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
          break 

  print(f"<<< Agent Response: {final_response_text}")
  return final_response_text

In [6]:
text = await call_agent_async(f"how to swim? yt_url= {yt_url}")


>>> User Query: how to swim? yt_url= https://www.youtube.com/watch?v=sBuoMkJsMsA
<<< Agent Response: Here's a summary of how to swim freestyle, based on the YouTube video transcript:

*   **Step 1: Streamline.** Practice gliding on the surface with your face in the water, blowing bubbles, arms stretched forward, and chin down.
*   **Step 2: Arm Strokes with a Board.** Use a kickboard to focus on the arm movements. Keep your elbows up and push the water with your hands and forearms. Feel the pressure as you pull through the water. Then practice the same arm motion without the board, keeping your hands gliding on the surface.
*   **Step 3: Freestyle with Breathing.** Incorporate breathing by turning your head to the side while pulling with your hand. Breathe in, keeping your eye on the water, and return your head to the water before your hand re-enters. Practice slowly, alternating sides for breathing.
*   **Real Freestyle.** Instead of touching hands together in front, glide with your 

In [None]:

session = session_service.get_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
)
session.state

{'yt_agent_v0': "Here's a summary of how to swim freestyle, based on the YouTube video transcript:\n\n*   **Step 1: Streamline.** Practice gliding on the surface with your face in the water, blowing bubbles, arms stretched forward, and chin down.\n*   **Step 2: Arm Strokes with a Board.** Use a kickboard to focus on the arm movements. Keep your elbows up and push the water with your hands and forearms. Feel the pressure as you pull through the water. Then practice the same arm motion without the board, keeping your hands gliding on the surface.\n*   **Step 3: Freestyle with Breathing.** Incorporate breathing by turning your head to the side while pulling with your hand. Breathe in, keeping your eye on the water, and return your head to the water before your hand re-enters. Practice slowly, alternating sides for breathing.\n*   **Real Freestyle.** Instead of touching hands together in front, glide with your hands one by one. Relax your shoulders, open your hands, lift your elbows, and t

# Parallel Agent

In [4]:
parallel_research_agent = ParallelAgent(
    name="ParallelWebResearchAgent",
    sub_agents=[root_agent, root_agent_1, root_agent_2]
)

In [5]:
APP_NAME = "yt_app_advanced"
USER_ID = "user_2"
SESSION_ID = "session_002" 

# Session and Runner
session_service = InMemorySessionService()
session = session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
runner = Runner(agent=parallel_research_agent, app_name=APP_NAME, session_service=session_service)


# # Agent Interaction
# def call_agent(query):
#     '''
#     Helper function to call the agent with a query.
#     '''
#     content = types.Content(role='user', parts=[types.Part(text=query)])
#     events = runner.run(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

#     for event in events:
#         if event.is_final_response():
#             final_response = event.content.parts[0].text
#             print("Agent Response: ", final_response)

# call_agent(f"how to swim? yt_url= {yt_url}")

In [6]:
yt_url_1 ="https://www.youtube.com/watch?v=sBuoMkJsMsA"
yt_url_2 = "https://www.youtube.com/watch?v=1nnndhSc4iw"
yt_url_3 = "https://www.youtube.com/watch?v=_kGESn8ArrU"

In [16]:
async def process_multiple_queries(queries):    
    # Create a trigger message to start processing
    query = f"Process all queries in parallel in the sequence {queries['q1']}, {queries['q2']}, {queries['q3']}"
    message = types.Content(
        role="user", 
        parts=[types.Part(text=query)]
    )
    session = session_service.get_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
    # Run the parallel agent
    async for event in runner.run_async(
        user_id=USER_ID, 
        session_id=SESSION_ID, 
        new_message=message
    ):
        if event.is_final_response():
            # Processing complete, now retrieve results from session state
            results = {
                "q1": session.state.get("yt_agent_v0"),
                "q2": session.state.get("yt_agent_v1"),
                "q3": session.state.get("yt_agent_v2")
            }
            return results


queries = {
    "q1": f"how to swim? yt_url= {yt_url_1}",
    "q2": f"how to sing? yt_url= {yt_url_2}",
    "q3": f"how to run? yt_url= {yt_url_3}"
}
results = await process_multiple_queries(queries)
print(results)

{'q1': "Okay, I'm ready to process your queries in parallel. Please provide the topic and YouTube video URL. I will extract the transcript and summarize the key learnings related to the topic.\n", 'q2': None, 'q3': None}


In [11]:
session = session_service.get_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID)
session.state

{'yt_agent_v0': "Okay, I'm ready to process your queries in parallel. Please provide the topic and YouTube video URL. I will extract the transcript and summarize the key learnings related to the topic.\n"}