### 1.向量化

In [93]:
from copy import copy
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import os

os.environ['CURL_CA_BUNDLE'] = ''
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())

class BaseEmbeddings:
    """
    Base class for embeddings
    """
    def __init__(self, path:str, is_api:bool) -> None:
        self.path = path
        self.is_api = is_api

    def get_embedding(self, text:str, model:str) -> List[float]:
        raise NotImplementedError
    
    @classmethod
    def cosine_similarity(cls, vectors1:List[float], vectors2:List[float]) -> float:
        """
        Calculate cosine similarity between two vectors
        """
        dot_product = np.dot(vectors1, vectors2)
        magnitude = np.linalg.norm(vectors1) * np.linalg.norm(vectors2)
        if not magnitude:
            return 0
        return dot_product / magnitude

In [94]:
class OpenAIEmbeddings(BaseEmbeddings):
    """
    class for OpenAI embeddings
    """
    def __init__(self, path:str='', is_api:bool=True) -> None:
        super().__init__(path, is_api)
        if self.is_api:
            from openai import OpenAI
            self.client = OpenAI()
            self.client.api_key = os.getenv('OPENAI_API_KEY')
            self.client.base_url = os.getenv('OPENAI_BASE_URL')
    
    def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]:
        if self.is_api:
            text = text.replace("\n", " ")
            return self.client.embeddings.create(input=[text], model=model).data[0].embedding
        else:
            raise NotImplementedError


In [95]:
from typing import List


class JinaEmbedding(BaseEmbeddings):
    """
    class for Jina embeddings
    """
    def __init__(self, path: str = 'jinaai/jina-embeddings-v2-base-zh', is_api: bool = False) -> None:
        super().__init__(path, is_api)
        self._model = self.load_model()
        
    def get_embedding(self, text: str) -> List[float]:
        return self._model.encode([text])[0].tolist()
    
    def load_model(self):
        import torch
        from transformers import AutoModel
        if torch.cuda.is_available():
            device = torch.device("cuda")
        else:
            device = torch.device("cpu")
        model = AutoModel.from_pretrained(self.path, trust_remote_code=True).to(device)
        return model

class ZhipuEmbedding(BaseEmbeddings):
    """
    class for Zhipu embedding
    """
    def __init__(self, path:str='', is_api:bool=True):
        super().__init__(path, is_api)
        if self.is_api:
            from zhipuai import ZhipuAI
            self.client = ZhipuAI(api_key = os.getenv("ZHIPUAI_API_KEY"))
    
    def get_embedding(self, text: str) -> List[float]:
        response = self.client.embeddings.create(
            model="embedding-2",
            input = text,
        )
        return response.data[0].embedding

class DashscopeEmbedding(BaseEmbeddings):
    """
    class for Dashscope embeddings
    """
    def __init__(self, path: str = '', is_api: bool = True) -> None:
        super().__init__(path, is_api)
        if self.is_api:
            import dashscope
            dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
            self.client = dashscope.TextEmbedding

    def get_embedding(self, text: str, model: str='text-embedding-v1') -> List[float]:
        response = self.client.call(
            model=model,
            input=text
        )
        return response.output['embeddings'][0]['embedding']

### 2.文档加载及切分

In [96]:
# !pip install PyPDF2
# !pip install html2text
# !pip install pymupdf
# !pip install "unstructured[md]"

In [97]:
import PyPDF2
import markdown
import html2text
import json
from tqdm import tqdm
import tiktoken
from bs4 import BeautifulSoup
import re
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_community.document_loaders.markdown import UnstructuredMarkdownLoader

In [210]:
enc = tiktoken.get_encoding("cl100k_base")

