In [None]:
import os
import re
import json
import warnings
from pathlib import Path
from typing import TypedDict, Annotated, List, Any
from operator import itemgetter

from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

try:
    from dotenv import load_dotenv; load_dotenv()
except ImportError:
    pass

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")

if not OPENAI_API_KEY:
    warnings.warn("OPENAI_API_KEY가 설정되지 않았습니다. LLM 호출이 실패합니다.")

warnings.filterwarnings("ignore")

BASE_DIR = Path(os.getcwd())
DB_DIR = BASE_DIR / "db" / "cafe_db"

class GraphState(TypedDict):
    messages: Annotated[List[Any], add_messages]
    query_type: str 
    db_result: str 
    
def load_db_instance() -> FAISS:
    if not all([FAISS, OpenAIEmbeddings]):
        raise RuntimeError("FAISS 또는 OpenAIEmbeddings 패키지가 필요합니다.")
    db_path = str(DB_DIR)
    if not (DB_DIR / "index.faiss").exists():
        raise FileNotFoundError(f"DB 파일이 {db_path}에 없습니다.")
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    return FAISS.load_local(db_path, embeddings, allow_dangerous_deserialization=True)

try:
    menu_db = load_db_instance()
    print("Vector DB 로드 완료.")
except Exception as e:
    print(f"DB 로드 실패: {e}")
    menu_db = None

def extract_menu_info(doc: Document) -> dict:
    content = doc.page_content
    menu_name = doc.metadata.get('name', 'Unknown')
    price_match = re.search(r'^\s*•\s*가격:\s*(₩[\d,]+)', content, re.MULTILINE)
    description_match = re.search(r'•\s*설명:\s*(.+?)(?:\n\s*•|\Z)', content, re.DOTALL)
    return {
        "name": menu_name,
        "price": price_match.group(1) if price_match else "가격 정보 없음", 
        "description": description_match.group(1).strip().replace('\n', ' ') if description_match else "설명 없음"
    }

def detect_specific_menu(message: str) -> str | None:
    menu_keywords = ["아메리카노", "카페라떼", "카푸치노", "바닐라 라떼", "카라멜 마키아토", "콜드브루", "프라푸치노", "녹차 라떼", "티라미수"]
    for keyword in menu_keywords:
        if keyword.lower() in message.lower():
            return keyword
    return None

def classify_query(state: GraphState) -> GraphState:
    user_message = state["messages"][-1].content
    user_message_lower = user_message.lower()
    detected_menu = detect_specific_menu(user_message)
    if "가격" in user_message_lower or "얼마" in user_message_lower:
        if detected_menu:
            query_type = "price_specific" 
        else:
            query_type = "price_general" 
    elif "추천" in user_message_lower or "뭐 마실까" in user_message_lower:
        query_type = "recommend"
    elif "메뉴" in user_message_lower or "있어" in user_message_lower or "종류" in user_message_lower:
        query_type = "menu"
    else:
        query_type = "unknown"
    print(f"분류 결과: {query_type}")
    return {"query_type": query_type}

def execute_search(state: GraphState) -> GraphState:
    user_message = state["messages"][-1].content
    query_type = state["query_type"]
    if menu_db is None:
        return {"db_result": "Vector DB를 로드할 수 없어 검색이 불가능합니다."}
    if query_type == "price_specific":
        query = user_message
        k = 1
    elif query_type == "price_general":
        query = "모든 메뉴 가격 목록"
        k = 5
    elif query_type == "recommend":
        query = f"{user_message} 인기 메뉴"
        k = 3
    elif query_type == "menu":
        query = user_message
        k = 4
    else:
        query = user_message
        k = 3
    print(f"검색 쿼리: {query}")
    docs = menu_db.similarity_search(query, k=k)
    search_results = []
    for doc in docs:
        info = extract_menu_info(doc)
        search_results.append(
            f"메뉴: {info['name']}, 가격: {info['price']}, 특징: {info['description']}"
        )
    return {"db_result": "\n".join(search_results)}

def generate_response(state: GraphState) -> GraphState:
    db_result = state["db_result"]
    chat_history = state["messages"]
    query_type = state["query_type"]
    if db_result.startswith("Vector DB를 로드할 수 없어"):
        ai_content = "죄송해요. 현재 메뉴 DB에 접근할 수 없어 정확한 정보를 드릴 수 없습니다."
        return {"messages": [AIMessage(content=ai_content)]}
    if not all([ChatOpenAI, OPENAI_API_KEY]):
        ai_content = f"DB 검색 결과: {db_result[:100]}... OpenAI 키가 없어 답변을 생성할 수 없습니다."
        return {"messages": [AIMessage(content=ai_content)]}
    llm = ChatOpenAI(model=OPENAI_MODEL, temperature=0.2)
    system_prompt = (
        "당신은 친절한 카페 메뉴 추천 및 안내 AI입니다. "
        "다음 검색 결과와 이전 대화 이력을 참고하여 사용자에게 가장 적절하고 친절한 답변을 한국어로 생성하세요. "
        "검색 결과에 없는 내용은 추측하지 마세요. "
        "문의 유형은 '{query_type}'입니다."
    )
    final_prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt.format(query_type=query_type)),
        *chat_history,
        ("user", f"위에 주어진 검색 결과를 바탕으로 답변해줘. 검색 결과:\n{db_result}"),
    ])
    response_chain = final_prompt | llm | StrOutputParser()
    ai_content = response_chain.invoke({})
    return {"messages": [AIMessage(content=ai_content)]}

