## 0) Install & Imports

> Colab 기준. 이미 API 키는 Colab userdata / 환경변수에 세팅되어 있다고 가정합니다.

In [None]:
#!pip -q install -U openai==1.81.0 langgraph langchain-upstage langchain-community chromadb transformers python-dotenv pydantic rich


In [None]:
#!pip install chromadb

In [None]:
#!pip -q install -U trafilatura readability-lxml beautifulsoup4 lxml

In [None]:
import os, json, time, math, re, hashlib, textwrap
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Literal

import requests
from dotenv import load_dotenv
from pydantic import BaseModel, Field, ValidationError

from openai import OpenAI
from transformers import AutoTokenizer

import chromadb
from chromadb import Documents, EmbeddingFunction, Embeddings, PersistentClient

from rich import print as rprint


## 1) API Keys & Clients

- `UPSTAGE_API_KEY`, `SERPER_API_KEY`는 **이미 등록되어 있고 변수명도 동일**하다고 했으니 그대로 씁니다.
- 모델 라인업(예시):  
  - Solar: `solar-pro2-250909`  
  - Document Parse: `document-parse-250618`  
  - Embedding: `solar-embedding-1-large-query`


In [None]:
load_dotenv()

UPSTAGE_API_KEY = os.getenv("UPSTAGE_API_KEY")
SERPER_API_KEY  = os.getenv("SERPER_API_KEY")
DART_API_KEY    = os.getenv("DART_API_KEY")  # 선택(없어도 동작)

# Colab userdata (선택)
try:
    from google.colab import userdata
    UPSTAGE_API_KEY = UPSTAGE_API_KEY or userdata.get("UPSTAGE_API_KEY")
    SERPER_API_KEY  = SERPER_API_KEY  or userdata.get("SERPER_API_KEY")
    DART_API_KEY    = DART_API_KEY    or userdata.get("DART_API_KEY")
except Exception:
    pass

assert UPSTAGE_API_KEY, "UPSTAGE_API_KEY not found"
assert SERPER_API_KEY, "SERPER_API_KEY not found"

client = OpenAI(base_url="https://api.upstage.ai/v1", api_key=UPSTAGE_API_KEY)

# 토큰 추정용 (HF 토크나이저는 model_max_length=4096 경고가 뜨는 경우가 많아서 무력화)
tokenizer = AutoTokenizer.from_pretrained("upstage/solar-pro-preview-instruct")
tokenizer.model_max_length = 1_000_000

# Solar-Pro2 컨텍스트(≈64K) 안전값
MAX_CONTEXT_LIMIT = 65000

# Vector DB
CHROMA_PATH = "./chroma_db_ideaproof"
chroma_client = PersistentClient(path=CHROMA_PATH)


## 2) Shared Utils (schema, tool runner, token budget)

- Upstage Solar-Pro 계열은 컨텍스트가 대략 64K 수준이므로, 안전한 한계치로 60K를 사용합니다.


In [None]:
import inspect

def function_to_schema(func) -> dict:
    sig = inspect.signature(func)
    props = {}
    required = []
    for name, param in sig.parameters.items():
        if name in ("self",):
            continue
        ann = param.annotation
        jtype = "string"
        if ann in (int,):
            jtype = "integer"
        elif ann in (float,):
            jtype = "number"
        elif ann in (bool,):
            jtype = "boolean"
        elif ann in (list, List):
            jtype = "array"
        elif ann in (dict, Dict):
            jtype = "object"
        props[name] = {"type": jtype}
        if param.default is inspect._empty:
            required.append(name)
    return {
        "type": "function",
        "function": {
            "name": func.__name__,
            "description": (func.__doc__ or "").strip(),
            "parameters": {"type": "object", "properties": props, "required": required}
        }
    }

def truncate_tokens_if_needed(tokenizer, agent_instructions, messages, content, max_token_limit=None):
    """
    - base가 이미 limit을 넘으면(히스토리 과다) 에러 내지 말고 내용을 최소화해서 계속 진행
    - content가 넘치면 content만 잘라서 limit 안으로 넣기
    """
    if max_token_limit is None:
        max_token_limit = MAX_CONTEXT_LIMIT

    inputs = tokenizer.apply_chat_template(
        [{"role": "system", "content": agent_instructions}] + messages,
        tokenize=True
    )
    base_tokens = len(inputs)

    if base_tokens >= max_token_limit:
        return "[...omitted due to context budget...]"

    enc = tokenizer.encode(content)
    if base_tokens + len(enc) > max_token_limit:
        keep = max_token_limit - base_tokens
        enc = enc[:max(0, keep)]
        content = tokenizer.decode(enc, skip_special_tokens=True) + "\n\n[...Content Truncated due to Context Limit...]"
    return content

def execute_tool_call(tool_name: str, tools: Dict[str, Any], args: Dict[str, Any]) -> str:
    if tool_name not in tools:
        raise KeyError(f"Tool not found: {tool_name}")
    return tools[tool_name](**args)

def safe_json_loads(s: str) -> Any:
    s = s.strip()
    try:
        return json.loads(s)
    except Exception:
        pass
    s2 = re.sub(r"^```(json)?\s*|\s*```$", "", s, flags=re.MULTILINE).strip()
    try:
        return json.loads(s2)
    except Exception:
        pass
    m = re.search(r"(\{.*\}|\[.*\])", s2, flags=re.DOTALL)
    if not m:
        raise ValueError("No JSON object found in text")
    return json.loads(m.group(1))

def hash_key(*parts: str) -> str:
    h = hashlib.sha256()
    for p in parts:
        h.update(p.encode("utf-8"))
    return h.hexdigest()[:16]


## 3) Core Tools

필수 Tool 기능:
1) 인터넷 검색(serper.dev)  
2) 인터넷 파일 다운로드  
3) PDF → Markdown 파싱(Upstage Document Parse)  
4) Vector DB 저장/조회(Chroma + Upstage embedding)  
5) LLM-as-Judge Rerank (Top-K 재정렬)

> 한국 기업/시장 분석을 우선하기 위해 검색 쿼리에 한국 소스 힌트를 자동으로 섞습니다.


In [None]:
import logging
import time
import trafilatura
from urllib.parse import urlparse

logging.getLogger("trafilatura").setLevel(logging.ERROR)
logging.getLogger("trafilatura.core").setLevel(logging.ERROR)
logging.getLogger("trafilatura.utils").setLevel(logging.ERROR)

