In [39]:
import os
import time
import json
import requests
import yfinance as yf
from datetime import datetime
from dotenv import load_dotenv
from datetime import datetime, timezone

# .env 파일 로드 (환경변수 로드)
load_dotenv()

# 환경 변수에서 API Key 및 기타 기본 설정 불러오기
OPENAI_API_KEY = os.getenv("CAPSTONE_OPENAI_API")
OPENAI_URL = "https://api.openai.com/v1/responses"
UA = "Mozilla/5.0"  # User-Agent 헤더

# 베이스 에이전트 클래스 정의
class BaseAgent:
    def __init__(
        self,
        model: str | None = None,                     # 강제 사용할 모델 지정 (기본 없음)
        preferred_models: list[str] | None = None,    # 후보 모델 리스트
        temperature: float = 0.2,                     # 생성 temperature 설정
        bounds_tolerance: float = 0.15,               # 가격 허용 오차 비율 (±15%)
        reask_on_inconsistent: bool = True,           # 예측값이 비정상일 경우 재질문 여부
        price_period: str = "3mo",                    # 가격 통계 추출 범위
        use_price_snapshot: bool = True               # yfinance로 price_period 기간 가격 스탭샷(최고가, 최저가) 가져 
    ):
        # API 키 로드 및 유효성 검사
        OPENAI_API_KEY = os.getenv("CAPSTONE_OPENAI_API")
        if not OPENAI_API_KEY:
            raise RuntimeError("환경변수 OPENAI_API_KEY가 필요합니다.")

        # API 요청 헤더 세팅
        self.headers = {
            "Authorization": f"Bearer {OPENAI_API_KEY}",
            "Content-Type": "application/json"
        }

        # 주요 하이퍼파라미터 저장
        self.temperature = temperature
        self.bounds_tolerance = bounds_tolerance
        self.reask_on_inconsistent = reask_on_inconsistent
        self.price_period = price_period
        self.use_price_snapshot = use_price_snapshot 

        # 사용할 모델 순위 설정
        self.preferred_models = preferred_models or ["gpt-5-mini", "gpt-4.1-mini"]
        if model:
            self.preferred_models = [model] + [m for m in self.preferred_models if m != model]

        # 출력값 스키마 정의 (모든 Agent가 공유 가능)
        self.schema_obj = {
            # 최상위 응답은 객체(JSON 오브젝트)여야 함
            "type": "object",
            
            # 객체 어떤 속성들이 존재할 수 있는지를 정의
            "properties": {
                "buy_price":  {"type": "number", "description": "오늘 기준 목표 매수가"},
                "sell_price": {"type": "number", "description": "오늘 기준 목표 매도가"},
                "reason":     {"type": "string", "description": "근거 요약 (한국어 4~5문장)"}
            },
            # 객체는 아래 속성을 반드시 포함할 것
            "required": ["buy_price", "sell_price", "reason"],  
            
            # 그외 속성은 포함하지 않음
            "additionalProperties": False
        }

    # ticker 정규화 코드
    def _normalize_ticker(self, ticker: str) -> str:
        # ticker 양옆 공백제거, 대문자로 변경
        t = ticker.strip().upper()
        
        # 한국 6자리면 기본 KOSPI
        if t.isdigit() and len(t) == 6:
            return t + ".KS"  
        return t

    # 통화 단위와 소수점 자리 결정
    def _detect_currency_and_decimals(self, ticker: str) -> tuple[str, int]:
        try:
            info = yf.Ticker(ticker).info
            ccy = (info.get("currency") or "KRW").upper()
        except Exception:
            ccy = "KRW"
        decimals = 0 if ccy in ("KRW", "JPY") else 2
        return ccy, decimals

    # OpenAI API 호출 with fallback (모델 우선순위 순회)
    def _ask_with_fallback(self, msg_sys: dict, msg_user: dict, schema_obj: dict) -> dict:
        body_base = {
            "input": [msg_sys, msg_user],
            "text": {
                "format": {
                    "type": "json_schema",
                    "name": "ValuationTargets",
                    "strict": True,
                    "schema": self.schema_obj
                }
            },
            "temperature": self.temperature
        }

        last_err = None
        for m in self.preferred_models:
            body = dict(body_base, model=m)
            r = requests.post(OPENAI_URL, json=body, headers=self.headers, timeout=120)
            if r.ok:
                return r.json()
            if r.status_code in (400, 404):
                last_err = (r.status_code, r.text)
                continue
            r.raise_for_status()
        raise RuntimeError(f"모든 모델 실패. 마지막 오류: {last_err}")

    # API 응답에서 가격/근거 추출 및 형식 변환
    def _parse_result(self, resp_json: dict, decimals: int) -> tuple[float, float, str]:
        txt = resp_json.get("output_text")
        if not txt:
            txt = resp_json["output"][0]["content"][0]["text"]
        obj = json.loads(txt)
        buy = round(float(obj["buy_price"]), decimals)
        sell = round(float(obj["sell_price"]), decimals)
        reason = obj["reason"]
        return buy, sell, reason

    # 예측값 정합성 검사 (범위와 순서 확인)
    def _sanity_check(self, buy: float, sell: float, price_stats: dict) -> bool:
        if not price_stats:
            return True
        last = price_stats["close_last"]
        lo, hi = price_stats["close_min_3mo"], price_stats["close_max_3mo"]
        tol = self.bounds_tolerance
        span_ok = (lo*(1-tol) <= buy <= hi*(1+tol)) and (lo*(1-tol) <= sell <= hi*(1+tol))
        order_ok = sell >= buy
        # 추가로, 극단적으로 현재가와 동떨어지면 경고
        # (옵션) abs(buy-last)/last <= 50% 등도 넣을 수 있음
        return span_ok and order_ok

    # 예측값을 허용 범위 내로 보정 (클리핑)
    def _clip_to_bounds(self, buy: float, sell: float, price_stats: dict, decimals: int) -> tuple[float, float]:
        lo, hi = price_stats["close_min_3mo"], price_stats["close_max_3mo"]
        tol = self.bounds_tolerance
        lo_b = lo * (1 - tol)
        hi_b = hi * (1 + tol)
        buy2 = min(max(buy, lo_b), hi_b)         # buy를 lo_b ~ hi_b 범위로 클리핑
        sell2 = min(max(sell, buy2), hi_b)       # sell은 buy 이상 hi_b 이하
        return round(buy2, decimals), round(sell2, decimals)

    # 특정 티커의 과거 3개월 가격 데이터 통계 반환
    def _get_price_snapshot(self, ticker: str) -> dict:
        st = yf.Ticker(ticker)
        hist = st.history(period=self.price_period)
        if hist.empty:
            raise ValueError(f"{ticker}에 대한 가격 기록이 없습니다.")
        return {
            "close_last": float(hist["Close"].iloc[-1]),
            "close_mean_3mo": float(hist["Close"].mean()),
            "close_min_3mo": float(hist["Close"].min()),
            "close_max_3mo": float(hist["Close"].max()),
            "vol_mean_3mo": float(hist["Volume"].mean()),
        }

