# Advanced RAG


In [None]:
# model_id 설정
%store -r model_id
%store -r index_name

## 사전 준비

필요한 패키지를 설치합니다. 

In [None]:
%pip install -U --quiet opensearch-py requests
%pip install -q boto3
%pip install -q requests
%pip install -q requests-aws4auth
%pip install -q opensearch-py
%pip install -q tqdm
%pip install -q boto3
%pip install -q langgraph
%pip install -q tavily-python
%pip install -q langchain_community

CloudFormation Stack으로부터 필요한 정보를 가져옵니다.

In [None]:
def get_cfn_outputs(stackname, cfn):
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)["Stacks"][0]["Outputs"]:
        outputs[output["OutputKey"]] = output["OutputValue"]
    return outputs

위의 정보를 바탕으로 인증 정보를 가져옵니다

In [None]:
import boto3, json

# region_name = "us-west-2"
session = boto3.Session()
region_name = session.region_name

cfn = boto3.client("cloudformation", region_name)
kms = boto3.client("secretsmanager", region_name)

stackname = "opensearch-workshop"
cfn_outputs = get_cfn_outputs(stackname, cfn)

aos_credentials = json.loads(
    kms.get_secret_value(SecretId=cfn_outputs["OpenSearchSecret"])["SecretString"]
)

aos_host = cfn_outputs["OpenSearchDomainEndpoint"]

OpenSearch Cluster에 접속하고 클라이언트를 생성합니다.

In [None]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

auth = (aos_credentials["username"], aos_credentials["password"])

