加载文档

In [None]:
import os
import glob
import json
import logging
import fitz  # PyMuPDF, 用于PDF文本提取
from dotenv import load_dotenv  # 用于加载.env环境变量
from langchain.vectorstores import Chroma  # 向量数据库
from langchain.embeddings import SentenceTransformerEmbeddings  # 向量化
from langchain.text_splitter import CharacterTextSplitter  # 文本分片
from langchain.llms import OpenAI  # OpenAI LLM调用

# 加载本地.env文件中的环境变量（如API KEY）
load_dotenv()

# 读取OpenAI密钥
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# 设置使用的模型（OpenRouter上的GPT-4o-mini）
OPENAI_MODEL = "gpt-4o-mini"

# 配置日志，方便调试
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def load_pdf_from_folder(folder_path):
    """
    Load all the PDF files in the specified folder, 
    returning the full text content of each file (string list).
    """
    all_documents = []
    # Iterate over all pdf files
    for filename in glob.glob(f"Local_knowledge_base/*.pdf"):
        document_text = ""
        # Open the PDF and iterate over each page, accumulating the text
        with fitz.open(filename) as doc:
            for page_num in range(doc.page_count):
                page = doc.load_page(page_num)
                document_text += page.get_text("text")
        all_documents.append(document_text)
    return all_documents


# 调用函数并打印
all_documents = load_pdf_from_folder("Local_knowledge_base")  # 传入PDF所在文件夹路径
print("Loaded documents:", len(all_documents))
print("First document text:", all_documents[0][:100])  # 打印第一个文档的前100个字符
print("Second document text:", all_documents[1][:100])  # 打印第二个文档的前100个字符

Loaded documents: 4
First document text: 1
CSRF
1. 原理
2. 与XSS区别
3. 常见场景
4. 常见漏洞点
5. 漏洞危害
6. CSRF Poc 构造
7. 漏洞审计
8. 漏洞修复
9. Webgoat
跨站请求伪造（Cro
First document text: 1
XXE 漏洞
1. xml基本介绍
1.1. 什么是xml
1.2. xml 内容示例
1.2.1. DTD 约束
1.2.2. 内部实体 Internal Entity
1.2.3. 外部实体 


分片

In [32]:
from langchain.schema import Document
from langchain.text_splitter import CharacterTextSplitter

def split_documents(documents, chunk_size=1000, overlap=100):
    """
    对每个文档做分片，chunk_size控制单片长度，overlap保证上下文连贯
    输入: 字符串list，输出: 分片list
    """
    # 将字符串列表转换为Document对象列表
    doc_objects = [Document(page_content=doc) for doc in documents]
    
    # 创建text_splitter对象
    text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap)
    
    # 分片并返回结果
    return text_splitter.split_documents(doc_objects)

# 使用示例
split_docs = split_documents(all_documents, chunk_size=1000, overlap=100)
print("Number of document chunks:", len(split_docs))
# 使用示例
print("First chunk text:", split_docs[0].page_content[:100])  # 打印第一个分片的前100个字符
print("Second chunk text:", split_docs[1].page_content[:100])  # 打印第二个分片的前100个字符




Number of document chunks: 15
First chunk text: 1
CSRF
1. 原理
2. 与XSS区别
3. 常见场景
4. 常见漏洞点
5. 漏洞危害
6. CSRF Poc 构造
7. 漏洞审计
8. 漏洞修复
9. Webgoat
跨站请求伪造（Cro
Second chunk text: 一次完整的 CSRF 攻击需要具备以下两个条件：
用户已经登录某站点，并且在浏览器中存储了登录后的 Cookie 信息。
在不注销某站点的情况下，去访问攻击者构造的站点。
例：
网站管理员添加用户的 


索引

In [33]:
from langchain.vectorstores import Chroma
from langchain.embeddings import SentenceTransformerEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.schema import Document

def build_index(documents):
    """
    Convert the sharded documents into vectors and index them
    """
    embeddings = SentenceTransformerEmbeddings(model_name="shibing624/text2vec-base-chinese")
    
    # 将文档列表转换为Document对象（确保doc是字符串类型）
    doc_objects = [Document(page_content=str(doc)) for doc in documents]
    
    # 使用Chroma创建向量索引
    index = Chroma.from_documents(doc_objects, embeddings)
    
    return index

# 使用示例：假设split_docs包含分片后的文档列表
index = build_index(split_docs)

# 打印索引信息（获取索引中文档数量）
print("Index built with", index._collection.count(), "document chunks.")

INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: mps
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: shibing624/text2vec-base-chinese
  self.ids_to_tokens = collections.OrderedDict([(ids, tok) for tok, ids in self.vocab.items()])


Index built with 57 document chunks.


召回

In [34]:
def recall_documents(query, index, k=5):
    """
    A similarity retrieval is performed and the k most similar documents to the query are returned
    """
    return index.similarity_search(query, k=k)