class ReadFiles:
    """
    class to read files
    """

    def __init__(self, path: str) -> None:
        self._path = path
        self.file_list = self.get_files()

    def get_files(self):
        # args：dir_path，目标文件夹路径
        file_list = []
        for filepath, dirnames, filenames in os.walk(self._path):
            # os.walk 函数将递归遍历指定文件夹
            for filename in filenames:
                # 通过后缀名判断文件类型是否满足要求
                if filename.endswith(".md"):
                    # 如果满足要求，将其绝对路径加入到结果列表
                    file_list.append(os.path.join(filepath, filename))
                elif filename.endswith(".txt"):
                    file_list.append(os.path.join(filepath, filename))
                elif filename.endswith(".pdf"):
                    file_list.append(os.path.join(filepath, filename))
        return file_list

    def get_content(self, max_token_len: int = 600, cover_content: int = 150):
        print(f"该 目录下的文件数 {len(self.file_list)} ，文件列表为{self.file_list}")
        docs = []
        # 读取文件内容
        for file in self.file_list:
            content = self.read_file_content(file)
            
            chunk_content = self.get_chunk(
                content, max_token_len=max_token_len, cover_content=cover_content)
            # print(f'chunk_content：{chunk_content}')
            docs.extend(chunk_content)
            print(f"该 PDF 一共包含 {len(content)} 字, chunk数目{len(chunk_content)}")
            # print(f'chunk长度{len(chunk_content)},加上该文件后的长度{len(docs)},docs:{docs}')
            # docs.append(chunk_content)
        return docs


    @classmethod
    def read_file_content(cls, file_path: str):
        # 根据文件扩展名选择读取方法
        if file_path.endswith('.pdf'):
            return cls.read_pdf(file_path)
        elif file_path.endswith('.md'):
            return cls.read_markdown(file_path)
        elif file_path.endswith('.txt'):
            return cls.read_text(file_path)
        else:
            raise ValueError("Unsupported file type")

    # @classmethod
    # def read_pdf(cls, file_path: str):
    #     # 读取PDF文件
    #     with open(file_path, 'rb') as file:
    #         reader = PyPDF2.PdfReader(file)
    #         text = ""
    #         for page_num in range(len(reader.pages)):
    #             text += reader.pages[page_num].extract_text()
    #         return text
    @classmethod
    def read_pdf(cls, file_path: str):
        # 读取PDF文件
        loader = PyMuPDFLoader(file_path)
        pdf_pages = loader.load()
        text = ""
        for page_num in range(len(pdf_pages)):
            pdf_page = pdf_pages[page_num]
            # 处理每一页的文本
            import re
            pattern = re.compile(r'[^\u4e00-\u9fff](\n)[^\u4e00-\u9fff]', re.DOTALL)
            pdf_page.page_content = re.sub(pattern, lambda match: match.group(0).replace('\n', ''), pdf_page.page_content)
            pdf_page.page_content = pdf_page.page_content.replace('•', '')
            pdf_page.page_content = pdf_page.page_content.replace(' ', '')
            text += pdf_page.page_content
        return text
    


    # @classmethod
    # def read_markdown(cls, file_path: str):
    #     # 读取Markdown文件
    #     with open(file_path, 'r', encoding='utf-8') as file:
    #         md_text = file.read()
    #         html_text = markdown.markdown(md_text)
    #         # 使用BeautifulSoup从HTML中提取纯文本
    #         soup = BeautifulSoup(html_text, 'html.parser')
    #         plain_text = soup.get_text()
    #         # 使用正则表达式移除网址链接
    #         text = re.sub(r'http\S+', '', plain_text) 
    #         return text
    
    @classmethod
    def read_markdown(cls, file_path: str):
        # 读取Markdown文件
        loader = UnstructuredMarkdownLoader(file_path)
        md_pages = loader.load()
        text = ""
        for page_num in range(len(md_pages)):
            md_page = md_pages[page_num]
            md_page.page_content = md_page.page_content.replace('\n\n', '\n')
            text += md_page.page_content
        return text
        

    @classmethod
    def read_text(cls, file_path: str):
        # 读取文本文件
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    
    @classmethod
    def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
        chunk_text = []

        curr_len = 0
        curr_chunk = ''

        token_len = max_token_len - cover_content
        lines = text.splitlines()  # 假设以换行符分割文本为行

        for line in lines:
            line = line.replace(' ', '')
            line_len = len(enc.encode(line))
            if line_len > max_token_len:
                # 如果单行长度就超过限制，则将其分割成多个块
                num_chunks = (line_len + token_len - 1) // token_len
                for i in range(num_chunks):
                    start = i * token_len
                    end = start + token_len
                    # 避免跨单词分割
                    while not line[start:end].rstrip().isspace():
                        start += 1
                        end += 1
                        if start >= line_len:
                            break
                    curr_chunk = curr_chunk[-cover_content:] + line[start:end]
                    chunk_text.append(curr_chunk)
                # 处理最后一个块
                start = (num_chunks - 1) * token_len
                curr_chunk = curr_chunk[-cover_content:] + line[start:end]
                chunk_text.append(curr_chunk)
                
            if curr_len + line_len <= token_len:
                curr_chunk += line
                curr_chunk += '\n'
                curr_len += line_len
                curr_len += 1
            else:
                chunk_text.append(curr_chunk)
                curr_chunk = curr_chunk[-cover_content:]+line
                curr_len = line_len + cover_content

        if curr_chunk:
            chunk_text.append(curr_chunk)

        return chunk_text


