In [None]:
import os
import uuid
import faiss
import sqlite3
import traceback
import numpy as np
from sentence_transformers import SentenceTransformer

from google.genai import types
from google.adk.events import Event
from google.adk.runners import Runner
from google.adk.models.google_llm import Gemini
from google.adk.memory import InMemoryMemoryService
from google.adk.tools.tool_context import ToolContext
from google.adk.sessions import InMemorySessionService, Session
from google.adk.tools import google_search, load_memory
from google.adk.plugins.logging_plugin import LoggingPlugin
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents import Agent, SequentialAgent, ParallelAgent, LoopAgent
from google.adk.apps.app import App, ResumabilityConfig, EventsCompactionConfig

In [None]:
retry_config=types.HttpRetryOptions(attempts=3, exp_base=5, initial_delay=1, http_status_codes=[429, 500, 503, 504])
model = Gemini(model="gemini-2.5-flash-lite", retry_options=retry_config)

# Idea Refinement Loop

In [None]:
STORAGE_SQLITE_FILENAME = "docs.db"
STORAGE_FAISS_FILENAME = "vector_index.faiss"

# SQLite DB to store texts
STORAGE_SQLITE_CONNECTION = sqlite3.connect("docs.db")
STORAGE_SQLITE_CURSOR = STORAGE_SQLITE_CONNECTION.cursor()
_ = STORAGE_SQLITE_CURSOR.execute("""
CREATE TABLE IF NOT EXISTS documents (
    id INTEGER PRIMARY KEY,
    text TEXT NOT NULL
)""")

STORAGE_FAISS_EMBEDDING_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
if os.path.exists(STORAGE_FAISS_FILENAME):
    STORAGE_FAISS_INDEX = faiss.read_index(STORAGE_FAISS_FILENAME)
else:
    STORAGE_FAISS_INDEX = faiss.IndexFlatL2(384) # MiniLM-L6-v2 dimension = 384
    STORAGE_FAISS_INDEX = faiss.IndexIDMap2(STORAGE_FAISS_INDEX)

In [None]:
def get_similar_ideas(idea: str, summary: str) -> dict:
    """Fetch similar ideas already used from the database.
    Up to 3 closest results will be returned.
    If no idea is within the similarity range, an empty list will be returned.

    Args:
        idea (str): the full generated idea
        summary (str): the simple summary of main concepts of the idea

    Returns:
        Dictionary with status and a list of similar ideas already generated.
        Success: {"status": "success", "similar": ["<question>...", "<question>..."]}
        Error: {"status": "error", "error_message": "Couldn't validate idea"}
    """
    try:
        # embed query
        vec = STORAGE_FAISS_EMBEDDING_MODEL.encode([summary], convert_to_numpy=True)
        # search FAISS
        _, indices = STORAGE_FAISS_INDEX.search(vec, 3)
        # fetch matched documents from SQLite
        results = []
        for idx in indices[0]:
            idx = int(idx) # FAISS returns numpy.int64 and SQLite expects pythonic int
            if idx == -1: continue
            text_i = STORAGE_SQLITE_CURSOR.execute("SELECT text FROM documents WHERE id=?", (idx,)).fetchone()
            if text_i:
                results.append(text_i[0])
        # store text in SQLite
        STORAGE_SQLITE_CURSOR.execute("INSERT INTO documents (text) VALUES (?)", (idea,))
        doc_id = STORAGE_SQLITE_CURSOR.lastrowid
        STORAGE_SQLITE_CONNECTION.commit()
        # embed text
        vec = STORAGE_FAISS_EMBEDDING_MODEL.encode([summary], convert_to_numpy=True)
        # store vector in FAISS
        STORAGE_FAISS_INDEX.add_with_ids(vec, np.array([doc_id], dtype=np.int64))
        faiss.write_index(STORAGE_FAISS_INDEX, "vector_index.faiss")
        return {"status": "success", "similar": results}
    except Exception as ex:
        print(f"### Exception occured {ex}")
        return {"status": "error", "error_message": f"Couldn't validate idea: {ex}"}

In [None]:
async def auto_save_to_memory(callback_context: CallbackContext):
    """Call this function to save the current session to memory."""
    ctx = callback_context._invocation_context
    await ctx.memory_service.add_session_to_memory(ctx.session)

