# RAG

![img\RAG.png](img/RAG.png)

## Overview
* documents_loader
* splitter
* vector_store
* retriever

In [2]:
import os
os.environ["LANGCHAIN_TRACING_V2"] = 'true'
os.environ["LANGCHAIN_API_KEY"] = ""
os.environ["OPENAI_API_KEY"] = ""
os.environ["http_proxy"] = "http://127.0.0.1:7890"
os.environ["https_proxy"] = "http://127.0.0.1:7890"

In [22]:
import bs4
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
doc = WebBaseLoader("https://lilianweng.github.io/posts/2021-09-25-train-large/", 
                    bs_kwargs = dict(parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header"))))
docs = doc.load()

spliter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
doc_splitted = spliter.split_documents(docs)
vector_store = FAISS.from_documents(doc_splitted, embedding=OpenAIEmbeddings())
retriever = vector_store.as_retriever()
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

In [28]:
results = retriever.invoke("What is DP?")
results

[Document(metadata={'source': 'https://lilianweng.github.io/posts/2021-09-25-train-large/'}, page_content='Data Parallelism#\nThe most naive way for Data parallelism (DP)  is to copy the same model weights into multiple workers and assign a fraction of data to each worker to be processed at the same time.\nNaive DP cannot work well if the model size is larger than a single GPU node’s memory. Methods like GeePS (Cui et al. 2016) offload temporarily unused parameters back to CPU to work with limited GPU memory when the model is too big to fit into one machine. The data swapping transfer should happen at the backend and not interfere with training computation.\nAt the end of each minibatch, workers need to synchronize gradients or weights to avoid staleness. There are two main synchronization approaches and both have clear pros & cons.'),
 Document(metadata={'source': 'https://lilianweng.github.io/posts/2021-09-25-train-large/'}, page_content='Fig. 1. Pseudo code for Pytorch DDP. (Image s

In [29]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

In [26]:
from langchain_core.prompts import PromptTemplate

prompt = PromptTemplate.from_template(
    template="""You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.

Question: {question} 

Context: {context} 

Answer:""",

)

In [31]:
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
chain = ({'context': retriever|format_docs, "question":RunnablePassthrough()}
         | prompt
         | llm
         | StrOutputParser()         
         )

In [32]:
chain.invoke("What is DP?")

"DP stands for Data Parallelism, which involves copying the same model weights into multiple workers and assigning a fraction of data to each worker to be processed simultaneously. Naive DP may not work well if the model size exceeds a single GPU node's memory, leading to methods like GeePS that offload unused parameters back to the CPU. Workers in DP need to synchronize gradients or weights at the end of each minibatch to prevent staleness."

## Query Transformations

In [33]:
import bs4
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings   
doc = WebBaseLoader("https://lilianweng.github.io/posts/2021-09-25-train-large/",
                    bs_kwargs=dict(parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header"))))
docs = doc.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
doc_splitted = text_splitter.split_documents(docs)
vector_store = FAISS.from_documents(doc_splitted, embedding=OpenAIEmbeddings())
retriever = vector_store.as_retriever()


### Multi Query
核心解决用户询问模糊的问题，通过生成多个prompts解决该问题

![Multi Query](img/multi-query.png)

In [51]:
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
template = """
        You are an AI language model assistant. Your task is to generate five 
different versions of the given user question to retrieve relevant documents from a vector 
database. By generating multiple perspectives on the user question, your goal is to help
the user overcome some of the limitations of the distance-based similarity search. 
Provide these alternative questions separated by newlines. Original question: {question}

"""
prompts_generate_query = PromptTemplate.from_template(
    template = template,
)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
generate_chains = prompts_generate_query|llm

In [35]:
results = generate_chains.invoke("What is DP?") # 回复的是AIMessage

In [38]:
from langchain_core.output_parsers import StrOutputParser
outputparser = StrOutputParser() 
resultsstring = outputparser.invoke(results)# AIMessage对象转为字符串
resultsstring

'1. Can you explain the concept of DP?\n2. How would you define DP?\n3. What are the key characteristics of DP?\n4. Could you provide an overview of DP?\n5. In what context is DP commonly used?'

In [39]:
resultsstring.split("\n")

['1. Can you explain the concept of DP?',
 '2. How would you define DP?',
 '3. What are the key characteristics of DP?',
 '4. Could you provide an overview of DP?',
 '5. In what context is DP commonly used?']

In [43]:
# retriever.map() Return a new Runnable that maps a list of inputs to a list of outputs, 从接受单个input变成接受多个input
retriever_map = retriever.map()
results_all = retriever_map.invoke(resultsstring) # 为列表的列表 [[], [], []]最里面元素为Document对象

In [48]:
from langchain_core.load import loads, dumps
# dumps 返回一个对象的json字符串表示 loads 从 JSON 字符串恢复 LangChain 类。相当于加载（json.loads（文本））
def get_unique_union(results_all: list[list]):
    flattened_docs = [dumps(doc)for results in results_all for doc in results ]
    unique_docs = list(set(flattened_docs))
    return [loads(doc) for doc in unique_docs]

In [49]:
results = get_unique_union(results_all) # 去重
results

[Document(metadata={'source': 'https://lilianweng.github.io/posts/2021-09-25-train-large/'}, page_content='Data Parallelism#\nThe most naive way for Data parallelism (DP)  is to copy the same model weights into multiple workers and assign a fraction of data to each worker to be processed at the same time.\nNaive DP cannot work well if the model size is larger than a single GPU node’s memory. Methods like GeePS (Cui et al. 2016) offload temporarily unused parameters back to CPU to work with limited GPU memory when the model is too big to fit into one machine. The data swapping transfer should happen at the backend and not interfere with training computation.\nAt the end of each minibatch, workers need to synchronize gradients or weights to avoid staleness. There are two main synchronization approaches and both have clear pros & cons.'),
 Document(metadata={'source': 'https://lilianweng.github.io/posts/2021-09-25-train-large/'}, page_content='$n$ feed-forward networks as experts $\\{E_i\

In [50]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

In [52]:
# 故而整个检索chain如下
retriever = (
        prompts_generate_query
        | llm 
        | StrOutputParser()
        | (lambda x: x.split('\n'))
        | retriever.map()
        | get_unique_union
        | format_docs
)

In [53]:
retriever.invoke("What is DP?")

'Data Parallelism#\nThe most naive way for Data parallelism (DP)  is to copy the same model weights into multiple workers and assign a fraction of data to each worker to be processed at the same time.\nNaive DP cannot work well if the model size is larger than a single GPU node’s memory. Methods like GeePS (Cui et al. 2016) offload temporarily unused parameters back to CPU to work with limited GPU memory when the model is too big to fit into one machine. The data swapping transfer should happen at the backend and not interfere with training computation.\nAt the end of each minibatch, workers need to synchronize gradients or weights to avoid staleness. There are two main synchronization approaches and both have clear pros & cons.\n\nThe switch transformer paper summarized different data and model parallelism strategies for training large models with a nice illustration:\n\nBulk synchronous parallels (BSP): Workers sync data at the end of every minibatch. It prevents model weights stalen

## RAG-Fusion
![img\RAG-Fusion.png](img/RAG-Fusion.png)

当启用多个query进行询问时，受到llm输入窗口的限制，不可能将所有的相关文本都输入进去(即使在去重的情况下)，所以使用得分的方式进行选取
$$\text{scors} = \sum \frac{1}{rank+k}$$
k是防止分母为0

In [63]:
from langchain_core.prompts import PromptTemplate
template = """You are a helpful assistant that generates multiple search queries based on a single input query. \n
Generate multiple search queries related to: {question} \n
Output (4 queries):"""
query_template = PromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
retriever = vector_store.as_retriever()

In [98]:
from langchain_core.load import dumps, loads
def reciprocal_rank_fusion(results_all: list[str], k=60):
    scores_all = {}
    for results in results_all:
            for rank, result in enumerate(results):
                result = dumps(result)
                if result not in scores_all:
                    scores_all[result] = 0
                scores_all[result] += 1/(rank+k)
    ranked_results = [ (loads(result), score) 
        for result, score in sorted(scores_all.items(), key=lambda x: x[0], reverse=True)
    ]
    return ranked_results

query_chain = query_template|llm|StrOutputParser()|(lambda x: x.split('\n'))|retriever.map()|reciprocal_rank_fusion

In [99]:
results = query_chain.invoke("What is DP?")
results

[(Document(metadata={'source': 'https://lilianweng.github.io/posts/2021-09-25-train-large/'}, page_content='where each entry $A[i,j]$ in $A \\in \\mathbb{R}^{e \\times n}$ marks whether the $i$-the expert selects the $j$-th token. Solving this is non-trivial. The paper used Dykstra’s algorithm that runs a sequence of multiple iterative computation steps. Capped expert choice results in a slight decrease in the fine-tuning performance in the experiments.\nThe parameter $k$ is determined by $k=nc/e$, where $n$ is the total number of tokens in one batch and $c$ is a capacity factor indicating the average number of experts used by one token. The paper used $c=2$ in most experiments, but EC with $c=1$ still outperforms the top-1 token choice gating. Interestingly, $c=0.5$ only marginally hurts the training performance.\nOne big drawback of EC is that it does not work when the batch size is too small, neither for auto-regressive text generation, because it needs to know the future tokens to 

### Decomposition
将所提的问题进行解耦分成几步，然后针对几个子问题进行解答

In [100]:
from langchain_core.prompts import ChatPromptTemplate
# Decomposition 生成子问题
template = """You are a helpful assistant that generates multiple sub-questions related to an input question. \n
The goal is to break down the input into a set of sub-problems / sub-questions that can be answers in isolation. \n
Generate multiple search queries related to: {question} \n
Output (3 queries):"""
prompt_decomposition = ChatPromptTemplate.from_template(template)

In [101]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
chain = prompt_decomposition|llm|StrOutputParser()|(lambda x: x.split('\n'))
question = chain.invoke("How DP works?")

In [102]:
question  # 但是最大的问题是提出的问题错了 DP是Data Parallelism不是动态规划

['1. What are the key principles behind dynamic programming?',
 '2. Can you provide examples of problems that can be solved using dynamic programming?',
 '3. How does dynamic programming differ from other problem-solving techniques like greedy algorithms or divide and conquer?']

#### Answer recursively
根据得到问题后，如何处理问题的答案分为两种  


![img\Answer recursively.png](img/Answer_recursively.png)


一种是进行迭代 获得结果后将结果送入下一个问题中促进回答

In [103]:
# Prompt
template = """Here is the question you need to answer:

\n --- \n {question} \n --- \n

Here is any available background question + answer pairs:

\n --- \n {q_a_pairs} \n --- \n

Here is additional context relevant to the question: 

\n --- \n {context} \n --- \n

Use the above context and any background question + answer pairs to answer the question: \n {question}
"""

decomposition_prompt = ChatPromptTemplate.from_template(template)

In [105]:
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
def format_qa_pair(question, answer):
    formatted = ""
    formatted = f"Question:{question}\nAnswer:{answer}\n\n"
    return formatted.strip()  # strip() 方法用于移除字符串头尾指定的字符（默认为空格或换行符）或字符序列。

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
q_a_pairs = ""
# 通过prompt循环调用
for q in question:
    rag_chain = (
        {"context": itemgetter("question")|retriever,
         'question': itemgetter("question"),
         'q_a_pairs': itemgetter("q_a_pairs")}
         | decomposition_prompt
         | llm
         | StrOutputParser()
    )
    answer = rag_chain.invoke({"question": q, "q_a_pairs": q_a_pairs})
    q_a_pair = format_qa_pair(q, answer)
    q_a_pairs = q_a_pairs +  '\n---\n' + q_a_pair

In [106]:
answer

'Dynamic programming differs from other problem-solving techniques like greedy algorithms and divide and conquer in several key ways:\n\n1. **Optimality**: Dynamic programming guarantees finding the optimal solution to a problem by breaking it down into subproblems and storing the results of these subproblems for future use. Greedy algorithms, on the other hand, make decisions based on the current best choice without considering the overall optimal solution. Divide and conquer also breaks down the problem into smaller subproblems but may not always guarantee the optimal solution.\n\n2. **Overlapping subproblems**: Dynamic programming identifies and solves overlapping subproblems only once, storing their solutions for future use. Greedy algorithms and divide and conquer may not efficiently handle overlapping subproblems, leading to redundant calculations and potentially slower performance.\n\n3. **Memoization**: Dynamic programming uses memoization to store the results of subproblems an

#### Answer individually 

![img\Answer_individually.png](img/Answer_individually.png)


单独问每个问题，结合所有的回答最终进行总回答

In [107]:
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template(
    """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.

Question: {question} 

Context: {context} 

Answer:"""
)
sub_q_generator = prompt_decomposition|llm|StrOutputParser()|(lambda x: x.split('\n'))
def retrieve_and_rag(question, prompt_rag, sub_q_generator):
    question_all = sub_q_generator.invoke(question)
    rag_r = []
    for q in question_all:
        retrieval_answer = retriever.get_relevant_documents(q)
        answer = (prompt|llm|StrOutputParser()).invoke({"question": q, "context": retrieval_answer})
        rag_r.append(answer)
    return rag_r, question_all

rag_r, question_all = retrieve_and_rag("How Data parallelism works?", decomposition_prompt, sub_q_generator)

  warn_deprecated(


In [108]:
rag_r, question_all

(['The different types of data parallelism include Bulk Synchronous Parallelism (BSP) and Asynchronous Parallel (ASP). BSP involves syncing data at the end of every minibatch to prevent model weights staleness, while ASP processes data asynchronously without waiting, potentially leading to stale weights. Another approach is to synchronize gradients globally once every x iterations, known as "gradient accumulation" in Distribution Data Parallel (DDP).',
  'Data parallelism improves performance in parallel computing by copying the same model weights into multiple workers and assigning a fraction of data to each worker to be processed simultaneously. This approach helps distribute the workload efficiently across multiple workers, increasing overall processing speed. Synchronization of gradients or weights at the end of each minibatch is essential to avoid staleness and ensure effective training.',
  'Some common programming models used for implementing data parallelism include Data Parall

In [110]:
def format_qa_pair(question, answer):
    formatted = ""
    for index, (question, answer) in enumerate(zip(question, answer)):
        formatted += f"Question{index}:{question}\nAnswer:{answer}\n\n"
    return formatted.strip()

context = format_qa_pair(question_all, rag_r)
template = """Here is a set of Q+A pairs:

{context}

Use these to synthesize an answer to the question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

final_rag_chain = (
    prompt
    | llm
    | StrOutputParser()
)

final_rag_chain.invoke({"context":context,"question":"How Data parallelism works?"})

'Data parallelism works by distributing the same model weights to multiple workers and assigning a portion of the data to each worker for simultaneous processing. This approach helps improve performance in parallel computing by dividing the workload efficiently across multiple workers, increasing overall processing speed. Synchronization of gradients or weights at the end of each minibatch is crucial to prevent staleness and ensure effective training. Common programming models used for implementing data parallelism include Bulk Synchronous Parallelism (BSP), Asynchronous Parallel (ASP), Data Parallelism, Model Parallelism, and Pipeline Parallelism. Each of these models offers different strategies for distributing workloads and optimizing performance in parallel computing environments.'

### Step Back

问题太过于细节 通过将问题step—back 问一些原始的问题，检索到宽泛的上下文   
有点像纯track了 

### HyDE

![img\HyDE.png](img/HyDE.png)

单纯的问题和文本具有gap 故而让llm根据问题生成一部分的上下文进行增强检索

In [111]:
from langchain_core.prompts import ChatPromptTemplate
context_prompt = ChatPromptTemplate.from_template(
    """Please write a scientific paper passage to answer the question
Question: {question}
Passage:"""
)
prompt_hyde = context_prompt|llm|StrOutputParser()|retriever
results_retrieval = prompt_hyde.invoke("What is Data parallelism (DP)?")

In [112]:
results_retrieval

[Document(metadata={'source': 'https://lilianweng.github.io/posts/2021-09-25-train-large/'}, page_content='Data Parallelism#\nThe most naive way for Data parallelism (DP)  is to copy the same model weights into multiple workers and assign a fraction of data to each worker to be processed at the same time.\nNaive DP cannot work well if the model size is larger than a single GPU node’s memory. Methods like GeePS (Cui et al. 2016) offload temporarily unused parameters back to CPU to work with limited GPU memory when the model is too big to fit into one machine. The data swapping transfer should happen at the backend and not interfere with training computation.\nAt the end of each minibatch, workers need to synchronize gradients or weights to avoid staleness. There are two main synchronization approaches and both have clear pros & cons.'),
 Document(metadata={'source': 'https://lilianweng.github.io/posts/2021-09-25-train-large/'}, page_content='Training Parallelism#\nThe main bottleneck fo

In [114]:
prompt = ChatPromptTemplate.from_template(
    """Answer the following question based on this context:

{context}

Question: {question}"""
)
answer_chain = (prompt|llm|StrOutputParser()).invoke({"context": results_retrieval, "question": "What is Data parallelism (DP)?"})
answer_chain

'Data parallelism (DP) is a method where the same model weights are copied into multiple workers, and each worker is assigned a fraction of data to be processed simultaneously.'

## Routing
在有多个数据库的情况下，将分解的问题分配到不同的源数据库中，提高检索的效率   
Given a question about LangChain usage, we'd want to infer which language the the question was referring to and query the appropriate docs. Query routing is the process of classifying which index or subset of indexes a query should be performed on.

### Logical and Semantic routing
![img\semantic_routing.png](img/semantic_routing.png)

使用输出解析器，让LLM输出为固定的种类

In [117]:
from typing import Literal 
# Literal 是 Python 中的 typing 模块中的一个类型注解，用来定义一个函数参数、变量或者返回值只能取一组固定的字面值（字面量）之一。
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)

class RouteQuery(BaseModel):
    """Route a user query to the most relevant datasource"""
    datasource: Literal["python_docs", "js_docs", "golang_docs"] = Field(
        ..., 
        description="Given a user question choose which datasource would be most relevant for answering their question",)
system = """You are an expert at routing a user question to the appropriate data source.

Based on the programming language the question is referring to, route it to the relevant data source."""
structured_llm = llm.with_structured_output(RouteQuery)
prompt = ChatPromptTemplate.from_messages(
    [("system", system),
     ("human", "{question}")]
)

chain = prompt|structured_llm

LLM的结构化输出

![img\structed_output.png](img/structed_output.png)

In [118]:
question = """Why doesn't the following code work:

from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages(["human", "speak in {language}"])
prompt.invoke("french")
"""
results = chain.invoke(question)
results, results.datasource

(RouteQuery(datasource='python_docs'), 'python_docs')

In [119]:
# 对于一些函数想要连接到链中 使用RunnableLambda 
# lambda本身就可以连接到链中
from langchain_core.runnables import RunnableLambda

def choose_route(results):
    if "python_docs" in results.datasource.lower():
        return "chain for python_docs"
    elif "js_docs" in results.datasource.lower():
        return "chain for python_docs"
    elif "golang_docs" in results.datasource.lower():
        return "chain for python_docs"

chain_all = chain|RunnableLambda(choose_route)
chain_all.invoke(question)

'chain for python_docs'

### semantic routing
当有多个prompt模板时候，可以使用embedding model编码prompt，让question选择最近似的

![img\semantic_routing2.png](img/semantic_routing2.png)

In [120]:
from langchain_openai import OpenAIEmbeddings
from langchain.utils.math import cosine_similarity


# Two prompts
physics_template = """You are a very smart physics professor. \
You are great at answering questions about physics in a concise and easy to understand manner. \
When you don't know the answer to a question you admit that you don't know.

Here is a question:
{query}"""

math_template = """You are a very good mathematician. You are great at answering math questions. \
You are so good because you are able to break down hard problems into their component parts, \
answer the component parts, and then put them together to answer the broader question.

Here is a question:
{query}"""

embedding = OpenAIEmbeddings()
prompt_template = [physics_template, math_template]
prompt_embeddings = embedding.embed_documents(prompt_template)

In [122]:
type(prompt_embeddings),len(prompt_embeddings), len(prompt_embeddings[0])

(list, 2, 1536)

In [125]:
input = {"query": "What's a black hole"}
query_embedding = embedding.embed_query(input["query"])
# cosine_similarity需要参数为List[List[float]] 传入的是一个[List[List[float]]
similarity = cosine_similarity([query_embedding], prompt_embeddings) # 返回[List[List[float]]
similarity

array([[0.7529026 , 0.71731764]])

In [128]:
def prompt_router(input):
    query_embedding = embedding.embed_query(input["query"])
    similarity = cosine_similarity([query_embedding], prompt_embeddings)
    most_similar = prompt_template[similarity.argmax()]
    print("Using MATH" if most_similar == math_template else "Using PHYSICS")
    return PromptTemplate.from_template(most_similar)

In [129]:
from langchain_core.runnables import RunnableLambda
chain = ( 
        {"query": RunnablePassthrough()}
        | RunnableLambda(prompt_router)
        | ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
        | StrOutputParser()
        )
chain.invoke("What's a black hole")

Using PHYSICS


'A black hole is a region in space where the gravitational pull is so strong that nothing, not even light, can escape from it. This happens when a massive star collapses in on itself, creating a singularity with infinite density at its center. The boundary surrounding the singularity is called the event horizon, beyond which nothing can escape. Black holes can come in different sizes, from stellar-mass black holes to supermassive black holes found at the centers of galaxies.'

### Query Construaction

需要vectorstores包含metadata
通过源数据可以进行筛选

In [132]:
from langchain_community.document_loaders import YoutubeLoader

docs = YoutubeLoader.from_youtube_url(
    "https://www.youtube.com/watch?v=pbAd8O1Lvm4", add_video_info=True
).load()

docs[0].metadata

{'source': 'pbAd8O1Lvm4',
 'title': 'Self-reflective RAG with LangGraph: Self-RAG and CRAG',
 'description': 'Unknown',
 'view_count': 21844,
 'thumbnail_url': 'https://i.ytimg.com/vi/pbAd8O1Lvm4/hq720.jpg',
 'publish_date': '2024-02-07 00:00:00',
 'length': 1058,
 'author': 'LangChain'}

In [136]:
from typing import Optional
from langchain_core.pydantic_v1 import BaseModel, Field
import datetime
# 构建一个BaseModel 给一些属性进行检索
class TutorialSearch(BaseModel):
    """Search over a database of tutorial videos about a software library."""

    content_search: str = Field(
        ...,
        description="Similarity search query applied to video transcripts.",
    )
    title_search: str = Field(
        ...,
        description=(
            "Alternate version of the content search query to apply to video titles. "
            "Should be succinct and only include key words that could be in a video "
            "title."
        ),
    )
    min_view_count: Optional[int] = Field(
        None,
        description="Minimum view count filter, inclusive. Only use if explicitly specified.",
    )
    max_view_count: Optional[int] = Field(
        None,
        description="Maximum view count filter, exclusive. Only use if explicitly specified.",
    )
    earliest_publish_date: Optional[datetime.date] = Field(
        None,
        description="Earliest publish date filter, inclusive. Only use if explicitly specified.",
    )
    latest_publish_date: Optional[datetime.date] = Field(
        None,
        description="Latest publish date filter, exclusive. Only use if explicitly specified.",
    )
    min_length_sec: Optional[int] = Field(
        None,
        description="Minimum video length in seconds, inclusive. Only use if explicitly specified.",
    )
    max_length_sec: Optional[int] = Field(
        None,
        description="Maximum video length in seconds, exclusive. Only use if explicitly specified.",
    )

    def pretty_print(self) -> None:
        for field in self.__fields__:
            if getattr(self, field) is not None and getattr(self, field) != getattr(
                self.__fields__[field], "default", None
            ):
                print(f"{field}: {getattr(self, field)}")

In [137]:
system = """You are an expert at converting user questions into database queries. \
You have access to a database of tutorial videos about a software library for building LLM-powered applications. \
Given a question, return a database query optimized to retrieve the most relevant results.

If there are acronyms or words you are not familiar with, do not try to rephrase them."""
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm = llm.with_structured_output(TutorialSearch)
query_analyzer = prompt | structured_llm

In [138]:
query_analyzer.invoke({"question": "rag from scratch"}).pretty_print()

content_search: rag from scratch
title_search: rag
min_length_sec: 0
max_length_sec: 600


## Indexing
将相关文档存储到Datastore

### Multi-representation Indexing

![img\multi_rep_indexing.png](img/multi_rep_indexing.png)  


有多个文档，虽然可以textsplitter 但是可以使用GPT-4进行summary后embedding,当近似度最大时，选取对应整个文档

In [4]:
from langchain_community.document_loaders import WebBaseLoader

loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
docs = loader.load()

loader = WebBaseLoader("https://lilianweng.github.io/posts/2024-02-05-human-data-quality/")
docs.extend(loader.load())

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [5]:
docs

[Document(metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/', 'title': "LLM Powered Autonomous Agents | Lil'Log", 'description': 'Building agents with LLM (large language model) as its core controller is a cool concept. Several proof-of-concepts demos, such as AutoGPT, GPT-Engineer and BabyAGI, serve as inspiring examples. The potentiality of LLM extends beyond generating well-written copies, stories, essays and programs; it can be framed as a powerful general problem solver.\nAgent System Overview In a LLM-powered autonomous agent system, LLM functions as the agent’s brain, complemented by several key components:', 'language': 'en'}, page_content='\n\n\n\n\n\nLLM Powered Autonomous Agents | Lil\'Log\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nLil\'Log\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nPosts\n\n\n\n\nArchive\n\n\n\n\nSearch\n\n\n\n\nTags\n\n\n\n\nFAQ\n\n\n\n\nemojisearch.app\n\n\n\n\n\n\n\n\n\n      LLM Powered Auton

In [7]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
chain = (
        {"doc": lambda x: x.page_content}
         | ChatPromptTemplate.from_template("Summarize the following document:\n\n{doc}")
         | ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
         | StrOutputParser()
            ) 
 # 'max_concurrency' for controlling how much work to do in parallel
summaries = chain.batch(docs,{"max_concurrency": 5})

In [8]:
summaries

['The document discusses the concept of building autonomous agents powered by Large Language Models (LLMs). It covers key components of such agents, including planning, memory, and tool use, along with case studies and proof-of-concept examples. Challenges such as finite context length, planning difficulties, and reliability of natural language interfaces are also highlighted. The document provides references to related research and projects in the field.',
 'The document discusses the importance of high-quality human data for training deep learning models. It covers topics such as the role of human raters in data quality, the wisdom of the crowd in data annotation, methods for measuring rater agreement, and two paradigms for data annotation. It also explores techniques for identifying mislabeled data points during model training, such as influence functions, tracking prediction changes during training, and noisy cross-validation. The document provides citations to relevant studies and

In [16]:
from langchain_community.vectorstores import FAISS
from langchain_core.stores import InMemoryByteStore
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
id_key = "doc_id"
doc_ids = [str(i) for i,_ in enumerate(docs)]
summary_docs = [
    Document(page_content=s, metadata={id_key: doc_ids[i]})
    for i, s in enumerate(summaries)
]

vector_store = FAISS.from_documents(summary_docs, embedding=OpenAIEmbeddings())

In [22]:
retriever = vector_store.as_retriever(search_kwargs={"k":1})
retriever.get_relevant_documents("What is DP?")

[Document(metadata={'doc_id': '1'}, page_content='The document discusses the importance of high-quality human data for training deep learning models. It covers topics such as the role of human raters in data quality, the wisdom of the crowd in data annotation, methods for measuring rater agreement, and two paradigms for data annotation. It also explores techniques for identifying mislabeled data points during model training, such as influence functions, tracking prediction changes during training, and noisy cross-validation. The document provides citations to relevant studies and research in the field of data quality and model training.')]

### RAPTOR


![img\RAPTOR.png](img/RAPTOR.png)  



通过文本进行分层，有些问题需要详细的信息，有些问题需要广泛的信息

code: https://github.com/langchain-ai/langchain/blob/master/cookbook/RAPTOR.ipynb   
video: https://www.youtube.com/watch?v=jbGchdTL7d0

In [None]:
pass # todo

### ColBERT
ColBERT generates a contextually influenced vector for each token in the passages. 

ColBERT similarly generates vectors for each token in the query.

Then, the score of each document is the sum of the maximum similarity of each query embedding to any of the document embeddings:

In [1]:
pass # todo

## RAG from Scratch: Retrieval

### Re-ranking
![img\re-ranking.png](img/re-ranking.png)

In [2]:
import os
os.environ["LANGCHAIN_TRACING_V2"] = 'true'
os.environ["LANGCHAIN_API_KEY"] = ""
os.environ["OPENAI_API_KEY"] = ""
os.environ["http_proxy"] = "http://127.0.0.1:7890"
os.environ["https_proxy"] = "http://127.0.0.1:7890"

In [4]:
import bs4
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
loader = WebBaseLoader(
    web_path="https://lilianweng.github.io/posts/2023-06-23-agent/",
    bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header")),
    ),
    )
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
doc_splitted = text_splitter.split_documents(docs)
vector_store = FAISS.from_documents(doc_splitted, embedding=OpenAIEmbeddings())
retriever = vector_store.as_retriever()

In [5]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
template = ChatPromptTemplate.from_template(
    """
    You are a helpful assistant that generates multiple search queries based on a single input query. \n
Generate multiple search queries related to: {question} \n
Output (4 queries):
    """
)
generate_quers = (
    template
    | llm
    | StrOutputParser()
    | (lambda x: x.split('\n'))
)

在query_Transformers中根据出现的评论进行赋分, reciprocal_rank_fusion 就是一种rerank技术

### Cohere Re-rank
![img\Cohere_ReRank.png](img/Cohere_ReRank.png)   
Blog: https://cohere.com/blog/rerank  
在Rerank阶段,LLMs计算一个分数，该分数为query喝每一个最初搜索结果的相似性得分
> 感觉是就是一阶段通过语义或者是单词进行搜索，二阶段整体语言再rank

In [14]:
from langchain_cohere import CohereRerank
from langchain.retrievers import ContextualCompressionRetriever
# ContextualCompressionRetriever 
# Retriever that wraps a base retriever and compresses the results.
os.environ["COHERE_API_KEY"] = ""
retriever = vector_store.as_retriever(search_kwargs={"k": 10})

compressor = CohereRerank(model="rerank-english-v2.0")
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever)
compressed_docs = compression_retriever.get_relevant_documents("What is task decomposition for LLM agents?")


  warn_deprecated(


In [15]:
compressed_docs

[Document(metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/', 'relevance_score': 0.9972744}, page_content='Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.'),
 Document(metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/', 'relevance_score': 0.95622724}, page_content='LLM Powered Autonomous Agents\n    \nDate: June 23, 2023  |  Estimated Reading Time: 31 min  |  Author: Lilian Weng\n\n\nBu

### self-RAG

![img\self_rag.png](img/self_rag.png)   
* 检索节点
  * 从文档中检索除文档块
  * 输出:yes no continue
* Grade 评分
  * 评估文档块中是否提供了解决问题的有用信息
  * relevant irrelevant
* 生成 
  * 根据生成的内容是否和文档块相关 
  * fully supporrted partially supported no support
* 答案评估
  * generation是否对question进行回复
  * 1,2,3,4,5

In [16]:
import os
os.environ["LANGCHAIN_TRACING_V2"] = 'true'
os.environ["LANGCHAIN_API_KEY"] = ""
os.environ["OPENAI_API_KEY"] = ""
os.environ["http_proxy"] = "http://127.0.0.1:7890"
os.environ["https_proxy"] = "http://127.0.0.1:7890"

In [17]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url,bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header")),
    ),).load() for url in urls]
docs_list = [sublist for doc in docs for sublist in doc ]
text_splitter = RecursiveCharacterTextSplitter(chunk_size=250, chunk_overlap=0)
doc_splitted = text_splitter.split_documents(docs_list)
vector_store = FAISS.from_documents(doc_splitted, embedding=OpenAIEmbeddings())
retriever = vector_store.as_retriever()

以下构建图的不同部分

In [18]:
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate

# LLMs想要结构化输出使用BaseModel修饰的类
class GradeDocumnet(BaseModel):
    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
structured_llm = llm.with_structured_output(GradeDocumnet)
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)
retrieval_grader = grade_prompt | structured_llm
question = "agent memory"
docs = retriever.get_relevant_documents(question)
doc_text = docs[1].page_content
print(retrieval_grader.invoke({"document": doc_text, "question": question}))


binary_score='yes'


In [20]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
prompt = """
You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.

Question: {question} 

Context: {context} 

Answer:
"""
prompt_template = ChatPromptTemplate.from_template(prompt)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    prompt_template
    | llm
    | StrOutputParser()
)
generation = rag_chain.invoke({"question": "agent memory", "context": format_docs(docs)})

In [21]:
generation

"Agent memory refers to the long-term memory capability of retaining and recalling infinite information over extended periods. This is often achieved through an external vector store and fast retrieval. The memory stream module records a comprehensive list of the agent's experiences in natural language."

In [22]:
## Hallucination Grader 幻觉评分

class GradeHallucination(BaseModel):
    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
structured_llm_grader = llm.with_structured_output(GradeHallucination)
system = """
    you are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
    Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts.
"""
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
    ]
)
hallucination_grader = hallucination_prompt | structured_llm_grader
hallucination_grader.invoke({"documents": docs, "generation": generation})

GradeHallucination(binary_score='yes')

In [29]:
## Answer Grader 结果评分
# data model
class GradeAnswer(BaseModel):
    binary_score: int = Field(description="Score from 0 to 5, 5 being the best answer")
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
structured_llm = llm.with_structured_output(GradeAnswer)
system = """
    You are a grader assessing the quality of an answer generated by an LLM. \n 
    Give a score from 0 to 5, 5 being the best answer.
"""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {answer}"),
    ]
)
answer_grader = answer_prompt | structured_llm
grade = answer_grader.invoke({"question": question,"answer": generation})
grade.binary_score, type(grade.binary_score), int(grade.binary_score)

(5, int, 5)

In [26]:
## Question Re-writer
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
system = """You a question re-writer that converts an input question to a better version that is optimized \n 
     for vectorstore retrieval. Look at the input and try to reason about the underlying sematic intent / meaning."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Here is the initial question: \n\n {question} \n Formulate an improved question."),
    ]
)
question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

"What is the role of memory in an agent's functioning?"

构建graph

In [28]:
from typing_extensions import TypedDict

from typing import List
# TypingDict 用于定义字典类型 让键值对的类型固定
class GraphState(TypedDict):
    """
    对于图的表示
    Attributes:
        question: str: 问题
        generation: str: 生成的答案
        documents: list[Document]: 文档
    """
    question: str
    generation: str
    documents: list[Document]

In [34]:
def retrieve(state):
    """检索相关文档"""
    print("Retrieving documents...")
    question = state["question"]
    docs = retriever.get_relevant_documents(question)
    return {"documents": docs, "question": question}

def generate(state):
    """生成回答"""
    print("Generating answer...")
    question = state["question"]
    docs = state["documents"]
    generation = rag_chain.invoke({"question": question, "context": format_docs(docs)})
    return {"documents": docs, "question": question, "generation": generation}

def grade_documents(state):
    """判别检索的文档是否和问题相关"""
    print("Grading documents...")
    question = state["question"]
    documents = state["documents"]
    filtered_docs = []
    for doc in documents:
        print(doc.page_content)
        score = retrieval_grader.invoke({"document": doc.page_content, "question": question})
        grade = score.binary_score
        if grade == "yes":
            print("Document relevant")
            filtered_docs.append(doc)
        else:
            print("Document not relevant")
            continue
    return {"documents": filtered_docs, "question": question}

def transform_query(state):
    """转换问题"""
    print("Transforming query...")
    question = state["question"]
    new_question = question_rewriter.invoke({"question": question})
    return {"documents": state["documents"], "question": new_question}

def decide_to_generate(state):
    """判断是否生成答案"""
    print("Deciding to generate...")
    filtered_documents = state["documents"]
    if not filtered_documents:
        # 所有检索的文档都不好 重修将问题转换为新问题 再次检索
        print("No relevant documents. Transforming query...")
        return "transform_query"
    else:
        print("Generating answer...")
        return "generate"
    
def grade_generation_v_documents_and_question(state):
    """评分生成的答案"""
    print("Grading generation...")
    documents = state["documents"]
    question = state["question"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score.binary_score
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        print("Grading answer...")
        score = answer_grader.invoke({"question": question, "answer": generation})
        if score.binary_score < 3:
           print(f"{score.binary_score} is a low score. generation does not address question...")
           return "not useful"
        else:
            print(f"{score.binary_score} is a high score. generation is good...")
            return "useful"
    else:
        print("Generation not grounded in facts")
        return "not supported"

In [35]:
## 构建图
from langgraph.graph import StateGraph, END
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("transform_query", transform_query)

workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "generate": "generate",
        "transform_query": "transform_query",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {   
        "not supported": "transform_query",
        "useful": END,
        "not useful": "transform_query",
    },
)
app = workflow.compile()

In [36]:
from pprint import pprint
inputs = {"question": "Explain how the different types of agent memory work?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])

Retrieving documents...
"Node 'retrieve':"
'\n---\n'
Grading documents...
Long-term memory: This provides the agent with the capability to retain and recall (infinite) information over extended periods, often by leveraging an external vector store and fast retrieval.
Document relevant
The design of generative agents combines LLM with memory, planning and reflection mechanisms to enable agents to behave conditioned on past experience, as well as to interact with other agents.
Document not relevant
Component Two: Memory#
(Big thank you to ChatGPT for helping me draft this section. I’ve learned a lot about the human brain and data structure for fast MIPS in my conversations with ChatGPT.)
Types of Memory#
Document not relevant
Long-term memory as the external vector store that the agent can attend to at query time, accessible via fast retrieval.
Document not relevant
Deciding to generate...
Generating answer...
"Node 'grade_documents':"
'\n---\n'
Generating answer...
Grading generation...

In [40]:
app.get_graph().print_ascii()

                         +-----------+                    
                         | __start__ |                    
                         +-----------+                    
                               *                          
                               *                          
                               *                          
                         +----------+                     
                         | retrieve |*                    
                         +----------+ ****                
                        **                ****            
                      **                      ****        
                    **                            ****    
        +-----------------+                           *** 
        | grade_documents |                             * 
        +-----------------+                             * 
           ...          ..                              * 
          .               ..                            

### Agentic_RAG
Agent为使用工具   
故而Agentic_RAG为 Retrieal Agent(an LLM access to a retriever tool)

In [4]:
import os
os.environ["LANGCHAIN_TRACING_V2"] = 'true'
os.environ["LANGCHAIN_API_KEY"] = ""
os.environ["OPENAI_API_KEY"] = ""
os.environ["http_proxy"] = "http://127.0.0.1:7890"
os.environ["https_proxy"] = "http://127.0.0.1:7890"

In [5]:
import bs4
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url,bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header")),
    ),).load() for url in urls]
docs_list = [sublist for doc in docs for sublist in doc]
text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=50) # 切的太小，运行时间长
doc_splitter = text_splitter.split_documents(docs_list)
vector_store = FAISS.from_documents(doc_splitter, embedding=OpenAIEmbeddings())
retreiver = vector_store.as_retriever()


In [56]:
# 创建一个检索工具
from langchain.tools.retriever import create_retriever_tool
from langgraph.prebuilt import ToolExecutor # Executes a tool invocation.
from langgraph.prebuilt import ToolNode
tool = create_retriever_tool(
    retreiver,  # retriever
    "retrieve_blog_posts", # name
    "Search and return information about Lilian Weng blog posts on LLM agents, prompt engineering, and adversarial attacks on LLMs.", # description
)
tools = [tool]
ToolNode = ToolNode(tools)
tool_executor = ToolExecutor(tools) 

  tool_executor = ToolExecutor(tools)


接下来构建一个langgraph

![img\RAG_agent.png](img/RAG_agent.png)

In [57]:
# 创建一个状态 我理解的状态其实就是想要保存到变量
# 该状态一般继承了TypedDict
import operator
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
class GraphState(TypedDict):
    # Annotated允许为类型提示添加元数据 即Seuqence[BaseMessage]类型的messages操作为add
    # Sequence 代表了一个泛型的、有序的集合。
    messages: Annotated[Sequence[BaseMessage], operator.add]

In [64]:

from langchain.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.utils.function_calling import format_tool_to_openai_function
from langchain_core.utils.function_calling import convert_to_openai_tool
from langchain.output_parsers.openai_tools import PydanticToolsParser
from langgraph.prebuilt import ToolInvocation
from langchain_core.messages import FunctionMessage, HumanMessage, AIMessage
from langchain_core.output_parsers import StrOutputParser
import json
# 构建判别节点
def should_retrieve(stat):
    """判断是否需要检索来处理问题"""
    print("--decide to retrieve--")
    messages = stat["messages"]
    last = messages[-1]
    if "function_call" not in last.additional_kwargs:
        print("--decision: do not retrieve--")
        return "end"
    else:
        print("--decision: retrieve--")
        return "continue"

def grade_documents(state):
    """对检索的文档进行评估 是否和问题有关"""
    print("--check relevance--")

    class grade(BaseModel):
        binary_score: str=Field(description="Documents are relevant to the question, 'yes' or 'no'")
    
    model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
    grade_tool_llm = convert_to_openai_tool(grade)
    llm_with_tool = model.bind(
        tools = [grade_tool_llm],
        tool_choice={"type": "function", "function": {"name": "grade"}})
    # 以上使用convert_to_openai_tool 等同于使用 with_structured_output
    parser = PydanticToolsParser(tools=[grade]) # Parse tools from OpenAI response.
    prompt = PromptTemplate(
        template="""You are a grader assessing relevance of a retrieved document to a user question. \n 
        Here is the retrieved document: \n\n {context} \n\n
        Here is the user question: {question} \n
        If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
        Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.""",
        input_variables=["context", "question"],
    )

    chain = prompt | llm_with_tool | parser
    messages = state["messages"]
    last_message = messages[-1]
    question = messages[0].content
    docs = last_message.content
    score = chain.invoke({"context": docs, "question": question})
    grade = score[0].binary_score
    if grade == "yes":
        print("Document relevant")
        return "yes"
    else:
        print("Document not relevant")
        return "no"

# 构建节点
def agent(state):
    """基于当前的状态生成响应， 使用检索还是结束"""
    print("--call agent--")
    messages = state["messages"]
    model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2, streaming=True)
    functions = [format_tool_to_openai_function(t) for t in tools]
    model = model.bind_functions(functions)
    responses = model.invoke(messages)
    return {"messages": [responses]}


def retrieve(state):
    """使用工具去检索"""
    print("--executing retrieve--")
    messages = state["messages"]
    last_message = messages[-1]
    action = ToolInvocation(
        tool = last_message.additional_kwargs["function_call"]["name"],
        tool_input = 
        json.loads(last_message.additional_kwargs["function_call"]["arguments"]),
    )
    responese = tool_executor.invoke(action)
#     message_with_single_tool_call = AIMessage(
#     content="",
#     tool_calls=[
#         {
#             "name": last_message.additional_kwargs["function_call"]["name"],
#             "args": last_message.additional_kwargs["function_call"]['arguments'],
#             "id": last_message['id'],
#             "type": "function_call",
#         }
#     ],
# )
#     responese = ToolNode.invoke({"messages": [message_with_single_tool_call]})
    function_message = FunctionMessage(content=str(responese), name=last_message.additional_kwargs["function_call"]["name"])
    return {"messages": [function_message]}

def rewrite(state):
    """
    重写问题来产生更好的问题
    """
    print("--transforming query--")
    messages = state["messages"]
    question = messages[0].content
    msg = [HumanMessage(
        content=f""" \n 
    Look at the input and try to reason about the underlying semantic intent / meaning. \n 
    Here is the initial question:
    \n ------- \n
    {question} 
    \n ------- \n
    Formulate an improved question: """,
    )]
    model = ChatOpenAI(temperature=0.2, model="gpt-3.5-turbo")
    response = model.invoke(msg)
    return {"messages": [response]}

def generate(state):
    """生成答案"""
    print("--generating answer--")
    messages = state["messages"]
    question = messages[0].content
    last_message = messages[-1]
    docs = last_message.content
    chat_template = PromptTemplate.from_template(
    template="""You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.

    Question: {question} 

    Context: {context} 

    Answer:""",

    )
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
    def format_docs(docs):
        return "\n\n".join(doc.page_content for doc in docs)
    
    rag_chain = (
        chat_template
        | llm
        | StrOutputParser()
    )
    response = rag_chain.invoke({"question": question, "context": format_docs(docs)})
    return {"messages": [response]}
    

In [65]:
from langgraph.graph import StateGraph, END

workflow = StateGraph(GraphState)
workflow.add_node("agent", agent)
workflow.add_node("retrieve", retrieve)
workflow.add_node("rewrite", rewrite)
workflow.add_node("generate", generate)

workflow.set_entry_point("agent")

workflow.add_conditional_edges(
    "agent",
    should_retrieve,
    {
        "continue": "retrieve",
        "end": END,
    },
)
workflow.add_conditional_edges(
    "retrieve",
    grade_documents,
    {
        "yes": "generate",
        "no": "rewrite",
    },
)
workflow.add_edge("generate", END)
workflow.add_edge("rewrite", "agent")
app = workflow.compile()

In [66]:
from pprint import pprint
from langchain_core.messages import HumanMessage

input = {
    "messages":[
        HumanMessage(content="What does Lilian Weng say about the types of agent memory?")
    ]
}
for output in app.stream(input):
    for key, value in output.items():
        pprint(f"Output from node '{key}':")
        pprint("---")
        pprint(value, indent=2, width=80, depth=None)
    pprint("\n---\n")

--call agent--
--decide to retrieve--
--decision: retrieve--
"Output from node 'agent':"
'---'
{ 'messages': [ AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"query":"types of agent memory"}', 'name': 'retrieve_blog_posts'}}, response_metadata={'finish_reason': 'function_call', 'model_name': 'gpt-3.5-turbo-0125'}, id='run-3eed99ec-8245-46c8-be1f-131c5debf2ce-0')]}
'\n---\n'
--executing retrieve--


  action = ToolInvocation(


--check relevance--
Document not relevant
"Output from node 'retrieve':"
'---'
{ 'messages': [ FunctionMessage(content='Types of Memory#\n\ninformation. There are several types of memory in human brains.\n\n2. Long Term memory management.\n3. GPT-3.5 powered Agents for delegation of simple tasks.\n\nLong-term memory: This provides the agent with the capability to retain and recall (infinite)', name='retrieve_blog_posts')]}
'\n---\n'
--transforming query--
"Output from node 'rewrite':"
'---'
{ 'messages': [ AIMessage(content='What are the different types of agent memory discussed by Lilian Weng?', response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 62, 'total_tokens': 77}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-97994b92-2eed-4789-a98f-71fd72fbce4b-0', usage_metadata={'input_tokens': 62, 'output_tokens': 15, 'total_tokens': 77})]}
'\n---\n'
--call agent--
--decide to retrieve--
--decisio

In [67]:
app.get_graph().print_ascii()

                    +-----------+                
                    | __start__ |                
                    +-----------+                
                           *                     
                           *                     
                           *                     
                      +-------+                  
                      | agent |.                 
                   ...+-------+ ....             
                ...        *        ....         
            ....           *            ...      
          ..               *               ....  
+----------+               *                   ..
| retrieve |               *                    .
+----------+....           *                    .
      .         ...        *                    .
      .            ....    *                    .
      .                ..  *                    .
+----------+          +---------+              ..
| generate |          | rewrite |          ....  


### Adaptive RAG
核心技术有两点: 
* query analysis： query的构建分析 https://blog.langchain.dev/query-construction/
* self-corrective RAG: 对自我进行西征 所谓的修正 即使对context的管理 和回答的质量

对于query analysis: 1. no retrieval 2. single-shot RAG 3. iterative RAG   
对于route: 1. Web search 和实事相关 2. self-corrective RAG 和文档相关

![img\adaptive_RAG.png](img/adaptive_RAG.png)


In [68]:
os.environ["LANGCHAIN_TRACING_V2"] = 'true'
os.environ["LANGCHAIN_API_KEY"] = ""
os.environ["OPENAI_API_KEY"] = ""
os.environ["COHERE_API_KEY"] = ""
os.environ["TAVILY_API_KEY"] = ""
os.environ["http_proxy"] = "http://127.0.0.1:7890"
os.environ["https_proxy"] = "http://127.0.0.1:7890"


In [69]:
import bs4
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings

embed = OpenAIEmbeddings()

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url,bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header")),)).load() 
        for url in urls]
docs_list = [sublist for doc in docs for sublist in doc]
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)
doc = text_splitter.split_documents(docs_list)
vector_store = FAISS.from_documents(doc, embedding=embed)
retriever = vector_store.as_retriever()

In [70]:
### router 根据问题选择不同的处理方式

from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

class RouteQuery(BaseModel):
    """根据语义将问题分配给最相关的数据源"""
    route: Literal["vectorstore", "web_search"] = Field(
        ...,
        description="Given a user question choose to route it to web search or a vectorstore.",
    )

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
structured_llm_router = llm.with_structured_output(RouteQuery)

system = """You are an expert at routing a user question to a vectorstore or web search.
The vectorstore contains documents related to agents, prompt engineering, and adversarial attacks.
Use the vectorstore for questions on these topics. Otherwise, use web-search."""

route_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "{question}"),
    ]
)
question_router = route_prompt | structured_llm_router
print(question_router.invoke({"question": "Who will the Bears draft first in the NFL draft?"}))
print(question_router.invoke({"question": "What are the types of agent memory?"}))

route='web_search'
route='vectorstore'


In [71]:
### Retrieval Grader 检索评分
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents. """
    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
structured_llm_grader = llm.with_structured_output(GradeDocuments)
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

grade_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | structured_llm_grader
question = "agent memory"
docs = retriever.get_relevant_documents(question)
doc_text = docs[1].page_content
print(retrieval_grader.invoke({"document": doc_text, "question": question}))

binary_score='yes'


In [72]:
### 生成 根据检索生成
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template(
    """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.

Question: {question} 

Context: {context} 

Answer:"""
)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)
rag_chain = prompt | llm | StrOutputParser()

generation = rag_chain.invoke({"question": "agent memory", "context": format_docs(docs)})
print(generation)

Generative agents combine LLM with memory, planning, and reflection mechanisms to behave based on past experience and interact with other agents. The memory stream records agents' experiences in natural language, with each element being an observation or event provided by the agent. The retrieval model surfaces context based on relevance, recency, and importance to inform the agent's behavior.


In [73]:
### Hallucination Grader  幻觉评分 来根据相关文档评估回复是否满意
class GradeHallucination(BaseModel):
    """Binary score for hallucination check on generated text. """
    binary_score: str = Field(description="Documents are relevant to the question, 'yes' or 'no'")

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
structured_llm = llm.with_structured_output(GradeHallucination)

system = """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts."""
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
    ]
)
hallucination_grader = hallucination_prompt | structured_llm
hallucination_grader.invoke({"documents": docs, "generation": generation})

GradeHallucination(binary_score='yes')

In [77]:
### Answer Grader 结果评分 对结果是否满意
class GradeAnswer(BaseModel):
    """Score for grading the quality of an answer. """
    binary_score: int = Field(description="Score from 0 to 5, 5 being the best answer")

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
structured_llm = llm.with_structured_output(GradeAnswer)
system = """You are a grader assessing the quality of an answer generated by an LLM. \n 
    Give a score from 0 to 5, 5 being the best answer."""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {answer}"),
    ]
)
answer_grader = answer_prompt | structured_llm
grade = answer_grader.invoke({"question": question,"answer": generation})
grade.binary_score, type(grade.binary_score), int(grade.binary_score)

(4, int, 4)

In [78]:
### Question Re-writer 问题重写 
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
sysytem = """You a question re-writer that converts an input question to a better version that is optimized \n 
     for vectorstore retrieval. Look at the input and try to reason about the underlying sematic intent / meaning."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Here is the initial question: \n\n {question} \n Formulate an improved question."),
    ]
)
question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

'"Can you provide an overview of the different types of memory used by agents in artificial intelligence systems and discuss their importance in enhancing the performance of AI applications?"'

In [79]:
### WebSearch Tool 网络搜索工具
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3)

In [89]:
### 构建图
from typing import TypedDict, List
from langchain_core.documents import Document
class GraphState(TypedDict):
    """State for the graph."""
    question: str
    generation: str
    documents: List[Document]

In [106]:
# 构建节点
# 图中的节点还是判断等都是函数，函数的输入为state,输出为state
def retrieve(state):
    """检索相关文档"""
    print("Retrieving documents...")
    question = state["question"]
    docs = retriever.get_relevant_documents(question)
    return {"documents": docs, "question": question}

def generate(state):
    """生成回答"""
    print("Generating answer...")
    question = state["question"]
    docs = state["documents"]
    print(docs)
    print(type(docs))
    generation = rag_chain.invoke({"question": question, "context": format_docs(docs)})
    return {"documents": docs, "question": question, "generation": generation}

def grade_documents(state):
    """判别检索的文档是否和问题相关"""
    print("Grading documents...")
    question = state["question"]
    documents = state["documents"]
    filtered_docs = []
    for doc in documents:
        print(doc.page_content)
        score = retrieval_grader.invoke({"document": doc.page_content, "question": question})
        grade = score.binary_score
        if grade == "yes":
            print("Document relevant")
            filtered_docs.append(doc)
        else:
            print("Document not relevant")
            continue
    return {"documents": filtered_docs, "question": question}

def transform_query(state):
    """转换问题,当没有相关文档时使用"""
    print("Transforming query...")
    question = state["question"]
    new_question = question_rewriter.invoke({"question": question})
    return {"documents": state["documents"], "question": new_question}

def web_search(state):
    """使用网络搜索工具"""
    print("Searching the web...")
    question = state["question"]
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([doc['content'] for doc in docs])
    web_results = Document(page_content=web_results)
    return {"documents": [web_results], "question": question}

### 构建边

def route_question(state):
    """route: 根据问题分配不同的处理方式"""
    print("Routing question...")
    question = state["question"]
    source = question_router.invoke({"question": question})
    if source.route == 'web_search':
        print("Routing to web search...")
        return "web_search"
    elif source.route == 'vectorstore':
        print("Routing to vectorstore...")
        return "retrieve"

def decide_to_generate(state):
    """判断是否生成答案"""
    print("Deciding to generate...")
    filtered_documents = state["documents"]
    if not filtered_documents:
        # 所有检索的文档都不好 重修将问题转换为新问题 再次检索
        print("No relevant documents. Transforming query...")
        return "transform_query"
    else:
        print("Generating answer...")
        return "generate"

def grade_generate_v_documents_and_question(state):
    """生成回答，并根据回答，首先进行幻觉评分，然后进行结果评分"""
    print("Grading generation...")
    documents = state["documents"]
    question = state["question"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score.binary_score
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        print("Grading answer...")
        score = answer_grader.invoke({"question": question, "answer": generation})
        if score.binary_score < 3:
           print(f"{score.binary_score} is a low score. generation does not address question...")
           return "not useful"
        else:
            print(f"{score.binary_score} is a high score. generation is good...")
            return "useful"
    else:
        print("Generation not grounded in facts")
        return "not supported"

In [107]:
from langgraph.graph import StateGraph, END
adaptive_workflow = StateGraph(GraphState)

# 定义节点
adaptive_workflow.add_node("web_search", web_search)
adaptive_workflow.add_node("retrieve", retrieve)
adaptive_workflow.add_node("grade_documents", grade_documents)
adaptive_workflow.add_node("generate", generate)
adaptive_workflow.add_node("transform_query", transform_query)

# 定义图
adaptive_workflow.set_conditional_entry_point(
    route_question,
    {
        "web_search": "web_search",
        "retrieve": "retrieve",
    },
)
adaptive_workflow.add_edge("web_search", "generate")
adaptive_workflow.add_edge("retrieve", "grade_documents")
adaptive_workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "generate": "generate",
        "transform_query": "transform_query",
    },
)
adaptive_workflow.add_conditional_edges(
    "generate",
    grade_generate_v_documents_and_question,
    {   
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)
app = adaptive_workflow.compile()

In [108]:
from pprint import pprint
inputs = {"question": "What player at the Bears expected to draft first in the 2024 NFL draft?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")
pprint(value["generation"])

Routing question...
Routing to web search...
Searching the web...
"Node 'web_search':"
'\n---\n'
Generating answer...
[Document(page_content="The Bears on Thursday, as expected, selected USC quarterback Caleb Williams with the No. 1 pick in the 2024 NFL Draft. Williams was widely considered the top prospect in a draft class loaded with talented quarterbacks.\nHere is how Ryan Poles and Matt Eberflus did with their selections for the Chicago Bears 2024 NFL Draft: Round 1 • Pick 1 (1) • QB Caleb Williams USC. Round 1 • Pick 9 (9) • WR Rome Odunze ...\nChicago Bears Draft Overview. 2023 record: 7-10. Fourth in NFC North; missed playoffs. Bears 2024 draft picks (4): Round 1, pick 1 (from the Panthers in the D.J. Moore/2023 No. 1 overall pick ...\nThe Chicago Bears made waves during the first round of the 2024 NFL draft, where they added two blue chip players to the roster with their top-10 selections — quarterback Caleb Williams (No. 1 ...\nApr 24, 2024. The Beast, Dane Brugler's expansive

In [112]:
inputs = {"question": "What are the types of agent memory?"}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint(value["keys"], indent=2, width=80, depth=None)
    pprint("\n---\n")

# Final generation
pprint(value ["generation"])

Routing question...
Routing to vectorstore...
Retrieving documents...
"Node 'retrieve':"
'\n---\n'
Grading documents...
The design of generative agents combines LLM with memory, planning and reflection mechanisms to enable agents to behave conditioned on past experience, as well as to interact with other agents.
Document relevant
Short-term memory: I would consider all the in-context learning (See Prompt Engineering) as utilizing short-term memory of the model to learn.
Long-term memory: This provides the agent with the capability to retain and recall (infinite) information over extended periods, often by leveraging an external vector store and fast retrieval.


Tool use
Document relevant
Planning

Subgoal and decomposition: The agent breaks down large tasks into smaller, manageable subgoals, enabling efficient handling of complex tasks.
Reflection and refinement: The agent can do self-criticism and self-reflection over past actions, learn from mistakes and refine them for future steps

In [129]:
app.get_graph()

Graph(nodes={'__start__': Node(id='__start__', name='__start__', data=<class 'pydantic.v1.main.LangGraphInput'>, metadata=None), 'web_search': Node(id='web_search', name='web_search', data=web_search(recurse=True), metadata=None), 'retrieve': Node(id='retrieve', name='retrieve', data=retrieve(recurse=True), metadata=None), 'grade_documents': Node(id='grade_documents', name='grade_documents', data=grade_documents(recurse=True), metadata=None), 'generate': Node(id='generate', name='generate', data=generate(recurse=True), metadata=None), 'transform_query': Node(id='transform_query', name='transform_query', data=transform_query(recurse=True), metadata=None), 'route_question': Node(id='route_question', name='route_question', data=route_question(recurse=True), metadata=None), '__end__': Node(id='__end__', name='__end__', data=<class 'pydantic.v1.main.LangGraphOutput'>, metadata=None)}, edges=[Edge(source='__start__', target='route_question', data=None, conditional=False), Edge(source='retrie

### Corrective RAG(CRAG)

![img\CRAG.png](img/CRAG.png)

* 文档相关性检查：在生成答案之前，首先会检查检索到的文档是否至少有一个文档达到了相关性阈值。如果有文档满足该条件，系统会继续进行答案生成。

* 知识精炼：在生成之前，会对检索到的文档进行知识精炼，将文档划分为多个“知识片段”。这些片段代表文档中的不同知识点或信息单元。

* 自我评分与过滤：系统会对每个知识片段进行评分，过滤掉那些与查询无关的片段。只有通过评分的相关片段才会被用于答案生成。

* 补充信息检索：如果所有文档的相关性评分都低于阈值，或者评分器无法确定文档的相关性，系统将主动寻求额外的数据源进行补充检索，通常是通过网络搜索来获取更多相关信息。

该部分不详细解释了，感觉就是之前如果没有合适的结果，重新构建query 现在是拓展相关知识，上网搜索 核心没有改变

### RAG Agent
将以下技术合并在一起 构建了一个RAG agent model
* Routing: Adaptive RAG, Route questions to different retrieval approaches
* Fall back: Corrective RAG 如果docs和query不相关 使用 web search
* Self-correction: self-RAG 检查回答有没有幻觉和是否解决了问题

![img\RAG_Agent_all.png](img/RAG_Agent_all.png)

In [153]:
import bs4
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings

urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url,bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header")),)).load() 
        for url in urls]
docs_list = [sublist for doc in docs for sublist in doc]
text_splitter = RecursiveCharacterTextSplitter(chunk_size=250, chunk_overlap=0)
doc = text_splitter.split_documents(docs_list)
embed = OpenAIEmbeddings()
vector_store = FAISS.from_documents(doc, embedding=embed)
retriever = vector_store.as_retriever()

In [154]:
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """You are a grader assessing relevance of a retrieved document to a user question. If the document contains keywords related to the user question, grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explaination."""),
        ("human", "Here is the retrieved document: \n\n {document} \n\n Here is the user question: {question} \n assistant:")
    ]
)
retrieval_grader = prompt | llm | JsonOutputParser()
question = "agent memory"
docs = retriever.invoke(question)
doc_text = docs[1].page_content
print(retrieval_grader.invoke({"document": doc_text, "question": question}))

{'score': 'yes'}


In [155]:
### 生成
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_messages(
    [
        ('system', """You are an assistant for question-answering tasks. 
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. 
    Use three sentences maximum and keep the answer concise"""),
        ('human',"""Question: {question} 
    Context: {context} 
    Answer: """)
    ]
)
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)
rag_chain = prompt | llm | StrOutputParser()

question = "agent memory"
docs = retriever.invoke(question)
generation = rag_chain.invoke({"question": question, "context": format_docs(docs)})
print(generation)

Long-term memory provides the agent with the ability to retain and recall vast amounts of information over extended periods by using an external vector store for fast retrieval. The memory stream is an external database that records a detailed list of the agent's experiences in natural language. This long-term memory serves as an external vector store that the agent can access quickly during query time.


In [156]:
### Hallucination Grader 幻觉评分
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """You are a grader assessing whether 
    an answer is grounded in / supported by a set of facts. Give a binary score 'yes' or 'no' score to indicate 
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
    single key 'score' and no preamble or explanation."""),
        ("human", """Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation}\n assistant:""")
    ]
)
hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'yes'}