class Documents:
    """
        获取已分好类的json格式文档
    """
    def __init__(self, path: str = '') -> None:
        self.path = path
    
    def get_content(self):
        with open(self.path, mode='r', encoding='utf-8') as f:
            content = json.load(f)
        return content

In [175]:
#测试文件处理

# readfiles = ReadFiles('./data')
# content = readfiles.read_file_content(readfiles.file_list[0])
# docs= readfiles.get_content(content, max_token_len=100, cover_content=20)
# docs= readfiles.get_content(max_token_len=600, cover_content=150)

### 3.数据库 && 向量检索

In [176]:
import os
from typing import Dict,List, Optional, Tuple, Union
import json
import numpy as np
from tqdm import tqdm

class VectorStore:
    def __init__(self, document:List[str]=['']) -> None:
        self.document = document
        self.vectors = []
    
    def get_vector(self, EmbeddingModel: BaseEmbeddings) -> List[List[float]]:
        # 获得文档的向量表示
        for doc in tqdm(self.document, desc="Calculating embeddings"):
            # print(f'emb : {EmbeddingModel.get_embedding(doc)}')
            self.vectors.append(EmbeddingModel.get_embedding(doc))
        return self.vectors

    def persist(self, path: str = 'storage'):
        # 数据库持久化，本地保存
        if not os.path.exists(path):
            os.makedirs(path)
        with open(f"{path}/document.json", 'w', encoding='utf-8') as f:
            json.dump(self.document, f, ensure_ascii=False)
        if self.vectors:
            with open(f"{path}/vectors.json", 'w', encoding='utf-8') as f:
                json.dump(self.vectors, f)

    def load_vector(self, path: str = 'storage'):
        with open(f"{path}/vectors.json", 'r', encoding='utf-8') as f:
            self.vectors = json.load(f)
        with open(f"{path}/document.json", 'r', encoding='utf-8') as f:
            self.document = json.load(f)

    def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1) -> List[str]:
        # 根据问题检索相关的文档片段
        # print(f'doc:{self.document[:5]}')
        query_vector = EmbeddingModel.get_embedding(query)
        # print(f'query_vector:{query_vector[:3]}, len_vector:{len(self.vectors)}')
        result = np.array([self.get_similarity(query_vector, vector) for vector in self.vectors])
        # print(f'result:{result}')
        return np.array(self.document)[result.argsort()[-k:][::-1]].tolist()

    def get_similarity(self,vector1:List[float], vector2:List[float]) -> float:
        return BaseEmbeddings.cosine_similarity(vector1, vector2)
    


#### 4.大模型模块

In [177]:
PROMPT_TEMPLATE = dict(
    RAG_PROMPT_TEMPALTE="""使用以上下文来回答用户的问题。如果你不知道答案，就说你不知道。总是使用中文回答。
        问题: {question}
        可参考的上下文：
        ···
        {context}
        ···
        如果给定的上下文无法让你做出回答，请回答数据库中没有这个内容，你不知道。
        有用的回答:""",
    InternLM_PROMPT_TEMPALTE="""先对上下文进行内容总结,再使用上下文来回答用户的问题。如果你不知道答案，就说你不知道。总是使用中文回答。
        问题: {question}
        可参考的上下文：
        ···
        {context}
        ···
        如果给定的上下文无法让你做出回答，请回答数据库中没有这个内容，你不知道。
        有用的回答:"""
)

In [178]:
class BaseModel:
    def __init__(self, path: str = '') -> None:
        self.path = path

    def chat(self, prompt: str, history: List[dict], content: str) -> str:
        pass

    def load_model(self):
        pass

In [179]:
class OpenAIChat(BaseModel):
    def __init__(self, path: str = '', model: str = "gpt-3.5-turbo-1106") -> None:
        super().__init__(path)
        self.model = model

    def chat(self, prompt: str, history: List[dict], content: str) -> str:
        from openai import OpenAI
        client = OpenAI()
        client.api_key = os.getenv("OPENAI_API_KEY")   
        client.base_url = os.getenv("OPENAI_BASE_URL")
        history.append({'role': 'user', 'content': PROMPT_TEMPLATE['RAG_PROMPT_TEMPALTE'].format(question=prompt, context=content)})
        response = client.chat.completions.create(
            model=self.model,
            messages=history,
            max_tokens=150,
            temperature=0.1
        )
        return response.choices[0].message.content

In [180]:
# !pip install sentencepiece

