#### 具有知识库配置和索引功能的Agent

做一个知识库助手非常有意思且有用，如果做得好，是可以让这个助手精准回答任何关于知识库的问题的。
这里主要涉及到的知识就是RAG(检索增强生成)。
一个复杂的高效的RAG包含非常多模块，但是我们可以从一个简单的做起。

简单的，能用的RAG，涉及到以下几个模块：
数据索引：将知识库进行清洗，构建RAG的索引数据，也就是将数据库分块保存到数据库里
检索：找到与用户问题最相关的数据
生成：将找到的数据发给大模型生成回答

在我这个简单的智能体里边，我用以下两个函数提取PDF文本：
- text = self.extract_text_from_pdf(pdf_path)
- paragraphs = self.split_text_into_paragraphs(text)
PDF的文本会被放进一个列表里，然后调用智谱的嵌入模型将这个列表里的每个文本都转换为向量保存起来。（可以放进excel也可以放入数据库）
然后用get_pdf_relevant_paragraphs这个函数找到与用户的问题最相关的两个段落，发给大模型，让大模型根据这两个段落回答用户的问题。

这里我还增加了一个算法就是识别用户的问题是否与知识库相关，如果不相关则不做处理并提醒用户，但是其实可以在不相关的时候直接让大模型回复用户就行了~
这就留给各位进行发挥~


In [None]:
from tools.code_interpreter import *
from tools.json_tool import *
from tools.llm_api import *
from tools.llm_keys import *
import re
import string
import numpy as np
import PyPDF2

