In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
from langchain_ollama import OllamaEmbeddings
from langchain_qdrant import QdrantVectorStore, FastEmbedSparse, RetrievalMode
from qdrant_client import QdrantClient, models

client = QdrantClient(host="localhost", port=6333)

dense_embedding = OllamaEmbeddings(model="bge-m3")
sparse_embedding = FastEmbedSparse(model_name="Qdrant/bm25")

qdrant = QdrantVectorStore(
    client=client,
    collection_name="LangChain_LangGraph_QA",
    embedding = dense_embedding,
    sparse_embedding = sparse_embedding,
    retrieval_mode = RetrievalMode.HYBRID,
    vector_name= "dense",
    sparse_vector_name = "sparse",
)

In [3]:
retriever = qdrant.as_retriever(search_kwargs={"k": 5})
print(retriever.invoke("self-rag")[0].page_content)

# Self RAG

Self-RAG is a strategy for RAG that incorporates self-reflection / self-grading on retrieved documents and generations. 

[Paper](https://arxiv.org/abs/2310.11511)

![Screenshot 2024-04-01 at 12.41.50 PM.png](attachment:15cba0ab-a549-4909-8373-fb761e384eff.png)

# Environment 

```python
%pip install -qU langchain-pinecone langchain-openai langchainhub langgraph
```

### Tracing

Use [LangSmith](https://docs.smith.langchain.com/) for tracing (shown at bottom)

```python
import os

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = "<your-api-key>"
```

```python
import os

os.environ["LANGCHAIN_PROJECT"] = "pinecone-devconnect"
```

## Retriever
 
Let's use Pinecone's sample movies database

```python
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore

# use pinecone movies database

# Add to vectorDB
vectorstore = PineconeVectorStore

## RAG 체인 구축

여기서는 Naive 형태의 Chain 을 구성하도록 한다.

단, Reranker 를 사용하여 검색 정확도를 향상시키도록 한다. (`Jina-reranker` 를 사용하지만, 다른 reranker 를 사용하거나, 빼도 좋다.)

In [4]:
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.document_compressors import JinaRerank
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts import ChatMessagePromptTemplate

from langchain import hub
from operator import itemgetter


# retriever 생성
code_retriever = qdrant.as_retriever(search_kwargs={"k": 20})

# JinaRerank 설정
compressor = JinaRerank(model="jina-reranker-v2-base-multilingual", top_n=8)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=code_retriever
)

# prompt 설정
system_prompt = """
You are an CODE Copilot Assistant. 
You must use the following pieces of retrieved source code or documentation to answer the question. 
You are given question related to RAG(Retrieval Augmented Generation) source code and documentation.
If you don't know the answer, just say that you don't know. Answer in Korean.

When answering questions, follow these guidelines:

1. Use only the information provided in the context. 
2. Include as many example code snippets as possible.
3. Writing a full code snippet is highly recommended.
4. Do not introduce external information or make assumptions beyond what is explicitly stated in the context.
5. The context contain sources at the topic of each individual document.
6. Include these sources your answer next to any relevant statements. For example, for source # 1 use [1]. 
7. List your sources in order at the bottom of your answer. [1] Source 1, [2] Source 2, etc
8. If the source is: <source>assistant/docs/llama3_1.md" page="7"</source>' then just list: 
        
[1] llama3_1.md
        
And skip the addition of the brackets as well as the Document source preamble in your citation.

----

### Sources

In the Sources section:
- Include all sources used in your answer
- Provide full links to relevant websites or document names
- Separate each source by a newline. Use two spaces at the end of each line to create a newline in Markdown.
- It will look like:

**Sources**
- [1] Link or Document name
- [2] Link or Document name

Be sure to combine sources. For example this is not correct:

- [3] https://ai.meta.com/blog/meta-llama-3-1/
- [4] https://ai.meta.com/blog/meta-llama-3-1/

There should be no redundant sources. It should simply be:

- [3] https://ai.meta.com/blog/meta-llama-3-1/

-----

### Retrieved Context

Here is the context that you can use to answer the question:

{context}

----

### Question

Here is user's question:

{question}

----

Final review:
- Ensure the report follows the required structure
- Check that all guidelines have been followed
- Check if a full code snippet is included in your answer if applicable.
- Your response should be written in Korean
- Using many example code snippets would be rewarded by the user
- Think step by step.

----

Your answer to the question with the source:
"""

rag_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            system_prompt
        ),
        ("human", "{question}")
    ]
)

llm = ChatOpenAI(model_name="gpt-4.1-mini", temperature=0)

rag_chain = (
    {
        "question": itemgetter("question"),
        "context": itemgetter("context"),
    }
    | rag_prompt
    | llm
    | StrOutputParser()
)

In [5]:
# response = rag_chain.invoke({"question": "Adaptive RAG", "context": compression_retriever.invoke("Adaptive RAG")})
response = rag_chain.invoke({"question": "Adaptive RAG", "context": retriever.invoke("Adaptive RAG")})

In [6]:
print(response)

Adaptive RAG는 RAG(Retrieval Augmented Generation) 전략 중 하나로, 쿼리 분석(query analysis)과 능동적(self-corrective) RAG를 결합한 방식입니다. 이 전략은 쿼리의 특성에 따라 적절한 처리 경로를 선택하여 더 효율적이고 정확한 답변을 생성하는 것을 목표로 합니다.

### Adaptive RAG 주요 개념
- 쿼리 분석을 통해 질문 유형을 파악하고, 이에 따라 다음 경로 중 하나로 라우팅합니다:
  - Retrieval 없이 LLM이 직접 답변하는 경우 (No Retrieval)
  - 단일 검색 후 답변 생성 (Single-shot RAG)
  - 반복적 검색 및 답변 생성 (Iterative RAG)
  - 웹 검색(Web-search)을 통한 보완
- 예를 들어, 최신 정보가 필요한 질문은 웹 검색으로, 인덱스 내 정보가 충분한 질문은 RAG로 처리합니다.

### Adaptive RAG 구현 예시 (LangGraph 기반)

```python
# 필요한 패키지 설치
! pip install --quiet langchain langchain_cohere langchain-openai tiktoken langchainhub chromadb langgraph

import os
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_cohere import CohereEmbeddings
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma

# API 키 설정
os.environ["COHERE_API_KEY"] = "<your-cohere-api-key>"
os.environ["OPENAI_API_KEY"] = "<your-openai-api-key>"

## LangGraph 체인 구축을 위한 노드 정의

### Routing

사용자의 질문에 대한 routing 을 수행합니다.

질문이 vector 검색이 필요한지 여부를 판단하여 라우팅하며, 최대한 보수적으로 판단하도록 프롬프트를 설정하였습니다.

In [7]:
from typing import Literal

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

from langchain_core.prompts import ChatPromptTemplate

class RouteQuery(BaseModel):
    
    binary_score: Literal["yes", "no"] = Field(
        ...,
        description="Given a user question, determine if it needs to be retrieved from vectorstore or not. Return 'yes' if it needs to be retrieved from vectorstore, otherwise return 'no'.",
    )

llm = ChatOpenAI(model="gpt-4.1-mini", temperature=0)

structured_llm_router = llm.with_structured_output(RouteQuery)

system = """
You are an expert at routing a user question. 
The vectorstore contains documents retrieved using LangChain / LangGraph.
The vectorstore contains documents related to RAG(Retrieval Augmented Generation) source code and documentation.
Return 'yes' if the question is related to the source code or documentation, otherwise return 'no'.
If you can't determine if the question is related to the source code or documentation, return 'yes'.
If you don't know the answer, return 'yes'.
"""

route_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}")
    ]
)

