# [Lv4-Day2-Lab2] Part 3: ADK `Custom Agent` - The Power of Full Control

### 실습 목표
Part 1에서 `LLM Agent`, Part 2에서 `Workflow Agent`를 경험했습니다. 이제 ADK의 최고 수준인 **`Custom Agent`**를 학습합니다.

1. **`Custom Agent` 개념 이해**: `BaseAgent`를 상속하여 Agent의 핵심 동작 로직을 직접 정의
2. **`_run_async_impl` 구현**: Agent의 심장인 핵심 메소드를 오버라이딩하여 독자적인 로직 구현
3. **상태 관리와 이벤트 시스템**: ADK의 이벤트 기반 아키텍처 이해
4. **Agent 유형 선택 기준**: 언제 어떤 Agent를 사용해야 하는지 명확한 기준 확립

## 🚀 1. 환경 설정 및 라이브러리 설치

In [1]:
# ADK 설치
# !pip install --upgrade --quiet google-adk

In [2]:
import os
from getpass import getpass

# API 키 설정
if "GOOGLE_API_KEY" not in os.environ:
    api_key = getpass("Google AI Studio API 키를 입력하세요: ")
    os.environ["GOOGLE_API_KEY"] = api_key
    print("✅ API 키가 설정되었습니다.")
else:
    print("✅ 기존 API 키를 사용합니다.")

Google AI Studio API 키를 입력하세요: ········
✅ API 키가 설정되었습니다.


## 🏗️ 2. Custom Agent 설계: Turn Counter Agent

ADK의 `BaseAgent`를 상속하여 대화 턴을 세는 Custom Agent를 만들어보겠습니다. 이 Agent는:
1. 대화 턴 수를 세션 상태에 저장
2. 매 응답마다 현재 턴 수를 포함
3. Tool 사용 기능 유지
4. ADK의 이벤트 시스템 활용

In [7]:
from google.adk.agents import BaseAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event, EventActions
from google.genai import types
import math
from typing import AsyncGenerator, List, Optional, Any
from pydantic import Field, PrivateAttr


# Tool 함수 정의 (Part 1과 동일)
def calculate_circle_area(radius: float) -> dict:
    """반지름(radius)을 입력받아 원의 넓이를 계산합니다.

    Args:
        radius (float): 원의 반지름

    Returns:
        dict: 계산 결과를 포함한 딕셔너리
    """
    if radius <= 0:
        return {"status": "error", "error_message": "반지름은 0보다 큰 값이어야 합니다."}

    area = math.pi * (radius**2)
    return {"status": "success", "radius": radius, "area": round(area, 4), "formula": "π × r²"}


def get_conversation_stats() -> dict:
    """현재 대화 통계를 반환합니다.

    Returns:
        dict: 대화 관련 통계 정보
    """
    return {
        "agent_type": "Custom Turn Counter Agent",
        "features": ["턴 카운팅", "상태 관리", "이벤트 처리", "Tool 통합"],
        "description": "BaseAgent를 상속한 완전한 커스텀 Agent",
    }


print("✅ Tool 함수들이 정의되었습니다.")

✅ Tool 함수들이 정의되었습니다.


