In [1]:
import re
def filter_content(text):
    # 过滤掉以 '![]' 开头的行
    # ^ 表示行开头，\Q 和 \E 用来转义 '[]' 中的特殊字符
    text = re.sub(r'\n\n!\[\][^\n]*', '', text)

    # 过滤掉被 <html> 标签包裹的内容
    # 匹配从 <html> 到 </html> 的所有内容，包括多行内容
    text = re.sub(r'<html>[\s\S]*?</html>', '', text)
    return text.strip()

def load_md(file_path, is_omit_ref_apx=True):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    sections = re.split(r'\n(?=#+ )', content)
    meta_information = {}
    title2content = {}
    for idx, sec in enumerate(sections):

        position = sec.find('\n\n')
        if position != -1:
            header = sec[:position]
        else:
            match = re.search(r'^(#{1,6})[\s\S]*?(?=\n)', sec, re.MULTILINE)
            header = match.group()

        if idx == 0:
            meta_information['title'] = header
        else:
            filtered_content = filter_content(sec[position:].strip())
            if len(filtered_content) != 0:
                title2content[header] = filtered_content

        if is_omit_ref_apx:
            if 'conclusion' in header.lower():
                break
    return {"meta_information": meta_information, "content": title2content}

In [2]:
import glob

md_folder_path = "/mnt/d/PycharmCode/LLMscratch/essay_searcher/raw_data/mds"
md_files = glob.glob(f'{md_folder_path}/*.md')
data_list = []
for f in md_files:
    data_list.append(load_md(f))


In [2]:
from pymilvus import MilvusClient,Collection
import numpy as np
from typing import List, Optional, TypedDict
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
from pymilvus import FieldSchema, CollectionSchema,DataType

class chunk(TypedDict):
    paper_name: str
    section: str
    content: str
    embedding: List[float]

class essay_database:
    def __init__(self, uri, db_names, model_path):
        self.client = MilvusClient(uri=uri, db_names=db_names)
        self.embedding_model = BGEM3EmbeddingFunction(
            model_name=model_path,  # Specify the model name
            device='cuda:0',  # Specify the device to use, e.g., 'cpu' or 'cuda:0'
            use_fp16=True  # Specify whether to use fp16. Set to `False` if `device` is `cpu`.
        )
        self.chunk_size = 512

    def add_batch(self, datalist, collection_name):
        chunk_list = []
        content_queue = []
        for data in datalist:
            paper_name = data['meta_information']['title']
            for t, c in data['content'].items():
                truncated_content = self.truncate_chunk(c)
                content_queue.append(truncated_content)
                chunk_list.append({
                    "paper_name": paper_name,
                    "section": t,
                    "content": truncated_content
                })
        docs_embeddings = self.embedding_model.encode_documents(content_queue)['dense']
        for emb, chun in zip(docs_embeddings, chunk_list):
            chun['embedding'] = emb.astype(np.float32)

        self.client.insert(
            collection_name=collection_name,
            data=chunk_list,
        )

    def truncate_chunk(self, chunk_content):
        split_content = chunk_content.split(" ")
        chunk_length = len(split_content)
        if chunk_length <= self.chunk_size:
            return chunk_content
        truncated_content = split_content.copy()
        while chunk_length > 512:
            last_newline_index = chunk_content.rfind('\n')

            # 如果找不到换行符或者字符串已经不能再被有效截断，则直接截断到512
            if last_newline_index <= 0 or last_newline_index >= 512:
                return " ".join(split_content[:512])

            truncated_content = truncated_content[:last_newline_index]

        return truncated_content

    def search(self, query, collection_name, max_results=5):
        if database.client.get_load_state(collection_name="pdf_chunks")['state'] != 3:
            self.load_collection(collection_name)
        query_vector = self.embedding_model.encode_queries([query])['dense']
        query_vector = [arr.astype(np.float32) for arr in query_vector]
        res = self.client.search(
            collection_name=collection_name,
            anns_field="embedding",
            data=query_vector,
            limit=max_results,
            search_params={"metric_type": "IP"},
            output_fields=['paper_name', 'section','content']
        )

        return res
    def load_collection(self, collection_name):
        self.client.load_collection(collection_name=collection_name, replica_number=1)

    def delete(self):
        pass

    def create_collection(self, field_dict: dict, collection_name: str, description: Optional[str], dimension=1024):
        if self.client.has_collection(collection_name=collection_name):
            print(f"The collection {collection_name} has been existed")
            return
        fields_schema = [FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=True)]
        for k, v in field_dict.items():
            if v == 'DataType.FLOAT_VECTOR':
                field = FieldSchema(name=k, dtype=DataType.FLOAT_VECTOR, dim=dimension)
            elif v == 'DataType.INT64':
                field = FieldSchema(name=k, dtype=DataType.INT64)
            elif v == 'STRING':
                field = FieldSchema(name=k, dtype=DataType.VARCHAR, max_length=10240)
            else:
                continue
            fields_schema.append(field)

        schema = CollectionSchema(fields=fields_schema)
        self.client.create_collection(
            collection_name=collection_name,
            schema=schema,  # 使用刚才创建的schema
            dimension=dimension,  # The vectors we will use in this demo has 768 dimensions,
        
            descrption=description
        )
        collection = Collection(name=collection_name,using=self.client._using)

        index_params = {
            "index_type": "FLAT",
            "metric_type": "IP",
        }
        collection.create_index(field_name="embedding", index_params=index_params)

