In [None]:
# config.py
import os
import logging
import warnings

# 屏蔽不必要的警告
warnings.filterwarnings("ignore")

# 日志配置
logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# 环境变量设置
os.environ["TOKENIZERS_PARALLELISM"] = "false"


In [None]:
# document_loader.py
import os
import logging
import torch
import chardet
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import TextLoader, PyPDFLoader, Docx2txtLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS

device = "cuda" if torch.cuda.is_available() else "cpu"
def detect_encoding(file_path, max_size=1024*1024):
    """安全检测文本文件编码"""
    try:
        with open(file_path, 'rb') as f:
            raw_data = b''
            chunk_size = 4096
            while len(raw_data) < max_size:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                raw_data += chunk
            return chardet.detect(raw_data)['encoding']
    except Exception as e:
        logging.error(f"编码检测失败: {str(e)}")
        return 'utf-8'

def load_document(qa_system, file_path):
    """文档加载处理"""
    print(f"📄  读取文件：{file_path}")
    try:
        if not os.path.isfile(file_path):
            raise ValueError("路径不是文件")

        # 编码检测和备选编码列表
        encoding = detect_encoding(file_path)
        retry_encodings = [encoding, 'utf-8', 'gbk']

        loader = None
        if file_path.lower().endswith('.pdf'):
            loader = PyPDFLoader(file_path)
        elif file_path.lower().endswith(('.docx', '.doc')):
            loader = Docx2txtLoader(file_path)
        else:
            for enc in retry_encodings:
                try:
                    loader = TextLoader(file_path, encoding=enc)
                    loader.load()  # 测试加载
                    break
                except UnicodeDecodeError:
                    continue

        documents = loader.load()

        # 使用递归文本分割器对文档进行切分
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", "。", "！", "？", "；", "……", "…", "　"]
        )
        docs = text_splitter.split_documents(documents)

        print(f"✅ 成功加载 {len(docs)} 个文本块")
        if docs:
            print(f"📝 首文本块示例：{docs[0].page_content[:200]}...")
            #print(f"📝 文本：{docs}")

        # 构造向量数据库
        embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-mpnet-base-v2",
            model_kwargs={"device": device},
            encode_kwargs={"batch_size": 32}
        )

        qa_system.vector_db = FAISS.from_documents(
            docs,
            embeddings
        )

        # 初始化问答链
        qa_system.qa_chain = RetrievalQA.from_chain_type(
            llm=qa_system.llm_registry[qa_system.current_model],
            chain_type="stuff",
            retriever=qa_system.vector_db.as_retriever(search_kwargs={"k": 3}),
            return_source_documents=True
        )
        return True
    except PermissionError as pe:
        logging.error(f"❌ 权限拒绝: {str(pe)}")
        return False
    except Exception as e:
        logging.error(f"❌ 文档加载异常: {str(e)}")
        return False


In [None]:
# models.py
import os
import time
import torch
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
from langchain.chains import RetrievalQA
from langchain_community.llms import Ollama

class DocumentQASystem:
    def __init__(self):
        self.llm_registry = self._init_models()
        self.current_model = "mistral"
        self.memory = ConversationBufferMemory()
        self.vector_db = None
        self.qa_chain = None
        self.conversation_chain = None

    def _init_models(self):
        """初始化模型实例并缓存"""
        return {
            "mistral": Ollama(
                model="mistral",
                temperature=0.7,
                num_ctx=4096,
            ),
            "qwen": Ollama(
                model="qwen2.5:7b",
                temperature=0.5,
                num_ctx=2048,
            ),
            "deepseek": Ollama(
                model="deepseek-r1:7b",
                temperature=0.5,
                num_ctx=2048,
            ),
        }

    def _release_model_resources(self):
        """彻底释放模型资源"""
        for model in self.llm_registry.values():
            if hasattr(model, 'client'):
                del model.client
        if torch.cuda.is_available():
            torch.cuda.empty_cache()


In [None]:
# check_env.py
import sys, torch, numpy
from langchain_community.embeddings import HuggingFaceEmbeddings

print("[系统信息]")
print(f"Python版本: {sys.version}")
print(f"numpy版本: {numpy.__version__}")
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA可用: {torch.cuda.is_available()}")
print(f"所用设备：{torch.cuda.get_device_name(0)}")

print("\n[关键功能验证]")

try:
    emb = HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
    print("✅ 嵌入模型初始化成功")
    test_text = "测试中文向量化"
    embedding = emb.embed_query(test_text)
    print(f"向量维度: {len(embedding)}")
except Exception as e:
    print(f"❌ 初始化失败: {str(e)}")

In [None]:
# utils.py
import pandas as pd
from datetime import datetime
import logging
import os

DEFAULT_PATH = "./compare_output"
def print_help():
    print("""
🔧 系统指令手册：
    /switch [mistral|qwen|deepseek]  - 切换大语言模型
    /compare [问题]         - 对比模型性能
    /help                  - 显示帮助信息
    /upload                - 上传文档进入问答模式
    exit/quit              - 退出系统
""")