In [181]:
class InternLMChat(BaseModel):
    def __init__(self, path: str = '') -> None:
        super().__init__(path)
        self.load_model()

    def chat(self, prompt: str, history: List = [], content: str='') -> str:
        prompt = PROMPT_TEMPLATE['InternLM_PROMPT_TEMPALTE'].format(question=prompt, context=content)
        response, history = self.model.chat(self.tokenizer, prompt, history)
        return response


    def load_model(self):
        import torch
        from transformers import AutoTokenizer, AutoModelForCausalLM
        self.tokenizer = AutoTokenizer.from_pretrained(self.path, trust_remote_code=True)
        if torch.cuda.is_available():
            device = torch.device("cuda")
        else:
            device = torch.device("cpu")
        self.model = AutoModelForCausalLM.from_pretrained(self.path, torch_dtype=torch.float16, trust_remote_code=True).to(device)

In [182]:
class DashscopeChat(BaseModel):
    def __init__(self, path: str = '', model: str = "qwen-turbo") -> None:
        super().__init__(path)
        self.model = model

    def chat(self, prompt: str, history: List[Dict], content: str) -> str:
        import dashscope
        dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
        history.append({'role': 'user', 'content': PROMPT_TEMPLATE['RAG_PROMPT_TEMPALTE'].format(question=prompt, context=content)})
        response = dashscope.Generation.call(
            model=self.model,
            messages=history,
            result_format='message',
            max_tokens=150,
            temperature=0.1
        )
        return response.output.choices[0].message.content

In [183]:
class ZhipuChat(BaseModel):
    def __init__(self, path: str = '', model: str = "glm-4") -> None:
        super().__init__(path)
        from zhipuai import ZhipuAI
        self.client = ZhipuAI(api_key=os.getenv("ZHIPUAI_API_KEY"))
        self.model = model

    def chat(self, prompt: str, history: List[Dict], content: str) -> str:
        history.append({'role': 'user', 'content': PROMPT_TEMPLATE['RAG_PROMPT_TEMPALTE'].format(question=prompt, context=content)})
        response = self.client.chat.completions.create(
            model=self.model,
            messages=history,
            max_tokens=150,
            temperature=0.1
        )
        return response.choices[0].message.content

### LLM Tiny-RAG Demo

In [184]:
# !pip install modelscope

In [185]:
#download_model
import torch
from modelscope import snapshot_download, AutoModel, AutoTokenizer

# model_dir = snapshot_download('Shanghai_AI_Laboratory/internlm2-chat-7b', cache_dir='/Users/kangxun/Documents/LLM/model_dir', revision='master')

In [186]:
# emb_model_dir = snapshot_download('jinaai/jina-embeddings-v2-base-zh', cache_dir='/Users/kangxun/Documents/LLM/model_dir', revision='master')

In [187]:
#测试下文本向量化模型
embedding = ZhipuEmbedding()
text_emb = embedding.get_embedding(text="embedding的输入文本")
print(len(text_emb))
print(text_emb[:10])

1024
[0.005395322, 0.07114486, 0.0021059725, 0.030416531, 0.027175419, -0.029336752, -0.0371354, -0.034679066, -0.007935629, 0.07531217]


In [188]:
#测试下离线模型,文本向量化
embedding = JinaEmbedding(path='/Users/kangxun/Documents/LLM/model_dir/jinaai/jina-embeddings-v2-base-zh')
text_emb = embedding.get_embedding(text="embedding的输入文本")
print(len(text_emb))
print(text_emb[:10])

768
[-0.148262158036232, 0.18025441467761993, -0.388839453458786, -0.05320968106389046, 0.021515721455216408, 0.11201328784227371, -0.027237730100750923, 0.04411815479397774, 0.1298605054616928, 0.021183768287301064]


In [189]:
#测试下zhipu chat大模型
llm = ZhipuChat()
output = llm.chat("请你自我介绍一下自己！",[],"")
print(output)

我是一个人工智能助手，专门设计来提供信息和帮助解答问题。根据您提供的上下文，目前没有具体的个人信息可以介绍。如果您有其他问题或需要帮助，请随时告诉我。


In [190]:
#测试下离线大模型chat(transformer版本不兼容)
# llm = InternLMChat(path='/Users/kangxun/Documents/LLM/model_dir/Shanghai_AI_Laboratory/internlm2-chat-7b')
# output = llm.chat("请你自我介绍一下自己！",[],"")

In [215]:
#向量数据库建立
docs = ReadFiles('./data').get_content(max_token_len=600, cover_content=150)
# print(f'docs:{len(docs)}')
vector = VectorStore(docs)
#文本向量化 emb（离线1模型或 zhipu openai）
# embedding = JinaEmbedding(path='/Users/kangxun/Documents/LLM/model_dir/jinaai/jina-embeddings-v2-base-zh')
embedding = ZhipuEmbedding()