In [3]:
uri='http://localhost:19530'
db_name='essay_seacher_pdfs'
model_path = "/mnt/d/PycharmCode/LLMscratch/essay_searcher/embedding_models/BAAI/bge-m3"

In [4]:
database = essay_database(uri, db_name, model_path)

I0000 00:00:1741240316.217593    9758 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers


In [7]:
query = "基于双语字典的匹配算法构造SFT数据集"

In [39]:
query_vector = database.embedding_model.encode_queries([query])['dense']


In [9]:
res = database.search(query,'pdf_chunks')

In [14]:
def result_paser(retri_res, retain_fileds):
    if len(retri_res) == 1:
        retri_res = retri_res[0]
    res = []
    for data in retri_res:
        d_dict = {}
        for field in retain_fileds:
            d_dict[field] = data['entity'][field]
        res.append(d_dict)
    return res
            

In [16]:
doc = result_paser(res, ['paper_name', 'section', 'content'])

In [17]:
for d in doc:
    print(d['paper_name'])

# LexMatcher: Dictionary-centric Data Collection for LLM-based Machine Translation  
# X-ALMA: PLUG & PLAY MODULES AND ADAPTIVE REJECTION FOR QUALITY TRANSLATION AT SCALE  
# LexMatcher: Dictionary-centric Data Collection for LLM-based Machine Translation  
# LexMatcher: Dictionary-centric Data Collection for LLM-based Machine Translation  
# X-ALMA: PLUG & PLAY MODULES AND ADAPTIVE REJECTION FOR QUALITY TRANSLATION AT SCALE  


In [5]:
field_dict = {
    'paper_name': 'STRING',
    'section': 'STRING',
    'content': 'STRING',
    'embedding': 'DataType.FLOAT_VECTOR'
}

database.create_collection(
    field_dict=field_dict,
    collection_name='pdf_chunks',
    description = "Knowledge base",
    dimension=1024  # The vectors we will use in this demo has 384 dimensions
)

In [7]:
database.add_batch(data_list,'pdf_chunks')

You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.
2025-03-05 22:13:45,514 [ERROR][handler]: RPC error: [insert_rows], <ParamError: (code=1, message=invalid input for float32 vector. Expected an np.ndarray with dtype=float32)>, <Time:{'RPC start': '2025-03-05 22:13:45.464742', 'RPC error': '2025-03-05 22:13:45.512550'}> (decorators.py:140)


ParamError: <ParamError: (code=1, message=invalid input for float32 vector. Expected an np.ndarray with dtype=float32)>

In [33]:
"我是你爹，你是我儿子".find("儿子")

8

In [22]:
from typing import Annotated, TypedDict
import operator
from langgraph.types import Send
from langgraph.graph import END, START, StateGraph

