# Self - RAG

**Self-RAG**는 검색된 문서와 생성된 응답에 대한 자기 반선 (self-reflection) 및 자기 평가 (self-evaluation)를 포함한 RAG 전략으로, RAG 기반 시스템의 성능 향상에 기여할 수 있습니다. 

#### Self-RAG란?
Self-RAG는 검색된 문서와 생성된 응답 모두에 대해 점검하고 검증하는 추가 단계를 포함하는 RAG 전략입니다. 전통적인 RAG 에서는 검색된 정보를 기반으로 LLM이 답변을 생성하는 것이 주된 과정이었다면, Self-RAG에서는 자체평가를 통해 다음과 같은 사항을 검증합니다. 

1. 검색할 필요성 판단 : 현재 질문에 대해 추가 검색이 필요한지 여부를 판단합니다. 
2. 검색 결과 관련성 평가 : 검색된 문서 조각(청크)이 질문 해결에 도움이 되는지 확인합니다. 
3. 응답 사실성 검증 : 생성된 답변이 제공된 문서 청크에 의해 충분히 뒷받침되는지 평가합니다. 
4. 응답 품질 평가 : 생성된 답변이 실제로 질문을 잘 해결하는지 측정합니다. 

[self-rag](https://arxiv.org/pdf/2310.11511)
----------------------------------------
#### Self-RAG 주요 개념 정리 
1. **Retriever 사용 여부 결정** : 추가 검색을 진행할지, 검색없이 진행할지, 혹은 더 기다려볼것인지 결정합니다.
* 입력 : `x (question)` 또는 `(x(question), y(generation))`
* 출력 : `yes`, `no`, `continue`

2. **관련성 평가 (Retrieval Grader)** : 검색된 문서 청크들이 실제로 질문을 받는데 유용한 정보인지 판별합니다. 
* 입력 : (x(question), d(chunk)) for each `d` in `D`
* 출력 : `relevant` 또는 `irrelevant` 

3. **사실성 검증 (Hallucination Grader)**  : 생성된 응답이 검색 결과에 근거한 사실을 반영하는지, 혹은 환각 (hallucination)이 발생했는지 판단합니다. 
* 입력 : `x(question)`, `d(chunk)`, `y(generation)` for each `d` in `D`
* 출력 : `{fully suppored, partially supported, no support}`

3. **정답 품질 평가 (Answer Grader)** : 생성된 응답이 질문을 어느정도 해결하는지 점수화하여 평가합니다.
* 입력 : `x(question)`, `y(generation)`
* 출력 : `{5, 4, 3, 2, 1}`

----------------------------------------
본 내용에서는 LangGraph를 활용하여 Self-RAG 전략의 일부 아이디어를 구현하는 과정을 다룹니다.  
다음과 같은 단계를 통해 Self-RAG전략을 구축하고 실행하는 방법을 익히게 됩니다.  

* Retriever : 문서를 검색 
* Retrieval Grader : 검색된 문서의 관련성 평가 
* Generate : 질문에 대한 답변 생성 
* Hallucination Grader : 생성된 답변의 사실성 (환각 여부) 검증
* Answer Grader : 답변의 질문에 대한 관련성 평가 
* Question Re-Writer : 쿼리 재작성 
* 그래프 생성 및 실행 : 정의한 노드로 그래프를 빌드하고 실행 


In [None]:
from dotenv import load_dotenv

load_dotenv()

from config.langsmith import langsmith 

langsmith("LangGraph-Self-RAG")


## PDF 기반 Retrieval Chain 생성 

PDF 문서를 기반으로 Retrieval Chain을 생성합니다. 가장 단순한 구조의 Retrieval Chain입니다.  
단, LangGraph에서는 Retriever와 Chain을 따로 생성합니다. 그래야 각 노드별로 세부 처리를 할 수 있습니다.  

In [None]:
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI

# 단계 1: 문서 로드(Load Documents)
loader = PyMuPDFLoader("data/SPRI_AI_Brief_2023년12월호_F.pdf")
docs = loader.load()
print(f"문서 페이지 수 : {len(docs)}")

print(docs[0].page_content)

# 단계 2: 문서 분할(Split Documents)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
split_documents = text_splitter.split_documents(docs)
print(f"분할된 청크의수: {len(split_documents)}")


# 단계 2: 문서 분할(Split Documents)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
split_documents = text_splitter.split_documents(docs)
print(f"분할된 청크의수: {len(split_documents)}")

# 단계 3: 임베딩(Embedding) 생성
# embeddings = OpenAIEmbeddings()
embeddings = GoogleGenerativeAIEmbeddings()

# 단계 4: DB 생성(Create DB) 및 저장
# 벡터스토어를 생성합니다.
# FAISS (Facebook AI Similarity Search)는 밀집 벡터의 효율적인 유사도 검색과 클러스터링을 위한 라이브러리
vectorstore = FAISS.from_documents(documents=split_documents, embedding=embeddings)

# 단계 5: 검색기(Retriever) 생성
# 문서에 포함되어 있는 정보를 검색하고 생성합니다.
retriever = vectorstore.as_retriever()

# 검색기에 쿼리를 날려 검색된 chunk 결과를 확인합니다.
# retriever.invoke("삼성전자가 자체 개발한 AI 의 이름은?")

# 단계 6: 프롬프트 생성(Create Prompt)
# 프롬프트를 생성합니다.
prompt = PromptTemplate.from_template(
    """
    You are an assistant for question-answering tasks. 
    Use the following pieces of retrieved context to answer the question. 
    If you don't know the answer, just say that you don't know. 
    Answer in Korean.

    #Context: 
    {context}

    #Question:
    {question}

    #Answer:
    """
)
# 단계 7: 언어모델(LLM) 생성
# 모델(LLM) 을 생성합니다.
llm = ChatGoogleGenerativeAI(model_name="gemini-2.0-flash", temperature=0)

# 단계 8: 체인(Chain) 생성
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# 단계 9: 체인 실행(Run Chain)
# 체인을 실행합니다.
# 문서에 대한 질의를 입력하고, 답변을 출력합니다.
question = "삼성전자가 자체 개발한 AI 의 이름은?"
response = chain.invoke(question)
print(response)

## 문서 검색 평가기 (Retrieval Grader)

추후 retrieve 노드에서 문서에 대한 관련성 평가를 진행하기 위해 미리 정의합니다.

In [None]:
import os
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate 
from langchain_google_genai import ChatGoogleGenerativeAI


class GradeDocuments(BaseModel):
    """ A binary score to determine the relevance of the retrieved documents """
    binary_score :str = Field(
        description = "Documents are relevant to the question, 'yes' or 'no'"
    )

llm = ChatGoogleGenerativeAI(
    model = "gemini-2.0-flash-001",
    temperature = 0,
    api_key = os.getenv("GOOGLE_API_KEY")
)

# 구조화된 출력 생성
structred_llm_grader = llm.with_structured_output(GradeDocuments)

# 시스템 프롬프트 정의 : 검색된 문서가 사용자 질문에 관련이 있는지 평가하는 시스템 역할 정의
system_prompt = """
    You are a grader assessing relevance of a retrieved document to a user question. 
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals.
    If the document contains keyworkds or semantic meaning related to the user question, grade it as relevant. 
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question
""" 

# 채팅 프롬프트 템플릿 생성
grade_prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    ("human", "Retrieved document : \n\n {document} \n\n User question : {question}")
])

