- https://www.youtube.com/watch?v=uZoz3T3Z6-w&t=634s
- https://github.com/Coding-Crashkurse/LangGraph-Tutorial/blob/main/crag.ipynb

# Self-Corrective RAG with LangGraph

### 1. 개요
- 소개: 
    - LangGraph를 사용하여 자가 교정 RAG를 수행하는 방법을 소개합니다.
- 필요성: 
    - 사용자 질문이 벡터 데이터베이스에서 검색되었을 때, 항상 상위 K개의 문서(예: 4개)를 검색하게 됩니다. 하지만 이 문서들이 실제로 질문에 적합한지는 알 수 없습니다. 이를 해결하기 위해 자가 교정 RAG 시스템을 도입합니다.

### 2. 자가 교정 RAG 시스템
- 개념: 
    - 각 문서를 LLM이 이진 점수(예/아니오)로 평가하여 질문에 적합한지 여부를 판단합니다. 이 정보를 바탕으로 질문을 재작성하거나 검색된 문서 수를 줄일 수 있습니다.

In [None]:
from dotenv import load_dotenv

load_dotenv()

In [None]:
# LangSmith 설정

import os

os.environ["LANGCHAIN_TRACING_V2"]="true"
os.environ["LANGCHAIN_ENDPOINT"]="https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"]="ls__708b8970829247d1a055f33c434aad1d"
os.environ["LANGCHAIN_PROJECT"]="edu-langchain-0326"

### 3. 단계별 구현

LLM 객체 생성

In [None]:
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
import os

def Get_LLM():    
    os.environ["AZURE_OPENAI_API_KEY"] = '352a6bee97b5451ab5866993a7ef4ce4'
    os.environ["AZURE_OPENAI_ENDPOINT"] = 'https://aoai-spn-krc.openai.azure.com/'
    model = AzureChatOpenAI(  
      api_version = '2024-02-01',
      azure_deployment = 'gpt-4o-kr-spn',
      temperature = 0.0
    )
    return model

def Get_Embedding():
    os.environ["AZURE_OPENAI_API_KEY"] = '5acedf2738034ef4be0cd6f075a8e4a3'
    os.environ["AZURE_OPENAI_ENDPOINT"] = 'https://aoaibhkim2.openai.azure.com/'
    return AzureOpenAIEmbeddings(
      azure_deployment='txt-embed-ada-002-au',
      openai_api_version='2024-03-01-preview',     
    )

벡터 데이터베이스 생성
- 사용 도구: Chroma를 벡터 저장소로 사용, AzureOpenAI 임베딩을 문서 임베딩에 사용.
- 문서 예시: 가상의 레스토랑 'Bel Vista'에 대한 정보를 포함한 4개의 문서.

In [None]:
from langchain.schema import Document
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI
from langchain_community.vectorstores import Chroma
import os

docs = [
    Document(
        page_content="벨라비스타는 요리 업계에서 20년 이상의 경력을 보유한 유명한 셰프인 Antonio Rossi가 소유하고 있습니다. 그는 지역 사회에 정통 이탈리아 맛을 선사하기 위해 벨라비스타를 시작했습니다.",
        metadata={"source": "restaurant_info.txt"},
    ),
    Document(
        page_content="벨라비스타는 다양한 예산에 맞는 가격으로 다양한 요리를 제공합니다. 애피타이저 가격은 8달러부터, 메인 코스 가격은 15달러~35달러, 디저트 가격은 6달러~12달러입니다.",
        metadata={"source": "restaurant_info.txt"},
    ),
    Document(
        page_content="벨라비스타는 월요일부터 일요일까지 영업합니다. 평일은 오전 11시부터 오후 10시까지, 주말은 오전 11시부터 오후 11시까지 연장 운영됩니다.",
        metadata={"source": "restaurant_info.txt"},
    ),
    Document(
        page_content="벨라비스타는 점심 메뉴, 저녁 메뉴, 주말 특별 브런치 메뉴 등 다양한 메뉴를 선보이고 있습니다. 점심 메뉴에는 가벼운 이탈리아 요리가 포함되어 있으며, 저녁 메뉴에는 더욱 다양한 전통 요리와 현대 요리가 제공되며, 브런치 메뉴에는 고전적인 아침 식사 항목과 이탈리아 특선 요리가 모두 포함되어 있습니다.",
        metadata={"source": "restaurant_info.txt"},
    ),
]