# 使用示例
query = "2024年世界职业院校分为哪些赛道?"
retrieved_docs = recall_documents(query, index, k=5)

# 打印检索到的文档内容
for i, doc in enumerate(retrieved_docs):
    print(f"Document {i+1} content:", doc.page_content[:100])  # 打印每个文档的前100个字符


Document 1 content: page_content='2024 年世界职业院校技能大赛
制度汇编
世界职业院校技能大赛执行委员会（筹）
2024 年9 月
目
录
世界职业院校技能大赛管理规定与办法
组织机构与职能分工....
Document 2 content: page_content='1
SSRF漏洞
1. SSRF漏洞
1.1. 原理
1.2. 漏洞危害
1.3. 容易出现漏洞的地方
2. 漏洞审计点
2.1. URLConnection
2.2. H
Document 3 content: page_content='1
SSRF漏洞
1. SSRF漏洞
1.1. 原理
1.2. 漏洞危害
1.3. 容易出现漏洞的地方
2. 漏洞审计点
2.1. URLConnection
2.2. H
Document 4 content: page_content='1
SSRF漏洞
1. SSRF漏洞
1.1. 原理
1.2. 漏洞危害
1.3. 容易出现漏洞的地方
2. 漏洞审计点
2.1. URLConnection
2.2. H
Document 5 content: page_content='1
SSRF漏洞
1. SSRF漏洞
1.1. 原理
1.2. 漏洞危害
1.3. 容易出现漏洞的地方
2. 漏洞审计点
2.1. URLConnection
2.2. H


重排

In [35]:
import os
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
from pathlib import Path

# 获取项目根目录（.env 所在目录）
root_dir = Path().resolve().parent

# 加载项目根目录下的 .env 文件中的 OPENROUTER_API_KEY
load_dotenv(dotenv_path=root_dir / ".env")

# 测试输出
api_key = os.getenv("OPENROUTER_API_KEY")
if api_key is None:
    raise ValueError("OPENROUTER_API_KEY not found in .env!")
print("✅ API key loaded:", api_key[:5] + "...")    # 👈 看看是不是 None

✅ API key loaded: sk-or...


In [36]:
from langchain_openai import ChatOpenAI
import os
from dotenv import load_dotenv
from pathlib import Path

# 加载项目根目录下的 .env 文件中的 OPENROUTER_API_KEY
root_dir = Path().resolve().parent
load_dotenv(dotenv_path=root_dir / ".env")
# 从 .env 读取 OPENROUTER_API_KEY，并设置成 OpenAI 兼容变量
os.environ["OPENAI_API_KEY"] = os.getenv("OPENROUTER_API_KEY")
os.environ["OPENAI_API_BASE"] = "https://openrouter.ai/api/v1"

# 获取 OPENROUTER_API_KEY 环境变量
api_key = os.getenv("OPENROUTER_API_KEY")

def rerank_documents(query, retrieved_docs):
    """
    Reorder retrieved documents using LLM or models
    """
    
    # 使用ChatOpenAI进行重排
    llm = ChatOpenAI(openai_api_key=api_key, model="openai/gpt-4o-mini")
    scores = []
    
    # 为每个文档生成相关性分数
    for doc in retrieved_docs:
        prompt = f"Query: {query}\nDocument: {doc.page_content}\n"
        
        # 构建适合ChatOpenAI的消息格式
        messages = [
            {"role": "system", "content": "You are an assistant helping to rank documents based on relevance."},
            {"role": "user", "content": prompt}
        ]
        
        # 获取相关性分数，确保传递正确的消息列表格式
        response = llm.invoke(messages)  # 确保使用 invoke 来调用
        
        # 提取模型生成的内容（直接访问 content 属性）
        score = response.content  # 获取生成的文本
        
        scores.append((score, doc))
    
    # 按照分数对文档进行排序
    sorted_docs = sorted(scores, key=lambda x: x[0], reverse=True)
    
    # 返回排序后的文档
    return [doc[1] for doc in sorted_docs]

# 使用示例：假设你已经调用了 recall_documents 获取了 retrieved_docs
query = "2024年世界职业院校分为哪些赛道?"
retrieved_docs = recall_documents(query, index, k=3)

# 重排检索到的文档
reranked_docs = rerank_documents(query, retrieved_docs)

# 打印重排后的文档内容
for i, doc in enumerate(reranked_docs):
    print(f"Reranked Document {i+1} content:", doc.page_content[:100])  # 打印每个文档的前100个字符


INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"


Reranked Document 1 content: page_content='2024 年世界职业院校技能大赛
制度汇编
世界职业院校技能大赛执行委员会（筹）
2024 年9 月
目
录
世界职业院校技能大赛管理规定与办法
组织机构与职能分工....
Reranked Document 2 content: page_content='1
SSRF漏洞
1. SSRF漏洞
1.1. 原理
1.2. 漏洞危害
1.3. 容易出现漏洞的地方
2. 漏洞审计点
2.1. URLConnection
2.2. H
Reranked Document 3 content: page_content='1
SSRF漏洞
1. SSRF漏洞
1.1. 原理
1.2. 漏洞危害
1.3. 容易出现漏洞的地方
2. 漏洞审计点
2.1. URLConnection
2.2. H