In [157]:
### Answer Grader 
### 不需要with_structured_output 在prompt中使用system给予对应的提示
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """You are a grader assessing whether an 
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is 
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation."""),
        ("human", """Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    Here is the question: {question}  \n assistant:""")
    ]
)
answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question,"generation": generation})


{'score': 'yes'}

In [190]:
### Router
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2)
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", """You are an expert at routing a user question to a vectorstore or web search. The vectorstore contains documents related to agents, prompt engineering, and adversarial attacks.
Use the vectorstore for questions on these topics. Otherwise, use web-search. Give a binary choice 'web_search' 
    or 'vectorstore' based on the question. Return the a JSON with a single key 'datasource' and 
    no premable or explaination. """),
        ("human", "Question to route: {question}\n assistant:")
    ]
)
question_router = prompt | llm | JsonOutputParser()
question = "llm agent memory"
docs = retriever.get_relevant_documents(question)
doc_text = docs[1].page_content
print(question_router.invoke({"question": question}))

{'datasource': 'vectorstore'}


In [159]:
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3)

In [191]:
### 构建图
from typing import TypedDict, List
from langchain_core.documents import Document
class GraphState(TypedDict):
    question: str
    generation: str
    web_search: str
    documents: List[Document]

def retrieve(state):
    print("Retrieving documents...")
    question = state["question"]
    docs = retriever.invoke(question)
    return {"documents": docs, "question": question}

def generation(state):
    print("Generating answer...")
    question = state["question"]
    docs = state["documents"]
    generation = rag_chain.invoke({"question": question, "context": format_docs(docs)})
    return {"documents": docs, "question": question, "generation": generation}

def grade_documents(state):
    print("Grading documents...")
    question = state["question"]
    print(f'question: {question}')
    documents = state["documents"]
    filtered_docs = []
    web_search = "No"
    for doc in documents:
        print(doc.page_content)
        score = retrieval_grader.invoke({"document": doc.page_content, "question": question})
        grade = score['score']
        if grade.lower() == "yes":
            print("Document relevant")
            filtered_docs.append(doc)
        else:
            print("Document not relevant")
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}

def web_search(state):
    print("Searching the web...")
    question = state["question"]
    documents = state["documents"]
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([doc['content'] for doc in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    return {"documents": documents, "question": question}

## 构建边
def route_question(state):
    print("Routing question...")
    question = state["question"]
    source = question_router.invoke({"question": question})
    if source['datasource'] == 'web_search':
        print("Routing to web search...")
        return "websearch"
    elif source['datasource'] == 'vectorstore':
        print("Routing to vectorstore...")
        return "vectorstore"

def decide_to_generate(state):
    print("Assessing graded documents...")
    web_search = state["web_search"]
    if web_search == "Yes":
        print("No relevant documents. Transforming query...")
        return "websearch"
    else:
        print("Generating answer...")
        return "generate"

### 构建条件边
def grade_generate_v_documents_and_question(state):
    print("Checking hallucinations...")
    documents = state["documents"]
    question = state["question"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']
    if grade == "yes":
        print("--Decision: generation is grounded in documents--")
        print("Grading answer...")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score['score']
        if grade == "yes":
            print(f"--Decision: generation is useful--")
            return "useful"
        else:
            print(f"--Decision: generation is not useful--")
            return "not useful"
    else:
        print("--Decision: generation is not grounded in documents--")
        return "not supported"

In [192]:
from langgraph.graph import StateGraph, END
workflow = StateGraph(GraphState)

workflow.add_node("websearch", web_search)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generation)

workflow.set_conditional_entry_point(
    route_question,
    {
        "websearch": "websearch",
        "vectorstore": "retrieve",
    },
)
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "generate": "generate",
        "websearch": "websearch",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generate_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)
app = workflow.compile()

In [193]:
from pprint import pprint
inputs = {"question": "What are the types of agent memory?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

Routing question...
Routing to vectorstore...
Retrieving documents...
'Finished running: retrieve:'
Grading documents...
question: What are the types of agent memory?
Long-term memory: This provides the agent with the capability to retain and recall (infinite) information over extended periods, often by leveraging an external vector store and fast retrieval.
Document relevant
Long-term memory as the external vector store that the agent can attend to at query time, accessible via fast retrieval.
Document relevant
Subcategories include iconic memory (visual), echoic memory (auditory), and haptic memory (touch).
Document not relevant
Component Two: Memory#
(Big thank you to ChatGPT for helping me draft this section. I’ve learned a lot about the human brain and data structure for fast MIPS in my conversations with ChatGPT.)
Types of Memory#
Document not relevant
Assessing graded documents...
No relevant documents. Transforming query...
'Finished running: grade_documents:'
Searching the web

In [194]:
from pprint import pprint
inputs = {"question": "Who are the Bears expected to draft first in the NFL draft?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

Routing question...
Routing to web search...
Searching the web...
'Finished running: websearch:'
Generating answer...
Checking hallucinations...
--Decision: generation is grounded in documents--
Grading answer...
--Decision: generation is useful--
'Finished running: generate:'
('The Bears are expected to draft USC quarterback Caleb Williams first overall '
 'in the NFL draft. The 2024 NFL draft is set to begin on Thursday, April 25, '
 'with the Bears holding the first pick. Caleb Williams, a star quarterback '
 'from USC and the 2022 Heisman Trophy winner, is the likely choice for the '
 "Bears' first selection.")


In [196]:
app.get_graph()

Graph(nodes={'__start__': Node(id='__start__', name='__start__', data=<class 'pydantic.v1.main.LangGraphInput'>, metadata=None), 'websearch': Node(id='websearch', name='websearch', data=websearch(recurse=True), metadata=None), 'retrieve': Node(id='retrieve', name='retrieve', data=retrieve(recurse=True), metadata=None), 'grade_documents': Node(id='grade_documents', name='grade_documents', data=grade_documents(recurse=True), metadata=None), 'generate': Node(id='generate', name='generate', data=generate(recurse=True), metadata=None), '__end__': Node(id='__end__', name='__end__', data=<class 'pydantic.v1.main.LangGraphOutput'>, metadata=None)}, edges=[Edge(source='retrieve', target='grade_documents', data=None, conditional=False), Edge(source='websearch', target='generate', data=None, conditional=False), Edge(source='__start__', target='websearch', data=None, conditional=True), Edge(source='__start__', target='retrieve', data='vectorstore', conditional=True), Edge(source='grade_documents',