question_router = route_prompt | structured_llm_router

In [8]:
question_router.invoke({"question": "MemorySaver 에 대해 설명하세요."})

RouteQuery(binary_score='yes')

In [9]:
question_router.invoke({"question": "대한민국의 수도는?"})

RouteQuery(binary_score='no')

## Query Rewrite

In [10]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4.1-mini")

system = """
You a question re-writer that converts an input question to a better version that is optimized for CODE SEARCH(github repository).

Look at the input and try to reason about the underlying semantic intent / meaning.

Base Code Repository: 

https://github.com/langchain-ai/langgraph

Output should be in English."""

re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Here is the initial question: \n\n {question} \n Formulate an improved question."),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()

In [11]:
question_rewriter.invoke("Hierarchical RAG")

'How can I implement Hierarchical Retrieval-Augmented Generation (RAG) using the LangGraph framework?'

## 평가

- 검색된 문서의 관련성 평가
- 답변의 환각 여부 평가
- 답변의 질문에 대한 관련성 평가

### 검색된 문서의 관련성 평가

In [12]:
from pydantic import BaseModel, Field

class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""
    
    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )
    
    
llm = ChatOpenAI(model="gpt-4.1-mini", temperature=0)

structured_llm_grader = llm.with_structured_output(GradeDocuments)

