In [None]:
from dotenv import load_dotenv
load_dotenv()

import sys, os
sys.path.insert(0, os.path.abspath(".."))
from src.core2 import nodes_jasosu, nodes_job, nodes_main, graph

In [None]:
# 주피터 셀에서 그대로 실행
import os
from typing import Literal
from typing_extensions import TypedDict

from agents import Agent, Runner, function_tool, ItemHelpers, RunConfig
from openai.types.responses import ResponseTextDeltaEvent, ResponseFunctionCallArgumentsDeltaEvent

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

class CalcInput(TypedDict):
    op: Literal["add", "sub", "mul", "div", "pow"]
    a: float
    b: float

@function_tool
def calculator(args: CalcInput) -> float:
    # 도구에 실제로 전달된 인자/결과를 직접 로그
    print(f"\n[calculator 호출] args={args}")
    op = args["op"]
    a = float(args["a"]); b = float(args["b"])
    if op == "add": res = a + b
    elif op == "sub": res = a - b
    elif op == "mul": res = a * b
    elif op == "div":
        if b == 0: raise ValueError("division by zero")
        res = a / b
    elif op == "pow": res = a ** b
    else: raise ValueError(f"unsupported op: {op}")
    print(f"[calculator 결과] {res}\n")
    return res

agent = Agent(
    name="수학도우미",
    instructions=(
        "너는 수학 도우미다. 한국어로 간결하게 답하라. "
        "산술이 필요하면 반드시 'calculator' 도구를 사용하고, "
        "복합식은 단계로 나눠 여러 번 호출하라."
    ),
    tools=[calculator],
)

# (선택) 모델 호출 직전 입력 로깅
def log_model_input(data):
    mi = getattr(data, "model_data", None)
    print("\n=== [LLM 입력 직전] ===")
    try:
        print(f"[Instructions]\n{getattr(mi, 'instructions', None)}")
        items = getattr(mi, "input", [])
        print(f"[Input items] count={len(items)}")
    finally:
        print("========================\n")
    return mi

async def main():
    result = Runner.run_streamed(
        agent,
        input="한국어로 답해줘. (12.5 * 3) + (2^5) - 7 를 계산해줘.",
        run_config=RunConfig(
            call_model_input_filter=log_model_input,  # 선택
        ),
    )

    print("=== 실행 시작 ===")
    final_text = ""

    async for event in result.stream_events():
        # 1) LLM의 '원시' 스트림 이벤트 처리 (토큰 단위)
        if event.type == "raw_response_event":
            data = event.data
            # LLM 텍스트 토큰
            if isinstance(data, ResponseTextDeltaEvent):
                print(data.delta, end="", flush=True)  # [LLM 토큰]
                final_text += data.delta
            # 함수 인자 토큰(JSON 조각)
            elif isinstance(data, ResponseFunctionCallArgumentsDeltaEvent):
                print(f"\n[인자 토큰] {data.delta}", end="", flush=True)
            continue

        # 2) 에이전트 처리된 아이템 이벤트 (메시지/도구 호출/결과)
        if event.type == "run_item_stream_event":
            # 메시지 완성 시점 (사양상 이름 문자열로 구분)
            if event.name == "message_output_created":
                try:
                    msg = ItemHelpers.extract_last_text(getattr(event.item, "raw_item", event.item))
                    if msg:
                        print(f"\n[LLM 메시지 완성]\n{msg}")
                except Exception:
                    pass

            # 도구 호출(완성된 name/arguments)
            elif event.name == "tool_called":
                raw = getattr(event.item, "raw_item", event.item)
                tool_name = getattr(raw, "name", None) or (raw.get("name") if isinstance(raw, dict) else None)
                tool_args = getattr(raw, "arguments", None) or (raw.get("arguments") if isinstance(raw, dict) else None)
                print(f"\n[도구 호출] name={tool_name} arguments={tool_args}")

            # 도구 실행 결과
            elif event.name == "tool_output":
                out = getattr(event.item, "output", None)
                if out is None:
                    raw = getattr(event.item, "raw_item", None)
                    if isinstance(raw, dict):
                        out = raw.get("output")
                print(f"[도구 결과] {out}")

        # 3) 에이전트 핸드오프 등
        if event.type == "agent_updated_stream_event":
            print(f"\n[에이전트 변경] -> {event.new_agent.name}")

    print("\n=== 실행 종료 ===")
    print("\n[최종 출력]")
    print(final_text)

# 노트북에서는 최상위 await 사용
await main()


