In [11]:
from google.adk.agents import Agent, SequentialAgent, ParallelAgent, LoopAgent
from google.adk.models.google_llm import Gemini
from google.adk.runners import InMemoryRunner, Runner
from google.adk.sessions import  InMemorySessionService
from google.adk.tools import AgentTool, FunctionTool, google_search, BaseTool
from google.genai import types

import os
import json
import logging
from dotenv import load_dotenv
import uuid
import time

from pathlib import Path    
from typing import Literal, Optional, Dict, Any, Final



print("✅ ADK components imported successfully.")

✅ ADK components imported successfully.


In [2]:
# Ignore warnings from ADK and Gemini APIs
logging.getLogger("google.adk.runners").setLevel(logging.ERROR)
logging.getLogger("google_genai.types").setLevel(logging.ERROR)

In [3]:
# This line loads the environment variables from your .env file
load_dotenv()

# Now you can access the variables using os.getenv()
gemini_api_key = os.getenv("GEMINI_API_KEY")

if gemini_api_key:
    print("Successfully loaded GEMINI_API_KEY.")
    # You can now use the gemini_api_key variable to configure your API client
    # print(f"API Key: {gemini_api_key[:4]}...{gemini_api_key[-4:]}") # Example of printing a redacted key
else:
    print("Error: GEMINI_API_KEY not found. Make sure it's in your .env file.")


Successfully loaded GEMINI_API_KEY.


In [4]:
retry_config=types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=2,  # Delay multiplier
    initial_delay=5,
    http_status_codes=[429, 500, 503, 504], # Retry on these HTTP errors
)


In [None]:
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional, Final

# Import necessary ADK components
from google.adk.plugins import BasePlugin
from google.adk.sessions import Session
from google.adk.agents.invocation_context import InvocationContext

# --- Configuration & Constants ---
SESSION_DIARY_PATH: Final[Path] = Path("data/session_diary.json")
GEMINI_MODEL: Final[str] = "gemini-2.5-flash"
MAX_RETRIES: Final[int] = 3

# --- JSON Schema for Structured Output ---
# This is the standard JSON Schema used to force the LLM to return reliable JSON.
SUMMARY_SCHEMA: Final[Dict[str, Any]] = {
    "type": "object",
    "properties": {
        "last_daily_focus": {
            "type": "string",
            "description": "A short sentence capturing the main virtue/lesson/focus from the DAILY cluster session, if one occurred."
        },
        "last_weekly_goal": {
            "type": "string",
            "description": "A short sentence describing the main goal, reflection, or virtue defined in the WEEKLY cluster session, if one occurred."
        },
        "long_term_objective": {
            "type": "string",
            "description": "A concise statement of the user's primary long-term objective mentioned in the session, if one was set or discussed."
        },
        "on_demand_diary_note": {
            "type": "string",
            "description": "A short, narrative diary note (max 50 words) summarizing the most important reactive (ON-DEMAND) interaction, including the core problem and the philosophical advice given."
        }
    },
    "required": ["last_daily_focus", "last_weekly_goal", "long_term_objective", "on_demand_diary_note"]
}

# --- Utility Functions ---

def _get_text_from_event(event: Any) -> str:
    """Safely extracts and formats text from an event's content structure."""
    if not hasattr(event, 'content') or not event.content or not hasattr(event.content, 'parts'):
        return ""
    
    parts = []
    for part in event.content.parts:
        if hasattr(part, 'text') and part.text:
            parts.append(part.text.strip())
        # Add handling for other part types if needed (e.g., function_call)
    
    return " | ".join(parts)

def _generate_transcript(events: List[Any]) -> str:
    """Generates a readable conversation transcript from session events."""
    transcript = []
    for i, event in enumerate(events):
        text = _get_text_from_event(event)
        role = "USER" if i == 0 else "AGENT"
        if text:
            transcript.append(f"[{role}]: {text}")
    return "\n".join(transcript)

def _load_diary() -> List[Dict[str, Any]]:
    """Loads existing diary entries safely."""
    if SESSION_DIARY_PATH.exists():
        with open(SESSION_DIARY_PATH, 'r') as f:
            try:
                # Load the full list of previous session logs
                return json.load(f)
            except json.JSONDecodeError:
                print("Warning: Could not decode JSON in diary file. Starting with empty diary.")
                return []
    return []

def _save_diary(diary: List[Dict[str, Any]]):
    """Saves the updated diary entries."""
    with open(SESSION_DIARY_PATH, 'w') as f:
        json.dump(diary, f, indent=2)