workflow = StateGraph(GraphState)
workflow.add_node("classify", classify_query)
workflow.add_node("search", execute_search)
workflow.add_node("generate", generate_response)
workflow.set_entry_point("classify")
workflow.add_edge("classify", "search")
workflow.add_edge("search", "generate")
workflow.add_edge("generate", END)
app = workflow.compile()

def run_conversation(prompt: str, app_instance, thread_id: str) -> str:
    initial_state = {"messages": [HumanMessage(content=prompt)]}
    final_state = app_instance.invoke(
        initial_state,
        config={"configurable": {"thread_id": thread_id}}
    )
    final_message = final_state["messages"][-1]
    return final_message.content

if __name__ == "__main__":
    print("=" * 50)
    print("LangGraph 기반 메뉴 추천 시스템 테스트 시작")
    print("=" * 50)
    conversation_id = "user-cafe-003" 
    q1 = "아메리카노의 가격과 주요 특징을 자세히 알려줄래?"
    print(f"Q1: {q1}")
    a1 = run_conversation(q1, app, conversation_id)
    print(f"A1: {a1}")
    q2 = "요즘 인기 있는 메뉴 중에서 달콤하고 시원한 걸로 하나 추천해 줘."
    print(f"\nQ2: {q2}")
    a2 = run_conversation(q2, app, conversation_id)
    print(f"A2: {a2}")
    q3 = "그럼 내가 추천 받은 메뉴 가격은 얼마야?"
    print(f"\nQ3: {q3}")
    a3 = run_conversation(q3, app, conversation_id)
    print(f"A3: {a3}")
    print("=" * 50)
    print(f"테스트 종료 (대화 ID: {conversation_id})")
    print("=" * 50)


✅ Vector DB 로드 완료.

LangGraph 기반 메뉴 추천 시스템 테스트 시작 (가격 정확도 개선)
🙋‍♂️ Q1 (가격/메뉴): 아메리카노의 가격과 주요 특징을 자세히 알려줄래?
분류 결과: price_specific
검색 쿼리: 아메리카노의 가격과 주요 특징을 자세히 알려줄래?
🤖 A1: 아메리카노의 가격은 ₩4,500입니다. 이 음료는 진한 에스프레소에 뜨거운 물을 더해 만든 클래식한 블랙 커피로, 원두 본연의 맛을 가장 잘 느낄 수 있는 음료입니다. 깔끔하고 깊은 풍미가 특징이며, 원하신다면 설탕이나 시럽을 추가하여 드실 수도 있습니다. 커피의 진한 맛을 즐기고 싶으시다면 아메리카노를 추천드립니다!

🙋‍♂️ Q2 (추천 요청): 요즘 인기 있는 메뉴 중에서 달콤하고 시원한 걸로 하나 추천해 줘.
분류 결과: recommend
검색 쿼리: 요즘 인기 있는 메뉴 중에서 달콤하고 시원한 걸로 하나 추천해 줘. 인기 메뉴
🤖 A2: 요즘 인기 있는 메뉴 중에서 달콤하고 시원한 것을 찾으신다면 **프라푸치노**를 추천드립니다! 이 음료는 에스프레소와 우유, 얼음을 블렌더에 갈아 만든 시원한 음료로, 부드럽고 크리미한 질감이 특징입니다. 또한 휘핑크림이 올라가 있어 더욱 달콤한 맛을 즐길 수 있습니다. 여름철에 특히 인기 있는 메뉴이니, 시원하게 즐기기에 안성맞춤입니다! 가격은 ₩7,000입니다.

🙋‍♂️ Q3 (일반 가격 문의): 그럼 내가 추천 받은 메뉴 가격은 얼마야?
분류 결과: price_general
검색 쿼리: 모든 메뉴 가격 목록
🤖 A3: 추천 받은 메뉴의 가격은 다음과 같습니다:

- 바닐라 라떼: ₩6,000
- 카페라떼: ₩5,500
- 티라미수: ₩7,500
- 녹차 라떼: ₩5,800
- 아메리카노: ₩4,500

원하시는 메뉴의 가격을 확인하실 수 있습니다! 추가로 궁금한 점이 있으시면 언제든지 말씀해 주세요.

테스트 종료 (대화 ID: user-cafe-003)
특정 메뉴 가격 문의에 대한 검색 정확도를 높였습니다. 🚀