In [8]:
# Custom Agent 클래스 정의
class TurnCountingAgent(BaseAgent):
    """대화 턴을 세고 상태를 관리하는 Custom Agent"""

    # 공개 Pydantic 필드 선언
    model_name: str = Field(description="사용할 LLM 모델명")
    tools: List = Field(default_factory=list, description="사용 가능한 도구들")

    # 비공개 속성 선언
    _llm: Any = PrivateAttr()

    # Pydantic 설정
    model_config = {"arbitrary_types_allowed": True}

    def __init__(self, name: str, model: str, tools=None, **kwargs):
        """Custom Agent 초기화"""
        super().__init__(name=name, model_name=model, tools=tools or [], **kwargs)

        # LLM 클라이언트 초기화
        import google.generativeai as genai

        genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
        self._llm = genai.GenerativeModel(model)

    async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
        """Custom Agent의 핵심 실행 로직"""

        # 1. 세션 상태에서 턴 수 관리
        current_turn = ctx.session.state.get("turn_count", 0) + 1

        print(f"🔄 [Custom Agent] Turn {current_turn} 시작")

        # 2. 사용자 메시지 추출
        user_message = ""
        if ctx.user_content and ctx.user_content.parts:
            user_message = ctx.user_content.parts[0].text

        print(f"   📝 사용자: {user_message}")

        # 3. 상태 업데이트 이벤트 생성
        yield Event(author=self.name, actions=EventActions(state_delta={"turn_count": current_turn}))

        try:
            # 4. 사용자 질문 분석하여 적절한 Tool 직접 호출
            if "원의 넓이" in user_message or "계산" in user_message:
                # 반지름 추출 시도
                import re

                radius_match = re.search(r"반지름.*?(\d+(?:\.\d+)?)", user_message)
                if radius_match:
                    radius = float(radius_match.group(1))
                    final_text = self._execute_circle_calculation(radius, current_turn)
                else:
                    final_text = f"[턴 {current_turn}] 반지름 값을 찾을 수 없습니다. 예: '반지름이 5인 원의 넓이를 계산해주세요.'"

            elif "대화 통계" in user_message or "통계" in user_message:
                final_text = self._execute_conversation_stats(current_turn)

            elif "몇 번" in user_message and "대화" in user_message:
                final_text = f"[턴 {current_turn}] 현재까지 총 {current_turn}번의 대화 턴이 진행되었습니다. 저는 각 대화 턴을 추적하고 있습니다."

            else:
                # 일반적인 질문은 LLM에게 전달
                final_text = self._handle_general_question(user_message, current_turn)

            # 5. 최종 응답 이벤트 생성
            yield Event(author=self.name, content=types.Content(role="model", parts=[types.Part(text=final_text)]))

        except Exception as e:
            print(f"   ❌ 오류: {str(e)}")
            error_msg = f"[턴 {current_turn}] 죄송합니다. 오류가 발생했습니다: {str(e)}"

            yield Event(author=self.name, content=types.Content(role="model", parts=[types.Part(text=error_msg)]))

    def _execute_circle_calculation(self, radius: float, current_turn: int) -> str:
        """원의 넓이 계산 Tool 직접 실행"""
        print(f"   🛠️ 직접 Tool 호출: calculate_circle_area(radius={radius})")

        try:
            result = calculate_circle_area(radius)
            print(f"   ✅ Tool 실행 성공: {result}")

            if result.get("status") == "success":
                return f"[턴 {current_turn}] 원의 넓이 계산 결과:\n- 반지름: {result['radius']}\n- 넓이: {result['area']}\n- 공식: {result['formula']}\n\n계산이 완료되었습니다!"
            else:
                return f"[턴 {current_turn}] 계산 오류: {result.get('error_message', '알 수 없는 오류')}"

        except Exception as e:
            print(f"   ❌ Tool 실행 오류: {str(e)}")
            return f"[턴 {current_turn}] Tool 실행 중 오류가 발생했습니다: {str(e)}"

    def _execute_conversation_stats(self, current_turn: int) -> str:
        """대화 통계 Tool 직접 실행"""
        print(f"   🛠️ 직접 Tool 호출: get_conversation_stats()")

        try:
            result = get_conversation_stats()
            print(f"   ✅ Tool 실행 성공: {result}")

            return f"""[턴 {current_turn}] 현재 대화 통계:

🤖 Agent 정보:
- 유형: {result['agent_type']}
- 설명: {result['description']}

🔧 주요 기능:
{chr(10).join(f"- {feature}" for feature in result['features'])}

📊 현재 상태:
- 현재 턴: {current_turn}
- 상태: 정상 작동 중"""

        except Exception as e:
            print(f"   ❌ Tool 실행 오류: {str(e)}")
            return f"[턴 {current_turn}] Tool 실행 중 오류가 발생했습니다: {str(e)}"

    def _handle_general_question(self, user_message: str, current_turn: int) -> str:
        """일반적인 질문 처리"""
        print(f"   🧠 일반 질문 처리 중...")

        try:
            prompt = f"""당신은 Turn Counting Agent입니다. 현재 턴 번호는 {current_turn}입니다.

사용자 질문: {user_message}

간단하고 친절하게 응답하세요. "[턴 {current_turn}]"로 시작하세요."""

            response = self._llm.generate_content(prompt)

            if response.candidates and response.candidates[0].content.parts:
                text = response.candidates[0].content.parts[0].text
                if not text.startswith(f"[턴 {current_turn}]"):
                    text = f"[턴 {current_turn}] {text}"
                print(f"   ✅ 일반 응답 생성 완료")
                return text
            else:
                return f"[턴 {current_turn}] 응답을 생성할 수 없습니다."

        except Exception as e:
            print(f"   ❌ LLM 호출 오류: {str(e)}")
            return f"[턴 {current_turn}] LLM 호출 중 오류가 발생했습니다: {str(e)}"


print("✅ Tool 직접 호출 방식의 TurnCountingAgent가 정의되었습니다.")
print("   🔄 기능: 턴 카운팅, 확실한 Tool 실행, 명확한 응답")

✅ Tool 직접 호출 방식의 TurnCountingAgent가 정의되었습니다.
   🔄 기능: 턴 카운팅, 확실한 Tool 실행, 명확한 응답


## 🧪 3. Custom Agent 인스턴스 생성 및 테스트

In [9]:
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
import asyncio