生成

In [39]:
from langchain_openai import ChatOpenAI
import os
from dotenv import load_dotenv
from pathlib import Path

# 加载项目根目录下的 .env 文件中的 OPENROUTER_API_KEY
root_dir = Path().resolve().parent
load_dotenv(dotenv_path=root_dir / ".env")

# 从 .env 读取 OPENROUTER_API_KEY，并设置成 OpenAI 兼容变量
os.environ["OPENAI_API_KEY"] = os.getenv("OPENROUTER_API_KEY")
os.environ["OPENAI_API_BASE"] = "https://openrouter.ai/api/v1"

# 获取 OPENROUTER_API_KEY 环境变量
api_key = os.getenv("OPENROUTER_API_KEY")

def generate_answer(query, top_docs):
    """
    基于查询和重排后的文档生成最终答案
    """
    # 将文档内容拼接为一个字符串（确保传递给模型的是一个长文本）
    documents_content = "\n".join([doc.page_content for doc in top_docs])
    
    # 格式化prompt，确保传递给模型的是一个字符串
    prompt = f"Answer the following question based on the retrieved documents:\n{documents_content}\n\nQuestion: {query}\nAnswer:"
    
    # 使用ChatOpenAI进行生成
    llm = ChatOpenAI(openai_api_key=api_key, model="openai/gpt-4o-mini")
    
    try:
        # 调用模型生成答案
        response = llm.invoke([{"role": "system", "content": "You are an assistant helping to answer questions based on documents."},
                               {"role": "user", "content": prompt}])
        
        # 获取生成的答案（通常是从 response.content 中提取）
        answer = response.content.strip()  # 提取生成的文本，去除多余的空格
        
        return answer
    except Exception as e:
        print(f"Error generating answer: {e}")
        return None


# 使用示例：假设你已经调用了 recall_documents 获取了 retrieved_docs 和 reranked_docs
query = "2024年世界职业院校分为哪些赛道?"
print("Generating answer for query:", query)

# 调用生成答案的函数
answer = generate_answer(query, reranked_docs)

# 打印生成的答案
if answer:
    print("Generated answer:", answer)
else:
    print("Failed to generate an answer.")


Generating answer for query: 2024年世界职业院校分为哪些赛道?


INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"


Generated answer: 2024年世界职业院校技能大赛共设置42个赛道。这些赛道涉及不同的专业领域，具体的赛道信息可能会在相关的赛事通知或指南中详细列出。


集成 FastMCP 创建 MCP 服务器

In [38]:
import os
from dotenv import load_dotenv
from pathlib import Path
from fastmcp import FastMCP

# 加载项目根目录下的 .env 文件中的 OPENROUTER_API_KEY
root_dir = Path().resolve().parent
load_dotenv(dotenv_path=root_dir / ".env")

# 获取 OPENROUTER_API_KEY 环境变量
api_key = os.getenv("OPENROUTER_API_KEY")

# 定义知识库路径
knowledge_base_folder = "/Users/queen/Documents/VSCode/llm_retrieval/Local_knowledge_base"

# 创建 FastMCP 实例
mcp = FastMCP("localRetrieval", log_level="ERROR")

@mcp.tool(name="doc_init", description="A server for handling document retrieval and answer generation")
async def doc_init(self, knowledge_base_folder):
    """
    异步文档初始化，构建文档索引
    """
    self.index = build_index(knowledge_base_folder)  # 构建文档索引

@mcp.tool(name="handle_query", description="Handle a query by recalling, reranking documents and generating an answer")
async def handle_query(self, query):
    """
    处理查询，完成文档召回、重排与答案生成的过程
    """
    # Step 1: 召回文档
    retrieved_docs = await recall_documents(query, self.index)  # 异步召回文档
    
    # Step 2: 重排文档
    reranked_docs = await rerank_documents(query, retrieved_docs)  # 异步重排文档
    
    # Step 3: 生成答案
    answer = await generate_answer(query, reranked_docs)  # 异步生成答案
    
    return answer

# 确保 mcp.run() 运行时不与已有事件循环冲突
if __name__ == "__main__":
    import asyncio
    
    # 检查是否已有事件循环在运行
    if not asyncio.get_event_loop().is_running():
        # 如果没有事件循环，直接调用 run()
        mcp.run(transport='stdio')
    else:
        # 如果已经有事件循环运行，FastMCP 自动管理事件循环
        print("Event loop is already running. FastMCP will be managed automatically.")

Event loop is already running. FastMCP will be managed automatically.


  self._handle_deprecated_settings(
