# LangGraph Agents RAG - vetexAI

In [None]:
#%pip install -U langchain-core langchain-experimental langchain_community tiktoken chromadb tavily-python fastembed 

In [None]:
#%pip install --upgrade langchainhub langgraph langchain langchain-openai langchain-google-vertexai

In [None]:
#%pip install -U beautifulsoup4

In [1]:
from dotenv import load_dotenv
load_dotenv()
import os
HF_TOKEN = os.getenv("HF_TOKEN")

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings

embed_model = FastEmbedEmbeddings(model_name="intfloat/multilingual-e5-large")

urls = [
    "https://www.seoulsbdc.or.kr/bs/BS_VIEW.do?currentPage=1&boardCd=B061&infoSeq=132&bbs_Viewcnt=0&rnoIndex=undefined&searchType=title&searchStr=&viewcnt=10",
    "https://www.seoulsbdc.or.kr/bs/BS_VIEW.do?currentPage=1&boardCd=B061&infoSeq=132&bbs_Viewcnt=0&rnoIndex=undefined&searchType=title&searchStr=&viewcnt=10",
    "https://www.seoulsbdc.or.kr/bs/BS_VIEW.do?currentPage=1&boardCd=B061&infoSeq=109&bbs_Viewcnt=0&rnoIndex=undefined&searchType=title&searchStr=&viewcnt=10",
    "https://www.seoulsbdc.or.kr/bs/BS_VIEW.do?currentPage=1&boardCd=B061&infoSeq=109&bbs_Viewcnt=0&rnoIndex=undefined&searchType=title&searchStr=&viewcnt=10",
    "https://www.seoulsbdc.or.kr/bs/BS_VIEW.do?currentPage=1&boardCd=B061&infoSeq=102&bbs_Viewcnt=0&rnoIndex=undefined&searchType=title&searchStr=&viewcnt=10",
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
print(f"len of documents :{len(docs_list)}")

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=512, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)
print(f"length of document chunks generated :{len(doc_splits)}")

vectorstore = Chroma.from_documents(documents=doc_splits,
                                    embedding=embed_model,
                                    collection_name="local-seoul-promptone")

retriever = vectorstore.as_retriever(search_kwargs={"k":2})

USER_AGENT environment variable not set, consider setting it to identify your requests.
  from .autonotebook import tqdm as notebook_tqdm
Fetching 6 files: 100%|██████████| 6/6 [00:00<00:00, 43996.20it/s]


len of documents :5
length of document chunks generated :38


In [3]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./prompthon-prd-19-33d473e1eeb0.json"
from langchain_google_vertexai import VertexAI

llm = VertexAI(model_name="gemini-1.5-pro-001", temperature=0.1)

In [4]:
import time
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.output_parsers import StrOutputParser
from langchain_google_vertexai import VertexAI
import os

# Vertex AI 설정
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./prompthon-prd-19-33d473e1eeb0.json"
llm = VertexAI(model_name="gemini-1.5-pro-001", temperature=0.1)

# 프롬프트 설정
# web인지 RAG인지 판단하는 부분 삭제
# 차라리 질문을 다듬은 슈퍼에이전트로 들어가는게 맞음
prompt = PromptTemplate(
    template="""You are a route assistant for information about Seoul, responsible for selecting the appropriate data source for user questions.
Route questions to either 'web_search' or 'vectorstore' based on which is more appropriate to find the answer.
If the question is specific, detailed, or factual, prefer 'vectorstore'. If the question is general, broad, or likely to require the latest information, prefer 'web_search'.
Return the result as a JSON with a single key 'datasource' without any preamble or explanation.

Example questions and their appropriate data sources:
1. Question: What are the required documents for small business support applications in Seoul?
   Datasource: vectorstore
2. Question: What is the current weather in Seoul?
   Datasource: web_search

Question to route: {question}"""
,
    input_variables=["question"],
)

start = time.time()
question_router = prompt | llm | JsonOutputParser()

question = "서울 중구에서 1년정도 사업을 하고 있는데 내가 참여할 수 있는 지원사업이 있나요?"
response = question_router.invoke({"question": question})
end = time.time()

print(f"The time required to generate response by Router Chain in seconds: {end - start}")
print(response)


The time required to generate response by Router Chain in seconds: 2.1388232707977295
{'datasource': 'vectorstore'}


In [5]:
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an assistant for question-answering tasks. 
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. 
    Use three sentences maximum and keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question} 
    Context: {context} 
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)

# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Chain
start = time.time()
rag_chain = prompt | llm | StrOutputParser()

In [6]:
#
# 지금은 빼도 될 것 같음
prompt = PromptTemplate(
    template=""" You are a grader assessing relevance of a retrieved document to a user question. 
    If the document contains keywords related to the user question, grade it as relevant. 
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explaination.
    
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n 
    """,
    input_variables=["question", "document"],
)
start = time.time()
retrieval_grader = prompt | llm | JsonOutputParser()
question = "서울 중구에서 1년정도 사업을 하고 있는데 내가 참여할 수 있는 지원사업이 있나요?"
docs = retriever.invoke(question)
doc_txt = docs[0].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))
end = time.time()
print(f"The time required to generate response by the retrieval grader in seconds:{end - start}")