system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}")
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader

In [13]:
# question = "self-rag"
# retrieval_grader.invoke({"question": question, "document": retriever.invoke(question)[0].page_content})

In [14]:
# from pydantic import BaseModel, Field
# from langchain_openai import ChatOpenAI
# from langchain_core.prompts import ChatPromptTemplate


# # 문서 평가를 위한 데이터 모델 정의
# class GradeDocuments(BaseModel):
#     """Binary score for relevance check on retrieved documents."""

#     binary_score: str = Field(
#         description="Documents are relevant to the question, 'yes' or 'no'"
#     )


# # LLM 초기화 및 함수 호출을 통한 구조화된 출력 생성
# structured_llm_grader = llm.with_structured_output(GradeDocuments)

# # 시스템 메시지와 사용자 질문을 포함한 프롬프트 템플릿 생성
# system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
#     If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
#     It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
#     Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

# grade_prompt = ChatPromptTemplate.from_messages(
#     [
#         ("system", system),
#         ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
#     ]
# )

# # 문서 검색결과 평가기 생성
# retrieval_grader = grade_prompt | structured_llm_grader

In [15]:
# question = "self-rag"
# retrieval_grader.invoke({"question": question, "document": compression_retriever.invoke(question)[0].page_content})

# question = "self-rag"
# retrieval_grader.invoke({"question": question, "document": retriever.invoke(question)[0].page_content})

### 답변의 환각 여부 평가

`groundedness_checker`: 검색과 답변을 비교

`relevant_answer_checker`: 질문과 답변을 비교

In [16]:
class AnswerGroundedness(BaseModel):
    """Binary score for answer groundedness"""
    
    binary_score: str = Field(
        description="Answer is groundedness in the facts(given context), 'yes' or 'no'"
    )
    
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)    

structured_llm_grader = llm.with_structured_output(AnswerGroundedness)

system = """
You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts.
"""

groundedness_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
    ]
)

# 답변의 환각 여부 평가기 생성
groundedness_checker = groundedness_prompt | structured_llm_grader


In [17]:
# question = "self-rag"
# groundedness_checker.invoke({"documents": compression_retriever.invoke(question), "generation": rag_chain.invoke({"question": question, "context": compression_retriever.invoke(question)})})

question = "self-rag"
groundedness_checker.invoke({"documents": retriever.invoke(question), "generation": rag_chain.invoke({"question": question, "context": retriever.invoke(question)})})

AnswerGroundedness(binary_score='yes')

In [18]:
class GradeAnswer(BaseModel):
    """Binary scoring to evaluate the appropriateness of answers to questions"""

    binary_score: str = Field(
        description="Indicate 'yes' or 'no' whether the answer solves the question"
    )


# 함수 호출을 통한 LLM 초기화
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
structured_llm_grader = llm.with_structured_output(GradeAnswer)

# 프롬프트 설정
system = """You are a grader assessing whether an answer addresses / resolves a question \n 
     Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question."""
relevant_answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {generation}"),
    ]
)

# 프롬프트 템플릿과 구조화된 LLM 평가기를 결합하여 답변 평가기 생성
relevant_answer_checker = relevant_answer_prompt | structured_llm_grader

In [19]:
# question = "self-rag"
# relevant_answer_checker.invoke({"question": question, 
#                                 "generation": rag_chain.invoke({"question": question, "context": qdrant.invoke(question)[0].page_content})})

question = "self-rag"
relevant_answer_checker.invoke({"question": question, 
                                "generation": rag_chain.invoke({"question": question, "context": retriever.invoke(question)[0].page_content})})

GradeAnswer(binary_score='yes')

## 도구정의

웹서치를 위한 TavilySearch 

DUCKDUCKGO

In [20]:
from langchain_tavily.tavily_search import TavilySearch

tavily_search_tool = TavilySearch(max_result=3)


In [21]:
# tavily_search_tool.invoke("agent2agent")

In [22]:
# from langchain_community.tools.ddg_search import DuckDuckGoSearchRun

# duck = DuckDuckGoSearchRun(max_result=3)
# print(duck.invoke("agent2agent"))