async def _call_gemini_for_summary(transcript: str, session_id: str) -> Dict[str, str]:
    """
    STUB FUNCTION: Call the Gemini API for structured summarization.
    
    *** IMPORTANT: YOU MUST REPLACE THIS STUB WITH YOUR ASYNCHRONOUS HTTP CLIENT CALL. ***
    
    This function outlines the required payload structure for the Gemini API.
    """
    print(f"\t[LLM Call]: Preparing to summarize session {session_id}...")
    
    # 1. Construct the System Prompt
    system_prompt = (
        "You are a highly efficient memory compression and extraction service. "
        "Your task is to analyze the session transcript and extract the most "
        "relevant information into the STRICT JSON format provided in the schema."
    )
    
    # 2. Construct the User Query (including the transcript)
    user_query = (
        "### Session Transcript\n"
        f"{transcript}\n\n"
        "### Task\n"
        "Analyze the transcript and generate the required structured JSON summary."
    )

    # 3. Define the full API payload
    payload = {
        "contents": [{"parts": [{"text": user_query}]}],
        "systemInstruction": {"parts": [{"text": system_prompt}]},
        "generationConfig": {
            "responseMimeType": "application/json",
            "responseSchema": SUMMARY_SCHEMA,
            "temperature": 0.1 # Low temperature for reliable structure
        }
    }
    
    # --- STUB IMPLEMENTATION BELOW ---
    # In a real ADK environment, you would use an async library (like httpx)
    # and exponential backoff to make the POST request to the Gemini API.
    
    # MOCKING THE RESPONSE for local testing:
    if "quitting my stable job" in transcript:
        return {
            "last_daily_focus": "",
            "last_weekly_goal": "Investigate financial viability of artistic pursuits.",
            "long_term_objective": "Transition to a full-time artistic career.",
            "on_demand_diary_note": "User faced fear about career change; advised on existential freedom and necessity of choice (DeBeauvoir)."
        }
    else:
        return {
            "last_daily_focus": "Successfully mediated conflict with a structured, scientific approach.",
            "last_weekly_goal": "",
            "long_term_objective": "Improve personal communication skills.",
            "on_demand_diary_note": "Roommate conflict resolved using Dewey's scientific method of hypothesis testing and evaluation."
        }
    # --- STUB IMPLEMENTATION END ---

# --- The Plugin ---

class SessionPersistencePlugin(BasePlugin):
    """
    Plugin to handle custom persistence by summarizing the session content 
    using a Gemini LLM call and saving the results to a local diary file.
    """
    
    def __init__(self, name: str = "session_persistence_plugin"):
        """Initialize the plugin and ensure the data directory exists."""
        super().__init__(name)
        SESSION_DIARY_PATH.parent.mkdir(parents=True, exist_ok=True)
        
        # NOTE: In a real app, initialize your async HTTP client here
        
        print(f"[Plugin Init]: Ready to write session data to: {SESSION_DIARY_PATH}")
        
    def _load_diary(self) -> List[Dict[str, Any]]:
        return _load_diary()

    def _save_diary(self, diary: List[Dict[str, Any]]):
        _save_diary(diary)

    async def after_run_callback(
        self, 
        *, 
        invocation_context: InvocationContext
    ) -> Optional[None]:
        """
        Executes after the entire Runner process completes.
        Calls an LLM to generate a structured summary, then saves it.
        """
        session = invocation_context.session 
        session_id = session.id
        print(f"\n[AFTER_RUN Hook]: Starting summary and auto-save for ID: {session_id}")
        
        try:
            # 1. Extract the full session transcript
            transcript = _generate_transcript(session.events)
            
            # 2. Call LLM to get the structured summary (JSON)
            summary_data = await _call_gemini_for_summary(transcript, session_id)
            
            # 3. Extract necessary metadata
            user_query = _get_text_from_event(session.events[0]) if session.events else "N/A"
            final_response_text = _get_text_from_event(session.events[-1]) if session.events else "N/A"
            
            # 4. Assemble the final session log entry
            log_entry: Dict[str, Any] = {
                "timestamp": datetime.now().isoformat(),
                "session_id": session_id,
                "user_id": session.user_id,
                "user_query": user_query,
                "final_response_text": final_response_text,
                
                # Merge the LLM-generated summary keys directly into the log
                **summary_data, 
                
                # We can still include the raw final state for completeness/debugging
                "raw_final_state": session.state.to_dict() if hasattr(session.state, 'to_dict') else str(session.state)
            }

            # 5. Load, append, and save
            diary = self._load_diary()
            diary.append(log_entry)
            self._save_diary(diary)
                
            print("[AFTER_RUN Hook]: Successfully generated summary and saved log entry.")

        except Exception as e:
            print(f"[AFTER_RUN Hook ERROR]: Failed to generate summary or save session {session_id}. Error: {e}")

In [None]:
import json
from datetime import datetime
from typing import Dict, Any, List, Final
# We need to import the context object used by the low-level hook
from google.adk.agents.invocation_context import InvocationContext
# We need types.Content to safely parse content, but since we cannot reliably import 
# internal types, we will rely on duck-typing the context objects.
# We will still use BasePlugin and Session for context.
from google.adk.plugins import BasePlugin

# --- Utility Function for Safe Extraction (Revised to use invocation_context data) ---

