In [94]:
import re
from typing import Annotated, Literal, List, Optional
from typing_extensions import TypedDict
import os 

# LangGraph
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage

# Document 폴백 (langchain.schema.Document가 없을 때 대비)
try:
    from langchain.schema import Document  # type: ignore
except Exception:
    from dataclasses import dataclass
    @dataclass
    class Document:
        page_content: str
        metadata: dict

# === 간단한 벡터스토어 대체 (키워드 기반 '의미론적' 검색) ===
class SimpleKeywordVectorStore:
    """토큰 겹침(자카드 유사도)으로 흉내내는 경량 검색기."""
    def __init__(self):
        self._docs: List[Document] = []
        self._tokens: List[set] = []

    @staticmethod
    def _tokenize(text: str) -> set:
        # 한글/숫자/통화기호 정도만 남기고 토큰화
        text = re.sub(r"[^가-힣a-zA-Z0-9₩,.\s]", " ", text)
        toks = set(t.strip() for t in re.split(r"\s+", text) if t.strip())
        return toks

    def add_documents(self, docs: List[Document]) -> None:
        for d in docs:
            self._docs.append(d)
            self._tokens.append(self._tokenize(d.page_content + " " + " ".join(f"{k}:{v}" for k, v in d.metadata.items())))

    def similarity_search(self, query: str, k: int = 4) -> List[Document]:
        if not self._docs:
            return []
        q = self._tokenize(query)
        scores = []
        for i, toks in enumerate(self._tokens):
            inter = len(q & toks)
            union = len(q | toks) if q or toks else 1
            score = inter / union 
            scores.append((score, i))
        scores.sort(reverse=True, key=lambda x: x[0])
        # top = [self._docs[i] for s, i in scores[:k] if s > 0]
        # 🚨🚨🚨 (s > 0 필터 제거) 🚨🚨🚨
        top = [self._docs[i] for s, i in scores[:k]]            
        return top

# === 메뉴 정보 추출 ===
def extract_menu_info(doc: Document) -> dict:
    """Vector DB 문서에서 구조화된 메뉴 정보 추출"""
    content = doc.page_content
    menu_name = doc.metadata.get('menu_name', 'Unknown')

    # '가격: ₩[숫자]' 패턴을 찾음
    price_match = re.search(r'가격:\s*(₩[\d,]+)', content)
    # '설명: [내용]' 패턴을 찾음 (줄바꿈 포함)
    description_match = re.search(r'설명:\s*(.+?)(?:\n\d|\Z)', content, re.DOTALL) 

    return {
        "name": menu_name,
        "price": price_match.group(1).strip() if price_match else "가격 정보 없음",
        "description": description_match.group(1).strip() if description_match else "설명 없음"
    }

# === 문의 유형 분류 ===
def classify_user_query(user_message: str) -> Literal["가격 문의", "추천 요청", "메뉴 문의", "기타"]:
    msg = user_message.strip()
    if "가격" in msg or "얼마" in msg or "비싸" in msg or "가격표" in msg:
        return "가격 문의"
    if "추천" in msg or "뭐가 좋아" in msg or "인기" in msg or "베스트" in msg:
        return "추천 요청"
    if "메뉴" in msg or "있나요" in msg or "파나요" in msg or "설명" in msg:
        return "메뉴 문의"
    return "기타"

# === 상태 정의 ===
class CafeState(TypedDict, total=False):
    messages: Annotated[List[AnyMessage], add_messages]
    route: Literal["가격 문의", "추천 요청", "메뉴 문의", "기타"]
    last_query: str
    search_results: List[dict] 