def export_to_excel(results, query):
    try:
        safe_query = query.replace("\n", " ")
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"model_comparison_{timestamp}.xlsx"

        # 组装数据
        data = []
        for model_name, result in results.items():
            row = {
                "Model": model_name.upper(),
                "Query": safe_query,
                "Latency": result["latency"],
                "Response Preview": result["response"]
            }
            data.append(row)

        # 构造 DataFrame
        df = pd.DataFrame(data)
        df = df[["Model", "Query", "Latency", "Response Preview"]]

        filename = os.path.join(DEFAULT_PATH, filename)

        # 导出 Excel 文件
        df.to_excel(filename, index=False, engine="openpyxl")
        return filename
    except Exception as e:
        logging.error(f"导出失败: {str(e)}")
        return None

In [None]:
# main.py
import os
import time
import torch
import logging

from langchain.chains import RetrievalQA

from Core.models import DocumentQASystem
from Core.document_loader import load_document
from Utils.utils import print_help, export_to_excel

def process_command(command: str, qa_system: DocumentQASystem) -> bool:
    """处理系统命令"""
    command = command.strip().lower()

    if command.startswith("/switch"):
        model_name = command.split()[-1] if len(command.split()) > 1 else ""
        if model_name in qa_system.llm_registry:
            qa_system.current_model = model_name
            qa_system._release_model_resources()
            qa_system.conversation_chain = None
            print(f"已切换到 {model_name.upper()} 模型")
            return True
        else:
            print(f"❌ 无效模型，可用选项：{list(qa_system.llm_registry.keys())}")
            return False

    # main.py 中的 process_command 函数部分
    elif command.startswith("/compare"):
        query_part = command[8:].strip()
        if not query_part:
            print("请输入多行查询（输入空行结束）：")
            lines = []
            while True:
                line = input().rstrip()
                if line == "":
                    break
                lines.append(line)
            query = "\n".join(lines)
        else:
            query = query_part

        if not query:
            print("请提供查询内容，格式：/compare [查询内容]")
            return False

        print(f"\n正在比较各模型的响应...")
        results = {}
        for name, model in qa_system.llm_registry.items():
            try:
                start_time = time.time()
                if qa_system.qa_chain:  # 检查是否有文档上传
                    # 使用 qa_chain 生成基于文档的回答
                    qa_system.qa_chain = RetrievalQA.from_chain_type(
                        llm=model,
                        chain_type="stuff",
                        retriever=qa_system.vector_db.as_retriever(search_kwargs={"k": 3}),
                        return_source_documents=True
                    )
                    result = qa_system.qa_chain({"query": query})
                    response = f"{result['result']}\n📚 来源：{result['source_documents'][0].metadata['source']}"
                else:
                    # 没有文档上传，直接调用模型
                    response = model.invoke(query)
                end_time = time.time()
                latency = end_time - start_time
                results[name] = {
                    "response": response,
                    "latency": f"{latency:.2f}s"
                }
                # 释放资源
                del response
                qa_system._release_model_resources()
            except Exception as e:
                print(f"{name.upper()} 调用出错：{str(e)}")

        print("\n比较结果：")
        for model_name, data in results.items():
            print(f"{model_name.upper()}:")
            print(f"延迟：{data['latency']}")
            print(f"响应预览：{data['response'][:200]}...\n")

        export_file = export_to_excel(results, query)
        if export_file:
            print(f"\n已导出结果至：{os.path.abspath(export_file)}")
        else:
            print("❌ \n导出结果失败")
        return True

    elif command == "/help":
        print_help()
        return True

    elif command == "/upload":
        file_path = input("📂 请输入文件路径：").strip()
        if not os.path.exists(file_path):
            print("❌ 文件不存在")
            return False
        if load_document(qa_system, file_path):
            print("📄  文档加载成功")
            return True
        print("❌ 文档加载失败")
        return False

    else:
        print("❌ 未知命令，输入/help查看帮助")
        return False

def process_query(query: str, qa_system: DocumentQASystem, current_model: str) -> None:
    """处理用户查询"""
    try:
        if qa_system.qa_chain:
            result = qa_system.qa_chain({"query": query})
            response = f"{result['result']}\n来源：{result['source_documents'][0].metadata['source']}"
        else:
            if not qa_system.conversation_chain:
                from langchain.chains import ConversationChain
                qa_system.conversation_chain = ConversationChain(
                    llm=qa_system.llm_registry[qa_system.current_model],
                    memory=qa_system.memory
                )
            response = qa_system.conversation_chain.predict(input=query)

        print(f"\n{current_model.upper()}:", response)

    except Exception as e:
        error_msg = f"❌ 处理错误：{str(e)}"
        logging.error(error_msg)
        print(error_msg)

def main():
    qa_system = DocumentQASystem()
    print_help()
    current_model = qa_system.current_model

    while True:
        try:
            # 初始化输入收集
            user_input = []
            print("\nYou: (输入内容，连按两次回车提交)")

            # 多行输入循环
            while True:
                line = input().strip()

                # 退出指令处理
                if line.lower() in ["exit", "quit"]:
                    print("👋  再见！")
                    return

                # 命令立即执行
                if line.startswith("/"):
                    process_command(line, qa_system)
                    current_model = qa_system.current_model
                    break

                # 空行表示提交输入
                if not line:
                    if user_input:
                        full_query = "\n".join(user_input)
                        print("🤖 Model思考中......")
                        process_query(full_query, qa_system, current_model)
                    user_input = []
                    break

                user_input.append(line)

        except KeyboardInterrupt:
            print("\n输入中断，输入 exit 退出程序")
        except Exception as e:
            logging.error(f"系统错误：{str(e)}")
            print("❌ 发生意外错误，请重新尝试")

if __name__ == "__main__":
    main()