## Tiny-RAG
- 向量化模块
- 文档加载和切分模块
- 数据库
- 向量检索
- 大模型模块

In [2]:
!pip install PyPDF2
!pip install html2text
!pip install tiktoken
!pip install modelscope

Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Downloading pypdf2-3.0.1-py3-none-any.whl (232 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: PyPDF2
Successfully installed PyPDF2-3.0.1
Collecting html2text
  Downloading html2text-2024.2.26.tar.gz (56 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.5/56.5 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: html2text
  Building wheel for html2text (setup.py) ... [?25ldone
[?25h  Created wheel for html2text: filename=html2text-2024.2.26-py3-none-any.whl size=33110 sha256=bf1d7c0ea61159e2207096a399f14d2af8f9e6c0b46f2c209382a1ceb2742d2d
  Stored in directory: /root/.cache/pip/wheels/f3/96/6d/a7eba8f80d31cbd188a2787b81514d82fc5ae6943c44777659
Successfully built htm

In [10]:
!pip install openai

Collecting openai
  Downloading openai-1.38.0-py3-none-any.whl.metadata (22 kB)
Downloading openai-1.38.0-py3-none-any.whl (335 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m335.9/335.9 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: openai
Successfully installed openai-1.38.0


In [13]:
!pip install dashscope zhipuai

Collecting dashscope
  Downloading dashscope-1.20.3-py3-none-any.whl.metadata (6.6 kB)
Collecting zhipuai
  Downloading zhipuai-2.1.4.20230731-py3-none-any.whl.metadata (9.8 kB)
Downloading dashscope-1.20.3-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hDownloading zhipuai-2.1.4.20230731-py3-none-any.whl (89 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.0/90.0 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: zhipuai, dashscope
Successfully installed dashscope-1.20.3 zhipuai-2.1.4.20230731


## 1.Embedding

In [15]:
%%writefile .env
OPENAI_API_KEY='sk-7INmRu6iHEope6cJCa462b608aFb4eE281Df64F4E32e95F9'
OPENAI_BASE_URL='https://threefive.gpt7.link/v1'  # 有些国内的代理商需要加v1，有些不需要

ZHIPUAI_API_KEY='68604cc3e382eba3aba03daca6f4f717.fteCwuYKtXWpexN6'
DASHSCOPE_API_KEY='sk-d4c3f773af5d442c97eb7d9e40479c55'

Overwriting .env


In [16]:
import os
from copy import copy
from typing import Dict, List, Optional, Tuple, Union
import numpy as np

os.environ['CURL_CA_BUNDLE'] = ''
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())


class BaseEmbeddings:
    """
    Base class for embeddings
    """
    def __init__(self, path: str, is_api: bool) -> None:
        self.path = path
        self.is_api = is_api
    
    def get_embedding(self, text: str, model: str) -> List[float]:
        raise NotImplementedError
    
    @classmethod
    def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
        """
        calculate cosine similarity between two vectors
        """
        dot_product = np.dot(vector1, vector2)
        magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
        if not magnitude:
            return 0
        return dot_product / magnitude
    

class OpenAIEmbedding(BaseEmbeddings):
    """
    class for OpenAI embeddings
    """
    def __init__(self, path: str = '', is_api: bool = True) -> None:
        super().__init__(path, is_api)
        if self.is_api:
            from openai import OpenAI
            self.client = OpenAI()
            self.client.api_key = os.getenv("OPENAI_API_KEY")
            self.client.base_url = os.getenv("OPENAI_BASE_URL")
    
    def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]:
        if self.is_api:
            text = text.replace("\n", " ")
            return self.client.embeddings.create(input=[text], model=model).data[0].embedding
        else:
            raise NotImplementedError

class JinaEmbedding(BaseEmbeddings):
    """
    class for Jina embeddings
    """
    def __init__(self, path: str = 'jinaai/jina-embeddings-v2-base-zh', is_api: bool = False) -> None:
        super().__init__(path, is_api)
        self._model = self.load_model()
        
    def get_embedding(self, text: str) -> List[float]:
        return self._model.encode([text])[0].tolist()
    
    def load_model(self):
        import torch
        from transformers import AutoModel
        if torch.cuda.is_available():
            device = torch.device("cuda")
        else:
            device = torch.device("cpu")
        model = AutoModel.from_pretrained(self.path, trust_remote_code=True).to(device)
        return model

class ZhipuEmbedding(BaseEmbeddings):
    """
    class for Zhipu embeddings
    """
    def __init__(self, path: str = '', is_api: bool = True) -> None:
        super().__init__(path, is_api)
        if self.is_api:
            from zhipuai import ZhipuAI
            self.client = ZhipuAI(api_key=os.getenv("ZHIPUAI_API_KEY")) 
    
    def get_embedding(self, text: str) -> List[float]:
        response = self.client.embeddings.create(
        model="embedding-2",
        input=text,
        )
        return response.data[0].embedding

class DashscopeEmbedding(BaseEmbeddings):
    """
    class for Dashscope embeddings
    """
    def __init__(self, path: str = '', is_api: bool = True) -> None:
        super().__init__(path, is_api)
        if self.is_api:
            import dashscope
            dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
            self.client = dashscope.TextEmbedding

    def get_embedding(self, text: str, model: str='text-embedding-v1') -> List[float]:
        response = self.client.call(
            model=model,
            input=text
        )
        return response.output['embeddings'][0]['embedding']

## 2.文档加载和切分

In [4]:

import os
from typing import Dict, List, Optional, Tuple, Union

import PyPDF2
import markdown
import html2text
import json
from tqdm import tqdm
import tiktoken
from bs4 import BeautifulSoup
import re

enc = tiktoken.get_encoding("cl100k_base")


class ReadFiles:
    """
    class to read files
    """

    def __init__(self, path: str) -> None:
        self._path = path
        self.file_list = self.get_files()

    def get_files(self):
        # args：dir_path，目标文件夹路径
        file_list = []
        for filepath, dirnames, filenames in os.walk(self._path):
            # os.walk 函数将递归遍历指定文件夹
            for filename in filenames:
                # 通过后缀名判断文件类型是否满足要求
                if filename.endswith(".md"):
                    # 如果满足要求，将其绝对路径加入到结果列表
                    file_list.append(os.path.join(filepath, filename))
                elif filename.endswith(".txt"):
                    file_list.append(os.path.join(filepath, filename))
                elif filename.endswith(".pdf"):
                    file_list.append(os.path.join(filepath, filename))
        return file_list

    def get_content(self, max_token_len: int = 600, cover_content: int = 150):
        docs = []
        # 读取文件内容
        for file in self.file_list:
            content = self.read_file_content(file)
            chunk_content = self.get_chunk(
                content, max_token_len=max_token_len, cover_content=cover_content)
            docs.extend(chunk_content)
        return docs

    @classmethod
    def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
        chunk_text = []

        curr_len = 0
        curr_chunk = ''

        token_len = max_token_len - cover_content
        lines = text.splitlines()  # 假设以换行符分割文本为行

        for line in lines:
            line = line.replace(' ', '')
            line_len = len(enc.encode(line))
            if line_len > max_token_len:
                # 如果单行长度就超过限制，则将其分割成多个块
                num_chunks = (line_len + token_len - 1) // token_len
                for i in range(num_chunks):
                    start = i * token_len
                    end = start + token_len
                    # 避免跨单词分割
                    while not line[start:end].rstrip().isspace():
                        start += 1
                        end += 1
                        if start >= line_len:
                            break
                    curr_chunk = curr_chunk[-cover_content:] + line[start:end]
                    chunk_text.append(curr_chunk)
                # 处理最后一个块
                start = (num_chunks - 1) * token_len
                curr_chunk = curr_chunk[-cover_content:] + line[start:end]
                chunk_text.append(curr_chunk)
                
            if curr_len + line_len <= token_len:
                curr_chunk += line
                curr_chunk += '\n'
                curr_len += line_len
                curr_len += 1
            else:
                chunk_text.append(curr_chunk)
                curr_chunk = curr_chunk[-cover_content:]+line
                curr_len = line_len + cover_content

        if curr_chunk:
            chunk_text.append(curr_chunk)

        return chunk_text

    @classmethod
    def read_file_content(cls, file_path: str):
        # 根据文件扩展名选择读取方法
        if file_path.endswith('.pdf'):
            return cls.read_pdf(file_path)
        elif file_path.endswith('.md'):
            return cls.read_markdown(file_path)
        elif file_path.endswith('.txt'):
            return cls.read_text(file_path)
        else:
            raise ValueError("Unsupported file type")

    @classmethod
    def read_pdf(cls, file_path: str):
        # 读取PDF文件
        with open(file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            text = ""
            for page_num in range(len(reader.pages)):
                text += reader.pages[page_num].extract_text()
            return text

    @classmethod
    def read_markdown(cls, file_path: str):
        # 读取Markdown文件
        with open(file_path, 'r', encoding='utf-8') as file:
            md_text = file.read()
            html_text = markdown.markdown(md_text)
            # 使用BeautifulSoup从HTML中提取纯文本
            soup = BeautifulSoup(html_text, 'html.parser')
            plain_text = soup.get_text()
            # 使用正则表达式移除网址链接
            text = re.sub(r'http\S+', '', plain_text) 
            return text

    @classmethod
    def read_text(cls, file_path: str):
        # 读取文本文件
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()


class Documents:
    """
        获取已分好类的json格式文档
    """
    def __init__(self, path: str = '') -> None:
        self.path = path
    
    def get_content(self):
        with open(self.path, mode='r', encoding='utf-8') as f:
            content = json.load(f)
        return content


## 3.数据库 && 向量检索

In [17]:
import os
from typing import Dict, List, Optional, Tuple, Union
import json
import numpy as np
from tqdm import tqdm


class VectorStore:
    def __init__(self, document: List[str] = ['']) -> None:
        self.document = document

    def get_vector(self, EmbeddingModel: BaseEmbeddings) -> List[List[float]]:
        
        self.vectors = []
        for doc in tqdm(self.document, desc="Calculating embeddings"):
            self.vectors.append(EmbeddingModel.get_embedding(doc))
        return self.vectors

    def persist(self, path: str = 'storage'):
        if not os.path.exists(path):
            os.makedirs(path)
        with open(f"{path}/doecment.json", 'w', encoding='utf-8') as f:
            json.dump(self.document, f, ensure_ascii=False)
        if self.vectors:
            with open(f"{path}/vectors.json", 'w', encoding='utf-8') as f:
                json.dump(self.vectors, f)

    def load_vector(self, path: str = 'storage'):
        with open(f"{path}/vectors.json", 'r', encoding='utf-8') as f:
            self.vectors = json.load(f)
        with open(f"{path}/doecment.json", 'r', encoding='utf-8') as f:
            self.document = json.load(f)

    def get_similarity(self, vector1: List[float], vector2: List[float]) -> float:
        return BaseEmbeddings.cosine_similarity(vector1, vector2)

    def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1) -> List[str]:
        query_vector = EmbeddingModel.get_embedding(query)
        result = np.array([self.get_similarity(query_vector, vector)
                          for vector in self.vectors])
        return np.array(self.document)[result.argsort()[-k:][::-1]].tolist()


## 大模型模块

In [6]:
import os
from typing import Dict, List, Optional, Tuple, Union

PROMPT_TEMPLATE = dict(
    RAG_PROMPT_TEMPALTE="""使用以上下文来回答用户的问题。如果你不知道答案，就说你不知道。总是使用中文回答。
        问题: {question}
        可参考的上下文：
        ···
        {context}
        ···
        如果给定的上下文无法让你做出回答，请回答数据库中没有这个内容，你不知道。
        有用的回答:""",
    InternLM_PROMPT_TEMPALTE="""先对上下文进行内容总结,再使用上下文来回答用户的问题。如果你不知道答案，就说你不知道。总是使用中文回答。
        问题: {question}
        可参考的上下文：
        ···
        {context}
        ···
        如果给定的上下文无法让你做出回答，请回答数据库中没有这个内容，你不知道。
        有用的回答:"""
)


class BaseModel:
    def __init__(self, path: str = '') -> None:
        self.path = path

    def chat(self, prompt: str, history: List[dict], content: str) -> str:
        pass

    def load_model(self):
        pass

class OpenAIChat(BaseModel):
    def __init__(self, path: str = '', model: str = "gpt-3.5-turbo-1106") -> None:
        super().__init__(path)
        self.model = model

    def chat(self, prompt: str, history: List[dict], content: str) -> str:
        from openai import OpenAI
        client = OpenAI()
        client.api_key = os.getenv("OPENAI_API_KEY")   
        client.base_url = os.getenv("OPENAI_BASE_URL")
        history.append({'role': 'user', 'content': PROMPT_TEMPLATE['RAG_PROMPT_TEMPALTE'].format(question=prompt, context=content)})
        response = client.chat.completions.create(
            model=self.model,
            messages=history,
            max_tokens=150,
            temperature=0.1
        )
        return response.choices[0].message.content

class InternLMChat(BaseModel):
    def __init__(self, path: str = '') -> None:
        super().__init__(path)
        self.load_model()

    def chat(self, prompt: str, history: List = [], content: str='') -> str:
        prompt = PROMPT_TEMPLATE['InternLM_PROMPT_TEMPALTE'].format(question=prompt, context=content)
        response, history = self.model.chat(self.tokenizer, prompt, history)
        return response


    def load_model(self):
        import torch
        from transformers import AutoTokenizer, AutoModelForCausalLM
        self.tokenizer = AutoTokenizer.from_pretrained(self.path, trust_remote_code=True)
        self.model = AutoModelForCausalLM.from_pretrained(self.path, torch_dtype=torch.float16, trust_remote_code=True).cuda()

class DashscopeChat(BaseModel):
    def __init__(self, path: str = '', model: str = "qwen-turbo") -> None:
        super().__init__(path)
        self.model = model

    def chat(self, prompt: str, history: List[Dict], content: str) -> str:
        import dashscope
        dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
        history.append({'role': 'user', 'content': PROMPT_TEMPLATE['RAG_PROMPT_TEMPALTE'].format(question=prompt, context=content)})
        response = dashscope.Generation.call(
            model=self.model,
            messages=history,
            result_format='message',
            max_tokens=150,
            temperature=0.1
        )
        return response.output.choices[0].message.content
    

class ZhipuChat(BaseModel):
    def __init__(self, path: str = '', model: str = "glm-4") -> None:
        super().__init__(path)
        from zhipuai import ZhipuAI
        self.client = ZhipuAI(api_key=os.getenv("ZHIPUAI_API_KEY"))
        self.model = model

    def chat(self, prompt: str, history: List[Dict], content: str) -> str:
        history.append({'role': 'user', 'content': PROMPT_TEMPLATE['RAG_PROMPT_TEMPALTE'].format(question=prompt, context=content)})
        response = self.client.chat.completions.create(
            model=self.model,
            messages=history,
            max_tokens=150,
            temperature=0.1
        )
        return response.choices[0].message

## test

In [18]:
!git clone https://github.com/datawhalechina/llm-cookbook

Cloning into 'llm-cookbook'...
remote: Enumerating objects: 4251, done.[K
remote: Counting objects: 100% (406/406), done.[K
remote: Compressing objects: 100% (214/214), done.[K
remote: Total 4251 (delta 253), reused 289 (delta 190), pack-reused 3845[K
Receiving objects: 100% (4251/4251), 229.69 MiB | 38.82 MiB/s, done.
Resolving deltas: 100% (2391/2391), done.
Updating files: 100% (461/461), done.


In [None]:
import torch
from modelscope import snapshot_download, AutoModel, AutoTokenizer
import os
download=False
if download:
    model_dir = snapshot_download('Shanghai_AI_Laboratory/internlm2-chat-7b', cache_dir='./', revision='master')
    model_dir = snapshot_download('jinaai/jina-embeddings-v2-base-zh', cache_dir='./', revision='master')

## 使用Dashscope

In [19]:
# 建立向量数据库
docs = ReadFiles('/kaggle/working/llm-cookbook/content/必修一-Prompt Engineering For Developers').get_content(max_token_len=600, cover_content=150) # 获得data目录下的所有文件内容并分割
vector = VectorStore(docs)
embedding = DashscopeEmbedding(path='') # 创建EmbeddingModel
vector.get_vector(EmbeddingModel=embedding)
vector.persist(path='storage') # 将向量和文档内容保存到storage目录下，下次再用就可以直接加载本地的数据库

Calculating embeddings: 100%|██████████| 11/11 [00:10<00:00,  1.08it/s]


In [24]:
vector = VectorStore()

vector.load_vector('./storage') # 加载本地的数据库

embedding = DashscopeEmbedding(path='')

question = '提示工程是什么？'

contents = vector.query(question, EmbeddingModel=embedding, k=3)

for content in contents:
    print(content)

面向开发者的提示工程
本教程为吴恩达《ChatGPTPromptEngineeringforDevelopers》课程中文版，主要内容为指导开发者如何构建Prompt并基于OpenAIAPI构建新的、基于LLM的应用，包括：

书写Prompt的原则
文本总结（如总结用户评论）；
文本推断（如情感分类、主题提取）；
文本转换（如翻译、自动纠错）；
扩展（如书写邮件）

目录：

简介Introduction@邹雨衡
Prompt的构建原则Guidelines@邹雨衡
如何迭代优化PromptItrative@邹雨衡
文本总结Summarizing@玉琳
文本推断@长琴
文本转换Transforming@玉琳
文本扩展Expand@邹雨衡
聊天机器人@长琴
总结@长琴

附1使用ChatGLM学习@宋志学

第一章简介
作者吴恩达教授
欢迎来到本课程，我们将为开发人员介绍ChatGPT提示词工程（PromptEngineering）。本课程由IsaFulford教授和我一起授课。Isa是OpenAI的技术团队成员，曾开发过受欢迎的ChatGPT检索插件，并且在教授LLM（LargeLanguageModel,大语言模型）技术在产品中的应用方面做出了很大贡献。她还参与编写了教授人们使用Prompt的OpenAIcookbook。
互联网上有很多有关提示词（Prompt,本教程中将保留该术语）的材料，例如《30promptseveryonehastoknow》之类的文章。这些文章主要集中在ChatGPT的Web界面上，许多人在使用它执行特定的、通常是一次性的任务。但是，我认为对于开发人员，LLM的更强大功能是能通过API调用，从而快速构建软件应用程序。我认为这方面还没有得到充分的重视。实际上，我们在DeepLearning.AI的姊妹公司AIFund的团队一直在与许多初创公司合作，将这些技术应用于诸多应用程序上。很兴奋能看到LLMAPI能够让开发人员非常快速地构建应用程序。

恭喜您完成了这门短期课程。
总的来说，在这门课程中，我们学习了关于Prompt的两个关键原则：

编写清晰具体的指令；
如果适当的话，给模型一些思考时间。

您还学习了迭代式Prompt开发的方法，并了解了如何找到适合您应用程序的Prompt的过程是非常关键的。
我们还介绍了许多大型语言模型的功能，

In [25]:
## 使用RAG
model = DashscopeChat(path='')
print(model.chat(question, [], '\n\n\n'.join(contents)))

提示工程（Prompt Engineering）是指为大型语言模型（LLM）设计和构建指令（Prompt）的过程，以引导模型生成所需的结果。在面向开发者的提示工程中，重点在于指导开发者如何使用OpenAI API构建基于LLM的应用程序。这包括遵循书写Prompt的原则，以及对文本进行总结、推断、转换和扩展等操作。通过迭代优化Prompt，开发者可以提高模型的响应质量和相关性。此外，提示工程还涉及构建聊天机器人，使模型能够与用户进行自然对话。总之，提示工程旨在最大化LLM的能力，使其能够满足各种应用场景的需求。


In [26]:
## 不使用RAG
model = DashscopeChat(path='')
print(model.chat(question, [], ''))

提示工程是一种设计和实施过程，旨在创建、管理和优化用于生成提示（指导或建议）的系统。这些提示通常用于帮助用户在特定任务或决策过程中更有效地工作。提示工程涉及多个领域，包括人机交互、认知科学、软件工程和设计。通过理解用户需求、任务复杂性以及可用资源，提示工程师可以设计出既有效又易于使用的提示系统，以提高工作效率和用户体验。