# 🚨🚨🚨 파일 읽기 함수 추가 🚨🚨🚨
def read_menu_data_from_file(file_path: str) -> Optional[str]:
    """지정된 파일 경로에서 텍스트 데이터를 읽어옵니다."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    except FileNotFoundError:
        print(f"오류: 파일을 찾을 수 없습니다. 경로를 확인해 주세요: {file_path}")
        return None
    except Exception as e:
        print(f"파일 읽기 중 오류 발생: {e}")
        return None

# 🚨🚨🚨 데이터 파싱 함수 🚨🚨🚨
def load_menu_data(data_string: str) -> List[Document]:
    """텍스트 파일 내용을 파싱하여 Document 객체 리스트로 반환"""
    docs = []
    # 메뉴 번호와 이름 추출 (예: '1. 아메리카노')
    menu_blocks = re.split(r'\n(?=\d+\.\s)', data_string.strip())
    
    for block in menu_blocks:
        if not block.strip():
            continue
            
        name_match = re.match(r'^\d+\.\s*(.+)', block)
        if not name_match:
            continue
            
        menu_name = name_match.group(1).strip()
        
        # 메뉴 이름이 포함된 첫 줄을 제거하고 나머지 내용을 Document content로 사용
        content = re.sub(r'^\d+\.\s*.+\n', '', block, 1).strip()
        content = content.replace('\t', '').replace('• ', '').replace(' ', '')

        docs.append(
            Document(
                page_content=content, 
                metadata={"menu_name": menu_name}
            )
        )
    return docs

# 🚨🚨🚨 데이터 소스 (벡터스토어) 준비 🚨🚨🚨
MENU_FILE_PATH = '/Applications/SK-Shieldus-Rookies_Cloud_Study/mylangchain/mylangchain-app/src/mylangchain_app/Practice/data/cafe_menu_data.txt' # ⭐⭐ 파일 경로 설정 ⭐⭐

# 1. 파일에서 데이터 읽기
menu_data_text = read_menu_data_from_file(MENU_FILE_PATH)

MENU_DB = SimpleKeywordVectorStore()

if menu_data_text:
    # 2. 읽은 데이터를 Document로 파싱
    menu_documents = load_menu_data(menu_data_text)
    # 3. 벡터스토어에 추가
    MENU_DB.add_documents(menu_documents)
    print(f"✅ {len(menu_documents)}개의 메뉴 데이터를 로드했습니다.")
else:
    print("❌ 데이터 로드 실패. 챗봇이 작동하지 않을 수 있습니다.")


# === 노드 구현  ===
def node_classify(state: CafeState) -> CafeState:
    """마지막 사용자 메시지로 문의 유형을 결정하고 route에 기록."""
    last_user = None
    for m in reversed(state["messages"]):
        if isinstance(m, HumanMessage):
            last_user = m
            break
    user_text = (last_user.content if last_user else "").strip()
    route = classify_user_query(user_text) if user_text else "기타"
    state["route"] = route
    state["last_query"] = user_text
    return state

def node_search(state: CafeState) -> CafeState:
    """문의 유형별 검색 전략 수행 후, 중간 결과(구조화) 메시지로 저장."""
    route = state.get("route", "기타")
    user_message = state.get("last_query", "")

    all_menu_names = " ".join([d.metadata['menu_name'] for d in MENU_DB._docs]) 

    if route == "가격 문의":
        docs = MENU_DB.similarity_search(f"{user_message} 메뉴 가격 {all_menu_names}", k=10) 
        
    elif route == "추천 요청":
        enhanced_query = user_message + " " + all_menu_names
        docs = MENU_DB.similarity_search(enhanced_query, k=3)
        if not docs:
            docs = MENU_DB.similarity_search(f"인기 메뉴 {all_menu_names}", k=3)
            
    elif route == "메뉴 문의":
        docs = MENU_DB.similarity_search(user_message, k=4)
    else:
        docs = MENU_DB.similarity_search(user_message, k=3)

    if not docs:
        state["messages"] += [AIMessage(content="관련 정보를 찾지 못했습니다.")]
        state["search_results"] = [] 
        return state

    # infos = [extract_menu_info(d) for d in docs[:5]]
     # 🚨🚨🚨 docs[:5]를 docs[:] 또는 docs로 변경 🚨🚨🚨
    infos = [extract_menu_info(d) for d in docs] # docs 전체를 사용하도록 수정
    state["search_results"] = infos  
    return state


def node_respond(state: CafeState) -> CafeState:
    route = state.get("route", "기타")
    infos: List[dict] = state.get("search_results", [])
    query = state.get("last_query", "")

    if not infos:
        state["messages"] += [AIMessage(content="죄송합니다. 해당 요청을 처리할 수 없습니다. 다른 질문을 해주세요.")]
        return state

    # ⭐⭐ 질문에서 특정 메뉴 이름 찾기 - 전체 메뉴 DB를 대상으로 확인 ⭐⭐
    target_menu_name = None
    # 전체 메뉴 목록을 가져옵니다. (검색 결과에 관계 없이)
    all_known_menus = sorted([d.metadata['menu_name'] for d in MENU_DB._docs], key=len, reverse=True) 
    
    # 질문에 전체 메뉴 중 하나가 포함되어 있는지 확인
    for menu_name in all_known_menus:
        if menu_name in query: 
            target_menu_name = menu_name
            break

    is_specific_query = (target_menu_name is not None)
    top = infos[0] # 기본값은 검색 결과 1위

    if is_specific_query:
        # 특정 메뉴가 언급되었다면, 해당 메뉴 정보를 전체 DB에서 찾아서 top으로 설정
        target_doc = next((d for d in MENU_DB._docs if d.metadata.get('menu_name') == target_menu_name), None)
        if target_doc:
            top = extract_menu_info(target_doc)
        # 검색에 실패했거나 메뉴가 발견되지 않아도 top은 infos[0]으로 유지됨 (예외 처리)
    
    # ⭐⭐ '가격 문의' 시 특정 메뉴 언급 여부에 따라 응답 분기 ⭐⭐
    if route == "가격 문의":
        if is_specific_query:
            # ➡️ 특정 메뉴 가격 요청: 해당 메뉴만 응답 (티라미수, 바닐라 라떼 등)
            msg = f"{top['name']}의 가격은 {top['price']}입니다."
        else:
            # ➡️ 일반 가격 목록 요청: 검색된 상위 메뉴들을 응답 ("메뉴 가격 알려줘" 등)
            price_list = [f"{info['name']}: {info['price']}" for info in infos[:10]]
            
            if price_list:
                 msg = "문의하신 관련 메뉴들의 가격 정보입니다:\n" + "\n".join(price_list)
            else:
                 msg = "메뉴 가격 정보를 찾을 수 없습니다."
            
    elif route == "추천 요청":
        msg = f"추천 메뉴: {top['name']} — {top['description']} ({top['price']})."
        
    elif route == "메뉴 문의":
        msg = f"{top['name']} 설명: {top['description']} (가격: {top['price']})."

    else: # route == "기타"
        msg = f"요청에 맞는 정보를 찾지 못했습니다. 관련 메뉴: {top['name']} — {top['description']} ({top['price']})."

    state["messages"] += [AIMessage(content=msg)]
    return state

# === 그래프 구성 ===
graph = StateGraph(CafeState)
graph.add_node("classify", node_classify)
graph.add_node("search", node_search)
graph.add_node("respond", node_respond)

graph.add_edge(START, "classify")
graph.add_edge("classify", "search")
graph.add_edge("search", "respond")
graph.add_edge("respond", END)

app = graph.compile()

print("\n--- 결과 출력 ---")
state1: CafeState = {
     "messages": [HumanMessage(content="이 카페의 메뉴 가격을 알고 싶어요.")]
    #  "messages": [HumanMessage(content="이 카페에서 제일 인기 많은 메뉴를 추천해주세요")]
    #  "messages": [HumanMessage(content="카라멜 마키아토에 뭐가 들어가는지 설명해 주세요.")]
    # "messages": [HumanMessage(content="티라미수는 가격이 얼마인가요?")]
}
result1 = app.invoke(state1)
for m in result1["messages"]:
    role = "USER" if isinstance(m, HumanMessage) else "ASSISTANT"
    print(f"[{role}] {m.content}")

✅ 10개의 메뉴 데이터를 로드했습니다.

--- 결과 출력 ---
[USER] 이 카페의 메뉴 가격을 알고 싶어요.
[ASSISTANT] 문의하신 관련 메뉴들의 가격 정보입니다:
바닐라 라떼: ₩6,000
녹차 라떼: ₩5,800
아이스 아메리카노: ₩4,500
카페라떼: ₩5,500
카라멜 마키아토: ₩6,500
프라푸치노: ₩7,000
카푸치노: ₩5,000
티라미수: ₩7,500
아메리카노: ₩4,500
콜드브루: ₩5,000


  content = re.sub(r'^\d+\.\s*.+\n', '', block, 1).strip()