벡터 저장소 생성 및 검색기 변환
- from_documents 클래스 메서드를 사용하여 벡터 저장소를 생성하고, 문서와 임베딩 함수를 전달.
- 표준화된 인터페이스를 제공하여 문서 검색을 수행.

In [None]:
embedding_function = Get_Embedding()

db = Chroma.from_documents(docs, embedding_function)
retriever = db.as_retriever()

에이전트 상태 관리:
- 속성: 질문, 점수(예/아니오), LLM 출력, 문서, 주제 적합 여부를 저장.
- 조건: 질문이 주제와 관련 없는 경우, 검색을 수행하지 않고 사용자에게 알림.

In [None]:
from typing_extensions import TypedDict

class AgentState(TypedDict):
    question: str # 질문
    grades: list[str] # 점수 (yes/no)
    llm_output: str # LLM 출력
    documents: list[str] # 문서
    on_topic: bool # 주제 적합 여부

문서 검색 함수:
- 상태에서 질문을 추출하고 검색기의 get_relevant_documents 메서드에 전달.
- 문서의 페이지 내용을 추출하여 LLM이 평가하도록 함.

In [None]:
def retrieve_docs(state: AgentState):
    print(f"NODE: retrieve_docs START!")
    question = state["question"]
    print(f"NODE: retrieve_docs QUESTION : {question}")
    documents = retriever.get_relevant_documents(query=question)
    state["documents"] = [doc.page_content for doc in documents]
    print(f"NODE: retrieve_docs DOCUMENT : {state["documents"]}")
    return state

초기 질문 분류기:
- 목적: 질문이 레스토랑과 관련이 있는지 여부를 판단.
- 방법: 커스텀 출력 클래스 생성, LLM에게 질문이 주제와 관련이 있는지 여부를 평가하도록 함.

In [None]:
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate

# 질문 관련성 판단 결과 출력 클래스
class GradeQuestion(BaseModel): 
    """벨라비스타 레스토랑과 관련된 질문인지 확인하는 이진 점수"""

    score: str = Field(
        description="질문은 벨라비스타 레스토랑에 관한 것입니다. -> 'Yes' 또는 'No'"
    )

# 질문 관련성 판단 함수
def question_classifier(state: AgentState):
    print(f"NODE: question_classifier START!")

    question = state["question"]

    system = """귀하는 검색된 문서와 사용자 질문의 관련성을 평가하는 채점자입니다. \n
        질문이 다음 주제 중 하나에 관한 경우에만 대답하십시오:
        1. 벨라비스타 소유자(Antonio Rossi)에 대한 정보.
        2. 벨라비스타의 요리 가격.
        3. 벨라비스타 영업시간
        4. 벨라비스타에서 이용 가능한 메뉴입니다.

        이러한 주제에 관한 질문인 것인지 ('yes' 또는 'no')
        """

    grade_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system),
            ("human", "User question: {question}"),
        ]
    )

    llm = Get_LLM()
    structured_llm = llm.with_structured_output(GradeQuestion)
    grader_llm = grade_prompt | structured_llm
    result = grader_llm.invoke({"question": question})
    print(f"NODE: question_classifier RESULT : {result}")
    state["on_topic"] = result.score
    return state

질문 관련성 분류 라우터

In [None]:
# 질문 관련성 결과 반환 함수
def on_topic_router(state: AgentState):
    print(f"NODE: on_topic_router START!")
    on_topic = state["on_topic"]
    print(f"NODE: on_topic_router ON TOPIC : {on_topic}")
    if on_topic.lower() == "yes":
        return "on_topic"
    return "off_topic"