vector.get_vector(EmbeddingModel=embedding)
# print(len(vector.vectors))
vector.persist(path='storage')

该 目录下的文件数 4 ，文件列表为['./data/软件专利申请及权利保护.md', './data/pumpkin_book.pdf', './data/Guidelines.md', './data/Introduction.md']
该 PDF 一共包含 7698 字, chunk数目32
该 PDF 一共包含 242230 字, chunk数目752
该 PDF 一共包含 23085 字, chunk数目42
该 PDF 一共包含 1782 字, chunk数目7


Calculating embeddings: 100%|██████████| 833/833 [01:19<00:00, 10.43it/s]


In [219]:
# vector = VectorStore()
# vector.load_vector('./storage')
#向量检索
question='如果不保护自己的软件（算法）会怎样'
# question = '软件（算法）专利申请及权利保护是谁写的？'
# question = '南瓜书是谁写的？'
content = vector.query(question, EmbeddingModel=embedding, k=3)
print(content)

#大模型问答(离线模型或 zhipu openai)
# model = ZhipuChat()
# model = OpenAIChat()
# chat = InternLMChat(path='/Users/kangxun/Documents/LLM/model_dir/Shanghai_AI_Laboratory/internlm2-chat-7b')
# print(model.chat(question, [], content))

['恰当的保护除了可以有效地降低自己在未来遭遇诉讼的风险，也让自己可以在未来的“被侵权”中可以提供更多的证据来有效夺回自己的利益，目前在华为也已经出现了售卖专利使用权的方式来获取远超开发乃至申请专利所消耗的资源的盈利——利用每年几万件的专利申请，将自己的知识产权和自己的开发成果牢牢保护在自己的专利墙中。在当下，腾讯等国内领先的科技公司都不再停留在简单的软著保护，而是开始对软件的执行方法乃至算法进行保护，从而将自己的技术紧紧保护在自己的专利墙中。在法律愈来愈完善的情况下，专利的申请与保护也愈来愈重要。19.1.1如果不保护自己的软件（算法）会怎样？\n', '[TOC]\n第十九章软件（算法）专利申请及权利保护\nMarkdownRevision1;\nDate:2019/07/16\nEditor:何建宏\nContact:bonopengate@gmail.com\n19.1为什么需要对软件（算法）进行保护？\n对软件/系统/算法进行保护可以有效地保护在计算机领域中的公司或个人的权益，随着人工智能的兴起，在图像处理、语音处理、文本处理等方向上，公司或个人不断地研发新的系统，探究新的算法，可是随着软件的开发/设计成本逐渐增高，愈来愈多公司或个人开始对对手产品进行模仿，而这个过程中，被模仿的公司或个人也是深受其害。在美国早已有专门设立的对软件的专利保护政策，而国内因为各种因素迟迟未有这方面的实行政策。所以目前软件开发者或者算法设计者只能通过其它的方法来保护自己的权益，保护自己的知识产权。【1】\n', '知识产权和自己的开发成果牢牢保护在自己的专利墙中。在当下，腾讯等国内领先的科技公司都不再停留在简单的软著保护，而是开始对软件的执行方法乃至算法进行保护，从而将自己的技术紧紧保护在自己的专利墙中。在法律愈来愈完善的情况下，专利的申请与保护也愈来愈重要。19.1.1如果不保护自己的软件（算法）会怎样？\n在《中国法院知识产权司法保护状况（2018）》中指出，2018年，人民法院共新收一审、二审、申请再审等各类知识产权案件334951件，审结319651件（含旧存），比2017年分别上升41.19%和41.64%。其中，竞争类一审案件数量(含垄断民事案件)增幅最为显著,同比上升63.04%,达到4146件。其中的新收专利案件为21699，同比上升35.53%。而其中有一个特

In [220]:
model = ZhipuChat()
print(model.chat(question, [], content))

如果不保护自己的软件（算法），可能会面临以下风险和后果：

1. **容易遭受侵权**：没有保护的软件或算法容易被竞争对手模仿或直接使用，导致开发者的权益受损。

2. **难以提供法律证据**：在发生侵权行为时，如果没有相应的专利或著作权等法律保护，开发者将难以提供有效证据来维护自己的权益。

3. **经济损失**：侵权行为可能导致开发者失去潜在的市场份额和收益，同时如果侵权方因此获利，开发者难以追讨相应的经济损失。

4. **法律诉讼风险**：如果软件或算法被他人申请了专利保护，开发者可能面临被诉侵权的风险，这可能导致巨额的赔偿。

5. **技术优势丧失**：软件和
