In [None]:
import getpass
import os


def _set_env(key: str):
    if key not in os.environ:
        os.environ[key] = getpass.getpass(f"{key}:")

os.environ["TAVILY_API_KEY"] = ""
_set_env("OPENAI_API_KEY")
_set_env("TAVILY_API_KEY")

local_llm = "qwen2:latest"
model_tested = "qwen2:latest"
metadata = f"CRAG, {model_tested}"

In [3]:
### Retrieval Grader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader, DirectoryLoader, TextLoader
from langchain_community.vectorstores import Chroma
from langchain_ollama import OllamaEmbeddings  

finreport_dir = "./FinRep"

# 加载本地 txt 文件
loader = DirectoryLoader(finreport_dir, glob="report.txt", loader_cls=TextLoader)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs)

# 使用本地 Ollama 的 nomic-embed-text:latest 模型
embedding_model = OllamaEmbeddings(model="nomic-embed-text:latest")

# Add to vectorDB
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding=embedding_model,
)
retriever = vectorstore.as_retriever()

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [18]:
### Retrieval Grader

from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# Prompt
prompt = PromptTemplate(
    template="""You are a teacher grading a quiz. You will be given: 
    1/ a QUESTION
    2/ A FACT provided by the student

    You are grading RELEVANCE RECALL:
    A score of 1 means that ANY of the statements in the FACT are relevant to the QUESTION. 
    A score of 0 means that NONE of the statements in the FACT are relevant to the QUESTION. 
    1 is the highest (best) score. 0 is the lowest score you can give. 

    Explain your reasoning in a step-by-step manner. Ensure your reasoning and conclusion are correct. 

    Avoid simply stating the correct answer at the outset.

    Question: {question} \n
    Fact: \n\n {documents} \n\n

    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
    """,
    input_variables=["question", "documents"],
)

retrieval_grader = prompt | llm | JsonOutputParser()
question = "药明生物发展策略"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(docs)
print(retrieval_grader.invoke({"question": question, "documents": doc_txt}))

[Document(metadata={'source': 'FinRep/report.txt'}, page_content='①生产研发'), Document(metadata={'source': 'FinRep/report.txt'}, page_content='为辅助生殖技术提供资助。'), Document(metadata={'source': 'FinRep/report.txt'}, page_content='豪华的医生团队阵容。'), Document(metadata={'source': 'FinRep/report.txt'}, page_content='降的趋势是明确的。')]
{'score': 0}


In [19]:
### Generate

from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = PromptTemplate(
    template="""You are an assistant for question-answering tasks. 

    Use the following documents to answer the question. 

    If you don't know the answer, just say that you don't know. 

    Use three sentences maximum and keep the answer concise:
    Question: {question} 
    Documents: {documents} 
    Answer: 
    """,
    input_variables=["question", "documents"],
)

# LLM
llm = ChatOllama(model=local_llm, temperature=0)

# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
generation = rag_chain.invoke({"documents": docs, "question": question})
print(generation)

药明生物的发展策略主要集中在生产研发领域，提供辅助生殖技术资助，并且拥有豪华的医生团队阵容。然而，报告中没有明确提及其发展趋势。


In [6]:
### Search

from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=3)

In [8]:
from typing import List
from typing_extensions import TypedDict
from IPython.display import Image, display
from langchain.schema import Document
from langgraph.graph import START, END, StateGraph


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    search: str
    documents: List[str]
    steps: List[str]


def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    question = state["question"]
    documents = retriever.invoke(question)
    steps = state["steps"]
    steps.append("retrieve_documents")
    return {"documents": documents, "question": question, "steps": steps}


