In [2]:
import os
from openai import OpenAI
import matplotlib.pyplot as plt
import numpy as np
from typing import Dict, List, Optional, Tuple, Union

import PyPDF2
import markdown
import html2text
import json
from tqdm import tqdm
import tiktoken
import re
from bs4 import BeautifulSoup
from IPython.display import display, Code, Markdown


In [2]:

api_key = 'your api'
base_url = "your url"  


In [3]:
# 实例化客户端
client = OpenAI(api_key=api_key, base_url = base_url)

In [4]:
# 临时设置环境变量
os.environ["OPENAI_API_KEY"] = api_key
os.environ["OPENAI_BASE_URL"] = base_url

# 1.RAG技术原理

### 1.1 RAG技术必要性介绍

LLM生成语言时，产生幻觉或不准确的结果

 - 信息误导：过时或不准确
 - 知识更新之后：无法访问新的技术
 - 推理能力不足：LLM在面对复杂推理任务时，难以提供高效且准确的结果
 
 ### 1.2 RAG系统的核心组件说明

 1. **向量化模块：**用于将文档片段转化为向量表示，以便后续检索
 2. **文档加载与切分模块：**负责加载文档并将其切分为若干易于处理的文档片段
 3. **数据库模块：**用于存储文档片段及其对应的向量表示
 4. **检索模块：**根据用户输入的查询，从数据库中检索最相关的文档片段
 5. **生成模块：**将检索到的文档片段与用户输入的查询结合，生成最终的回答

 ### 1.3 RAG系统的基本流程：

 - **索引：**将文本库分割成焦段的Chunk，并通过编码器构建向量库索引
 - **检索：**根据问题和chunks的相似度检索相关文档片段
 - **生成：**一检索到的上下文为条件，生成问题的回答

# 2. OpenAI第三档embedding模型介绍与调用方法

### 2.1 OpenAI Embedding模型介绍

将文本字符串表示为向量（浮点数列表），通过计算向量之间的距离来衡量文本之间的相关性，向量距离越小，表示文本之间的相关性越高；反之越低。常见embedding应用包括：

 - 搜索：根据文本查询的相关性对结果进行排序
 - 聚类：根据文本相似性将其分组
 - 推荐：根据相关文本字符串推荐项目
 - 异常检测：识别与其他内容相关性较低的异常点
 - 多样性测量：分析相似性分布
 - 分类：将文本字符串根据其最相似的标签进行分类

OpenAI最新的embedding模型是text-embedding-3-small和text-embedding-3-large向量长度分别为1536和3072的向量。用户可以设置维度参数来介绍向量的维度，不损失其表达概念的能力   

### 2.2OpenAI Embedding模型获取方法（付费）：
要获取文本的Embedding向量，可以将文本字符串发送到OpenAI的EmbeedingAPI断点，并指定所使用的模型

这里用阿里云百炼embedding的模型替代

In [5]:
# 调用embedding API 获取文本的向量表示
response = client.embeddings.create(
    input = "测试文本",
    model = "text-embedding-v3"
)

In [None]:
print(response.data[0].embedding)

In [None]:
len(response.data[0].embedding)

余弦相似度与效果介绍

$$
\text { Cosine Similarity }(\vec{a}, \vec{b})=\frac{\vec{a} \cdot \vec{b}}{\|\vec{a}\|\|\vec{b}\|}
$$

In [8]:
def consine_sim(v1, v2):
    dot_product = np.dot(v1, v2)
    magnitude = np.linalg.norm(v1) * np.linalg.norm(v2)
    if not magnitude:
        return 0
    return dot_product / magnitude

In [9]:
text1 = '我喜欢吃苹果'
text2 = '苹果是我最喜欢吃的水果'
text3 = '我喜欢用苹果手机'

In [10]:
vector1 = client.embeddings.create(
    input = text1,
    model = "text-embedding-v3"
).data[0].embedding

vector2 = client.embeddings.create(
    input = text2,
    model = "text-embedding-v3"
).data[0].embedding

vector3 = client.embeddings.create(
    input = text3,
    model = "text-embedding-v3"
).data[0].embedding

In [None]:
consine_sim(vector1, vector2)

In [None]:
consine_sim(vector1, vector3)

In [None]:
consine_sim(vector2, vector3)

根据实验，embedding模型可以根据句意相关的embedding的处理