def web_search(query: str, k: int = 10) -> List[Dict[str, str]]:
    url = "https://google.serper.dev/search"
    payload = json.dumps({"q": query, "num": k})
    headers = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"}
    r = requests.post(url, headers=headers, data=payload, timeout=60)
    r.raise_for_status()
    data = r.json()
    out = []
    for item in data.get("organic", [])[:k]:
        out.append({"title": item.get("title",""), "link": item.get("link",""), "snippet": item.get("snippet","")})
    return out

def is_pdf_url(url: str) -> bool:
    return url.lower().split("?")[0].endswith(".pdf")

def fetch_url_text(url: str, timeout: int = 30, max_chars: int = 30000) -> str:
    try:
        r = requests.get(
            url,
            timeout=timeout,
            headers={"User-Agent":"Mozilla/5.0"},
            allow_redirects=True,
        )
        r.raise_for_status()

        ctype = (r.headers.get("content-type") or "").lower()
        if ("text/html" not in ctype) and ("application/xhtml" not in ctype):
            return ""

        html = (r.text or "").strip()
        if len(html) < 200:
            return ""

        text = trafilatura.extract(html, include_comments=False, include_tables=False, favor_recall=True) or ""
        text = text.strip()
        if len(text) > max_chars:
            text = text[:max_chars]
        return text
    except Exception:
        return ""

def source_priority_score(url: str) -> int:
    host = (urlparse(url).netloc or "").lower()
    score = 0
    if host.endswith(".go.kr"): score += 50
    if host.endswith(".ac.kr"): score += 35
    if host.endswith(".or.kr"): score += 25
    if host.endswith(".re.kr"): score += 20
    if host.endswith(".kr"): score += 10
    if "kosis" in host or "kostat" in host: score += 50
    if "dart" in host or "fss" in host: score += 40
    if "nipa" in host or "kisdi" in host or "kised" in host: score += 25
    return score

def download_file(url: str, save_dir: str = "./downloads") -> str:
    os.makedirs(save_dir, exist_ok=True)
    fn = re.sub(r"[^a-zA-Z0-9_.-]", "_", url.split("/")[-1]) or f"file_{int(time.time())}"
    path = os.path.join(save_dir, fn)
    r = requests.get(url, timeout=120, headers={"User-Agent":"Mozilla/5.0"}, allow_redirects=True)
    r.raise_for_status()
    with open(path, "wb") as f:
        f.write(r.content)
    return path

def parse_pdf_to_markdown(pdf_path: str) -> str:
    url = "https://api.upstage.ai/v1/document-ai/document-parse"
    headers = {"Authorization": f"Bearer {UPSTAGE_API_KEY}"}
    with open(pdf_path, "rb") as f:
        files = {"document": f}
        data = {
            "model": "document-parse-250618",
            "ocr": "auto",
            "chart_recognition": True,
            "coordinates": True,
            "output_formats": '["markdown"]',
            "base64_encoding": '["figure"]',
        }
        r = requests.post(url, headers=headers, files=files, data=data, timeout=180)
        r.raise_for_status()
        j = r.json()
    return j.get("content", {}).get("markdown", "")

def _normalize_for_embedding(x: Any, max_chars: int = 8000) -> str:
    s = (x if isinstance(x, str) else str(x) if x is not None else "").replace("\x00", "").strip()
    if not s:
        return ""
    if len(s) > max_chars:
        s = s[:max_chars]
    return s

class UpstageEmbeddingFunction(EmbeddingFunction):
    def __init__(
        self,
        client: OpenAI,
        model: str = "solar-embedding-1-large-query",
        batch_size: int = 16,
        max_chars: int = 8000,
        retries: int = 1,
        backoff_sec: float = 1.0,
    ):
        self.client = client
        self.model = model
        self.batch_size = batch_size
        self.max_chars = max_chars
        self.retries = retries
        self.backoff_sec = backoff_sec

    def _embed_batch(self, texts: List[str]) -> List[List[float]]:
        last_err = None
        for attempt in range(self.retries + 1):
            try:
                resp = self.client.embeddings.create(model=self.model, input=texts)
                return [d.embedding for d in resp.data]
            except Exception as e:
                last_err = e
                if attempt < self.retries:
                    time.sleep(self.backoff_sec * (attempt + 1))
        raise last_err

    def _embed_one(self, text: str) -> List[float]:
        resp = self.client.embeddings.create(model=self.model, input=text)
        return resp.data[0].embedding

    def __call__(self, input: Documents) -> Embeddings:
        if isinstance(input, str):
            t = _normalize_for_embedding(input, max_chars=self.max_chars)
            return [self._embed_one(t)] if t else [self._embed_one(".")]

        raw = [_normalize_for_embedding(t, max_chars=self.max_chars) for t in list(input)]
        texts = [t if t else "." for t in raw]

        out: List[List[float]] = []
        i = 0
        while i < len(texts):
            batch = texts[i:i+self.batch_size]
            try:
                out.extend(self._embed_batch(batch))
            except Exception:
                for t in batch:
                    out.append(self._embed_one(t if t else "."))
            i += self.batch_size
        return out

embedding_fn = UpstageEmbeddingFunction(client)

def get_collection(name: str):
    return chroma_client.get_or_create_collection(name=name, embedding_function=embedding_fn)

def vectordb_upsert(collection: str, docs: List[str], metadatas: List[Dict[str, Any]], ids: List[str]) -> int:
    clean_docs, clean_metas, clean_ids = [], [], []
    for d, m, i in zip(docs, metadatas, ids):
        s = _normalize_for_embedding(d, max_chars=8000)
        if not s:
            continue
        clean_docs.append(s)
        clean_metas.append(m)
        clean_ids.append(i)

    if not clean_ids:
        return 0

    col = get_collection(collection)
    col.upsert(documents=clean_docs, metadatas=clean_metas, ids=clean_ids)
    return len(clean_ids)

def vectordb_query(collection: str, query: str, n_results: int = 8) -> Dict[str, Any]:
    col = get_collection(collection)
    return col.query(query_texts=[query], n_results=n_results)

def llm_rerank(query: str, candidates: List[Dict[str, Any]], top_k: int = 5) -> List[Dict[str, Any]]:
    packed = [{"i": i, "text": (c.get("text","")[:1200]), "meta": c.get("meta",{})} for i,c in enumerate(candidates)]
    prompt = {
        "task": "rerank",
        "instruction": "You are a strict relevance judge. Score 0-10 by relevance. Prefer authoritative Korea-specific sources.",
        "query": query,
        "candidates": packed
    }
    resp = client.chat.completions.create(
        model="solar-pro2-250909",
        messages=[
            {"role": "system", "content": "Return JSON only."},
            {"role": "user", "content": json.dumps(prompt, ensure_ascii=False)}
        ],
    )
    data = safe_json_loads(resp.choices[0].message.content)
    scored = data.get("scores", data)
    merged = []
    for item in scored:
        i = int(item["i"])
        merged.append({**candidates[i], "score": float(item.get("score", 0)), "reason": item.get("reason","")})
    merged.sort(key=lambda x: x["score"], reverse=True)
    return merged[:top_k]