aos_client = OpenSearch(
    hosts=[{"host": aos_host, "port": 443}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
)

In [None]:
import requests

# search_model = {"query": {"match": {"name": "OpenSearch-Cohere"}}, "size": 10}

# response = requests.get(
#     "https://" + aos_host + "/_plugins/_ml/models/_search", auth=auth, json=search_model
# )
# model_info = json.loads(response.text)
# model_id = model_info["hits"]["hits"][0]["_id"]

연결이 잘 되었는지 확인하기 위해 인덱스의 문서 수를 count합니다.

In [None]:
count = aos_client.count(index=index_name)
print(count)

## RAG을 위한 LLM 및 Retriever 생성

anthropic.claude-3-sonnet-20240229-v1:0 모델을 활용하여 생성 모델을 초기화합니다.

In [None]:
from langchain.memory import ConversationBufferWindowMemory

from langchain.chains.question_answering import load_qa_chain
from langchain_aws import ChatBedrock
from langchain_core.messages import HumanMessage

model_kwargs = {  # anthropic
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 2048,
    "temperature": 0,
}

llm = ChatBedrock(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",  # 파운데이션 모델 지정
    model_kwargs=model_kwargs,
    region_name=region_name,
)  # Claude 속성 구성

In [None]:
from langchain_core.messages import HumanMessage

# Test Bedrock
query_text = "어벤져스와 비슷한 영화를 추천해주세요"
messages = [HumanMessage(content=query_text)]
llm.invoke(messages)

OpenSearch 벡터 저장소로부터 필요한 정보를 가져오는 Retriever를 생성합니다. 이전 단계에서 활용한 Hybrid Search Retriever를 재사용합니다.

In [None]:
# Retriever 생성
from langchain.callbacks.manager import CallbackManagerForRetrieverRun
from langchain.schema import BaseRetriever
from typing import Any, List
from langchain.schema import Document


class OpenSearchHybridSearchRetriever(BaseRetriever):
    os_client: Any
    index_name: str
    model_id: str
    keyword_weight = 0.3
    semantic_weight = 0.7
    k = 10
    minimum_should_match = 0
    filter = []

    def _reset_search_params(
        self,
    ):

        self.k = 10
        self.minimum_should_match = 0
        self.filter = []
        self.keyword_weight = keyword_weight
        self.semantic_weight = semantic_weight

    def _get_relevant_documents(
        self, query_text: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        query = {
            "size": 10,
            "_source": {"exclude": ["text", "vector_field"]},
            "query": {
                "hybrid": {
                    "queries": [
                        {
                            "multi_match": {
                                "query": query_text,
                                "fields": ["title", "plot", "genre", "main_act", "supp_act"],
                            }
                        },
                        {
                            "neural": {
                                "vector_field": {
                                    "query_text": query_text,
                                    "model_id": model_id,
                                    "k": 30,
                                }
                            }
                        },
                    ]
                }
            },
            "search_pipeline": {
                "description": "Post processor for hybrid search",
                "phase_results_processors": [
                    {
                        "normalization-processor": {
                            "normalization": {"technique": "min_max"},
                            "combination": {
                                "technique": "arithmetic_mean",
                                "parameters": {
                                    "weights": [self.keyword_weight, self.semantic_weight]
                                },
                            },
                        }
                    }
                ],
            },
        }
        res = self.os_client.search(index=index_name, body=query)

        query_result = []

        for hit in res["hits"]["hits"]:
            metadata = {"score": hit["_score"], "id": hit["_id"]}

            content = {
                "제목": hit["_source"]["title"],
                "장르": hit["_source"]["genre"],
                "평점": hit["_source"]["rating"],
                "줄거리": hit["_source"]["plot"],
                "주연": hit["_source"]["main_act"],
                "조연": hit["_source"]["supp_act"],
            }

            doc = Document(page_content=json.dumps(content, ensure_ascii=False), metadata=metadata)
            query_result.append(doc)
        return query_result

In [None]:
# Add to vectorDB
retriever = OpenSearchHybridSearchRetriever(
    os_client=aos_client, index_name=index_name, model_id=model_id
)

In [None]:
docs = retriever.invoke(query_text)

# docs

for doc in docs:
    d = json.loads(doc.page_content)
    print(json.dumps(d, indent=2, ensure_ascii=False))
    print()

## Self RAG

Self-RAG은 자기 평가을 통해 정보를 검색하고 생성하는 새로운 Advanced RAG 기법입니다.

1. **자기 평가 메커니즘**: Self-RAG는 '평가 토큰'이라는 특별한 토큰을 사용하여 모델이 생성한 텍스트의 품질을 자체적으로 평가합니다.

2. **적응적 검색**: 모델은 필요에 따라 문서를 검색하고, 생성된 내용과 검색된 문서를 평가합니다.

3. **사용자 맞춤화**: 평가 토큰 예측을 통해 검색 빈도를 조정하고 사용자 선호에 맞게 모델 동작을 맞춤화할 수 있습니다.

4. **품질 및 사실성 향상**: Self-RAG는 대규모 언어 모델의 출력 품질과 사실성을 향상시키는 것을 목표로 합니다.

Self-RAG는 다양한 작업에서 뛰어난 성능을 보여줍니다:

- 개방형 질문-응답, 추론, 사실 검증 작업 등에서 최신 언어 모델과 검색 기반 모델을 능가하는 성능을 보입니다.
- 특히 장문 생성에서 사실 정확성과 인용 정확도를 크게 향상시킵니다. 

검색된 문서의 관련성을 평가하는 Retrieval Grader를 구현합니다. LangChain Hub에서 가져온 프롬프트와 구조화된 출력을 지원하는 LLM을 결합하여, 주어진 질문에 대해 검색된 문서가 관련 있는지 여부를 'yes' 또는 'no'로 평가하는 grader를 생성합니다.

In [None]:
### Retrieval Grader

from langchain import hub
from langchain_core.pydantic_v1 import BaseModel, Field


# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")


# https://smith.langchain.com/hub/efriis/self-rag-retrieval-grader
grade_prompt = hub.pull("efriis/self-rag-retrieval-grader")

# LLM with function call
structured_llm_grader = llm.with_structured_output(GradeDocuments)

retrieval_grader = grade_prompt | structured_llm_grader

In [None]:
# Test the retrieval grader
docs = retriever.invoke(query_text)
doc_txt = docs[0].page_content
print(doc_txt)
print(retrieval_grader.invoke({"question": query_text, "document": doc_txt}))
print(retrieval_grader.invoke({"question": "슈퍼히어로가 나오는 영화", "document": doc_txt}))

In [None]:
### Generate

from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = hub.pull("rlm/rag-prompt")

# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
generation = rag_chain.invoke({"context": docs, "question": query_text})
print(generation)

생성된 답변에서 환각(hallucination) 여부를 평가하는 Hallucination Grader를 구현합니다. LangChain Hub에서 가져온 프롬프트와 구조화된 출력을 지원하는 LLM을 결합하여, 주어진 문서들을 기반으로 생성된 답변이 사실에 근거하는지 여부를 'yes' 또는 'no'로 평가하는 grader를 생성하고 실행합니다.

In [None]:
### Hallucination Grader


# Data model
class GradeHallucinations(BaseModel):
    """Binary score for hallucination present in generation answer."""

    binary_score: str = Field(description="Answer is grounded in the facts, 'yes' or 'no'")


# LLM with function call
structured_llm_grader = llm.with_structured_output(GradeHallucinations)

# https://smith.langchain.com/hub/efriis/self-rag-hallucination-grader
hallucination_prompt = hub.pull("efriis/self-rag-hallucination-grader")

hallucination_grader = hallucination_prompt | structured_llm_grader
print(generation)
hallucination_grader.invoke({"documents": docs, "generation": generation})

생성된 답변이 질문을 적절히 다루는지 평가하는 Answer Grader를 구현합니다.

In [None]:
### Answer Grader


# Data model
class GradeAnswer(BaseModel):
    """Binary score to assess answer addresses question."""

    binary_score: str = Field(description="Answer addresses the question, 'yes' or 'no'")


structured_llm_grader = llm.with_structured_output(GradeAnswer)

# Prompt
answer_prompt = hub.pull("efriis/self-rag-answer-grader")

answer_grader = answer_prompt | structured_llm_grader
print(query_text)
print(generation)
answer_grader.invoke({"question": query_text, "generation": generation})

### LangGraph를 활용하여 Self-RAG 구현하기

이번 과정에서는 Self-RAG 워크플로우를 구현하기 위해 LangGraph를 사용합니다. LangGraph란 LangChain의 확장 라이브러리로 복잡한 Multi-Agent 시스템을 구축하기 위한 도구입니다. 

`GraphState` 클래스는 그래프의 상태를 나타내는 데 사용됩니다. 예를 들어, LangGraph나 유사한 프레임워크에서 그래프 기반 워크플로우의 각 노드 간에 전달되는 상태 정보를 타입 안전하게 정의하고 관리하는 데 활용될 수 있습니다.

In [None]:
from typing import List

from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation: str
    documents: List[str]

Self-RAG를 구현할 Graph에서 각 노드가 사용할 모듈을 정의합니다. 

• retrieve 함수:
  - 주어진 질문에 대해 관련 문서를 검색합니다.
  - 검색된 문서를 상태 딕셔너리에 추가합니다.

• generate 함수:
  - 검색된 문서와 질문을 바탕으로 답변을 생성합니다.
  - RAG(Retrieval-Augmented Generation) 체인을 사용하여 답변을 생성합니다.

• grade_documents 함수:
  - 검색된 문서가 질문과 관련이 있는지 평가합니다.
  - 관련성이 높은 문서만 필터링하여 상태를 업데이트합니다.

• transform_query 함수:
  - 주어진 질문을 더 나은 형태로 변환합니다.
  - 변환된 질문으로 상태를 업데이트합니다.

각 함수는 상태 딕셔너리를 입력으로 받아 처리한 후, 업데이트된 상태를 반환합니다. 이 함수들은 질문 응답 시스템의 파이프라인을 구성하며, 문서 검색, 답변 생성, 문서 관련성 평가, 질문 개선 등의 작업을 수행합니다.

In [None]:
### Nodes


def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}


def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}