In [14]:
class BaseEmbeddings:
    """
    向量化的基类，用于将文本转化为向量表示，不同的子类可以实现不同的向量获取方式
    """

    def __init__(self, path: str, is_api: bool) -> None:
        """
        初始化基类

        参数：
        path(str) - 如果是本地模型， path 表示模型路径；如果是api模式，则path可以为空
        is_api(bool) - 是否使用api模式
        """
        self.path = path
        self.is_api = is_api

    def get_embedding(self, text:str, model: str) -> List[float]:
        """
        抽象方法：用于获取文本的向量表示

        参数：

        text(str) - 待获取向量表示的文本
        model(str) - 模型名称

        返回值：

        List[float] - 文本的向量表示
        """
        raise NotImplementedError("子类必须实现该方法")
        
    @classmethod
    def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
        """
        计算两个向量之间的余弦相似度，用于衡量他们的相似程度

        参数：
        vector1（list[float]）- 第一个向量
        vector2（list[float]）- 第二个向量
        
        返回：
        float - 预选相似度值，范围从 -1 到 1，越接近 1 表示向量越相似
        """
        dot_product = np.dot(vector1, vector2)
        magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
        if not magnitude:
            return 0
        return dot_product / magnitude

In [15]:
class OpenAIEmbedding(BaseEmbeddings):
    """
    使用 OpenAI 的 Embedding API 来获取文本向量的类，继承自BaseEmbeddings
    """

    def __init__(self, path: str = '', is_api: bool = True) -> None:
        """
        初始化类，设置 OpenAI API 客户端，如果使用的是 API 调用

        参数：
        path(str) - 如果是本地模型， path 表示模型路径；如果是api模式，则path可以为空
        is_api(bool) - 是否使用api模式
        """
        super().__init__(path, is_api)
        if self.is_api:
            # 初始化 OpenAI API 客户端
            self.client = OpenAI()
            self.client.api_key = os.getenv('OPENAI_API_KEY')
            self.client.base_url = os.getenv('OPENAI_BASE_URL')

    def get_embedding(self, text: str, model: str = 'text-embedding-v3') -> List[float]:
        """
        使用 Open API 获取文本的向量表示

        参数：
        text(str) - 待获取向量表示的文本
        model(str) - 模型名称

        返回值：
        List[float] - 文本的向量表示
        """
        if self.is_api:
            text = text.replace('\n', ' ')
            return self.client.embeddings.create(input=[text], model=model).data[0].embedding
        else:
            raise NotImplementedError("本地模型尚未实现")

In [None]:
embedding_model = OpenAIEmbedding()
text = "这是一个实例文本，用于演示 OpenAI Embedding 的使用。"
embedding_vector = embedding_model.get_embedding(text)
print("文本的向量表示为：", embedding_vector)

In [None]:
vector1 = embedding_model.get_embedding(text1)
vector2 = embedding_model.get_embedding(text2)
similarity = embedding_model.cosine_similarity(vector1, vector2)
print(f"两段文本的余弦相似度为：{similarity}")

# 3.文档加载与切分模块创建

### 3.1 文档格式处理函数

展示支持多种格式的简单实现：

In [18]:
def read_file_content(cls, file_path: str):
    # 根据文件扩展名选择读取方式
    if file_path.endswith('.pdf'):
        return cls.read_pdf(file_path)
    elif file_path.endswith('.md'):
        return cls.read_markdown(file_path)
    elif file_path.endswith('.txt'):
        return cls.read_text(file_path)
    else:
        raise ValueError('Unsupported file type')

# 3.2 文档切分函数

按照token长度进行切分，设置一个最大Token长度，然后按照这个长度进行切分。在这个过程我们也会保证每个片段之间有一定的重叠，避免重要信息被切掉。

In [19]:
def get_chunk(cls, text:str, max_token_len:int = 600, cover_content:int=150):
    chunk_text = []
    curr_len = 0
    curr_chunk = ''
    lines = text.split('\n')

    for line in lines:
        line = line.replace(' ', '')
        line_len = len(enc.encode(line))
        if line_len > max_token_len:
            print('warning line_len = ', line_len)
        if curr_len + line_len <= max_token_len:
            curr_chunl += line + '\n'
            curr_len += line_len + 1
        else:
            chunk_text.append(curr_chunk)
            curr_chunk = curr_chunk[-cover_content:] + line
            curr_len = len(enc.encode(curr_chunk))

    if curr_chunk:
        chunk_text.append(curr_chunk)
    
    return chunk_text

In [None]:
# 需要梯子
enc = tiktoken.get_encoding('cl100k_base')
len(enc.encode('你好啊,好久不见！'))