def _get_text_from_event(event: Any) -> str:
    """Safely extracts text from an event's content structure."""
    
    # We must check for content/parts structure which is what the ADK uses for message bodies
    if not hasattr(event, 'content') or not event.content or not hasattr(event.content, 'parts'):
        return ""
    
    parts = []
    for part in event.content.parts:
        if hasattr(part, 'text') and part.text:
            parts.append(part.text.strip())
        elif hasattr(part, 'function_call') and part.function_call:
             parts.append(f"Function Call: {part.function_call.name}")
        elif hasattr(part, 'function_response') and part.function_response:
             parts.append(f"Function Response: {part.function_response.name}")
    
    return " | ".join(parts)

# --- The Plugin ---

class SessionPersistencePlugin(BasePlugin):
    """
    A custom plugin using the standard method-name hook to capture session data 
    after the runner finishes.
    """
    def __init__(self, name: str = "session_persistence_plugin"):
        """
        Initialize the session persistence plugin and ensure the data directory exists.
        
        Args:
          name: The name of the plugin instance, passed to BasePlugin.
        """
        # CRITICAL FIX: Call the BasePlugin constructor with the required name argument
        super().__init__(name)
        
        # Ensure the directory exists when the plugin is initialized
        SESSION_DIARY_PATH.parent.mkdir(parents=True, exist_ok=True)
        print(f"[Plugin Init]: Ready to write session data to: {SESSION_DIARY_PATH}")

    def _load_diary(self) -> List[Dict[str, Any]]:
        # ... (Same safe loading logic as before) ...
        if SESSION_DIARY_PATH.exists():
            with open(SESSION_DIARY_PATH, 'r') as f:
                try:
                    return json.load(f)
                except json.JSONDecodeError:
                    return []
        return []

    def _save_diary(self, diary: List[Dict[str, Any]]):
        # ... (Same saving logic as before) ...
        with open(SESSION_DIARY_PATH, 'w') as f:
            json.dump(diary, f, indent=2)

    # The required method name for the after-run hook (no decorator used)
    async def after_run_callback(
        self, 
        *, 
        invocation_context: InvocationContext
    ) -> Optional[None]:
        """
        Executes after the entire Runner process completes, capturing the final state.
        We access the final session object via the invocation_context.
        """
        # The session is available within the InvocationContext for post-run hooks
        session = invocation_context.session 
        
        print(f"\n[AFTER_RUN Hook]: Starting session auto-save for ID: {session.id}")
        
        try:
            # 1. Extract necessary data
            
            # The user's query is typically the first event's content
            user_query = _get_text_from_event(session.events[0]) if session.events else "N/A"
            
            # The final response is the last event's content
            final_response_text = _get_text_from_event(session.events[-1]) if session.events else "N/A"
            
            # If session.state is dict-like, use this:
            final_state_dict = dict(session.state.items()) if hasattr(session.state, 'items') else {} 
            
            # If the SessionState object is iterable:
            if not final_state_dict and hasattr(session.state, '__iter__'):
                final_state_dict = {k: v for k, v in session.state}
            
            session_data: Dict[str, Any] = {
                # ... (rest of the session_data dict) ...
                "timestamp": datetime.now().isoformat(),
                "session_id": session.id,
                "user_id": session.user_id,
                "user_query": user_query,
                "final_state": final_state_dict,
                "final_response_text": final_response_text
            }

            # 2. Load, append, and save
            diary = self._load_diary()
            diary.append(session_data)
            self._save_diary(diary)
                
            print("[AFTER_RUN Hook]: Successfully saved session data.")
                
            print("[AFTER_RUN Hook]: Successfully saved session data.")

        except Exception as e:
            print(f"[AFTER_RUN Hook ERROR]: Failed to save session {session.id}. Error: {e}")

In [44]:
# Define the app_name, user_id and session_id for testing the agents
APP_NAME = "agents"
USER_ID = "user_1"


async def test_agent(query, agent):
  """Sends a query to the agent and prints the final response."""
  session_service = InMemorySessionService()


  print(f"\n>>> User Query: {query}")

  unique_session_id = str(uuid.uuid4()) 


  # Create a session
  session = await session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=unique_session_id # Pass the unique ID
  )

  # Create a Runner
  runner = Runner(
      app_name=APP_NAME,
      agent=agent,
      session_service=session_service,
      plugins=[SessionPersistencePlugin()],
  )

  # Prepare the user's message in ADK format
  content = types.Content(role='user', parts=[types.Part(text=query)])

  final_response_text = None
  # We iterate through events from run_async to find the final answer.
  async for event in runner.run_async(user_id=USER_ID, session_id=session.id, new_message=content):
      if event.is_final_response():
          if event.content and event.content.parts:
             final_response_text = event.content.parts[0].text
          break
  return f"<<< Agent Response: {final_response_text}"


In [None]:
CONFIG_DIR = Path("the_teams")
TEAM_NAMES = {
    "daily": "daily",
    "weekly": "weekly",
    "long_term": "long_term",
    "on_demand": "on_demand",
    "oracle": "oracle"
}

TEAM_NAMES.keys()

ALL_TEAM_ROUTERS = {}