def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}

엣지(결정 로직)를 정의하고 있습니다.

• decide_to_generate 함수:
  - 답변을 생성할지 또는 질문을 재생성할지 결정합니다.
  - 필터링된 문서가 없으면 질문을 변환하고, 있으면 답변을 생성합니다.

• grade_generation_v_documents_and_question 함수:
  - 생성된 답변이 문서에 근거하고 있는지, 그리고 질문에 적절히 답변하는지 평가합니다.
  - 평가 결과에 따라 다음 단계를 결정합니다:
    1. 답변이 문서에 근거하고 질문에 적절하면 "useful" 반환
    2. 답변이 문서에 근거하지만 질문에 부적절하면 "not useful" 반환
    3. 답변이 문서에 근거하지 않으면 "not supported" 반환

In [None]:
### Edges


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---")
        return "transform_query"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score.binary_score

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

위의 노드 및 엣지 모듈을 사용하여 LangGraph 워크플로우를 정의합니다.

• StateGraph 객체 생성:
  - GraphState를 사용하여 워크플로우의 상태를 관리합니다.

• 노드 추가:
  - retrieve: 문서 검색
  - grade_documents: 문서 평가
  - generate: 답변 생성
  - transform_query: 질문 변환

• 엣지 추가:
  - START에서 retrieve로 시작합니다.
  - retrieve에서 grade_documents로 이동합니다.
  - grade_documents에서 조건부로 transform_query 또는 generate로 이동합니다.
  - transform_query에서 다시 retrieve로 돌아갑니다.
  - generate에서 조건부로 다음 단계를 결정합니다:
    1. "not supported": 다시 generate로
    2. "useful": 워크플로우 종료 (END)
    3. "not useful": transform_query로

• 워크플로우 컴파일:
  - 정의된 그래프를 실행 가능한 애플리케이션으로 컴파일합니다.

In [None]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("transform_query", transform_query)  # transform_query

# Build graph
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)