In [None]:
def exit_loop(tool_context: ToolContext) -> dict:
    """Call this function ONLY when the idea is approved, signaling the iterative process should end."""
    tool_context.actions.escalate = True
    return {}

In [None]:
idea_generator_agent = Agent(
    name="IdeaGeneratorAgent",
    model=model,
    instruction="""
    Generate a coding trivia question, suitable for an interview or quiz, based on the user's prompt.
    If you need to chack some unknown concepts, use `google_search` tool.
    If you need to reference previous ideas, use `load_memory` tool.
    The question should be short, clear, and fit in one line.
    The code block should be clear, formatted, short (maximum 10 lines of code), and in the programming language specified by the user.
    The answer options each should be short and clear (maximum 1 line). Only one answer should be correct.
    Respond in the following JSON format:
    {
        "question": "<the generated question>",
        "code_block": "<the code block illustrating the question>",
        "options": ["<option 1>", "<option 2>", "<option 3>", "<option 4>"],
        "correct_answer": "<the correct answer>"
    }
    """,
    after_agent_callback=auto_save_to_memory,
    tools=[google_search, load_memory],
    output_key="idea",
)

In [None]:
idea_checker_agent = Agent(
    name="IdeaCheckerAgent",
    model=model,
    instruction="""
    Here is a generated idea:
    {{idea}}

    Create a summary of the question and answers that could identify this problem type (without specific values, but also not too general).
    Your task is to check if this question idea has already been asked, to avoid repeating.
    When creating a summary, remember:
    - the same question but with different numeric values shouldn't repeat.
    - similar question, but one asking about time complexity and another about space complexity are ok.
    This should give you a clue on how to summarise the generated idea.
    
    To check if the idea was used already, fetch similar used ideas by calling the `get_similar_ideas` tool.
    Based on the analysis of the most similar already used ideas, decide if the current idea is different enough.
    - If the idea is interesting and different from already used, you MUST call the 'exit_loop' function. Do not output any text.
    - Otherwise, you MUST respond with the exact phrase: "EXISTS"
    """,
    tools=[get_similar_ideas, exit_loop],
)

In [None]:
idea_refinement_loop_agent = LoopAgent(
    name="IdeaRefinementLoop",
    sub_agents=[idea_generator_agent, idea_checker_agent],
    max_iterations=3,
)

# Human Approval

In [None]:
def approve_idea(tool_context: ToolContext, idea: str) -> dict:
    """Waits for user to approve the idea.

    Args:
        idea [str]: summary of the idea to approve

    Returns:
        Dictionary with approval status: "approved", "rejected", or "pending".
    """
    # On first time run, wait for human approval - pause here.
    if not tool_context.tool_confirmation:
        tool_context.request_confirmation(hint=f"Idea: {idea}. Do you want to approve?", payload={"idea": idea})
        return {"status": "pending", "message": f"Idea requires approval"}
    # On the resumption, handle the approval response.
    if tool_context.tool_confirmation.confirmed:
        return {"status": "approved", "message": f"Idea approved: {idea}"}
    return {"status": "rejected", "message": f"Idea rejected: {idea}"}

In [None]:
human_approval_agent = Agent(
    name="HumanApprovalAgent",
    model=model,
    instruction="""
    Here is a generated idea:
    {{idea}}

    You are a generator assistant.
    When you receive the generated idea:
    1. Use the `approve_idea` tool with the exact idea
    2. If the status is 'pending', inform the user that approval is required
    3. After receiving the final result, don't respond.
    """,
    tools=[approve_idea],
)

# Video Partials Parallel

In [None]:
def generate_video(language: str, code: str, question: str, answers: list[str]) -> dict:
    """Generate an MP4 video featuring a code snippet, a question, and a list of possible answers.

    Args:
        language (str): Coding language used in the code snippet.
        code (str): Formatted code snippet.
        question (str): Question regarding the code snippet.
        answers (list[str]): List of 4 answer choices (A, B, C, or D).

    Returns:
        Dictionary with status and path to the file.
        Success: {"status": "success", "video": "generated.mp4"}
        Error: {"status": "error", "error_message": "Video not generated"}
    """
    # placeholder implementation, details in writeup https://www.kaggle.com/code/kamilmatejuk/coding-video-framework
    filename = os.path.abspath(f"generated_{uuid.uuid4().hex[:8]}.mp4")
    return {"status": "success", "video": filename}