# 검색 평가기 생성
retrieval_grader = grade_prompt | structred_llm_grader


## 답변 생성 체인 

Naive RAG 체인을 사용해서 검색된 문서를 기반으로 답변을 생성하는 체인

In [None]:
from langchain_core.output_parsers import StrOutputParser 
from langchian_google_genai import ChatGoogleGenerativeAI 

rag_prompt_system = """ 
    You are an AI assistant that can answer questions about the following context:
    Your task is answering questions based on the context provided. 
    If you cannot find the answer in the context, just say "I cannot find the answer in the context".
    You should answer in Korean. 
    Answer in English for name and technical terms. 
"""

rag_prompt_human = """ 
# question: {question}
# context: {context}
# answer: 
"""

rag_prompt_template = ChatPromptTemplate.from_messages(
    [
        ("system", rag_prompt_system),
        ("human", rag_prompt_human),
    ]
)

llm = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash",
    temperature=0,
    api_key=os.getenv("GOOGLE_API_KEY")
)

rag_chain = rag_prompt_template | llm | StrOutputParser()

def format_docs(docs):
    return "\n\n".join(
        [
            f'<document><content>{doc.page_content}</content><source>{doc.metadata["source"]}</source><page>{doc.metadata["page"]+1}</page></document>'
            for doc in docs
        ]
    )