In [21]:
class ReadFiles:
    """
    读取文件类，用于从指定路径读取支持的文件类型，并进行切分
    当前支持类型：pdf, md, txt
    """

    def __init__(self, path:str) -> None:
        """
        初始化函数，设定要读取的文件路径，并获取该路径下所有符合要求的文件。
        ：param path：文件夹路径
        """
        self._path = path
        self.file_list = self.get_files() 

    def get_files(self):
        """
        遍历指定文件夹，获取支持的文件类型列表（txt, md, pdf）
        return 文件路径列表
        """
        file_list = []
        for filepath, dirnames, filenames in os.walk(self._path):
            for filename in filenames:
                if filename.endswith('.md'):
                    file_list.append(os.path.join(filepath, filename))
                elif filename.endswith('.pdf'):
                    file_list.append(os.path.join(filepath, filename))
                elif filename.endswith('.txt'):
                    file_list.append(os.path.join(filepath, filename))
        return file_list
    
    def get_content(self, max_token_len:int = 600, cover_content: int = 150):
        """
        读取文件内容并进行分割，将长文本切分为多个块
        param max_token_len: 每个文档片段最大 Token 长度
        prarm cover_content: 每个文档片段之间的 Token 长度
        return： 切分后的文档片段列表
        """
        docs = []
        for file in self.file_list:
            content = self.read_file_content(file)
            chunk_content = self.get_chunk(content, max_token_len=max_token_len, cover_content=cover_content)
            docs.extend(chunk_content)
        return docs

    @classmethod
    def get_chunk(cls, text:str, max_token_len:int = 600, cover_content:int=150):
        chunk_text = []
        curr_len = 0
        curr_chunk = ''
        token_len = max_token_len - cover_content
        lines = text.splitlines()

        for line in lines:
            line = line.replace(' ', '')
            line_len = len(enc.encode(line))
            if line_len > max_token_len:
                # 如果但凡长度超过限制，将其分给为多个片段
                num_chunks = (line_len + token_len - 1) // token_len
                for i in range(num_chunks):
                    start = i * token_len
                    end = start + token_len
                    # 防止跨单词分割
                    while not line[start:end].rstrip().isspace():
                        start += 1
                        end += 1
                        if start >= line_len:
                            break
                    curr_chunk = curr_chunk[-cover_content:] + line[start:end]
                    chunk_text.append(curr_chunk)
                start = (num_chunks - 1) * token_len
                curr_chunk = curr_chunk[-cover_content:] + line[start:end]
                chunk_text.append(curr_chunk)
            elif curr_len + line_len <= max_token_len:
                # 当前片段长度未超过限制时，继续累加
                curr_chunk += line + '\n'
                curr_len += line_len + 1
            else:    
                chunk_text.append(curr_chunk)
                curr_chunk = curr_chunk[-cover_content:] + line
                curr_len = len(enc.encode(curr_chunk))

        if curr_chunk:
            chunk_text.append(curr_chunk)
        
        return chunk_text
    
    @classmethod
    def read_file_content(cls, file_path:str):
        """
        读取文件内容，根据文件类型选择不同的读取方式
        param file_path: 文件路径
        return： 文件内容
        """

        if file_path.endswith('.pdf'):
            return cls.read_pdf(file_path)
        elif file_path.endswith('.md'):
            return cls.read_markdown(file_path)
        elif file_path.endswith('.txt'):
            return cls.read_text(file_path)
        else:
            raise ValueError('Unsupported file type')
        
    @classmethod
    def read_pdf(cls, file_path:str):
        """
        读取pdf文件内容
        param file_path: 文件路径
        return： 文件内容
        """
        with open(file_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            text = ''
            for page_num in range(len(reader.pages)):
                text += reader.pages[page_num].extract_text()
            return text

    @classmethod
    def read_markdown(cls, file_path:str):
        """
        读取markdown文件内容
        param file_path: 文件路径
        return： 文件内容
        """
        with open(file_path, 'r', encoding='utf-8') as f:
           md_text = f.read()
           html_text = markdown.markdown(md_text)
           # 使用BeautifulSoup（html_text, 'html.parser'）
           soup = BeautifulSoup(html_text, 'html.parser')
           plain_text = soup.get_text()
           # 使用正则表达式一次网址链接
           text = re.sub(r'http\S+', '', plain_text)
           return text
    
    @classmethod
    def read_text(cls, file_path:str):
        """
        读取txt文件内容
        param file_path: 文件路径
        return： 文件内容
        """
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()

In [22]:
class Documents:
    """
    文档类：用于读取已分好类的json格式文档
    """
    def __init__(self, path:str = '') -> None:
        self.path = path
    
    def get_content(self):
        """
        读取 json 格式的文档内容
        return json 文档的内容
        """
        with open(self.path, 'r', encoding='utf-8') as f:
            content = json.load(f)
        return content

In [None]:
# 初始化 ReadFiles类，指定文件目录路径data
file_reader = ReadFiles('data')

# 获取目录下所有支持的文件类型
file_list = file_reader.get_files()
print("支持的文件类型：", file_list)

In [None]:
document_chunks = file_reader.get_content(max_token_len=600, cover_content=150)
print("分块后的文档内容：", document_chunks)

In [None]:
print(document_chunks[0])
print(document_chunks[1])

# 4. 词向量数据库与向量检索模块

为了构建向量数据库，需要一下关键功能：

1. **持久化存储（persist）：** 将数据库存储到本地，便于此次加载使用
2. **加载数据库（load_vector）：** 从本地文件加载已经存储的向量和文档。
3. **获取向量表示（get_vector）：** 将文本转化为向量表示并存储
4. **检索（query）:** 根据用户的Query，检索数据库中的想啊滚文档片段

In [39]:
class VectorStore:
    def __init__(self, document: List[str] = None) -> None:
        """
        初始化向量存储类，存储文档和对应的向量表示
        param document：文档列表，默认为空
        """
        if document is None:
            documnt = []
        self.document = document # 存储文档内容
        self.vector = [] # 存储文档的向量表示

    def get_vector(self, EmbeddingModel:BaseEmbeddings) -> List[List[float]]:
        """
        使用传入的 Embedding 模型当文档向量化
        param EmbeddingMode： 传入的用于生成向量的模型（需继承 BaseEmbedings 类）
        return 返回文档对应的向量列表 
        """
        # 遍历所有文档，获取每个文档的向量表示
        self.vectors = [EmbeddingModel.get_embedding(doc) for doc in self.document]
        return self.vectors
    
    def persist(self, path:str='storage'):
        """
        将文本和对应的向量表示持久化本地目录中，以便后续加载使用
        param path：存储路径，默认为：storage
        """
        if not os.path.exists(path):
            os.makedirs(path) # 如果路径不存在，创建路径
        # 保存向量为 numpy 文件
        np.save(os.path.join(path, 'vectors.npy'), self.vectors)
        # 将文档内容存储到文本文件中
        with open(os.path.join(path, 'documents.txt'), 'w') as f:
            for doc in self.document:
                f.write(doc + '\n')

    def load_vector(self, path:str='storage'):
        # 加载保存的向量数据
        self.vectors = np.load(os.path.join(path, 'vectors.npy')).tolist()
        # 加载文档内容
        with open(os.path.join(path, 'documents.txt'), 'r') as f:
            self.document = [line.strip() for line in f.readlines()]

    def get_similarity(self, vector1:List[float], vector2:List[float]) -> float:
        """
        计算两个向量的预选相似度
        param vector1: 第一个向量
        param vector2: 第二个向量
        return: 返回两个向量的余弦相似度 -1到1
        """
        dot_product = np.dot(vector1, vector2)
        magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
        return dot_product / magnitude

    def query(self, query:str, EmbeddingModel:BaseEmbeddings, k:int = 1)->List[str]:
        """
        根据用户的查询文本，检索最相关的文档片段
        param query: 用户查询的文本
        param EmbeddingModel: 用于生成查询文本向量的模型
        param k: 返回最相关的 k 个文档片段，默认为1
        return: 返回最相关的 k 个文档片段
        """
        # 将查询文本向量化
        query_vector = EmbeddingModel.get_embedding(query)
        # 计算查询向量与每个文档向量的相似度
        similarities = [self.get_similarity(query_vector, vec) for vec in self.vectors]
        # 获取最相关的 k 个文档片段
        top_k_indices = np.argsort(similarities)[-k:][::-1]
        # 返回对应的文档内容
        return [self.document[idx] for idx in top_k_indices]

In [29]:
# 测试
documents = [
    "机器学习是人工智能的一个分支。",
    "深度学习是一种特殊的机器学习方法",
    "监督学习是一种训练模型的方法",
    "强化学习是通过奖励和惩罚进行学习。",
    "无监督学习不依赖标签数据"
]

In [40]:
# 创建向量数据库
vector_store = VectorStore(documents)

# 使用 OpenAI Embedding 模型对文档进行向量化
embedding_model = OpenAIEmbedding()

# 获取文档向量并存储
vector_store.get_vector(embedding_model)

# 持久化存储到本地
vector_store.persist('storage')

In [None]:
# 模拟用户查询
query = "什么是深度学习？"
result = vector_store.query(query, embedding_model)

print('检索结果：', result)

In [None]:
loaded_array = np.load('storage/vectors.npy')
print('加载的向量：', loaded_array)

# 5. 大模型问答模块

先创建基类`BaseModel`，然后再以 GPT4o 模型为例，展示如何使用大语言模型里完成这个任务

### 5.1 大模型模块的基类
 - chat：负责处理用户的输入并生成回答
 - load_model：如果是使用本地模型，这个方法负责加载模型。如果使用API模型，可以不用实现这个方法。 

In [43]:
class BaseModel:
    """
    基础模型类，作为所有模型的基类
    包含一些通用的接口，如加载模型、生成回答等
    """
    def __init__(self, path:str='') -> None:
        self.path = path

    def chat(self, prompt:str, history:List[dict], content:str) -> str:
        """
        处理用户的输入并生成回答
        param query: 用户输入的查询
        return: 模型生成的回答
        """
        raise NotImplementedError

    def load_model(self) -> None:
        """
        加载模型
        """
        raise NotImplementedError

### 5.2 借助GPT4o模型进行对话

In [62]:
class GPT4oChat(BaseModel):
    """
    GPT4o模型对话类
    """
    def __init__(self, api_key:str=api_key
                 , base_url:str=base_url) -> None:
        super().__init__()
        self.client = OpenAI(api_key=api_key, base_url=base_url)

    def chat(self, prompt:str, history:List=[], content:str='') -> str:
        """
        处理用户的输入并生成回答
        param prompt：用户的提问
        param query: 用户输入的查询（可选）
        param content：可参考的上下文信息（可选）
        return：生成的回答
        """
        # 构建包含问题和上下文的完整提示
        full_prompt = PROMPT_TEMPLATE['GPT4o_PROMPT_TEMPLATE'].format(question=prompt, context=content)

        # 调用 GPT-4o 模型进行推理
        response = self.client.chat.completions.create(
            model='deepseek-r1',
            messages=[
                {"role": "user", "content": full_prompt}
            ]
        )

        # 返回模型生成的第一个答案
        return response.choices[0].message.content



### 5.3 提示模版
为了方便维护和复用提示词，可以使用一个字典来保存不同模型的提示模版

In [63]:
PROMPT_TEMPLATE = dict(
    GPT4o_PROMPT_TEMPLATE = """
下面有一个或许与这个问题相关的参考段落，若你觉得参考段落能和问题相关，这现总结参考段落的内容。
若你觉得参考段落和问题无关，这使用你自己的原始知识来回答用户的问题，并且送死使用中文来回答
问题：{question}
可参考的上下文
···
{context}
···
有用的回答：
"""
)

# 6. 完整流程演示

In [64]:
# 加载并切分文档
docs = ReadFiles('./data').get_content(max_token_len=600, cover_content=150)
vector = VectorStore(docs)

In [None]:
# 使用OpenAI Embedding 模型惊进行向量化
embedding = OpenAIEmbedding()
vector.get_vector(EmbeddingModel=embedding)

In [66]:
# 将向量和文本保存到本地
vector.persist(path = 'storage')
# 用户提出问题
question = 'RAG技术的基本原理是什么？'
# 在数据库中检索最相关的文档片段
content = vector.query(question, EmbeddingModel=embedding, k=1)[0]

In [None]:
content

In [None]:
# 使用 GPT40Chat 模型生成答案
chat = GPT4oChat()
print(chat.chat(question, [], content)) 

In [69]:
def run_mini_rag(question:str, knowledge_base_path:str, k:int=1) -> str:
    """
    运行一个简化版的RAG项目

    param question: 用户提出的问题
    param knowledge_base_path：知识库的路径，包含文档的文件夹路径
    param api_key：OpenAI API密钥，用于调用GPT-4o模型
    param k：返回与文艺最想概念的k个文档片段，默认为1
    return：返回GPT-4o模型生成的回答
    """
    api_key = os.getenv('OPENAI_API_KEY')
    # 1. 加载并切分文档
    docs = ReadFiles(knowledge_base_path).get_content(max_token_len=600, cover_content=150)
    vector = VectorStore(docs)
    # 2. 使用OpenAI Embedding 模型惊进行向量化
    embedding = OpenAIEmbedding()
    vector.get_vector(EmbeddingModel=embedding)
    # 3. 将向量和文本保存到本地（可选）
    vector.persist(path = 'storage')
    # 4. 在数据库中检索最相关的文档片段
    content = vector.query(question, EmbeddingModel=embedding, k=k)[0]
    # 5. 使用GPT-4o模型生成答案
    chat = GPT4oChat(api_key=api_key)
    answer = chat.chat(question, [], content)

    return answer



In [70]:
question = 'RAG技术的基本原理是什么？'
knowledge_base_path = './data'
answer = run_mini_rag(question, knowledge_base_path)

In [None]:
display(Markdown(answer))