In [None]:
# Jupyter 셀: GPT API 요청/응답 '전문' + Agents SDK 스트리밍 실행
import os, sys, json, asyncio, httpx
from typing import Literal
from typing_extensions import TypedDict

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    Agent, Runner, function_tool,
    RunConfig,
)

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# --- 요청/응답 전문 로깅용 스트림/트랜스포트 구현 ---
class LoggingAStream(httpx.AsyncByteStream):
    def __init__(self, inner):
        self.inner = inner  # 보통 httpx._client.BoundAsyncStream

    # HTTPX가 실제로 소비하는 인터페이스
    async def __aiter__(self):
        async for chunk in self.inner:  # inner.aiter_bytes()가 아님
            # 응답 바이트 원문 그대로 흘려보내되, 텍스트 가능한 경우에만 콘솔로 출력
            try:
                sys.stdout.write(chunk.decode("utf-8", errors="ignore"))
                sys.stdout.flush()
            except Exception:
                pass  # 바이너리면 그대로 통과
            yield chunk

    # 호환성: aiter_bytes도 __aiter__에 위임
    async def aiter_bytes(self):
        async for chunk in self.__aiter__():
            yield chunk

    async def aclose(self):
        if hasattr(self.inner, "aclose"):
            await self.inner.aclose()

class LoggingAsyncTransport(httpx.AsyncBaseTransport):
    def __init__(self, transport: httpx.AsyncBaseTransport):
        self.transport = transport

    async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
        # 요청 라인/헤더/본문(가능할 때) 출력 - 전문 그대로
        try:
            sys.stdout.write(f">>> REQUEST {request.method} {request.url}\n")
            sys.stdout.write(json.dumps(dict(request.headers), ensure_ascii=False) + "\n")
            body = request.content
            if body:
                try:
                    sys.stdout.write(body.decode("utf-8") + ("" if body.endswith(b"\n") else "\n"))
                except Exception:
                    # 바이너리/멀티파트 등은 길이만 표시
                    sys.stdout.write(f"[{len(body)} bytes]\n")
            sys.stdout.flush()
        except Exception:
            pass

        # 실제 전송
        response = await self.transport.handle_async_request(request)

        # 응답 헤더 출력, 바디는 스트리밍으로 Tee
        try:
            sys.stdout.write(f"<<< RESPONSE {request.method} {request.url} {response.status_code}\n")
            sys.stdout.write(json.dumps(dict(response.headers), ensure_ascii=False) + "\n")
            sys.stdout.flush()
        except Exception:
            pass

        wrapped_stream = LoggingAStream(response.stream)
        return httpx.Response(
            status_code=response.status_code,
            headers=response.headers,
            stream=wrapped_stream,     # 바디는 위 스트림을 통해 그대로 출력됨
            extensions=response.extensions,
            request=request,
        )

def jdump(obj):
    try:
        if hasattr(obj, "model_dump"):
            return json.dumps(obj.model_dump(), ensure_ascii=False, separators=(",",":"))
        if hasattr(obj, "to_dict"):
            return json.dumps(obj.to_dict(), ensure_ascii=False, separators=(",",":"))
        return json.dumps(obj, ensure_ascii=False, separators=(",",":"))
    except Exception:
        return str(obj)

# --- OpenAI 클라이언트 생성 및 Agents SDK에 주입 ---
async_http_client = httpx.AsyncClient(
    transport=LoggingAsyncTransport(httpx.AsyncHTTPTransport()),
    timeout=None,
)
openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY, http_client=async_http_client)
set_default_openai_client(openai_client)  # 이후 모든 LLM 호출에 동일 로깅 적용

# --- 테스트용 에이전트/도구 정의 ---
class CalcInput(TypedDict):
    op: Literal["add","sub","mul","div","pow"]
    a: float
    b: float

@function_tool
def calculator(args: CalcInput) -> float:
    op = args["op"]; a = float(args["a"]); b = float(args["b"])
    if op == "add": return a + b
    if op == "sub": return a - b
    if op == "mul": return a * b
    if op == "div":
        if b == 0: raise ValueError("division by zero")
        return a / b
    if op == "pow": return a ** b
    raise ValueError(f"unsupported op: {op}")

agent = Agent(
    name="수학도우미",
    instructions=(
        "너는 수학 도우미다. 한국어로 간결하게 답하라. "
        "산술이 필요하면 반드시 'calculator' 도구를 사용하고, "
        "복합식은 단계로 나눠 여러 번 호출하라."
    ),
    tools=[calculator],
)

# --- 스트리밍 실행: 이벤트는 가공 최소화(원한다면 추가 출력 제거 가능) ---
async def main():
    result = Runner.run_streamed(
        agent,
        input="한국어로 답해줘. (12.5 * 3) + (2^5) - 7 를 계산해줘.",
        run_config=RunConfig(),  # 필요 시 trace_include_sensitive_data 등 추가 가능
    )

    # HTTP 전문은 위 트랜스포트에서 이미 출력됨. 아래는 이벤트를 소비만 함.
    async for _ in result.stream_events():
        pass

# 주피터: 최상위 await로 실행
await main()