{'score': 'yes'}
The time required to generate response by the retrieval grader in seconds:1.0276546478271484


In [7]:
# Prompt
prompt = PromptTemplate(
    template="""<|start_header_id|>system<|end_header_id|> You are a grader assessing whether 
    an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate 
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
    single key 'score' and no preamble or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation} <|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "documents"],
)
start = time.time()
generation = []
hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader_response = hallucination_grader.invoke({"documents": docs, "generation": generation})
end = time.time()
print(f"The time required to generate response by the generation chain in seconds:{end - start}")
print(hallucination_grader_response)


The time required to generate response by the generation chain in seconds:0.9752624034881592
{'score': 'no'}


In [8]:
# Prompt
prompt = PromptTemplate(
    template=""" You are a grader assessing whether an answer is useful to resolve a question. 
    Give a binary score 'yes' or 'no' to indicate whether the answer is useful to resolve a question. 
    Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
    Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    Here is the question: {question} """,
    input_variables=["generation", "question"],
)
start = time.time()
answer_grader = prompt | llm | JsonOutputParser()
answer_grader_response = answer_grader.invoke({"question": question,"generation": generation})
end = time.time()
print(f"The time required to generate response by the answer grader in seconds:{end - start}")
print(answer_grader_response)


The time required to generate response by the answer grader in seconds:0.9854419231414795
{'score': 'no'}


In [9]:
import os
from dotenv import load_dotenv
load_dotenv()
tavily_api_key = os.getenv("TAVILY_API_KEY") 

In [10]:
import os
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3)

In [11]:
from typing_extensions import TypedDict
from typing import List

### State

class GraphState(TypedDict):
    question : str
    generation : str
    web_search : str
    documents : List[str]

In [12]:
from langchain.schema import Document
def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}
#
def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    
    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}
#
def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    
    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score['score']
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}
#
def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    return {"documents": documents, "question": question}
#

In [13]:
def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    source = question_router.invoke({"question": question})  
    print(source)
    print(source['datasource'])
    if source['datasource'] == 'web_search':
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source['datasource'] == 'vectorstore':
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"

In [14]:
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

In [15]:
from pprint import pprint

def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question,"generation": generation})
        grade = score['score']
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

In [16]:
from langgraph.graph import END, StateGraph
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generatae

In [17]:
workflow.set_conditional_entry_point(
    route_question,
    {
        "websearch": "websearch",
        "vectorstore": "retrieve",
    },
)

workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)

In [18]:
app = workflow.compile()

In [24]:
from pprint import pprint

inputs = {"question": "서울에서 음식점을 하는데, 1996년도에 시작한 노포집입니다. 이번에 손님들이 지원 받을 수도 있다고 해서 찾아보는데 안보여서요. 어떤 지원을 받을 수 있는지 알려주세요"}

def post_process_output(output):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
        
        if isinstance(value, dict):
            if "generation" in value:
                generation_text = value["generation"]
            else:
                pprint("No generation or text found in the output")
                continue
            
            # "assistant" 문자열 제거 및 앞뒤 입력 아이디 제거
            clean_text = generation_text.replace("assistant", ""
                                       ).replace("<|begin_of_text|>", ""
                                       ).replace("<|start_header_id|>", ""
                                       ).replace("<|end_header_id|>", ""
                                       ).replace("<|eot_id|>", ""
                                       ).strip()
            pprint(clean_text)
        else:
            pprint("Output format is not a dictionary")

# 출력 처리
for output in app.stream(inputs):
    post_process_output(output)


Retrying langchain_google_vertexai.llms._completion_with_retry.<locals>._completion_with_retry_inner in 4.0 seconds as it raised ServiceUnavailable: 503 Connection reset by peer.


---ROUTE QUESTION---
서울에서 음식점을 하는데, 1996년도에 시작한 노포집입니다. 이번에 손님들이 지원 받을 수도 있다고 해서 찾아보는데 안보여서요. 어떤 지원을 받을 수 있는지 알려주세요
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
---RETRIEVE---
'Finished running: retrieve:'
'No generation or text found in the output'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
'No generation or text found in the output'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
("서울시 종로구에서 30년 이상 운영된 노포의 경우 '종로형 노포' 지원 사업을 통해 최대 300만원의 환경/시설 개선 비용 및 컨설팅을 "
 "지원받을 수 있습니다. 1996년부터 음식점을 운영하셨다면 2023년 기준 27년째이므로, 아쉽지만 아직 '종로형 노포' 지원 자격 "
 '요건인 30년에는 해당되지 않습니다. 따라서 현재로서는 해당 지원 사업의 혜택을 받으실 수 없습니다.')