# Compile
app = workflow.compile()

Self-RAG을 테스트해봅니다.

In [None]:
from pprint import pprint

# Run
inputs = {"question": query_text}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

In [None]:
inputs = {"question": "외계인과 싸우는 영화 추천해줘"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

## CRAG(Corrective RAG)

CRAG이란 자체 평가를 통해 실시간으로 질문을 재작성하고 답변을 보정하는 Advanced RAG 기법입니다. CRAG은 다음과 같은 장점이 있습니다.

- CRAG는 기존 RAG 시스템에 비해 더 정확하고 관련성 높은 정보를 제공합니다.
- 플러그 앤 플레이 방식으로 다양한 RAG 시스템에 쉽게 적용할 수 있습니다.
- Self-RAG에 비해 더 경량화되어 있어 효율적입니다.

아래 과정을 수행하기 위해서는 Tavily API Key를 발급받아야 합니다. https://app.tavily.com/home에 Sign Up하고 API를 복사하여 아래와 같이 환경변수에 등록합니다.

In [2]:
from langchain_community.tools.tavily_search import TavilySearchResults
import os

os.environ["TAVILY_API_KEY"] = "your tavily API key"

web_search_tool = TavilySearchResults()

문서 검색 시스템의 관련성 평가 도구를 생성합니다.

1. `GradeDocuments` 클래스로 이진 점수(yes/no) 모델을 정의합니다.

2. LLM을 사용해 구조화된 출력을 생성하는 그레이더를 설정합니다.

3. 관련성 평가를 위한 프롬프트를 ChatPromptTemplate로 정의합니다.

4. 프롬프트와 LLM 그레이더를 결합해 `retrieval_grader`를 생성합니다.

5. 주어진 한국어 질문에 대해 문서를 검색하고, 두 번째 문서의 관련성을 평가합니다.

In [None]:
### Retrieval Grader

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field


# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")


structured_llm_grader = llm.with_structured_output(GradeDocuments)

# Prompt
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
question = "우주에서 외계인과 싸우는 이야기"
docs = retriever.get_relevant_documents(question)

print(docs[1].page_content)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

영화 관련 질문을 개선하는 질문 재작성기를 생성합니다.

1. 시스템 프롬프트를 정의하여 입력 질문의 의미를 파악하고 영화 검색에 최적화된 버전으로 재작성하도록 지시합니다.

2. `ChatPromptTemplate`을 사용해 시스템 메시지와 사용자 메시지를 포함한 프롬프트를 구성합니다.

3. `question_rewriter`를 생성합니다:
   - 재작성 프롬프트
   - LLM(대규모 언어 모델)
   - `StrOutputParser()`를 연결하여 문자열 출력을 생성

4. `invoke` 메서드로 주어진 질문을 재작성합니다.

In [None]:
### Question Re-writer

# Prompt
system = """You a question re-writer about movies that converts an input question to a better version that is optimized \n 
     for movie search. Look at the input and try to reason about the underlying semantic intent / meaning. 
     The response should be one simple rewritten question that captures the essence of the original question.
     """
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

Self-RAG와 마찬가지로 GraphState를 정의합니다. 여기서는 웹 검색을 위한 state인 `web_search`가 추가되었습니다.

In [None]:
from typing import List

from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search: str
    documents: List[str]

Self-RAG와 동일하게 Graph의 각 노드가 사용할 모듈을 정의합니다. 여기서는 생성을 보강하기 위한 웹 검색의 도구로 web_search 모듈이 추가됩니다.

In [None]:
from langchain.schema import Document


def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.get_relevant_documents(question)
    return {"documents": documents, "question": question}


def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}


def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question with one simple sentence
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}


def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]
    print("question:", question)

    # Web search
    docs = web_search_tool.invoke({"query": question})

    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    documents.append(web_results)

    return {"documents": documents, "question": question}

LangGraph 워크플로우를 컴파일합니다.

In [None]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("transform_query", transform_query)  # transform_query
workflow.add_node("web_search_node", web_search)  # web search

# Build graph
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "web_search_node")
workflow.add_edge("web_search_node", "generate")
workflow.add_edge("generate", END)

# Compile
app = workflow.compile()

CRAG을 테스트합니다.

In [None]:
from pprint import pprint

# Run
inputs = {"question": query_text}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

벡터 저장소에 저장된 정보로 대답할 수 없는 정보의 경우 웹 검색으로 답변을 보강하여 생성하는 것을 확인할 수 있습니다.

In [None]:
from pprint import pprint

# Run
inputs = {"question": "2024년 개봉한 슈퍼 히어로 영화 추천해줘"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])