# Custom Agent 인스턴스 생성
custom_agent = TurnCountingAgent(
    name="turn_counter_agent",
    model="gemini-2.0-flash",
    tools=[calculate_circle_area, get_conversation_stats],
    description="대화 턴을 세고 상태를 관리하는 Custom Agent",
)

# Runner와 SessionService 설정
session_service = InMemorySessionService()
runner = Runner(agent=custom_agent, app_name="custom_agent_app", session_service=session_service)

print("✅ Custom Agent와 Runner가 생성되었습니다.")
print(f"   📋 Agent 이름: {custom_agent.name}")
print(f"   🛠️ 사용 가능한 Tools: {len(custom_agent.tools)}개")

✅ Custom Agent와 Runner가 생성되었습니다.
   📋 Agent 이름: turn_counter_agent
   🛠️ 사용 가능한 Tools: 2개


In [10]:
async def call_custom_agent_async(agent, runner, session_id, query):
    """Custom Agent를 비동기적으로 호출하는 함수"""
    content = types.Content(role="user", parts=[types.Part(text=query)])

    events = runner.run_async(user_id="custom_user", session_id=session_id, new_message=content)

    responses = []

    async for event in events:
        print(f"   📧 이벤트 수신: author={event.author}")

        # 컨텐츠가 있는 이벤트 처리
        if event.content and event.content.parts:
            for part in event.content.parts:
                if hasattr(part, "text") and part.text:
                    responses.append(part.text)
                    print(f"      💬 텍스트: {part.text[:50]}...")

        # 액션이 있는 이벤트 처리 (상태 변경 등)
        if event.actions:
            if event.actions.state_delta:
                print(f"      🔄 상태 변경: {event.actions.state_delta}")

        # 최종 응답 확인 (여러 조건으로 확인)
        if event.is_final_response() or (
            event.content
            and event.content.parts
            and any(hasattr(part, "text") and part.text for part in event.content.parts)
        ):

            # 마지막 텍스트 응답 반환
            if responses:
                return responses[-1]

    # 응답이 수집된 경우 마지막 응답 반환
    if responses:
        return responses[-1]

    return "응답을 받지 못했습니다."


# 세션 생성 및 연속 대화 테스트
async def test_custom_agent():
    """Custom Agent 연속 대화 테스트"""

    print("🔄 Custom Agent 연속 대화 테스트")
    print("=" * 60)

    # 세션 생성
    session = await session_service.create_session(app_name="custom_agent_app", user_id="custom_user")

    print(f"📋 세션 생성됨: {session.id}")

    # 테스트 질문들
    queries = [
        "안녕하세요! 이 Agent에 대해 설명해주세요.",
        "반지름이 7인 원의 넓이를 계산해주세요.",
        "현재 대화 통계를 알려주세요.",
        "지금까지 몇 번의 대화를 했나요?",
    ]

    for i, query in enumerate(queries, 1):
        print(f"\n{i}. 👤 사용자: {query}")

        try:
            response = await call_custom_agent_async(custom_agent, runner, session.id, query)

            print(f"   🤖 Custom Agent: {response}")
            print("-" * 50)

        except Exception as e:
            print(f"   ❌ 오류 발생: {str(e)}")
            print("-" * 50)

    print("\n✅ Custom Agent 테스트 완료!")
    return session


# 테스트 실행
test_session = await test_custom_agent()

🔄 Custom Agent 연속 대화 테스트
📋 세션 생성됨: 5458fdd9-3007-4794-8f71-21da5cb60106

1. 👤 사용자: 안녕하세요! 이 Agent에 대해 설명해주세요.
🔄 [Custom Agent] Turn 1 시작
   📝 사용자: 안녕하세요! 이 Agent에 대해 설명해주세요.
   📧 이벤트 수신: author=turn_counter_agent
      🔄 상태 변경: {'turn_count': 1}
   🧠 일반 질문 처리 중...
   ✅ 일반 응답 생성 완료
   📧 이벤트 수신: author=turn_counter_agent
      💬 텍스트: [턴 1] 안녕하세요! 저는 턴 수를 세는 역할을 합니다. 현재 턴은 1번입니다. 궁금한 ...
   🤖 Custom Agent: [턴 1] 안녕하세요! 저는 턴 수를 세는 역할을 합니다. 현재 턴은 1번입니다. 궁금한 점이 있다면 언제든지 물어보세요!

--------------------------------------------------

2. 👤 사용자: 반지름이 7인 원의 넓이를 계산해주세요.
🔄 [Custom Agent] Turn 2 시작
   📝 사용자: 반지름이 7인 원의 넓이를 계산해주세요.
   📧 이벤트 수신: author=turn_counter_agent
      🔄 상태 변경: {'turn_count': 2}
   🛠️ 직접 Tool 호출: calculate_circle_area(radius=7.0)
   ✅ Tool 실행 성공: {'status': 'success', 'radius': 7.0, 'area': 153.938, 'formula': 'π × r²'}
   📧 이벤트 수신: author=turn_counter_agent
      💬 텍스트: [턴 2] 원의 넓이 계산 결과:
- 반지름: 7.0
- 넓이: 153.938
- 공식: ...
   🤖 Custom Agent: [턴 2] 원의 넓이 계산 결과:
- 반지름: 7.0
- 넓이: 1

## 📊 4. Custom Agent vs 기본 Agent 비교

In [11]:
from google.adk.agents import Agent

# 기본 Agent 생성 (Part 1 스타일)
basic_agent = Agent(
    name="basic_agent",
    model="gemini-2.0-flash",
    description="기본 ADK Agent",
    instruction="사용자의 질문에 간단히 답변하세요.",
    tools=[calculate_circle_area],
)

# 기본 Agent용 Runner
basic_runner = Runner(agent=basic_agent, app_name="basic_agent_app", session_service=session_service)


async def call_basic_agent_async(query):
    """기본 Agent 호출 함수"""
    session = await session_service.create_session(app_name="basic_agent_app", user_id="basic_user")

    content = types.Content(role="user", parts=[types.Part(text=query)])
    events = basic_runner.run_async(user_id="basic_user", session_id=session.id, new_message=content)

    async for event in events:
        if event.is_final_response():
            return event.content.parts[0].text
    return "응답 없음"


# 비교 테스트
async def compare_agents():
    """Custom Agent vs 기본 Agent 비교"""

    print("⚖️ Custom Agent vs 기본 Agent 비교")
    print("=" * 70)

    test_query = "반지름이 5인 원의 넓이를 계산해주세요."
    print(f"📝 테스트 질문: {test_query}")
    print("-" * 70)

    # 기본 Agent 테스트
    print("\n🔸 기본 Agent (LLM Agent):")
    basic_response = await call_basic_agent_async(test_query)
    print(f"   응답: {basic_response[:100]}...")

    # Custom Agent 테스트 (기존 세션 사용)
    print("\n🔹 Custom Agent (Turn Counter):")
    custom_response = await call_custom_agent_async(custom_agent, runner, test_session.id, test_query)
    print(f"   응답: {custom_response[:100]}...")

    print("\n" + "=" * 70)


await compare_agents()

⚖️ Custom Agent vs 기본 Agent 비교
📝 테스트 질문: 반지름이 5인 원의 넓이를 계산해주세요.
----------------------------------------------------------------------

🔸 기본 Agent (LLM Agent):




   응답: 반지름이 5인 원의 넓이는 78.5398입니다....

🔹 Custom Agent (Turn Counter):
🔄 [Custom Agent] Turn 5 시작
   📝 사용자: 반지름이 5인 원의 넓이를 계산해주세요.
   📧 이벤트 수신: author=turn_counter_agent
      🔄 상태 변경: {'turn_count': 5}
   🛠️ 직접 Tool 호출: calculate_circle_area(radius=5.0)
   ✅ Tool 실행 성공: {'status': 'success', 'radius': 5.0, 'area': 78.5398, 'formula': 'π × r²'}
   📧 이벤트 수신: author=turn_counter_agent
      💬 텍스트: [턴 5] 원의 넓이 계산 결과:
- 반지름: 5.0
- 넓이: 78.5398
- 공식: ...
   응답: [턴 5] 원의 넓이 계산 결과:
- 반지름: 5.0
- 넓이: 78.5398
- 공식: π × r²

계산이 완료되었습니다!...



## 🎯 5. 세션 상태 확인 및 분석

In [12]:
# 세션 상태 분석
async def analyze_session_state():
    """Custom Agent의 세션 상태 분석"""

    print("🔍 Custom Agent 세션 상태 분석")
    print("=" * 50)

    # 현재 세션 정보 가져오기
    session_info = await session_service.get_session(
        app_name="custom_agent_app", user_id="custom_user", session_id=test_session.id
    )

    print(f"📋 세션 ID: {session_info.id}")
    print(f"👤 사용자 ID: {session_info.user_id}")
    print(f"📱 앱 이름: {session_info.app_name}")

    # 상태 정보 출력
    print("\n🧠 세션 상태:")
    for key, value in session_info.state.items():
        print(f"   {key}: {value}")

    # 턴 카운터 확인
    turn_count = session_info.state.get("turn_count", 0)
    print(f"\n🔄 총 대화 턴 수: {turn_count}")

    print("\n" + "=" * 50)

    return session_info


session_analysis = await analyze_session_state()

🔍 Custom Agent 세션 상태 분석
📋 세션 ID: 5458fdd9-3007-4794-8f71-21da5cb60106
👤 사용자 ID: custom_user
📱 앱 이름: custom_agent_app

🧠 세션 상태:
   turn_count: 5

🔄 총 대화 턴 수: 5