In [40]:
# ValuationAgent는 BaseAgent를 상속받아 단기 주가 예측을 담당하는 에이전트임
class ValuationAgent(BaseAgent):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    # 에이전트의 메인 실행 메서드: 티커 기반 예측 수행
    def run(self, ticker: str) -> list:
        # 1. 티커 정규화 (예: 005930 → 005930.KS)
        tkr = self._normalize_ticker(ticker)

        # 2. 통화 단위 및 소수점 자릿수 판별 (KRW → 0자리, USD → 2자리)
        currency, decimals = self._detect_currency_and_decimals(tkr)

        # 3. 최근 3개월 가격 스냅샷 가져오기 (사용 여부는 self.use_price_snapshot에 따름)
        price = self._get_price_snapshot(tkr) if self.use_price_snapshot else None

        # 4. 시스템 메시지 및 유저 메시지 생성
        context = self._build_context(tkr, price)
        msg_sys, msg_user = self._build_messages(context, currency, decimals)

        # 5. GPT API 요청 → 응답 JSON 파싱
        result = self._ask_with_fallback(msg_sys, msg_user, self.schema_obj)
        buy, sell, reason = self._parse_result(result, decimals)

        # 6. 정합성 검사 및 보정 (범위 초과 or 매도가 < 매수가 → 재질문 or 클리핑)
        if not self._sanity_check(buy, sell, price):
            if self.reask_on_inconsistent:
                # 제약 추가하여 재질문
                msg_user2 = self._add_constraints(msg_user, price, decimals)
                result = self._ask_with_fallback(msg_sys, msg_user2, self.schema_obj)
                buy, sell, reason = self._parse_result(result, decimals)

                # 재질문도 정합성 미충족 시 클리핑 보정
                if not self._sanity_check(buy, sell, price):
                    buy, sell = self._clip_to_bounds(buy, sell, price, decimals)
            else:
                # 재질문 비활성화 시 바로 클리핑
                buy, sell = self._clip_to_bounds(buy, sell, price, decimals)

        # 7. 최종 결과 반환
        return [buy, sell, reason]

    # 프롬프트 Context 구성: GPT에게 전달할 가격 정보 및 실행 시간 포함
    def _build_context(self, ticker: str, price: dict | None) -> str:
        lines = [
            f"[RUN_AT_UTC] {datetime.now(timezone.utc).isoformat()}",
            f"[TICKER] {ticker}"
        ]
        if price:
            lines.append(f"[PRICE_3MO] {price}")
        return "\n".join(lines)

    # GPT 메시지 포맷 생성: 시스템 + 유저 요청
    def _build_messages(self, context: str, currency: str, decimals: int) -> tuple[dict, dict]:
        sys = (
            "너는 기업의 펀더멘털 기반 단기 매수/매도가를 제시하는 애널리스트다. "
            f"통화는 {currency}이며, 숫자는 소수 {decimals}자리로 제시한다. "
            "결과는 JSON 객체로만 반환한다."
        )
        user = (
            "입력값: 최근 3개월 가격 데이터"
            "요구사항:\n"
            "1) 오늘 기준 buy_price(number), sell_price(number) 예측 (단기 목표)\n"
            "2) reason(string) 4~5문장으로 요약\n"
            "3) 다른 텍스트 없이 JSON 객체만 반환\n"
            "4) 한국말로 설명할 것 \n\n"
            f"{context}"
        )
        return {"role": "system", "content": sys}, {"role": "user", "content": user}


In [41]:
value = ValuationAgent().run('RZLV')

In [35]:
value

[4.1,
 4.3,
 '최근 3개월간 RZLV의 주가는 1.92달러에서 4.32달러 사이에서 변동하며 평균 2.85달러를 기록했습니다. 최근 종가는 4.27달러로 최고가 근처에 위치해 단기 상승 모멘텀이 강한 것으로 보입니다. 거래량도 평균적으로 1,076만 주 수준으로 활발하여 투자자 관심이 높습니다. 따라서 단기적으로 4.10달러에 매수하고 4.30달러에 매도하는 전략이 적절하다고 판단됩니다.']