def simple_chunk(text: str, max_chars: int = 1500, overlap: int = 200) -> List[str]:
    text = (text or "").strip()
    if not text:
        return []
    chunks = []
    i = 0
    step = max(1, max_chars - overlap)
    while i < len(text):
        c = text[i:i+max_chars].strip()
        if c:
            chunks.append(c)
        i += step
    return chunks
EVIDENCE_PLAN_PROMPT = '''
역할: 아이디어를 검증하기 위한 '질문 리스트'와 각 질문별 '검색 쿼리'를 만든다.
출력(JSON only):
[
  {"question":"...", "queries":["...","...","..."], "preferred_sources":["gov","kosis","dart","research"]},
  ...
]
규칙:
- 한국 시장/한국 기업 중심 쿼리로 작성
- 각 question당 queries는 3개 이내
- question은 4~6개
'''
evidence_plan_agent = Agent(
    name="EvidencePlanMaker",
    instructions=EVIDENCE_PLAN_PROMPT,
    tools=[],
)

def make_evidence_plan(idea_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
    messages = [{"role":"user","content": json.dumps({"idea_schema": idea_schema}, ensure_ascii=False)}]
    out = run_agent(messages, evidence_plan_agent)
    plan = safe_json_loads(out)
    if not isinstance(plan, list):
        raise ValueError("evidence_plan must be a list")
    return plan

def check_vectordb_cache(collection: str) -> bool:
    try:
        chroma_client.get_collection(name=collection)
        return True
    except Exception:
        return False

def build_expanded_queries(q: str) -> List[str]:
    years = ["2025", "2024", "2023"]
    tails = ["시장 규모", "시장 동향 보고서", "통계", "백서", "TAM SAM SOM", "경쟁사", "규제", "지원사업"]
    out = []
    for y in years:
        out.append(f"{q} {y}")
    for t in tails:
        out.append(f"{q} {t}")
    out += [f"{q} site:go.kr", f"{q} site:kosis.kr", f"{q} DART 공시"]
    return list(dict.fromkeys(out))

def evidence_builder_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    idea = state["idea_schema"]
    collection = f"ideaproof_{hash_key(json.dumps(idea, ensure_ascii=False))}"

    mode = (state.get("request", {}) or {}).get("mode", "standard")
    if mode == "fast":
        MAX_RESULTS_PER_QUERY_1 = 5
        MAX_SOURCES_1 = 12
        MAX_CHUNKS_PER_SOURCE = 6
        MAX_TOTAL_CHUNKS = 120
        MIN_STORED_CHUNKS = 50
        DO_EXPAND = False
        DO_PDF_PARSE = False
        TIME_BUDGET_SEC = 6 * 60
    elif mode == "deep":
        MAX_RESULTS_PER_QUERY_1 = 10
        MAX_SOURCES_1 = 45
        MAX_CHUNKS_PER_SOURCE = 14
        MAX_TOTAL_CHUNKS = 450
        MIN_STORED_CHUNKS = 180
        DO_EXPAND = True
        DO_PDF_PARSE = True
        TIME_BUDGET_SEC = 18 * 60
    else:
        MAX_RESULTS_PER_QUERY_1 = 8
        MAX_SOURCES_1 = 25
        MAX_CHUNKS_PER_SOURCE = 10
        MAX_TOTAL_CHUNKS = 260
        MIN_STORED_CHUNKS = 120
        DO_EXPAND = True
        DO_PDF_PARSE = True
        TIME_BUDGET_SEC = 12 * 60

    def time_left() -> float:
        return TIME_BUDGET_SEC - (time.time() - t0)

    if check_vectordb_cache(collection):
        state["evidence_store"] = EvidenceStoreModel(collection=collection, items=[], version="v3").model_dump()
        logs.append({"node":"evidence_builder", "t": time.time()-t0, "cache":"HIT", "collection": collection, "mode": mode})
        state["logs"] = logs
        return state

    plan = make_evidence_plan(idea)
    state["evidence_plan"] = plan

    def run_harvest(queries: List[str], max_results_per_query: int, max_sources_total: int):
        results = []
        for q in queries:
            if time_left() <= 0:
                break
            try:
                for r in web_search(q, k=max_results_per_query):
                    if r.get("link"):
                        results.append(r)
            except Exception:
                continue

        uniq = {}
        for r in results:
            uniq[r["link"]] = r
        ranked = list(uniq.values())
        ranked.sort(key=lambda x: source_priority_score(x["link"]), reverse=True)
        ranked = ranked[:max_sources_total]

        items: List[EvidenceItem] = []
        all_chunks, all_metas, all_ids = [], [], []

        for r in ranked:
            if time_left() <= 0 or len(all_chunks) >= MAX_TOTAL_CHUNKS:
                break

            url = r["link"]
            title = r.get("title","")
            snippet = r.get("snippet","")

            text = ""
            local = None
            md_text = None

            if is_pdf_url(url):
                if DO_PDF_PARSE and source_priority_score(url) >= 30 and time_left() > 60:
                    try:
                        local = download_file(url)
                        md_text = parse_pdf_to_markdown(local)
                        text = (md_text or "").strip()
                    except Exception:
                        text = ""
                else:
                    text = ""
            else:
                text = fetch_url_text(url)

            if not text:
                text = f"{title}\n{snippet}\nURL: {url}"

            items.append(EvidenceItem(source_url=url, title=title, snippet=snippet, local_path=local, parsed_markdown=md_text))

            chunks = simple_chunk(text, max_chars=1500, overlap=200)[:MAX_CHUNKS_PER_SOURCE]
            for j, ch in enumerate(chunks):
                if len(all_chunks) >= MAX_TOTAL_CHUNKS:
                    break
                cid = hash_key(collection, url, str(j))
                all_chunks.append(ch)
                all_metas.append({"url": url, "title": title, "chunk": j})
                all_ids.append(cid)

        stored = 0
        if all_chunks:
            stored = vectordb_upsert(collection, all_chunks, all_metas, all_ids)
        return items, stored

    base_queries = []
    for p in plan:
        for q in (p.get("queries") or [])[:3]:
            base_queries.append(q)
    base_queries = list(dict.fromkeys(base_queries))[:15]

    items, stored = run_harvest(base_queries, max_results_per_query=MAX_RESULTS_PER_QUERY_1, max_sources_total=MAX_SOURCES_1)

    if DO_EXPAND and (stored < MIN_STORED_CHUNKS) and (time_left() > 90):
        expanded = []
        for p in plan:
            expanded += build_expanded_queries(p.get("question",""))
        expanded = list(dict.fromkeys(expanded))[:20]

        items2, stored2 = run_harvest(expanded, max_results_per_query=MAX_RESULTS_PER_QUERY_1, max_sources_total=max(MAX_SOURCES_1, 30))
        merged = {it.source_url: it for it in (items + items2)}
        items = list(merged.values())
        stored = max(stored, stored2)

    state["evidence_store"] = EvidenceStoreModel(collection=collection, items=items, version="v3").model_dump()

    logs.append({
        "node":"evidence_builder",
        "t": time.time()-t0,
        "cache":"MISS",
        "collection": collection,
        "mode": mode,
        "sources": len(items),
        "stored_chunks": stored,
        "time_budget_sec": TIME_BUDGET_SEC,
        "time_budget_hit": (time_left() <= 0)
    })
    state["logs"] = logs
    return state


## 4) State Models (Design Doc)

설계서의 상태(State) 모델을 그대로 반영합니다:
- request, idea_schema, evidence_plan, evidence_store, signals, verdict, artifacts, guards


In [None]:
class RequestModel(BaseModel):
    raw_request: str
    language: str = "ko"
    tone: str = "concise"
    mode: Literal["fast","standard","deep"] = "standard"

class IdeaSchemaModel(BaseModel):
    problem: str
    target: str
    solution: str
    differentiation: str
    business_model: str
    industry: str
    keywords: List[str] = Field(default_factory=list)
    persona_hypotheses: List[str] = Field(default_factory=list)

class EvidenceQueryPlan(BaseModel):
    question: str
    queries: List[str]
    preferred_sources: List[str] = Field(default_factory=list)

class EvidenceItem(BaseModel):
    source_url: str
    title: str = ""
    snippet: str = ""
    local_path: Optional[str] = None
    parsed_markdown: Optional[str] = None

class EvidenceStoreModel(BaseModel):
    collection: str
    items: List[EvidenceItem] = Field(default_factory=list)
    version: str = "v1"

class SignalsModel(BaseModel):
    market: str
    competition: str
    customer: str
    risks: str
    score_explainable: Dict[str, float]

class VerdictModel(BaseModel):
    decision: Literal["GO","NO_GO","PIVOT"]
    key_reasons: List[str]
    evidence_links: List[str]
    next_actions: List[str]

class ArtifactsModel(BaseModel):
    prd_1p: str
    scope_must_should_could: str
    erd_mermaid: str
    user_flow: str
    roadmap_2_4_weeks: str
    validation_plan: str

class GuardsModel(BaseModel):
    policy_violation: bool = False
    token_overflow: bool = False
    copyright_risk: bool = False
    evidence_insufficient: bool = False
    notes: List[str] = Field(default_factory=list)

class WorkflowState(TypedDict, total=False):
    request: Dict[str, Any]
    idea_schema: Dict[str, Any]
    evidence_plan: List[Dict[str, Any]]
    evidence_store: Dict[str, Any]
    evidence_pack: List[Dict[str, Any]]
    signals: Dict[str, Any]
    verdict: Dict[str, Any]
    artifacts: Dict[str, Any]
    guards: Dict[str, Any]
    final_report_markdown: str
    intake: Dict[str, Any]
    logs: List[Dict[str, Any]]


## 5) Agent Base (Vanilla tool-calling loop)


In [None]:
class Agent(BaseModel):
    name: str = "Agent"
    model: str = "solar-pro2-250909"
    instructions: str = "You are a helpful agent."
    tools: List[Any] = Field(default_factory=list)

def run_agent(messages: List[Dict[str, Any]], agent: Agent, max_context_limit: int = None) -> str:
    """
    OpenAI tool-calling 스타일 루프 실행.
    - tools가 없으면 tools/tool_choice 파라미터를 아예 보내지 않는다(Upstage 400 방지)
    """
    if max_context_limit is None:
        max_context_limit = MAX_CONTEXT_LIMIT

    tool_schemas = [function_to_schema(t) for t in agent.tools]
    tool_map = {t.__name__: t for t in agent.tools}

    while True:
        kwargs = dict(
            model=agent.model,
            messages=[{"role":"system","content": agent.instructions}] + messages,
        )
        if tool_schemas:
            kwargs["tools"] = tool_schemas
            kwargs["tool_choice"] = "auto"

        resp = client.chat.completions.create(**kwargs)
        msg = resp.choices[0].message

        if not getattr(msg, "tool_calls", None):
            content = msg.content or ""
            content = truncate_tokens_if_needed(tokenizer, agent.instructions, messages, content, max_token_limit=max_context_limit)
            return content

        for tc in msg.tool_calls:
            tool_name = tc.function.name
            args = json.loads(tc.function.arguments or "{}")
            try:
                out = execute_tool_call(tool_name, tool_map, args)
                if not isinstance(out, str):
                    out = json.dumps(out, ensure_ascii=False)
            except Exception as e:
                out = f"ToolError: {e}"

            out = truncate_tokens_if_needed(tokenizer, agent.instructions, messages, out, max_token_limit=max_context_limit)

            messages.append({"role":"assistant","content": None, "tool_calls":[tc]})
            messages.append({"role":"tool","tool_call_id": tc.id, "content": out})


## 6) Agents (per design nodes)


### 6-1) Intake Route Clarify Agent


In [None]:
INTAKE_PROMPT = """
역할: 요청을 '아이디어 검증/설계 워크플로'로 처리할지, 일상대화로 처리할지 라우팅한다.
목표:
1) request 정규화(언어/톤/모드)
2) 아이디어 입력이 부족하면 '최소 질문'으로 보완 질문을 만든다.
출력(JSON only):
{
  "route": "workflow" | "chat",
  "request": {"raw_request": "...", "language": "ko", "tone": "concise", "mode":"fast|standard|deep"},
  "missing_fields": ["problem","target","solution","differentiation","business_model"],
  "clarifying_questions": ["...","..."]
}
규칙:
- 질문은 최대 5개. 선택형/단답형 우선.
"""

intake_agent = Agent(name="IntakeRouteClarify", instructions=INTAKE_PROMPT, tools=[])

def intake_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    raw = state["request"]["raw_request"]
    messages = [{"role":"user","content": raw}]
    out = run_agent(messages, intake_agent)
    data = safe_json_loads(out)

    state["request"] = data["request"]
    state["intake"] = data

    guards = state.get("guards", {})
    guards.setdefault("notes", [])
    guards["notes"].append(f"route={data.get('route')}")
    state["guards"] = guards

    logs.append({"node":"intake", "t": time.time()-t0, "route": data.get("route")})
    state["logs"] = logs
    return state

def route_after_intake(state: WorkflowState) -> str:
    route = state.get("intake", {}).get("route", "workflow")
    return "chat_end" if route == "chat" else "structurer"


### 6-2) Structurer Agent


In [None]:
STRUCTURER_PROMPT = """
역할: 아이디어를 문제/대상/해결/차별/BM로 구조화하고, 산업 분류 및 키워드를 만든다.
입력:
- raw idea text(자유형)
출력(JSON only):
{
  "problem": "...",
  "target": "...",
  "solution": "...",
  "differentiation": "...",
  "business_model": "...",
  "industry": "...",
  "keywords": ["..."],
  "persona_hypotheses": ["..."]
}
규칙:
- 모호하면 가능한 가설을 1~2개로 제한해 persona_hypotheses에 넣고, 단정하지 말 것.
"""

structurer_agent = Agent(name="Structurer", instructions=STRUCTURER_PROMPT, tools=[])

def structurer_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    raw = state["request"]["raw_request"]
    messages = [{"role":"user","content": raw}]
    out = run_agent(messages, structurer_agent)
    data = safe_json_loads(out)

    obj = IdeaSchemaModel(**data)
    state["idea_schema"] = obj.model_dump()

    logs.append({"node":"structurer", "t": time.time()-t0, "industry": obj.industry})
    state["logs"] = logs
    return state


### 6-3) Evidence Builder Agent (cache + web search + optional pdf parse + vectordb upsert)


In [None]:
EVIDENCE_PLAN_PROMPT = '''
역할: 아이디어를 검증하기 위한 '질문 리스트'와 각 질문별 '검색 쿼리'를 만든다.
출력(JSON only):
[
  {"question":"...", "queries":["...","...","..."], "preferred_sources":["gov","kosis","dart","research"]},
  ...
]
규칙:
- 한국 시장/한국 기업 중심 쿼리로 작성
- 각 question당 queries는 3개 이내
- question은 4~6개
'''
evidence_plan_agent = Agent(
    name="EvidencePlanMaker",
    instructions=EVIDENCE_PLAN_PROMPT,
    tools=[],
)

def make_evidence_plan(idea_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
    messages = [{"role":"user","content": json.dumps({"idea_schema": idea_schema}, ensure_ascii=False)}]
    out = run_agent(messages, evidence_plan_agent)
    plan = safe_json_loads(out)
    if not isinstance(plan, list):
        raise ValueError("evidence_plan must be a list")
    return plan

def check_vectordb_cache(collection: str) -> bool:
    try:
        chroma_client.get_collection(name=collection)
        return True
    except Exception:
        return False

def build_expanded_queries(q: str) -> List[str]:
    years = ["2025", "2024", "2023"]
    tails = ["시장 규모", "시장 동향 보고서", "통계", "백서", "TAM SAM SOM", "경쟁사", "규제", "지원사업"]
    out = []
    for y in years:
        out.append(f"{q} {y}")
    for t in tails:
        out.append(f"{q} {t}")
    out += [f"{q} site:go.kr", f"{q} site:kosis.kr", f"{q} DART 공시"]
    return list(dict.fromkeys(out))

def evidence_builder_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    idea = state["idea_schema"]
    collection = f"ideaproof_{hash_key(json.dumps(idea, ensure_ascii=False))}"

    mode = (state.get("request", {}) or {}).get("mode", "standard")
    if mode == "fast":
        MAX_RESULTS_PER_QUERY_1 = 5
        MAX_SOURCES_1 = 12
        MAX_CHUNKS_PER_SOURCE = 6
        MAX_TOTAL_CHUNKS = 120
        MIN_STORED_CHUNKS = 50
        DO_EXPAND = False
        DO_PDF_PARSE = False
        TIME_BUDGET_SEC = 6 * 60
    elif mode == "deep":
        MAX_RESULTS_PER_QUERY_1 = 10
        MAX_SOURCES_1 = 45
        MAX_CHUNKS_PER_SOURCE = 14
        MAX_TOTAL_CHUNKS = 450
        MIN_STORED_CHUNKS = 180
        DO_EXPAND = True
        DO_PDF_PARSE = True
        TIME_BUDGET_SEC = 18 * 60
    else:
        MAX_RESULTS_PER_QUERY_1 = 8
        MAX_SOURCES_1 = 25
        MAX_CHUNKS_PER_SOURCE = 10
        MAX_TOTAL_CHUNKS = 260
        MIN_STORED_CHUNKS = 120
        DO_EXPAND = True
        DO_PDF_PARSE = True
        TIME_BUDGET_SEC = 12 * 60

    def time_left() -> float:
        return TIME_BUDGET_SEC - (time.time() - t0)

    if check_vectordb_cache(collection):
        state["evidence_store"] = EvidenceStoreModel(collection=collection, items=[], version="v3").model_dump()
        logs.append({"node":"evidence_builder", "t": time.time()-t0, "cache":"HIT", "collection": collection, "mode": mode})
        state["logs"] = logs
        return state

    plan = make_evidence_plan(idea)
    state["evidence_plan"] = plan

    def run_harvest(queries: List[str], max_results_per_query: int, max_sources_total: int):
        results = []
        for q in queries:
            if time_left() <= 0:
                break
            try:
                for r in web_search(q, k=max_results_per_query):
                    if r.get("link"):
                        results.append(r)
            except Exception:
                continue

        uniq = {}
        for r in results:
            uniq[r["link"]] = r
        ranked = list(uniq.values())
        ranked.sort(key=lambda x: source_priority_score(x["link"]), reverse=True)
        ranked = ranked[:max_sources_total]

        items: List[EvidenceItem] = []
        all_chunks, all_metas, all_ids = [], [], []

        for r in ranked:
            if time_left() <= 0 or len(all_chunks) >= MAX_TOTAL_CHUNKS:
                break

            url = r["link"]
            title = r.get("title","")
            snippet = r.get("snippet","")

            text = ""
            local = None
            md_text = None

            if is_pdf_url(url):
                if DO_PDF_PARSE and source_priority_score(url) >= 30 and time_left() > 60:
                    try:
                        local = download_file(url)
                        md_text = parse_pdf_to_markdown(local)
                        text = (md_text or "").strip()
                    except Exception:
                        text = ""
                else:
                    text = ""
            else:
                text = fetch_url_text(url)

            if not text:
                text = f"{title}\n{snippet}\nURL: {url}"

            items.append(EvidenceItem(source_url=url, title=title, snippet=snippet, local_path=local, parsed_markdown=md_text))

            chunks = simple_chunk(text, max_chars=1500, overlap=200)[:MAX_CHUNKS_PER_SOURCE]
            for j, ch in enumerate(chunks):
                if len(all_chunks) >= MAX_TOTAL_CHUNKS:
                    break
                cid = hash_key(collection, url, str(j))
                all_chunks.append(ch)
                all_metas.append({"url": url, "title": title, "chunk": j})
                all_ids.append(cid)

        stored = 0
        if all_chunks:
            stored = vectordb_upsert(collection, all_chunks, all_metas, all_ids)
        return items, stored

    base_queries = []
    for p in plan:
        for q in (p.get("queries") or [])[:3]:
            base_queries.append(q)
    base_queries = list(dict.fromkeys(base_queries))[:15]

    items, stored = run_harvest(base_queries, max_results_per_query=MAX_RESULTS_PER_QUERY_1, max_sources_total=MAX_SOURCES_1)

    if DO_EXPAND and (stored < MIN_STORED_CHUNKS) and (time_left() > 90):
        expanded = []
        for p in plan:
            expanded += build_expanded_queries(p.get("question",""))
        expanded = list(dict.fromkeys(expanded))[:20]

        items2, stored2 = run_harvest(expanded, max_results_per_query=MAX_RESULTS_PER_QUERY_1, max_sources_total=max(MAX_SOURCES_1, 30))
        merged = {it.source_url: it for it in (items + items2)}
        items = list(merged.values())
        stored = max(stored, stored2)

    state["evidence_store"] = EvidenceStoreModel(collection=collection, items=items, version="v3").model_dump()

    logs.append({
        "node":"evidence_builder",
        "t": time.time()-t0,
        "cache":"MISS",
        "collection": collection,
        "mode": mode,
        "sources": len(items),
        "stored_chunks": stored,
        "time_budget_sec": TIME_BUDGET_SEC,
        "time_budget_hit": (time_left() <= 0)
    })
    state["logs"] = logs
    return state


### 6-4) Extractor Agent (VectorDB retrieve + LLM rerank)


In [None]:
def extractor_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    plan = state.get("evidence_plan", [])
    collection = state["evidence_store"]["collection"]

    evidence_pack = []
    for p in plan[:6]:
        q = p.get("question","")
        if not q:
            continue

        raw = vectordb_query(collection=collection, query=q, n_results=12)
        docs = raw.get("documents", [[]])[0]
        metas = raw.get("metadatas", [[]])[0]

        if not docs:
            continue

        candidates = [{"text": d, "meta": m} for d, m in zip(docs, metas)]
        reranked = llm_rerank(query=q, candidates=candidates, top_k=5)

        for r in reranked:
            r["text"] = (r.get("text","")[:800]).strip()

        evidence_pack.append({"question": q, "top_chunks": reranked})

    state["evidence_pack"] = evidence_pack

    guards = state.get("guards", {})
    if not evidence_pack:
        guards["evidence_insufficient"] = True
        guards.setdefault("notes", []).append("Extractor: evidence_pack is empty.")
    state["guards"] = guards

    logs.append({"node":"extractor", "t": time.time()-t0, "questions": len(evidence_pack)})
    state["logs"] = logs
    return state


### 6-5) Analysis Agent (signals + explainable scoring)


In [None]:
ANALYSIS_PROMPT = """
역할: evidence_pack을 기반으로 시장/경쟁/고객/리스크 신호를 요약하고,
설명가능한 점수(0~5)를 만든다.
출력(JSON only):
{
  "market": "...",
  "competition": "...",
  "customer": "...",
  "risks": "...",
  "score_explainable": {"market":3.0,"competition":2.5,"customer":3.5,"risks":2.0}
}
규칙:
- fact / interpretation / hypothesis를 문장 앞 라벨로 구분해라.
- 수치(성장률 등)는 evidence_pack에 근거가 없으면 생성하지 마라.
"""

analysis_agent = Agent(name="Analysis", instructions=ANALYSIS_PROMPT, tools=[])

def analysis_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    payload = {
        "idea_schema": state["idea_schema"],
        "evidence_pack": state.get("evidence_pack", [])
    }
    messages = [{"role":"user","content": json.dumps(payload, ensure_ascii=False)}]
    out = run_agent(messages, analysis_agent)
    data = safe_json_loads(out)

    obj = SignalsModel(**data)
    state["signals"] = obj.model_dump()

    logs.append({"node":"analysis", "t": time.time()-t0, "score": obj.score_explainable})
    state["logs"] = logs
    return state


### 6-6) Decision Agent


In [None]:
DECISION_PROMPT = """
역할: signals + evidence_pack을 바탕으로
GO / NO_GO / PIVOT 결론을 내리고, 근거 링크와 다음 액션(검증 실험 포함)을 제안한다.
출력(JSON only):
{
  "decision":"GO|NO_GO|PIVOT",
  "key_reasons":["..."],
  "evidence_links":["..."],
  "next_actions":["..."]
}
규칙:
- evidence_links는 evidence_pack.meta.url에서만 가져와라(최소 3개).
- 단정 금지: 불확실하면 PIVOT 또는 조건부 GO로 표현.
"""

decision_agent = Agent(name="Decision", instructions=DECISION_PROMPT, tools=[])

def decision_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    payload = {
        "idea_schema": state["idea_schema"],
        "signals": state.get("signals", {}),
        "evidence_pack": state.get("evidence_pack", [])
    }
    messages = [{"role":"user","content": json.dumps(payload, ensure_ascii=False)}]
    out = run_agent(messages, decision_agent)
    data = safe_json_loads(out)

    obj = VerdictModel(**data)
    state["verdict"] = obj.model_dump()

    logs.append({"node":"decision", "t": time.time()-t0, "decision": obj.decision})
    state["logs"] = logs
    return state

def route_after_decision(state: WorkflowState) -> str:
    return "blueprint"


### 6-7) Blueprint Agent


In [None]:
BLUEPRINT_PROMPT = '''
역할: verdict를 반영하여 MVP 설계 산출물을 만든다.
출력(JSON only):
{
  "prd_1p": "<markdown string>",
  "scope_must_should_could": "<markdown string>",
  "erd_mermaid": "```mermaid ...```",
  "user_flow": "<markdown string>",
  "roadmap_2_4_weeks": "<markdown string>",
  "validation_plan": "<markdown string>"
}
규칙:
- 위 6개 필드는 '문자열'이어야 한다. (객체/리스트 JSON으로 내지 말 것)
- scope/roadmap/validation은 사람이 읽기 좋은 bullet markdown으로 작성.
- ERD는 Mermaid ER diagram 또는 flowchart 형식.
- 검증 플랜은 '실험-지표-판정기준'이 포함되어야 함.
'''
blueprint_agent = Agent(
    name="BlueprintMaker",
    instructions=BLUEPRINT_PROMPT,
    tools=[],
)

def _to_markdown(x: Any, indent: int = 0) -> str:
    pad = "  " * indent
    if x is None:
        return ""
    if isinstance(x, str):
        return x.strip()
    if isinstance(x, list):
        lines = []
        for item in x:
            if isinstance(item, (dict, list)):
                lines.append(f"{pad}-")
                child = _to_markdown(item, indent+1)
                if child:
                    lines.append(child)
            else:
                lines.append(f"{pad}- {str(item)}")
        return "\n".join(lines).strip()
    if isinstance(x, dict):
        lines = []
        for k, v in x.items():
            if isinstance(v, (dict, list)):
                lines.append(f"{pad}- **{k}**")
                child = _to_markdown(v, indent+1)
                if child:
                    lines.append(child)
            else:
                lines.append(f"{pad}- **{k}**: {str(v)}")
        return "\n".join(lines).strip()
    return str(x).strip()

def blueprint_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    payload = {
        "idea_schema": state["idea_schema"],
        "signals": state.get("signals", {}),
        "verdict": state.get("verdict", {})
    }
    messages = [{"role":"user","content": json.dumps(payload, ensure_ascii=False)}]
    out = run_agent(messages, blueprint_agent)
    data = safe_json_loads(out)

    for key in ["prd_1p", "scope_must_should_could", "erd_mermaid", "user_flow", "roadmap_2_4_weeks", "validation_plan"]:
        if key in data and not isinstance(data[key], str):
            data[key] = _to_markdown(data[key])

    obj = ArtifactsModel(**data)
    state["artifacts"] = obj.model_dump()

    logs.append({"node":"blueprint", "t": time.time()-t0})
    state["logs"] = logs
    return state


### 6-8) Guardrail / Validator Agent


In [None]:
GUARDRAIL_PROMPT = """
역할: 최종 산출물(analysis/verdict/artifacts)이 아래 가드를 충족하는지 점검하고,
위반/부족 플래그를 설정하며, 필요한 최소 수정(라벨/단정 표현 완화/링크 누락 보완)을 제안한다.
출력(JSON only):
{
  "policy_violation": false,
  "token_overflow": false,
  "copyright_risk": false,
  "evidence_insufficient": false,
  "notes": ["..."]
}
체크리스트:
- 핵심 주장에 출처 링크가 최소 3개 이상인가?
- fact/interpretation/hypothesis 라벨이 존재하는가?
- 수치가 '근거 없이' 생성되지 않았는가?
- 뉴스/리포트 전문을 길게 인용하지 않았는가?
"""

guardrail_agent = Agent(name="GuardrailValidator", instructions=GUARDRAIL_PROMPT, tools=[])

def guardrail_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    payload = {
        "signals": state.get("signals", {}),
        "verdict": state.get("verdict", {}),
        "artifacts": state.get("artifacts", {}),
        "evidence_pack": state.get("evidence_pack", [])
    }
    messages = [{"role":"user","content": json.dumps(payload, ensure_ascii=False)}]
    out = run_agent(messages, guardrail_agent)
    data = safe_json_loads(out)

    obj = GuardsModel(**data)
    state["guards"] = obj.model_dump()

    logs.append({"node":"guardrail", "t": time.time()-t0, "flags": {
        "policy_violation": obj.policy_violation,
        "token_overflow": obj.token_overflow,
        "copyright_risk": obj.copyright_risk,
        "evidence_insufficient": obj.evidence_insufficient
    }})
    state["logs"] = logs
    return state


### 6-9) Render Agent


In [None]:
def render_report(state: WorkflowState) -> str:
    idea = state.get("idea_schema", {})
    signals = state.get("signals", {})
    verdict = state.get("verdict", {})
    artifacts = state.get("artifacts", {})
    guards = state.get("guards", {})
    evidence_pack = state.get("evidence_pack", [])

    links = []
    for ep in evidence_pack:
        for ch in ep.get("top_chunks", []):
            url = (ch.get("meta") or {}).get("url")
            if url:
                links.append(url)
    links = list(dict.fromkeys(links))[:10]

    out = []
    out.append("# IdeaProof 결과 리포트 (Prototype)\n")
    out.append("## 1) 아이디어 구조화\n")
    out.append(f"- 문제: {idea.get('problem','')}\n- 대상: {idea.get('target','')}\n- 해결: {idea.get('solution','')}\n- 차별: {idea.get('differentiation','')}\n- BM: {idea.get('business_model','')}\n- 산업: {idea.get('industry','')}\n- 키워드: {', '.join(idea.get('keywords',[]))}\n")

    out.append("\n## 2) 신호(시장/경쟁/고객/리스크)\n")
    out.append(f"### Market\n{signals.get('market','')}\n\n### Competition\n{signals.get('competition','')}\n\n### Customer\n{signals.get('customer','')}\n\n### Risks\n{signals.get('risks','')}\n")
    out.append(f"\n**Explainable Score(0~5):** {json.dumps(signals.get('score_explainable',{}), ensure_ascii=False)}\n")

    out.append("\n## 3) 결론\n")
    out.append(f"**Decision:** {verdict.get('decision','')}\n\n")
    out.append("**Key reasons**\n" + "\n".join([f"- {x}" for x in verdict.get("key_reasons",[])]) + "\n")
    out.append("\n**Next actions**\n" + "\n".join([f"- {x}" for x in verdict.get("next_actions",[])]) + "\n")

    out.append("\n## 4) MVP 설계 산출물\n")
    out.append("### PRD 1p\n" + artifacts.get("prd_1p","") + "\n")
    out.append("\n### Scope (Must / Should / Could)\n" + artifacts.get("scope_must_should_could","") + "\n")
    out.append("\n### ERD (Mermaid)\n" + artifacts.get("erd_mermaid","") + "\n")
    out.append("\n### User Flow\n" + artifacts.get("user_flow","") + "\n")
    out.append("\n### Roadmap (2~4 weeks)\n" + artifacts.get("roadmap_2_4_weeks","") + "\n")
    out.append("\n### Validation Plan\n" + artifacts.get("validation_plan","") + "\n")

    out.append("\n## 5) 근거 링크(Top)\n" + "\n".join([f"- {u}" for u in links]) + "\n")
    out.append("\n## 6) Guardrail Check\n")
    out.append("```json\n" + json.dumps(guards, ensure_ascii=False, indent=2) + "\n```\n")

    return "\n".join(out)

def render_node(state: WorkflowState) -> WorkflowState:
    logs = state.get("logs", [])
    t0 = time.time()

    report = render_report(state)
    state["final_report_markdown"] = report

    logs.append({"node":"render", "t": time.time()-t0, "report_chars": len(report)})
    state["logs"] = logs
    return state


## 7) Super Graph (LangGraph)


In [None]:
from langgraph.graph import StateGraph, START, END

graph = StateGraph(WorkflowState)

graph.add_node("intake", intake_node)
graph.add_node("structurer", structurer_node)
graph.add_node("evidence_builder", evidence_builder_node)
graph.add_node("extractor", extractor_node)
graph.add_node("analysis", analysis_node)
graph.add_node("decision", decision_node)
graph.add_node("blueprint", blueprint_node)
graph.add_node("guardrail", guardrail_node)
graph.add_node("render", render_node)

graph.add_edge(START, "intake")
graph.add_conditional_edges("intake", route_after_intake, {"chat_end": END, "structurer": "structurer"})
graph.add_edge("structurer", "evidence_builder")
graph.add_edge("evidence_builder", "extractor")
graph.add_edge("extractor", "analysis")
graph.add_edge("analysis", "decision")
graph.add_conditional_edges("decision", route_after_decision, {"blueprint": "blueprint"})
graph.add_edge("blueprint", "guardrail")
graph.add_edge("guardrail", "render")
graph.add_edge("render", END)

app = graph.compile()


## 8) Single-Agent Test Logs


In [None]:
# 아이디어 입력: "기획서 자체"를 아이디어로 사용 (사용자 요구사항 반영)
IDEA_TEXT = """
IdeaProof(가칭)는 창업 아이디어를 입력하면 산업 분류 → 시장/경쟁/고객 신호 수집 → 근거 기반 검증 →
Go/No-Go/Pivot 결론을 제공하고, 결론에 따라 피벗 제안과 MVP 설계 산출물(ERD, 로드맵, 검증 플랜)을
패키지로 생성하는 서비스이다.
""".strip()

state0: WorkflowState = {"request": {"raw_request": IDEA_TEXT, "language":"ko", "tone":"concise", "mode":"standard"}, "logs":[]}

# 예: intake + structurer만 단독 테스트
tmp = intake_node(state0.copy())
tmp = structurer_node(tmp)
rprint(tmp["idea_schema"])
rprint(tmp["logs"])


## 9) Super Graph End-to-End Run Logs


In [None]:
# 실행 + 진행상황(노드별) 출력: 어디서 오래 걸리는지 바로 보이게 함
state_init: WorkflowState = {"request": {"raw_request": IDEA_TEXT, "language":"ko", "tone":"concise", "mode":"standard"}, "logs":[]}

last_log_len = 0
final_state = None

for s in app.stream(state_init, stream_mode="values"):
    logs = s.get("logs", [])
    if len(logs) > last_log_len:
        for item in logs[last_log_len:]:
            print("[LOG]", item)
        last_log_len = len(logs)
    final_state = s

print("\n\n================ FINAL REPORT (markdown) ================\n")
print((final_state or {}).get("final_report_markdown","")[:4000])


[LOG] {'node': 'intake', 't': 2.6452674865722656, 'route': 'workflow'}
[LOG] {'node': 'structurer', 't': 2.6188158988952637, 'industry': 'AI SaaS/스타트업 인큐베이션/비즈니스 인텔리전스'}
[LOG] {'node': 'evidence_builder', 't': 457.4857060909271, 'cache': 'MISS', 'collection': 'ideaproof_35b91439639a2ad8', 'mode': 'standard', 'sources': 51, 'stored_chunks': 108, 'time_budget_sec': 720, 'time_budget_hit': False}
[LOG] {'node': 'extractor', 't': 35.4906370639801, 'questions': 5}
[LOG] {'node': 'analysis', 't': 4.323936223983765, 'score': {'market': 3.0, 'competition': 2.5, 'customer': 3.5, 'risks': 2.0}}
[LOG] {'node': 'decision', 't': 4.2830810546875, 'decision': 'GO'}
[LOG] {'node': 'blueprint', 't': 7.513864040374756}
[LOG] {'node': 'guardrail', 't': 2.804211139678955, 'flags': {'policy_violation': False, 'token_overflow': False, 'copyright_risk': False, 'evidence_insufficient': False}}
[LOG] {'node': 'render', 't': 0.0003123283386230469, 'report_chars': 6034}



# IdeaProof 결과 리포트 (Prototype)

## 1) 아

## 10) (Optional) Save report as Markdown file


In [None]:
out_path = "ideaproof_report.md"
with open(out_path, "w", encoding="utf-8") as f:
    f.write(final_state.get("final_report_markdown",""))
print("saved:", out_path)


saved: ideaproof_report.md


## (Optional) Step1/Step2 수행 요약 (제출용)

### Step 1) Vanilla Python vs LangGraph 비교 (내 구현 기준)
**공통점**
- LLM(Upstage) 호출 + Tool(Web search/Download/PDF parse/VectorDB/Rerank) 조합으로 근거 수집 → 요약/판단
- 상태(요청/중간 산출물/로그)를 누적하며 최종 리포트 생성

**차이점**
- Vanilla: 순차 호출/예외처리/토큰 관리 등을 코드 흐름으로 직접 제어(단순하지만 규모 커지면 유지보수 비용↑)
- LangGraph: 노드/엣지로 제어 흐름이 명확하고, 실행 흐름 추적(stream)과 디버깅이 쉬움(구성 초기 비용은↑)

### Step 2) Query 생성(Expansion) 및 search→rerank 관찰
- Evidence Builder(검색) 단계에서 mode(standard/deep)에 따라 검색 쿼리 다양화/확장 정도가 달라짐
- Extractor 단계에서 VectorDB 후보를 가져온 뒤 LLM-as-Judge로 rerank하여 최종 근거를 좁힘
- 관찰 포인트(로그):
  - 생성된 queries 목록(다양성/키워드 커버리지)
  - 수집된 소스 수 및 chunk 수
  - rerank 상위 근거의 점수 분포(상위-하위 격차)