## 상태정의

`AgentState` :  `messages: Annotated[Sequence[BaseMessage], add_messages]`로 정의한 messages가 추가된 형태.

In [23]:
from typing_extensions import TypedDict, Annotated
from langchain_core.documents import Document
from langgraph.prebuilt.chat_agent_executor import AgentState


# class GraphState(TypedDict):
#     question: Annotated[str, "User question"]
#     gneration: Annotated[str, "LLM generated answer"]
#     documents: Annotated[list[Document], "List of documents"]

class GraphState(AgentState):
    question: Annotated[str, "User question"]
    generation: Annotated[str, "LLM generated answer"]
    documents: Annotated[list[Document], "List of documents"]


## 노드 정의

### 질문 라우팅 노드

In [24]:
def route_question_node(state):
    question = state["question"]
    evaluation = question_router.invoke({"question": question})
    
    if evaluation.binary_score == "yes":
        return "query_expansion"
    else: 
        return "general_answer"

### 질문 재작성 노드

In [25]:
def query_rewrite_node(state):
    question = state["question"]
    
    better_question = question_rewriter.invoke({"question": question})
    
    return {"question": better_question}

### 문서 검색 노드

In [26]:
def retrieve_node(state):
    question = state["question"]
    
    # documents = compression_retriever.invoke(question)
    documents = retriever.invoke(question)
    return {"documents": documents}

### 일반 답변 생성 노드

In [27]:
from langchain_google_genai import ChatGoogleGenerativeAI
def general_answer_node(state):
    llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
    question = state["question"]
    answer = llm.invoke(question)
    return {"generation": answer}

### RAG 답변 생성 노드

In [28]:
def rag_answer_node(state):
    question = state["question"]
    documents = state["documents"]
    
    answer = rag_chain.invoke({"context": documents, "question": question})
    return {"generation": answer}

### 문서 관련성을 평가 후 필터링

filtered_docs에 관련성 있는 문서 추가

In [29]:
def filtering_documents_node(state):
    question = state["question"]
    documents = state["documents"]
    
    filtered_docs = []
    
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            filtered_docs.append(d)
        else:
            continue
        
    return {"documents": filtered_docs}

In [30]:
# # 문서의 관련성을 평가 후 필터링
# def filtering_documents_node(state):
#     # 질문과 문서 검색 결과 가져오기
#     question = state["question"]
#     documents = state["documents"]

#     # 각 문서에 대한 관련성 점수 계산
#     filtered_docs = []
#     for d in documents:
#         score = retrieval_grader.invoke(
#             {"question": question, "document": d.page_content}
#         )
#         grade = score.binary_score
#         if grade == "yes":
#             # 관련성이 있는 문서 추가
#             filtered_docs.append(d)
#         else:
#             # 관련성이 없는 문서는 건너뛰기
#             continue

#     return {"documents": filtered_docs}

### 웹 검색 노드
**외부(질문 입력)** 에서는 계속 question을 사용하고, **내부(Tavily 호출)** 에서는 query로 변환해서 넘기는 구조

In [31]:
from langchain_core.documents import Document

def web_search_node(state):
    question = state["question"]
    res = tavily_search_tool.invoke({"query": question})  # dict( results=[...] )
    items = res.get("results", [])                          # list[dict]

    web_results_docs = [
        Document(
            page_content=item.get("content", ""),
            metadata={"source": item.get("url", ""), "title": item.get("title", "")},
        )
        for item in items
        if isinstance(item, dict)
    ]
    
    
    return {"documents": web_results_docs}

### 추가 정보 검색 필요성 여부 평가 노드(웹 검색)

In [32]:
def decide_to_web_search_node(state):
    filtered_docs = state["documents"]
    print(len(filtered_docs))
    
    if len(filtered_docs) < 20:
        return "web_search"
    else:
        return "rag_answer"

### 답변의 환각 여부/관련성 여부 평가 노드

`groundedness_checker`: 검색과 답변을 비교

`relevant_answer_checker`: 질문과 답변을 비교

