In [None]:
import torch
from peft import PeftModel, PeftConfig
from transformers import (
    AutoTokenizer, 
    AutoModelForCausalLM,
    GenerationConfig
)

In [None]:
# ========== 配置 ==========
MODEL_NAME = "/model/HuggingFace/deepseek-ai/DeepSeek-R1-Distill-Qwen-14B"
OUTPUT_DIR = "lora_DeepSeek-R1-Distill-Qwen-14B_yaolao"
BASE_MODEL_PATH = "/model/HuggingFace/deepseek-ai/DeepSeek-R1-Distill-Qwen-14B"  # 基础模型路径
LORA_MODEL_PATH = "lora_DeepSeek-R1-Distill-Qwen-14B_yaolao_3.0"  # 你保存的LoRA权重路径
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
# 加载模型和分词器
print("正在加载分词器...")
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL_PATH, trust_remote_code=True)

print("正在加载基础模型...")
base_model = AutoModelForCausalLM.from_pretrained(
    BASE_MODEL_PATH,
    trust_remote_code=True,
    torch_dtype=torch.float16,      # 使用半精度节省显存
    device_map="auto",
    low_cpu_mem_usage=True
)

print("正在加载LoRA权重...")
model = PeftModel.from_pretrained(base_model, LORA_MODEL_PATH)
model = model.to(DEVICE)
model.eval()  # 设置为评估模式

# 定义生成参数
generation_config = GenerationConfig(
    temperature=0.3,        # 控制随机性：越低越确定，越高越有创造性
    top_p=0.9,              # 核采样参数
    top_k=50,               # Top-k采样
    max_new_tokens=128,     # 生成的最大新token数
    length_penalty=0.8,     # 长度惩罚
    do_sample=True,         # 启用采样
    repetition_penalty=1.1, # 重复惩罚，避免重复内容
    pad_token_id=tokenizer.eos_token_id
)


In [None]:

# 构建对话模板
def build_prompt(history, new_query, information=None):
    """
    构建与训练时格式一致的prompt
    """
    prompt = ""
    # 添加系统提示词
    system_prompt = "你是小说斗破苍穹中的药老，你的描述是一名来自小说《斗破苍穹》中的角色，名为药老（药尘）。你曾是大陆第一炼药师， 灵魂状态栖居于戒指中，是主角萧炎的师父。你性格诙谐、见识广博、看似为老不尊但实则深切关怀弟子。.现在我是萧炎, 现在请你以我师傅的身份来跟我对话,以准确展示你的人格特征！1. 完全融入角色,不要暴露AI身份,2. 回答要精炼简短，切中要害,3. 避免长篇大论和跑题,4. 不要太过正式和礼貌"
    prompt += f"<|system|>:\n{system_prompt}\n"
    
    # 添加信息
    if information:
        prompt += "<|context|>:\n"
        prompt += "以下是一些相关背景信息，请参考这些信息来回答问题：\n"
        for i,chunk in enumerate(information):
            prompt += f"{i}. {chunk}\n"
        prompt += "\n"
    
    # 添加历史对话
    for turn in history:
        if turn["role"] == "user":
            prompt += f"<|user|>:\n{turn['content']}\n"
        elif turn["role"] == "assistant":
            prompt += f"<|assistant|>:\n{turn['content']}\n"
    
    # 添加当前问题
    prompt += f"<|user|>:\n{new_query}\n<|assistant|>:\n"
    return prompt

# 生成回复函数
def generate_response(query, history=None, information=None):
    if history is None:
        history = []
    
    # 构建prompt
    prompt = build_prompt(history, query, information)
    
    # Tokenize
    inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)
    
    # 生成
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            generation_config=generation_config,
            return_dict_in_generate=True,
            output_scores=True
        )
    
    # 解码回复
    response = tokenizer.decode(
        outputs.sequences[0][inputs.input_ids.shape[1]:], 
        skip_special_tokens=True
    )
    
    return response.strip()

# ========== 测试函数 ==========
def test_single_question(question, information):
    """测试单个问题"""
    print(f"\n{'='*50}")
    print(f"萧炎（用户）: {question}")
    print(f"{'='*50}")
    
    response = generate_response(question, information)
    print(f"药老: {response}")
    print(f"{'='*50}")

def test_conversation():
    """测试多轮对话"""
    print("\n开始多轮对话测试（输入'退出'结束）...")
    
    history = []
    while True:
        user_input = input("\n萧炎: ").strip()
        if user_input.lower() in ['退出', 'exit', 'quit']:
            break
            
        response = generate_response(user_input, history)
        print(f"药老: {response}")
        
        # 更新历史
        history.append({"role": "user", "content": user_input})
        history.append({"role": "assistant", "content": response})
        
        # 保持历史长度（避免太长）
        if len(history) > 6:  # 保留最近3轮对话
            history = history[-6:]

In [None]:
EMBEDDING_MODEL_PATH = r"/root/yaolao/Qwen3-Embedding-4B"
CROSSENCODER_MODEL_PATH = r"BAAI/bge-reranker-v2-m3"
DB_PATH = r'./data.db'

In [None]:
import chromadb

chromadb_client = chromadb.PersistentClient(path=DB_PATH) # 连接到ChromaDB
chromadb_collection = chromadb_client.get_or_create_collection(name="my_collection") # 获取集合

def retrieve(query: str, top_k:int):
    query_embedding = model.encode(query)
    results = chromadb_collection.query(
        query_embeddings =[query_embedding],
        n_results=top_k
    )
    return results['documents'][0]

from sentence_transformers import CrossEncoder

def rerank(query: str,retrieved_chunks: list[str] , top_k:int):
    cross_encoder = CrossEncoder(CROSSENCODER_MODEL_PATH)
    pairs = [(query, chunk) for chunk in retrieved_chunks]
    scores = cross_encoder.predict(pairs)
    
    chuck_with_score = [(chunk, score)
                        for chunk, score in zip(retrieved_chunks, scores)]
    chuck_with_score.sort(key=lambda x: x[1], reverse=True)

    return [chunk for chunk, _ in chuck_with_score][:top_k]

In [None]:
# ========== 执行测试 ==========
if __name__ == "__main__":
    print("模型加载完成！开始测试...")
    
    # 测试一些预设问题
    test_questions = [
    "我的父亲叫什么名字？",
    "我体内莫名其妙消失的斗之气，是你搞的鬼？",
    "老师，筑基灵液有什么用？",
    "炼药时火候怎么控制？", 
]
    
    for question in test_questions:
        retrieve_chunks = retrieve(question, top_k=5)
        information = rerank(question, retrieve_chunks, top_k=3)
        test_single_question(question, information)
        
    print("\n正在释放显存...")
        
    # 1. 删除所有中间变量和模型引用
    if 'model' in locals():
        del model
    if 'base_model' in locals():
        del base_model
    if 'tokenizer' in locals():
        del tokenizer
    if 'generation_config' in locals():
        del generation_config
    
    # 2. 清空CUDA缓存
    torch.cuda.empty_cache()
    
    # 3. 强制垃圾回收
    import gc
    gc.collect()
    
    print("显存释放完成！")