def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """

    question = state["question"]
    documents = state["documents"]
    generation = rag_chain.invoke({"documents": documents, "question": question})
    steps = state["steps"]
    steps.append("generate_answer")
    return {
        "documents": documents,
        "question": question,
        "generation": generation,
        "steps": steps,
    }


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    question = state["question"]
    documents = state["documents"]
    steps = state["steps"]
    steps.append("grade_document_retrieval")
    filtered_docs = []
    search = "No"
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "documents": d.page_content}
        )
        grade = score["score"]
        if grade == "yes":
            filtered_docs.append(d)
        else:
            search = "Yes"
            continue
    return {
        "documents": filtered_docs,
        "question": question,
        "search": search,
        "steps": steps,
    }


def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    question = state["question"]
    documents = state.get("documents", [])
    steps = state["steps"]
    steps.append("web_search")
    web_results = web_search_tool.invoke({"query": question})
    documents.extend(
        [
            Document(page_content=d["content"], metadata={"url": d["url"]})
            for d in web_results
        ]
    )
    return {"documents": documents, "question": question, "steps": steps}


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """
    search = state["search"]
    if search == "Yes":
        return "search"
    else:
        return "generate"


# Graph
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("web_search", web_search)  # web search

# Build graph
workflow.add_edge(START, "retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "search": "web_search",
        "generate": "generate",
    },
)
workflow.add_edge("web_search", "generate")
workflow.add_edge("generate", END)

custom_graph = workflow.compile()

#display(Image(custom_graph.get_graph(xray=True).draw_mermaid_png()))

In [9]:
import uuid


def predict_custom_agent_local_answer(example: dict):
    config = {"configurable": {"thread_id": str(uuid.uuid4())}}
    state_dict = custom_graph.invoke(
        {"question": example["input"], "steps": []}, config
    )
    return {"response": state_dict["generation"], "steps": state_dict["steps"]}


example = {"input": "药明成立时间"}
response = predict_custom_agent_local_answer(example)
response["response"]

'药明康德成立于2000年12月。'

In [None]:
import os
import glob


def process_query(input_dir, output_dir):
    """
    读取指定目录下的问题文件，处理问题，并写入答案到对应的文件中。

    :param input_dir: 输入目录的路径，其中包含问题文件。
    :param output_dir: 输出目录的路径，答案文件将写入此目录。
    """
    # 确保输出目录存在
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 使用glob模块找到目录中所有的问题文件
    query_files = glob.glob(os.path.join(input_dir, 'query*.txt'))

    # 遍历问题文件
    for i in range(0,150):  # 遍历query000.txt到query149.txt
        query_file = os.path.join(input_dir, f"query{i:03d}.txt")
        answer_file = os.path.join(output_dir, f"answer{i:03d}.txt")

        if not os.path.exists(query_file):
            print(f"文件 {query_file} 不存在，跳过处理。")
            continue

        # 读取问题文件内容
        with open(query_file, "r", encoding="utf-8") as f:
            content = f.read()
            lines = content.splitlines()

        # 提取问题1
        question = None
        for line in lines:
            if line.startswith("问题1："):
                question = line.replace("问题1：", "").strip()
                break

        if not question:
            print(f"文件 {query_file} 中未找到问题1，跳过处理。")
            continue

        # 调用app.stream获取答案
        inputs = {"input": question}
        answer = "抱歉，我无法回答您的问题。"
        try:
            res = predict_custom_agent_local_answer(inputs)
        except Exception as e:
            print(f"处理问题 {query_file} : {question}时发生错误")
        answer = res["response"]
        # 将问题和答案写入答案文件
        with open(answer_file, "w", encoding="utf-8") as f:
            f.write(f"问题：{question}\n")
            f.write(f"答案：{answer}\n")

        print(f"答案已写入文件 {answer_file}")

# 指定输入目录和输出文件
input_directory = './FinRep/fin_queries'
output_directory = './FinRep/CRAG/1_hop'

# 调用函数执行处理操作
process_query(input_directory, output_directory)

答案已写入文件 ./FinRep/CRAG/1_hop/answer000.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer001.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer002.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer003.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer004.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer005.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer006.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer007.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer008.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer009.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer010.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer011.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer012.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer013.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer014.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer015.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer016.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer017.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer018.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer019.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer020.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer021.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer022.txt
答案已写入文件 ./FinRep/CRAG/1_hop/answer