In [8]:
def load_config(filename: str) -> Dict[str, Any]:
    """Loads a JSON configuration file."""
    try:
        with open(CONFIG_DIR / f"{filename}.json", 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        print(f"Error: Configuration file not found at {CONFIG_DIR / f'{filename}.json'}")
        return {}

In [49]:
agent_LLM = model=Gemini(
    model="gemini-2.0-flash-lite",
    api_key=gemini_api_key,
    retry_options=retry_config
    )
router_LLM = model=Gemini(
    model="gemini-2.5-flash",
    api_key=gemini_api_key,
    retry_options=retry_config
    )

def define_agents(team_name) -> Agent:
    """
    Defines the complete three-tiered agent architecture:
    RootRouter (Tier 1) -> Cluster Routers (Tier 2) -> Specialist Agents (Tier 3)
    """

    # --- 5a. TIER 3: SPECIALIST AGENTS ---
    # These agents are grouped by the cluster they belong to, but we create unique instances
    # for all 13 agents.

    agent_list = []
    
    # Collect all unique agent names across all four levels
    descriptions = load_config(f"{team_name}_descriptions")
    all_agent_names = list(descriptions.keys())
    instructions = load_config(f"{team_name}_instructions")


    # Create the unique instance for each specialist (Tier 3)
    for agent_name in all_agent_names:
        if agent_name != team_name:
            print(agent_name)
            #print(descriptions[agent_name])
            #print(instructions[agent_name])
        # All specialist agents get the memory tool
            output_key = f"{team_name}_response_by_{agent_name}"
            current_agent =  Agent(
                name=agent_name,
                description=descriptions[agent_name],
                model=agent_LLM,
                instruction=instructions[agent_name],
                output_key=output_key,
                #tools=[memory_tool] # Pass the shared memory tool to the specialist
            )
            agent_list.append(current_agent)


    agents_as_tools = [AgentTool(agent) for agent in agent_list]        
    
    output_key = f"{team_name}_response_by_{team_name}"

    router_agent = Agent(
            name=team_name,
            description=descriptions[team_name],
            model=router_LLM,
            instruction=instructions[team_name],
            output_key=output_key,
            tools = agents_as_tools # Pass the shared memory tool to the specialist
        )
    return router_agent


In [None]:
TEAM_NAMES = {
    "daily": "daily",
    "weekly": "weekly",
    "long_term": "long_term",
    "on_demand": "on_demand",
    "oracle": "oracle"
}

for team in TEAM_NAMES:
    if team != "oracle":
        print(team)
        ALL_TEAM_ROUTERS[team] = define_agents(team)


daily
Camus
Seneca
Epicurus
Aristotle
Nagarjuna
Socrates
DeBeauvoir
Hooks
weekly
Aristotle
DeBeauvoir
Hooks
Kant
Nagarjuna
Nietzsche
Nussbaum
Dewey
long_term
DeBeauvoir
Nietzsche
Nussbaum
Aristotle
Dewey
Nagarjuna
on_demand
Confucius
Socrates
DeBeauvoir
Seneca
Kant
Aristotle
Epicurus
Dewey
Camus
Nussbaum


In [50]:
this_team = "on_demand"

ALL_TEAM_ROUTERS[this_team] = define_agents(this_team)


# --- Step 1: Load the test queries from your JSON file ---
queries_path = f'the_teams/{this_team}_queries.json'
with open(queries_path, 'r') as f:
    test_queries = json.load(f)


# --- Step 3: Loop through the queries and run the tests ---
print(f"--- Running tests from {queries_path} ---\n")

for philosopher, query in test_queries.items():
    # The ADK runner routes to the agent based on its name in the query.
    # We'll construct the full input string.
    # Note: The 'LaoTzu' query will fail gracefully if you don't have a 'LaoTzu' agent.
    #full_query = f"{philosopher}, {query}"
    full_query = f"{query}"
    
    print(f"--- Testing: {philosopher} ---")
    #print(f"[User Query]: {full_query}\n")
    
    # Await the response and capture the DebugInfo object
    # Note: The current google-adk `run_debug` is synchronous. If your code
    # doesn't need 'await', you can simply remove it.
    response = await test_agent(full_query, ALL_TEAM_ROUTERS[this_team])

    
    # Now you can print the specific part of the response you want
    print(response)
    print("----------------------------------------\n")
    time.sleep(5)



Confucius
Socrates
DeBeauvoir
Seneca
Kant
Aristotle
Epicurus
Dewey
Camus
Nussbaum
--- Running tests from the_teams/on_demand_queries.json ---

--- Testing: Confucius ---

>>> User Query: I need to write an email to respectfully decline a request from a superior. How should I phrase it?
[Plugin Init]: Ready to write session data to: data/session_diary.json

[AFTER_RUN Hook]: Starting session auto-save for ID: 0dd28b9f-764c-4609-8c76-a4f05f444d14
[AFTER_RUN Hook]: Successfully saved session data.
[AFTER_RUN Hook]: Successfully saved session data.
<<< Agent Response: Analyze your situation with diligence. To decline a request from a superior is a matter of utmost importance. Your role is one of a subordinate, and your duty is to fulfill requests to the best of your ability. However, when you cannot, you must decline with both clarity and respect.

Begin by expressing gratitude for the opportunity. State your inability to fulfill the request directly and honestly, but frame it as a constra

CancelledError: 

In [15]:
this_team = "daily"

ALL_TEAM_ROUTERS[this_team] = define_agents(this_team, memory_tool)


# --- Step 1: Load the test queries from your JSON file ---
queries_path = f'the_teams/{this_team}_queries.json'
with open(queries_path, 'r') as f:
    test_queries = json.load(f)


# --- Step 3: Loop through the queries and run the tests ---
print(f"--- Running tests from {queries_path} ---\n")

for philosopher, query in test_queries.items():
    # The ADK runner routes to the agent based on its name in the query.
    # We'll construct the full input string.
    # Note: The 'LaoTzu' query will fail gracefully if you don't have a 'LaoTzu' agent.
    #full_query = f"{philosopher}, {query}"
    full_query = f"{query}"
    
    print(f"--- Testing: {philosopher} ---")
    #print(f"[User Query]: {full_query}\n")
    
    # Await the response and capture the DebugInfo object
    # Note: The current google-adk `run_debug` is synchronous. If your code
    # doesn't need 'await', you can simply remove it.
    response = await test_agent(full_query, ALL_TEAM_ROUTERS[this_team])

    
    # Now you can print the specific part of the response you want
    print(response)
    print("----------------------------------------\n")
    time.sleep(5)



Camus
Seneca
Epicurus
Aristotle
Nagarjuna
Socrates
DeBeauvoir
Hooks
--- Running tests from the_teams/daily_queries.json ---

--- Testing: Camus ---

>>> User Query: I need a quick prompt to start my day. How should I approach it?
<<< Agent Response: The world offers no inherent meaning, yet we yearn for it. Embrace the absurdity: Today, seek out one small act of rebellion through joy. Laugh in the face of the void.

**Camus's Core Message:** The world is indifferent, but you are not. Find your joy, choose your rebellion. Your action: Find one thing today that makes you truly *feel* alive, and do it.

----------------------------------------

--- Testing: Seneca ---

>>> User Query: I just got frustrated by a long customer service hold time. Help me log this quickly.
<<< Agent Response: Let us begin. Quickly, describe the stressful event: What specifically caused your frustration during the customer service hold?

Now, let's apply the Dichotomy of Control:

*   What aspects of this situ

In [14]:
this_team = "weekly"

ALL_TEAM_ROUTERS[this_team] = define_agents(this_team, memory_tool)


# --- Step 1: Load the test queries from your JSON file ---
queries_path = f'the_teams/{this_team}_queries.json'
with open(queries_path, 'r') as f:
    test_queries = json.load(f)


# --- Step 3: Loop through the queries and run the tests ---
print(f"--- Running tests from {queries_path} ---\n")

for philosopher, query in test_queries.items():
    # The ADK runner routes to the agent based on its name in the query.
    # We'll construct the full input string.
    # Note: The 'LaoTzu' query will fail gracefully if you don't have a 'LaoTzu' agent.
    #full_query = f"{philosopher}, {query}"
    full_query = f"{query}"
    
    print(f"--- Testing: {philosopher} ---")
    #print(f"[User Query]: {full_query}\n")
    
    # Await the response and capture the DebugInfo object
    # Note: The current google-adk `run_debug` is synchronous. If your code
    # doesn't need 'await', you can simply remove it.
    response = await test_agent(full_query, ALL_TEAM_ROUTERS[this_team])

    
    # Now you can print the specific part of the response you want
    print(response)
    print("----------------------------------------\n")
    time.sleep(5)



Aristotle
DeBeauvoir
Hooks
Kant
Nagarjuna
Nietzsche
Nussbaum
Dewey
--- Running tests from the_teams/weekly_queries.json ---

--- Testing: Aristotle ---

>>> User Query: I need to review my week and see if I handled a conflict with my sibling with the right amount of assertiveness (courage).
<<< Agent Response: Ah, a worthy endeavor! To examine our actions through the lens of virtue is to embark on the path of flourishing. Let us delve into your conflict with your sibling, examining your assertiveness – the virtue of courage – and its delicate balance.

1.  **Select Virtue:** You've chosen well. Courage, as it applies to conflict, is the mean between the vices of cowardice (deficiency) and rashness (excess). It is the ability to stand your ground, to speak your mind, and to defend what is right, all while avoiding recklessness and undue aggression.

2.  **Review the Mean:** Let us consider your assertiveness in that conflict. Please recall **two or three specific instances** where you i

In [12]:
this_team = "long_term"

ALL_TEAM_ROUTERS[this_team] = define_agents(this_team, memory_tool)


# --- Step 1: Load the test queries from your JSON file ---
queries_path = f'the_teams/{this_team}_queries.json'
with open(queries_path, 'r') as f:
    test_queries = json.load(f)


# --- Step 3: Loop through the queries and run the tests ---
print(f"--- Running tests from {queries_path} ---\n")

for philosopher, query in test_queries.items():
    # The ADK runner routes to the agent based on its name in the query.
    # We'll construct the full input string.
    # Note: The 'LaoTzu' query will fail gracefully if you don't have a 'LaoTzu' agent.
    #full_query = f"{philosopher}, {query}"
    full_query = f"{query}"
    
    print(f"--- Testing: {philosopher} ---")
    #print(f"[User Query]: {full_query}\n")
    
    # Await the response and capture the DebugInfo object
    # Note: The current google-adk `run_debug` is synchronous. If your code
    # doesn't need 'await', you can simply remove it.
    response = await test_agent(full_query, ALL_TEAM_ROUTERS[this_team])

    
    # Now you can print the specific part of the response you want
    print(response)
    print("----------------------------------------\n")
    time.sleep(5)



DeBeauvoir
Nietzsche
Nussbaum
Aristotle
Dewey
Nagarjuna
--- Running tests from the_teams/long_term_queries.json ---

--- Testing: DeBeauvoir ---

>>> User Query: I need help figuring out my life's purpose and defining a project that is truly my own.
<<< Agent Response: Ah, so you seek a purpose, a project worthy of your unique existence? Do not be mistaken; this is not a pre-packaged answer you can simply *find*. Life's meaning is not a hidden treasure waiting to be unearthed. It is a structure you *create*, a project you *build* with the very substance of your freedom.

First, tell me: What do you *value* above all else? Not what you *should* value, according to society or your upbringing, but what truly ignites the flame within you? Be brutally honest. Is it love? Justice? Creation? Knowledge? Name it, and be prepared to defend it against the indifference of the world.

Now, let us move beyond mere abstract values. How will you *act* on these values? What concrete commitments are you

In [None]:
team_name = "oracle"
agent_list = []

# Collect all unique agent names across all four levels
descriptions = load_config(f"{team_name}_descriptions")
all_agent_names = list(descriptions.keys())
instructions = load_config(f"{team_name}_instructions")

agent_name = "BalanceCoach"
print(agent_name)
# All specialist agents get the memory tool
output_key = f"{team_name}_response_by_{agent_name}"

current_agent =  Agent(
    name=agent_name,
    description=descriptions[agent_name],
    model=agent_LLM,
    instruction=instructions[agent_name],
    output_key=output_key,
    tools=[memory_tool] # Pass the shared memory tool to the specialist
)
agent_list.append(current_agent)

for agent in list(ALL_TEAM_ROUTERS.keys()):
    agent_list.append(ALL_TEAM_ROUTERS[agent])


agents_as_tools = [AgentTool(agent) for agent in agent_list]        

oracle_agent = Agent(
        name=team_name,
        description=descriptions[team_name],
        model=router_LLM,
        instruction=instructions[team_name],
        tools = agents_as_tools # Pass the shared memory tool to the specialist
    )
ALL_TEAM_ROUTERS[team_name] = oracle_agent


BalanceCoach


In [26]:
ALL_TEAM_ROUTERS.keys()

dict_keys(['long_term', 'weekly', 'daily', 'on_demand', 'oracle'])

In [28]:
this_team = "oracle"

# --- Step 1: Load the test queries from your JSON file ---
queries_path = f'the_teams/{this_team}_queries.json'
with open(queries_path, 'r') as f:
    test_queries = json.load(f)


# --- Step 3: Loop through the queries and run the tests ---
print(f"--- Running tests from {queries_path} ---\n")

for philosopher, query in test_queries.items():
    # The ADK runner routes to the agent based on its name in the query.
    # We'll construct the full input string.
    # Note: The 'LaoTzu' query will fail gracefully if you don't have a 'LaoTzu' agent.
    #full_query = f"{philosopher}, {query}"
    full_query = f"{query}"
    
    print(f"--- Testing: {philosopher} ---")
    #print(f"[User Query]: {full_query}\n")
    
    # Await the response and capture the DebugInfo object
    # Note: The current google-adk `run_debug` is synchronous. If your code
    # doesn't need 'await', you can simply remove it.
    response = await test_agent(full_query, ALL_TEAM_ROUTERS[this_team])

    
    # Now you can print the specific part of the response you want
    print(response)
    print("----------------------------------------\n")
    time.sleep(5)


--- Running tests from the_teams/oracle_queries.json ---

--- Testing: on_demand ---

>>> User Query: I'm about to send a risky email, should I use the Categorical Imperative on it first?
<<< Agent Response: Ah, a precarious undertaking! Let us dissect the moral implications of this "risky email" through the rigorous lens of the Categorical Imperative. To do this, we must proceed methodically:

1.  **Formulate the Maxim:**
    The first step is to distill the underlying principle of your action. What rule are you, in essence, following by sending this email? Let us assume your maxim is: "I will send this email, even though it carries potential risks, to achieve a desired outcome."

2.  **Test Universality (Formula of Universal Law):**
    Now, we must consider whether this maxim could become a universal law. Could everyone, in all circumstances, send risky emails to achieve their desired outcomes?

    Consider the implications: If everyone were to send risky emails whenever they sough

In [18]:
await test_agent("What is the true nature of 'success' in a modern context?", on_demand_router_agent)



>>> User Query: What is the true nature of 'success' in a modern context?
<<< Agent Response: The true nature of 'success' in a modern context is a profound question. If one achieves stated goals, wealth, and acclaim but feels emptiness or compromises principles, have they truly succeeded? Conversely, can one be deemed 'unsuccessful' by societal standards yet live a life of purpose, strong relationships, and inner peace?

Is success an external metric, an internal feeling, or merely the efficient attainment of any chosen aim? Does the 'modern context' change the superficial trappings of success, or does it fundamentally shift the essence of a life well-lived?

The true measure of any concept lies not in fleeting opinions, but in the rigorous examination of one's own fundamental assumptions. Therefore, challenge what you believe to be true and scrutinize the foundations upon which you build your understanding of a flourishing life.


In [None]:
import json
import os
from pathlib import Path
from typing import Literal, Optional, Dict, Any, Final
from google.adk.agents import Agent
from google.adk.models.google_llm import Gemini
from google.adk.tools import FunctionTool, AgentTool

# --- 1. CORE CONFIGURATION PATHS ---
CONFIG_DIR = Path("config")
AGENT_DIR = Path("agents")

# Paths for the four Tier 2 clusters (used to load instructions/descriptions)
LEVEL_CONFIGS = {
    "daily": "daily",
    "weekly": "weekly",
    "long_term": "long_term",
    "on_demand": "on_demand",
}

# --- 2. LLM SETUP ---
# Use the correct model for complex reasoning and tool use
LLM = Gemini(model="gemini-2.5-pro", temperature=0.1)


        
# --- 4. CONFIG LOADING HELPERS ---

def load_config(filename: str) -> Dict[str, Any]:
    """Loads a JSON configuration file."""
    try:
        with open(CONFIG_DIR / f"{filename}.json", 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        print(f"Error: Configuration file not found at {CONFIG_DIR / f'{filename}.json'}")
        return {}

def create_specialist_agent(name: str, cluster_name: str, memory_tool: FunctionTool) -> Agent:
    """Creates a Tier 3 specialist agent with specific instructions and the memory tool."""
    
    # Load instructions from the specific cluster file
    instructions = load_config(f"{cluster_name}_instructions").get(name, f"SYSTEM ERROR: No instructions found for {name}.")
    
    # All specialist agents get the memory tool
    return Agent(
        name=name,
        description=f"A specialist philosophical agent ({name}) ready to execute a task.",
        model=LLM,
        system_instruction=instructions,
        tools=[memory_tool] # Pass the shared memory tool to the specialist
    )

# --- 5. AGENT DEFINITION FUNCTION ---

def define_agents(memory_tool: FunctionTool) -> Agent:
    """
    Defines the complete three-tiered agent architecture:
    RootRouter (Tier 1) -> Cluster Routers (Tier 2) -> Specialist Agents (Tier 3)
    """

    # --- 5a. TIER 3: SPECIALIST AGENTS ---
    # These agents are grouped by the cluster they belong to, but we create unique instances
    # for all 13 agents.

    SPECIALIST_AGENTS = {}
    
    # Collect all unique agent names across all four levels
    all_agent_names = set()
    for config_name in LEVEL_CONFIGS.values():
        descriptions = load_config(f"{config_name}_descriptions")
        all_agent_names.update(descriptions.keys())

    # Create the unique instance for each specialist (Tier 3)
    for agent_name in all_agent_names:
        # NOTE: We approximate the cluster name for instruction loading, 
        # but the specific instruction file will cover it.
        # For simplicity, we assume one instruction file per philosopher exists if their name is found.
        
        # We need a more robust way to map the agent name back to the correct instructions file.
        # Since we structured the files so one file holds instructions for all agents in that cluster,
        # we'll iterate through the instruction files to find where the agent is defined.
        
        cluster_name = None
        for name, config_prefix in LEVEL_CONFIGS.items():
            instructions_data = load_config(f"{config_prefix}_instructions")
            if agent_name in instructions_data:
                cluster_name = config_prefix
                break
        
        if cluster_name:
            SPECIALIST_AGENTS[agent_name] = create_specialist_agent(
                name=agent_name, 
                cluster_name=cluster_name, 
                memory_tool=memory_tool
            )
        else:
             print(f"Warning: Could not find instructions for {agent_name}. Skipping creation.")

    # --- 5b. TIER 2: CLUSTER ROUTERS ---
    
    CLUSTER_TOOLS = {}

    for cluster_name, config_prefix in LEVEL_CONFIGS.items():
        instructions_data = load_config(f"{config_prefix}_instructions")
        descriptions_data = load_config(f"{config_prefix}_descriptions")
        
        # Extract the instruction for the router itself
        router_instruction = instructions_data.get("ClusterRouter", f"ERROR: Router instruction missing for {cluster_name}")

        # Create AgentTools for the specialist agents defined in this cluster's description
        cluster_tools = []
        for agent_name, description in descriptions_data.items():
            if agent_name in SPECIALIST_AGENTS:
                cluster_tools.append(
                    AgentTool(
                        agent=SPECIALIST_AGENTS[agent_name],
                        description=description
                    )
                )
        
        # Define the Tier 2 Router Agent
        router_agent = Agent(
            name=f"{config_prefix.title()}Router",
            description=f"Handles all {config_prefix} tasks (Level 2/3/4).",
            model=LLM,
            system_instruction=router_instruction,
            tools=cluster_tools
        )
        
        # Create the AgentTool wrapper for the Root Router
        CLUSTER_TOOLS[config_prefix] = AgentTool(
            agent=router_agent,
            description=descriptions_data.get("ClusterRouter", f"Tier 2 Router for {config_prefix} tasks.")
        )


    # --- 5c. TIER 1: ROOT ROUTER ---

    root_instruction = load_config("root_router_instructions").get("RootRouter", "SYSTEM ERROR: Root instruction missing.")
    root_descriptions = load_config("root_router_descriptions")
    
    # Map the root descriptions to the Cluster Tools created above
    final_root_tools = []
    
    # We must ensure the keys in root_descriptions match the keys in CLUSTER_TOOLS
    root_router_mapping = {
        "on_demand": "on_demand",
        "daily": "daily",
        "weekly": "weekly",
        "long_term": "long_term"
    }

    for key, cluster_prefix in root_router_mapping.items():
        if cluster_prefix in CLUSTER_TOOLS and key in root_descriptions:
            # Overwrite the description in the AgentTool wrapper with the root description
            cluster_tool = CLUSTER_TOOLS[cluster_prefix]
            cluster_tool.description = root_descriptions[key]
            final_root_tools.append(cluster_tool)
        else:
            print(f"Warning: Missing definition or description for {cluster_prefix} cluster at the root level.")


    ROOT_AGENT = Agent(
        name="LifeArchitect",
        description="The ultimate philosophical guidance system.",
        model=LLM,
        system_instruction=root_instruction,
        tools=final_root_tools
    )
    
    return ROOT_AGENT

# --- 6. EXAMPLE RUNNER SETUP ---
if __name__ == "__main__":
    from google.adk.runners import InMemoryRunner
    
    # Ensure all necessary files/directories exist before trying to run
    if not CONFIG_DIR.exists():
        print("Error: 'config' directory not found. Please ensure all JSON files are generated and placed in a 'config' folder.")
    elif not all((CONFIG_DIR / f"{c}_instructions.json").exists() and (CONFIG_DIR / f"{c}_descriptions.json").exists() for c in LEVEL_CONFIGS.values()):
        print("Error: Some required config JSON files are missing. Please check the 'config' folder.")
    else:
        # 1. Initialize Memory Tool
        memory_tool = SharedMemoryTool()
        
        # 2. Define the Agent Hierarchy
        root_agent = define_agents(memory_tool)
        
        # 3. Initialize Runner
        runner = InMemoryRunner(agent=root_agent)
        print("✅ ADK Agent and Memory Tool initialized successfully.")
        print("-" * 30)

        # --- Test Scenarios ---
        
        async def run_test(query: str):
            print(f"\nUser Query: {query}")
            response = await runner.run(query)
            print(f"Agent Response: {response.text}")
            print("-" * 30)
            
        import asyncio
        
        # Use asyncio.run to execute the async tests
        print("--- Running Test 1 (Long-Term Goal Setting) ---")
        asyncio.run(run_test("I need to define my long-term goal for the next 5 years."))
        
        print("--- Running Test 2 (Daily Log) ---")
        asyncio.run(run_test("What is a simple thing I can log right now to practice mindfulness?"))

        print("--- Running Test 3 (Weekly Review) ---")
        asyncio.run(run_test("I need help reviewing my moral consistency from the past week."))

        print("--- Running Test 4 (On-Demand Crisis) ---")
        asyncio.run(run_test("I have a crisis: my friend betrayed me. What should I do right now?"))
        
        # Example using the memory tool (requires prior setting)
        print("--- Running Test 5 (Memory Read/Write Test) ---")
        async def memory_test():
             # Write a value (this will be routed through an agent that decides to use the tool)
             write_response = await runner.run("My current focus for the week is to practice temperance.")
             print(f"Write attempt: {write_response.text}")
             
             # Directly read the memory key using the tool instance (for demonstration)
             read_value = memory_tool.read_key_value(key="current_virtue")
             print(f"Direct Read: Current Virtue is '{read_value}'")
             
             # Attempt to use an agent to read it back (weekly/daily Aristotle should use this)
             read_response = await runner.run("What was my current virtue focus set for this week?")
             print(f"Agent Read attempt: {read_response.text}")
             
        asyncio.run(memory_test())