In [None]:
# 질문 관련성 없음 처리 함수
def off_topic_response(state: AgentState):
    print(f"NODE: off_topic_response START!")
    state["llm_output"] = "그 질문에는 답변할 수 없습니다!"
    return state

문서 평가기:
- 목적: 검색된 문서가 질문과 관련이 있는지 여부를 평가.
- 방법: 커스텀 팬데틱 클래스 생성, LLM이 각 문서를 평가하도록 함.

In [None]:
# 질문과 검색 결과의 관련성 평가 결과 출력 클래스
class GradeDocuments(BaseModel):
    """검색된 문서의 관련성을 확인하기 위한 이진 점수"""

    score: str = Field(
        description="문서는 질문과 관련이 있습니다, 'Yes' or 'No'"
    )

# 질문과 검색 결과의 관련성 평가 함수
def document_grader(state: AgentState):
    print(f"NODE: document_grader START!")
    docs = state["documents"]
    print(f"NODE: document_grader DOCS : {docs}")
    question = state["question"]
    print(f"NODE: document_grader QUESTION : {question}")

    # 시스템 프롬프트:
    # 귀하는 검색된 문서와 사용자 질문의 관련성을 평가하는 채점자입니다.
    # 문서에 질문과 관련된 키워드나 의미론적 의미가 포함되어 있는 경우 관련성 등급을 매깁니다.
    # 문서가 질문과 관련이 있는지 여부를 나타내기 위해 이진 점수 '예' 또는 '아니요' 점수를 부여합니다.

    system = """You are a grader assessing relevance of a retrieved document to a user question. \n
        If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
        Give a binary score 'Yes' or 'No' score to indicate whether the document is relevant to the question."""

    grade_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system),
            (
                "human",
                "Retrieved document: \n\n {document} \n\n User question: {question}",
            ),
        ]
    )

    llm = Get_LLM()
    structured_llm = llm.with_structured_output(GradeDocuments)
    grader_llm = grade_prompt | structured_llm
    scores = []
    for doc in docs:
        result = grader_llm.invoke({"document": doc, "question": question})
        scores.append(result.score)
    state["grades"] = scores
    print(f"NODE: document_grader GRADES : {scores}")
    return state

생성 라우터:
- 모든 문서가 '예'인 경우 최종 답변 생성.
- '아니오'인 문서가 있는 경우 질문을 재작성.

In [None]:
# 생성 라우터 함수
def gen_router(state: AgentState):
    print(f"NODE: gen_router START!")
    grades = state["grades"]
    print(f"NODE: gen_router GRADES : {grades}")
    if any(grade.lower() == "yes" for grade in grades): # 최종 답변 생성
        filtered_grades = [grade for grade in grades if grade.lower() == "yes"]
        print(f"NODE: gen_router RETURN : generate")
        return "generate"
    else:
        print(f"NODE: gen_router RETURN : rewrite_query")
        return "rewrite_query" # 질문 재 생성
    
# def gen_router(state: AgentState):
#     grades = state["grades"]
#     print("Document Grades:", grades)
#     if all(grade.lower() == "yes" for grade in grades):
#         return "generate"
#     else:
#         return "rewrite_query"

질문 재 작성기:
- 질문을 웹 검색에 최적화된 버전으로 변환.

In [None]:
from langchain_core.output_parsers import StrOutputParser