In [33]:
def answer_groundedness_check(state):
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    
    # Groundedness 평가: 검색과 답변 관련성 비교
    score = groundedness_checker.invoke(
        {"documents": documents, "generation": generation}
    )
    
    grade = score.binary_score
    
    # 만약 검색과 답변이 관련이 있다면,
    # Groundedness 평가 결과에 따른 처리: 질문과 답변 관련성 비교
    if grade == "yes":
        score = relevant_answer_checker.invoke(
            {"question": question, "generation": generation}
        )
        
        grade = score.binary_score
        
        # 관련성 평가 결과에 따른 처리
        if grade == "yes":
            return "relevant"
        else:
            return "not relevant"
        
    else:
        return "not grounded"
    

## 그래프 생성
라우팅 노드는 add_node로 추가하지 말고, `conditional_edge` 로 바로쓰자!

In [34]:
from langgraph.graph import END, StateGraph, START
from langgraph.checkpoint.memory import MemorySaver

# 그래프 상태 초기화
workflow = StateGraph(GraphState)

# 노드 정의
workflow.add_node("query_expand", query_rewrite_node)
workflow.add_node("query_rewrite", query_rewrite_node)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("web_search", web_search_node)
workflow.add_node("rag_answer", rag_answer_node)
workflow.add_node("general_answer", general_answer_node)
workflow.add_node("grade_documents", filtering_documents_node)



# 엣지 추가
workflow.add_conditional_edges(
    START,
    route_question_node, # 
    {
        "query_expansion": "query_expand", # 결과가 query_expansion이면 query_expand로
        "general_answer": "general_answer" # 결과가 general_answer이면, general_answer로
    }
)

workflow.add_edge("query_expand", "retrieve")
workflow.add_edge("retrieve", "grade_documents")

workflow.add_conditional_edges(
    "grade_documents",
    decide_to_web_search_node,
    {
        "web_search": "web_search", # 결과가 web_search이면, web_search로
        "rag_answer": "rag_answer" # 결과가 rag_answer이면, rag_answer로
    }
)

workflow.add_conditional_edges(
    "rag_answer",
    answer_groundedness_check,
    {
        "relevant": END,
        "not relevant": "web_search",
        "not grounded": "query_rewrite"
    }
)

workflow.add_edge("query_rewrite", "rag_answer")

workflow.add_edge("web_search", "rag_answer")

app = workflow.compile(checkpointer=MemorySaver())

In [35]:
# app

## 그래프 실행

In [36]:
from langchain_core.runnables import RunnableConfig
from langchain_teddynote.messages import stream_graph, random_uuid

# config 설정(재귀 최대 횟수, thread_id)
config = RunnableConfig(recursion_limit=20, configurable={"thread_id": random_uuid()})

# 질문 입력
inputs = {
    "question": "Self-RAG 에서 사용되는 관련성 평가 노드 예제를 찾아줘",
}

# 스트리밍 형식으로 그래프 실행
stream_graph(
    app,
    inputs,
    config,
)


🔄 Node: [1;36m__start__[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 

🔄 Node: [1;36mquery_expand[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 
Can you provide an example of a relevance evaluation node used in Self-RAG within the langgraph repository?
🔄 Node: [1;36mgrade_documents[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 
2

🔄 Node: [1;36mrag_answer[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 
Self-RAG에서 사용하는 relevance evaluation 노드 예시는 다음과 같습니다. 이 노드는 사용자 질문(question)과 검색된 문서(document)의 관련성을 평가하는 역할을 합니다. LangGraph Self-RAG 구현에서 주로 사용되는 평가 템플릿과 함수 구조를 참고할 수 있습니다.

아래는 Self-RAG에서 문서의 관련성을 평가하는 grader 노드 예시 코드입니다.

```python
system = """
You are a grader assessing relevance of a retrieved document to a user question.
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))
"""

def grade_retrieval_relevance(question, doc_txt):
    # 문서가 질문과 관련 있는지 평가
    result = retrieval_grader.invoke({"question": question, "docu

In [37]:
--

SyntaxError: invalid syntax (3659366440.py, line 1)

In [None]:
from langchain_core.runnables import RunnableConfig
from langchain_teddynote.messages import stream_graph, random_uuid


# config 설정(재귀 최대 횟수, thread_id)
config = RunnableConfig(recursion_limit=20, configurable={"thread_id": random_uuid()})

# 질문 입력
question = "대한민국의 수도는?"
inputs = {
    "question": question,
}

# 스트리밍 형식으로 그래프 실행
stream_graph(
    app,
    inputs,
    config,
)


🔄 Node: [1;36m__start__[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 

🔄 Node: [1;36mgeneral_answer[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 
대한민국의 수도는 서울입니다.