# 定义状态
class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]  # 记录所有的笑话
    error: str  # 存储错误信息（如果没有相关笑话）

# 发送多个请求的函数
def front(state: OverallState):
    print('1')
    print(state)
    return state

def continue_to_jokes(state: OverallState):
    print("2")
    print(state)
    return [Send("generate_joke", {"subject": s}) for s in state['subjects']]

# 生成笑话的函数
def generate_joke(state: OverallState):
    print("3")
    print(state)
    return {"jokes": [f"Joke about {state['subject']}"]}  # 这里返回的是单个字符串的列表

# 判断所有请求是否返回了有意义的结果
def check_results(state: OverallState):
    print("4")
    print(state)
    if "jokes" in state and state["jokes"]:  
        return state  # 保持 state 不变，流程正常结束
    return {"error": "No relevant jokes found."}  # 返回完整的 state 结构

# 构建 LangGraph
builder = StateGraph(OverallState)
# 添加节点
builder.add_node("front", front)
builder.add_node("generate_joke", generate_joke)
builder.add_node("check_results", check_results)  # **添加 check_results 节点**
builder.add_conditional_edges('front', continue_to_jokes)  # 发送多个请求
builder.add_edge(START, 'front')  # 发送多个请求
builder.add_edge("generate_joke", "check_results")  # 连接 generate_joke -> check_results
builder.add_edge("check_results", END)  # 结束流程

# 编译并执行
graph = builder.compile()

# 运行示例
result = graph.invoke({"subjects": ["cats", "dogs"]})
print(result)

1
{'subjects': ['cats', 'dogs'], 'jokes': []}
2
{'subjects': ['cats', 'dogs'], 'jokes': []}
3
{'subject': 'cats'}
3
{'subject': 'dogs'}
4
{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs', 'Joke about cats', 'Joke about dogs']}


In [31]:
from typing import Annotated 
import operator 
class OverallState(TypedDict):    
    subjects: list[str]     
    jokes: Annotated[list[str], operator.add]
    error: int
from langgraph.types import Send
from langgraph.graph import END, START

def front(state: OverallState):
    return state

def continue_to_jokes(state: OverallState):     
    return [Send("generate_joke", {"subject": s}) for s in state['subjects']]

def generate_joke(state: OverallState):
    return {"jokes": [f"Joke about {state['subject']}"]}  # 这里返回的是单个字符串的列表

def check_results(state: OverallState):
    print(state)
    if "jokes" in state and state["jokes"]:  
        return {"error":3}  # 保持 state 不变，流程正常结束
    return {"error": "No relevant jokes found."}  # 返回完整的 state 结构

from langgraph.graph import StateGraph
builder = StateGraph(OverallState)
builder.add_node("front", front)
builder.add_node("generate_joke", generate_joke)
builder.add_node("check", check_results)
builder.add_conditional_edges('front', continue_to_jokes)
builder.add_edge(START,'front')
builder.add_edge("generate_joke", 'check')
builder.add_edge('check', END)
graph = builder.compile()
# Invoking with two subjects results in a generated joke for each
graph.invoke({"subjects": ["cats", "dogs"]})



{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}


{'subjects': ['cats', 'dogs'],
 'jokes': ['Joke about cats', 'Joke about dogs'],
 'error': 3}

In [8]:
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import ChatOpenAI
from langchain import hub
import asyncio
from langgraph.prebuilt import create_react_agent

#创建TavilySearchResults工具
# tools = [TavilySearchResults(max_results=1)]

#从Langchain Hub加载模版
prompt = hub.pull("wfh/react-agent-executor")



In [12]:
prompt.pretty_print()


You are a helpful assistant.


[33;1m[1;3m{{messages}}[0m


In [18]:
prompt.invoke(
    [{"role":"user","content":'heihei'}]
)

ChatPromptValue(messages=[SystemMessage(content='You are a helpful assistant.', additional_kwargs={}, response_metadata={}), HumanMessage(content='heihei', additional_kwargs={}, response_metadata={})])

In [19]:
"""{1}"""

'{1}'