generation = rag_chain.invoke({"context" : format_docs(docs), "question" : question})
print(generation)

## 답변의 할루시네이션 여부 평가 
`groundedness_grader` 를 생성하고 생성된 답변과 `context`를 기반하여 답변의 할루시네이션 평가를 진행합니다.  
* `yes`인 경우 답변의 할루시네이션이 없음을 의미합니다.
* `no`인 경우 답변이 할루시네이션이라고 간주합니다.

In [None]:
from pydantic import BaseModel, Field 
from langchain_core.prompts import ChatPromptTemplate 
from langchain_google_genai import ChatGoogleGenerativeAI

class GroundednessGrader(BaseModel):
    """ A binary score to determine the groundedness of the generated answer """
    binary_score :str = Field(
        description = "Answer is grounded in the facts, 'yes' or 'no'"
    )

llm = ChatGoogleGenerativeAI(
    model = "gemini-2.0-flash-001",
    temperature = 0,
    api_key = os.getenv("GOOGLE_API_KEY")
)

groundedness_grader = llm.with_structured_output(GroundednessGrader)

system_prompt = """
    You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. 
    Give a binary score 'yes' or 'no'. 'yes' means the answer is grounded in / supported by the set of facts.
"""

groundedness_checking_prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    ("human", "set of facts : \n\n {documents} \n\n LLM generation : {generation}")
])

# 답변의 할루시네이션 평가기 생성
groundedness_grader = groundedness_checking_prompt | groundedness_grader

## 답변 관련성 평가 

In [None]:

class GradeAnswer(BaseModel):
    """A binary score indicating whether the question is addressed."""

    # 답변의 관련성 평가: 'yes' 또는 'no'로 표기(yes: 관련성 있음, no: 관련성 없음)
    binary_score: str = Field(
        description="Answer addresses the question, 'yes' or 'no'"
    )


llm = ChatGoogleGenerativeAI(
    model = "gemini-2.0-flash-001",
    temperature = 0,
    api_key = os.getenv("GOOGLE_API_KEY")
)
# llm 에 GradeAnswer 바인딩
structured_llm_grader = llm.with_structured_output(GradeAnswer)

# 시스템 프롬프트 정의
system = """You are a grader assessing whether an answer addresses / resolves a question \n 
     Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question."""

# 프롬프트 생성
answer_grader_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {generation}"),
    ]
)

# 답변 평가기 생성
answer_grader = answer_grader_prompt | structured_llm_grader

## 질문 재작성기 (Question Rewriter)

In [None]:

# 시스템 프롬프트 정의
# 입력 질문을 벡터스토어 검색에 최적화된 형태로 변환하는 시스템 역할 정의
system = """You a question re-writer that converts an input question to a better version that is optimized \n 
     for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning."""

# 시스템 메시지와 초기 질문을 포함한 프롬프트 템플릿 생성
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

# 질문 재작성기 생성
question_rewriter = re_write_prompt | llm | StrOutputParser()

## 상태 정의

* `question`: 사용자가 입력한 질문
* `generation`: 생성된 응답
* `documents`: 검색된 문서 목록

In [None]:
from typing import List
from typing_extensions import TypedDict, Annotated

# 그래프의 상태를 나타내는 클래스 정의
class GraphState(TypedDict):
    # 질문을 나타내는 문자열
    question: Annotated[str, "Question"]
    # LLM에 의해 생성된 응답을 나타내는 문자열
    generation: Annotated[str, "LLM Generation"]
    # 문서의 목록을 나타내는 문자열 리스트
    documents: Annotated[List[str], "Retrieved Documents"]

## 노드 정의
* retrieve: 문서 검색
* grade_documents: 문서 평가
* generate: 답변 생성
* transform_query: 질문 재작성

In [None]:
# 문서 검색
def retrieve(state):
    print("==== [RETRIEVE] ====")
    question = state["question"]

    # 검색 수행
    documents = retriever.invoke(question)
    return {"documents": documents}