# 질문 재 작성 함수
def rewriter(state: AgentState):
    print(f"NODE: rewriter START!")
    question = state["question"]

    # 시스템 프롬프트:
    # 당신은 입력 질문을 최적화된 더 나은 버전으로 변환하는 질문 재작성자입니다.
    # 웹 검색용. 입력을 보고 기본 의미론적 의도/의미에 대해 추론해 보세요.    

    system = """You a question re-writer that converts an input question to a better version that is optimized \n
        for web search. Look at the input and try to reason about the underlying semantic intent / meaning."""
    re_write_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system),
            (
                "human",
                # 초기 질문은 다음과 같습니다. \n\n {question} \n 개선된 질문을 작성하세요.
                "Here is the initial question: \n\n {question} \n Formulate an improved question.",
            ),
        ]
    )
    llm = Get_LLM()
    question_rewriter = re_write_prompt | llm | StrOutputParser()
    output = question_rewriter.invoke({"question": question})
    print(f"NODE: rewriter QUESTION : {output}")
    state["question"] = output
    return state

최종 답변 생성:
- 재작성된 질문과 문서를 바탕으로 최종 답변을 생성.

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser

# 최종 답변 생성 함수
def generate_answer(state: AgentState):
    print(f"NODE: generate_answer START!")
    llm = Get_LLM()
    question = state["question"]
    print(f"NODE: generate_answer QUESTION : {question}")
    docs = state["documents"]
    print(f"NODE: generate_answer DOCUMENTS : {docs}")

    template = """Answer the question based only on the following context:
    {context}

    Question: {question}
    """

    prompt = ChatPromptTemplate.from_template(
        template=template,
    )
    chain = prompt | llm | StrOutputParser()
    result = chain.invoke({"question": question, "context": docs})
    state["llm_output"] = result
    print(f"NODE: generate_answer LLM OUTPUT : {result}")
    return state

### 4. 워크플로우 구성

노드:
- 주제 결정: 질문 분류기 사용.
- 문서 검색: 문서 검색 함수 실행.
- 문서 평가: 문서 평가 함수 실행.
- 질문 재작성: 질문 재작성 함수 실행.
- 답변 생성: 최종 답변 생성 함수 실행.

In [None]:
from langgraph.graph import StateGraph, END

workflow = StateGraph(AgentState)

workflow.add_node("topic_decision", question_classifier) # 주제 결정: on_topic_router 사용.
workflow.add_node("retrieve_docs", retrieve_docs) # 문서 검색: 문서 검색 함수 실행
workflow.add_node("document_grader", document_grader) # 문서 평가: 문서 평가 함수 실행.
workflow.add_node("rewrite_query", rewriter) # 질문 재작성: 질문 재작성 함수 실행.
workflow.add_node("generate_answer", generate_answer) # 답변 생성: 최종 답변 생성 함수 실행.

workflow.add_node("off_topic_response", off_topic_response)

에지:
- 주제 결정 결과에 따라 검색 또는 오프 토픽 응답.
- 문서 평가 결과에 따라 최종 답변 생성 또는 질문 재작성.

In [None]:
# 주제 결정 결과에 따라 검색 또는 오프 토픽 응답.
workflow.add_conditional_edges(
    "topic_decision",
    on_topic_router,
    {
        "on_topic": "retrieve_docs",
        "off_topic": "off_topic_response",
    },
)

workflow.add_edge("retrieve_docs", "document_grader")

# 문서 평가 결과에 따라 최종 답변 생성 또는 질문 재작성.
workflow.add_conditional_edges(
    "document_grader",
    gen_router,
    {
        "generate": "generate_answer",
        "rewrite_query": "rewrite_query",
    },
)

workflow.add_edge("rewrite_query", "retrieve_docs")

workflow.add_edge("generate_answer", END)
workflow.add_edge("off_topic_response", END)

workflow.set_entry_point("topic_decision")

app = workflow.compile()

시각화

In [None]:
from IPython.display import Image, display

try:
    display(Image(app.get_graph(xray=True).draw_mermaid_png()))
except:
    pass

In [None]:
result = app.invoke({"question": "날씨가 어때?"})
result["llm_output"]

In [None]:
result = app.invoke({"question": "벨라비스타 주인은 누구야?"})
result["llm_output"]

In [None]:
result = app.invoke({"question": "벨라비스타 메뉴에 자장면도 있어?"})
result["llm_output"]