class RAGAgent:
    def __init__(self, model, zhipu_key, print_ans=True):
        self.model = model
        self.temperature = 0.9
        self.api_key = zhipu_key
        self.print_ans = print_ans
        self.judge_cmd_prompt = self.get_prompt("RAG/judge_cmd_prompt.txt")
        self.answer_questions_prompt = self.get_prompt("RAG/answer_questions_prompt.txt")
        
        # 获取RAG/files文件夹下面的所有文件名称
        self.files = os.listdir("RAG/files")
        self.files = [file for file in self.files if file.endswith(".pdf")]
        
        self.system_prompt = '''
你是我的知识库助理，拥有大量的知识库的信息，你将根据这些信息回答我的问题。

# 当前拥有的知识库文件
{files}
'''
        self.system_prompt = self.system_prompt.format(files=self.files)
        self.conversations = [{"role": "system", "content": self.system_prompt}]
        
    def get_prompt(self, path):
        with open(path, 'r', encoding="utf-8") as file:
            prompt = file.read()
        return prompt

    def get_llm_ans_v1(self, conversations):
        if self.print_ans:
            ans = ""
            for char in get_llm_answer_converse(conversations, self.model, self.temperature):
                ans += char
                print(char, end="", flush=True)
            print("\n")
        else:
            ans = ""
            for char in get_llm_answer(conversations, self.model, self.temperature):
                ans += char
        return ans
    
    def get_llm_ans_converse(self, question):
        self.conversations.append({"role": "user", "content": question})
        if self.print_ans:
            ans = ""
            for char in get_llm_answer_converse(self.conversations, self.model, self.temperature):
                ans += char
                print(char, end="", flush=True)
            print("\n")
        else:
            ans = ""
            for char in get_llm_answer_converse(self.conversations, self.model, self.temperature):
                ans += char
        return ans
    
    # 提取PDF文本
    def extract_text_from_pdf(self, pdf_path):
        with open(pdf_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            text = ""
            for page in reader.pages:
                text += page.extract_text() + "\n"
        
        return self.clean_text(text)

    def clean_text(self, text):
        # 移除多余的空白字符
        text = re.sub(r'\s+', ' ', text)
        
        # 移除特殊字符，但保留标点符号
        text = ''.join([char for char in text if char.isalnum() or char.isspace() or char in string.punctuation])
        
        # 统一标点符号的格式（例如，将中文引号转换为英文引号）
        text = text.replace('"', '"').replace('"', '"')
        text = text.replace(''', "'").replace(''', "'")
        
        # 确保句子之间有正确的空格
        text = re.sub(r'([.!?。！？])\s*', r'\1 ', text)
        
        return text.strip()

    # 文本分段（小于1000字）
    def split_text_into_paragraphs(self, text, max_length=2000):
        # 使用正则表达式查找句子结束的位置（句号、问号或感叹号）
        sentences = re.split(r'(?<=[.?!])\s*', text)
        paragraphs = []
        current_paragraph = ""
        for sentence in sentences:
            # 检查加上当前句子后的长度是否超过限定的最大长度
            if len(current_paragraph + sentence) <= max_length:
                current_paragraph += sentence
            else:
                # 如果当前段落加上这句话会超过1000字，先保存当前段落，然后新起一个段落
                if current_paragraph:
                    paragraphs.append(current_paragraph)
                current_paragraph = sentence
        # 添加最后一个段落，如果它非空
        if current_paragraph:
            paragraphs.append(current_paragraph)
        return paragraphs

    # GLM嵌入模型
    def embedding(self, text):
        client = ZhipuAI(api_key=self.api_key) 
        response = client.embeddings.create(
            model="embedding-2",
            input=text,
        )
        ebd = response.data[0].embedding
        return ebd
    
    # 用户问题向量化
    def query_embedding(self, query):
        return self.embedding(query)

    # 计算两个向量之间的相似性
    def cosine_similarity(self, vec1, vec2):
        dot_product = np.dot(vec1, vec2)
        norm_vec1 = np.linalg.norm(vec1)
        norm_vec2 = np.linalg.norm(vec2)
        return dot_product / (norm_vec1 * norm_vec2)

    # 计算问题向量与每个段落之间的相关性，用列表存储，降序排列
    def compute_similarities(self, paragraph_vectors, question_vector):
        similarities = []
        for index, pvec in enumerate(paragraph_vectors):
            sim = self.cosine_similarity(pvec, question_vector)
            similarities.append((index, sim))
        
        # 按相似度降序排序
        sorted_similarities = sorted(similarities, key=lambda x: x[1], reverse=True)
        return sorted_similarities

    # 找出pdf中与用户问题最相关的两个段落
    def get_pdf_relevant_paragraphs(self, query, paragraphs, pdf_ebd):
        # 量化用户问题
        query_ebd = self.query_embedding(query)

        # 找到与用户问题最相关的pdf段落
        relevant_paragraphs_list = self.compute_similarities(pdf_ebd, query_ebd)
        relevant_paragraphs_list = relevant_paragraphs_list[:2]
        relevant_paragraphs = ""

        print('最相关的两个pdf段落：')
        for i in relevant_paragraphs_list:
            relevant_paragraphs += paragraphs[i[0]]
            print(f"paragrah{i[0]}:\n", paragraphs[i[0]])
            
        # 返回段落
        return relevant_paragraphs

    # 判断问题是否与知识库相关
    def judge_cmd(self, question):
        prompt = self.judge_cmd_prompt
        prompt = prompt.format(question=question)
        conversation_temp = self.conversations.copy()
        conversation_temp.append({"role": "user", "content": prompt})
        ans = self.get_llm_ans_v1(conversation_temp)
        ans = get_json(ans)
        is_related = ans["is_related"]
        print(is_related)
        return is_related
    
    def answer_questions(self, question):
        # 判断问题是否与知识库相关
        is_related = self.judge_cmd(question)
        if not is_related:
            return "这个问题不是关于知识库的问题，我无法回答。"

        # 提取pdf文本
        pdf_path = "RAG/files/Adaptive In-conversation Team Building.pdf"
        print(f"正在处理pdf文件：{pdf_path}")
        text = self.extract_text_from_pdf(pdf_path)
        paragraphs = self.split_text_into_paragraphs(text)

        # 获取pdf段落的向量表示
        print("正在计算pdf段落的向量表示...")
        pdf_ebd = [self.embedding(paragraph) for paragraph in paragraphs]

        # 找出pdf中与用户问题最相关的两个段落
        relevant_paragraphs = self.get_pdf_relevant_paragraphs(question, paragraphs, pdf_ebd)

        prompt = self.answer_questions_prompt
        prompt = prompt.format(question=question, relevant_paragraphs=relevant_paragraphs)
        ans = self.get_llm_ans_converse(prompt)
        self.conversations.append({"role": "assistant", "content": ans})
        return ans
    
    

In [None]:
model = "glm-4"
knowledge_agent = RAGAgent(model, zhipu_key)

question = "我知识库中的论文的主要内容是什么？"
answer = knowledge_agent.answer_questions(question)