# 답변 생성
def generate(state):
    print("==== [GENERATE] ====")
    question = state["question"]
    documents = state["documents"]

    # RAG 생성
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"generation": generation}


# 검색된 문서의 관련성 평가
def grade_documents(state):
    print("==== [GRADE DOCUMENTS] ====")
    question = state["question"]
    documents = state["documents"]

    # 각 문서 점수 평가
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            print("==== GRADE: DOCUMENT RELEVANT ====")
            filtered_docs.append(d)
        else:
            print("==== GRADE: DOCUMENT NOT RELEVANT ====")
            continue
    return {"documents": filtered_docs}


# 질문 변환
def transform_query(state):
    print("==== [TRANSFORM QUERY] ====")
    question = state["question"]

    # 질문 재작성
    better_question = question_rewriter.invoke({"question": question})
    return {"question": better_question}

## 조건부 엣지 정의

`decide_to_generate` 함수는 검색된 문서의 관련성 평가 결과에 따라 답변 생성 여부를 결정합니다.   

`grade_generation_v_documents_and_question` 함수는 생성된 답변의 문서 및 질문과의 관련성 평가 결과에 따라 생성 여부를 결정합니다

In [None]:
# 답변 생성 여부 결정
def decide_to_generate(state):
    print("==== [ASSESS GRADED DOCUMENTS] ====")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # 모든 문서가 관련성이 없는 경우
        # 새로운 쿼리 생성
        print(
            "==== [DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY] ===="
        )
        return "transform_query"
    else:
        # 관련 문서가 있는 경우 답변 생성
        print("==== [DECISION: GENERATE] ====")
        return "generate"


# 생성된 답변의 문서 및 질문과의 관련성 평가
def grade_generation_v_documents_and_question(state):
    print("==== [CHECK HALLUCINATIONS] ====")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = groundedness_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score.binary_score

    # 환각 여부 확인
    if grade == "yes":
        print("==== [DECISION: GENERATION IS GROUNDED IN DOCUMENTS] ====")
        # 질문 해결 여부 확인
        print("==== [GRADE GENERATION vs QUESTION] ====")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("==== [DECISION: GENERATION ADDRESSES QUESTION] ====")
            return "relevant"
        else:
            print("==== [DECISION: GENERATION DOES NOT ADDRESS QUESTION] ====")
            return "not relevant"
    else:
        print("==== [DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY] ====")
        return "hallucination"

## 그래프 생성

In [None]:
from langgraph.graph import END, StateGraph, START 
from langgraph.checkpoint.memory import MemorySaver

In [None]:
# 그래프 상태 초기화
workflow = StateGraph(GraphState)

# 노드 정의
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("transform_query", transform_query)  # transform_query

# 엣지 정의
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")

# 문서 평가 노드에서 조건부 엣지 추가
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)

# 엣지 정의
workflow.add_edge("transform_query", "retrieve")

# 답변 생성 노드에서 조건부 엣지 추가
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "hallucination": "generate",
        "relevant": END,
        "not relevant": "transform_query",
    },
)

# 그래프 컴파일
app = workflow.compile(checkpointer=MemorySaver())

In [None]:
from config.graphs import visualize_graph

visualize_graph(app)

## 그래프 실행 

In [None]:
from langchain_core.runnables import RunnableConfig
from langchain_teddynote.messages import stream_graph, invoke_graph, random_uuid

# config 설정(재귀 최대 횟수, thread_id)
config = RunnableConfig(recursion_limit=10, configurable={"thread_id": random_uuid()})

# 질문 입력
inputs = {
    "question": "삼성전자가 개발한 생성형 AI 의 이름은?",
}

# 그래프 실행
invoke_graph(
    app, inputs, config, ["retrieve", "transform_query", "grade_documents", "generate"]
)

In [None]:
from langgraph.errors import GraphRecursionError

# config 설정(재귀 최대 횟수, thread_id)
config = RunnableConfig(recursion_limit=10, configurable={"thread_id": random_uuid()})

# 질문 입력
inputs = {
    "question": "테디노트가 개발한 생성형 AI 의 이름은?",
}

try:
    # 그래프 실행
    stream_graph(
        app,
        inputs,
        config,
        ["retrieve", "transform_query", "grade_documents", "generate"],
    )
except GraphRecursionError as recursion_error:
    print(f"GraphRecursionError: {recursion_error}")