In [None]:
video_generator_agent = Agent(
    name="VideoGeneratorAgent",
    model=model,
    instruction="""
    Here is a generated idea:
    {{idea}}

    Using the `generate_video` tool, create a video based on the idea.
    Output only the path to the generated video.
    """,
    tools=[generate_video],
    output_key="video_path",
)

In [None]:
video_description_agent = Agent(
    name="VideoDescriptionAgent",
    model=model,
    instruction="""
    Here is a generated idea:
    {{idea}}

    Create a video description for the social media short, including programming language, problem description and correct answer explanation.
    Make sure to spark interest, call to action (like, share, comment your answer and explanation) and add hashtags.
    """,
    output_key="description",
)

video_description_agent_safe = SequentialAgent(
    name="VideoDescriptionAgentSafeWrapper",
    sub_agents=[video_description_agent]
)

In [None]:
video_partials_parallel_agent = ParallelAgent(
    name="VideoPartialsParallel",
    sub_agents=[video_generator_agent, video_description_agent_safe],
)

# Publish

In [None]:
def publish_to_yt(video: str, desc: str) -> dict:
    """Publish video to YouTube.

    Args:
        video (str): path to mp4 file
        desc (str): video description

    Returns:
        Dictionary with status.
        Success: {"status": "success"}
        Error: {"status": "error", "error_message": "Video not generated"}
    """
    # placeholder implementation
    return {"status": "success"}

In [None]:
publish_agent = Agent(
    name="PublishAgent",
    model=model,
    instruction="""
    Here is video file {{video_path}} and description:
    {{description}}

    Publish the video to YouTube, using a `publish_to_yt` tool.
    Respond with status
    """,
    tools=[publish_to_yt],
)

# Run

In [None]:
pipeline = SequentialAgent(
    name="VideoGeneratorPipeline",
    sub_agents=[idea_refinement_loop_agent, human_approval_agent, video_partials_parallel_agent, publish_agent],
)

In [None]:
app = App(
    name="VideoGeneratorApp",
    root_agent=pipeline,
    resumability_config=ResumabilityConfig(is_resumable=True),
    events_compaction_config=EventsCompactionConfig(compaction_interval=3, overlap_size=1),
    plugins=[LoggingPlugin()],
)

In [None]:
runner = Runner(
    app=app,
    session_service=InMemorySessionService(),
    memory_service=InMemoryMemoryService(),
)

In [None]:
async def run(query: str):
    try:
        user_id = "user_id"
        session: Session = await runner.session_service.create_session(app_name=app.name, user_id=user_id)

        print(f"User > {query}")

        # Send initial request to the agent
        events: list[Event] = [event async for event in runner.run_async(
            user_id=user_id,
            session_id=session.id,
            new_message=types.Content(role="user", parts=[types.Part(text=query)]))]

        # Find `adk_request_confirmation` event returned by agent
        approval_id, invocation_id = None, None
        for event in events:
            if not event.content: continue
            if not event.content.parts: continue
            for part in event.content.parts:
                if part.function_call and part.function_call.name == "adk_request_confirmation":
                    approval_id = part.function_call.id
                    invocation_id = event.invocation_id
                    break
        assert approval_id is not None and invocation_id is not None, "No approval request found"

        # Handle approval 
        approve = input("HumanInTheLoop > Do you approve the idea? (y/n): ").strip().lower()
        print(f"HumanInTheLoop > Decision: {'✅' if approve == 'y' else '❌'}")

        # Resume the agent with the approval decision
        decision = types.FunctionResponse(
            id=approval_id,
            name="adk_request_confirmation",
            response={"confirmed": approve == 'y'})
        events: list[Event] = [event async for event in runner.run_async(
            user_id=user_id,
            session_id=session.id,
            new_message=types.Content(role="user", parts=[types.Part(function_response=decision)]))]
        for event in events:
            if not event.content: continue
            if not event.content.parts: continue
            for part in event.content.parts:
                if not part.text: continue
                print(f"Agent > {part.text}")
    except* Exception as eg:
        print("=== ROOT EXCEPTIONS FROM TASKGROUP ===")
        for ex in eg.exceptions:
            traceback.print_exception(type(ex), ex, ex.__traceback__)