In [6]:
import json
import os
from typing import List, Dict, Any
from langchain.evaluation import load_evaluator, EvaluatorType
import langchain
from langchain_community.cache import InMemoryCache
langchain.llm_cache = InMemoryCache()
from langchain_openai import ChatOpenAI
import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

True

In [2]:
from bge_RAG import RAG
rag = RAG()
rag.add()

successful load embedding model
successful load rerank model
~ 正在读取文本
~ 正在添加进知识库


BM25 Embedding:   0%|          | 0/302 [00:00<?, ?step/s]Building prefix dict from the default dictionary ...
Loading model from cache C:\Users\20753\AppData\Local\Temp\jieba.cache
Loading model cost 0.757 seconds.
Prefix dict has been built successfully.
BM25 Embedding: 100%|██████████| 302/302 [00:01<00:00, 234.05step/s]
Annoy Embedding: 100%|██████████| 302/302 [00:10<00:00, 29.60step/s]

~ 添加成功





In [12]:
class RAGEvaluator:
    def __init__(self, api_key: str = None, criteria: Dict = None):
        """初始化RAG评估器"""
        # 设置OpenAI API密钥
        self.llm = ChatOpenAI(model=os.environ["Model"], temperature=float(os.environ['temperature']))
            
        # 加载评估器
        self.accuracy_evaluator = load_evaluator(EvaluatorType.QA, llm=self.llm)
        
        # 自定义评估标准
        if criteria:
            self.criteria_evaluator = load_evaluator(
                EvaluatorType.LABELED_CRITERIA, 
                criteria=criteria,
                llm=self.llm
            )
        else:
            # 默认评估标准
            self.criteria_evaluator = load_evaluator(
                EvaluatorType.LABELED_CRITERIA,
                criteria={
                    "正确性": "答案是否正确且事实一致",
                    "完整性": "答案是否包含所有必要信息",
                    "清晰度": "答案是否清晰易懂",
                    "简洁性": "答案是否简洁明了，避免冗余"
                },
                llm=self.llm
            )
            
        # 初始化LLM（用于评估）
        

    def evaluate_answers(self, result_path: str, llm_rag_func) -> List[Dict]:
        """
        评估RAG系统的回答质量
        
        参数:
            result_path: 结果JSON文件路径
            llm_rag_func: RAG系统的调用函数，输入问题，返回回答
            
        返回:
            包含评估结果的列表
        """
        # 读取结果文件
        with open(result_path, 'r', encoding='utf-8') as f:
            results = json.load(f)
            
        evaluation_results = []
        from tqdm import tqdm
        results = results[:10]
        for item in tqdm(results, desc="Evaluating"):
            question = item["question"]
            references = [item[f"answer_{i+1}"] for i in range(5) if f"answer_{i+1}" in item]
            
            # 调用RAG系统获取回答
            llm_answer = llm_rag_func(question)
            
            # 评估回答
            eval_result = self._evaluate_single_answer(question, llm_answer, references)
            evaluation_results.append({
                "question": question,
                "predicted_answer": llm_answer,
                "evaluation": eval_result
            })
            
        return evaluation_results

    def _evaluate_single_answer(self, question: str, predicted_answer: str, references: List[str]) -> Dict:
        """评估单个回答"""
        # 1. 计算准确性评分
        accuracy_result = self.accuracy_evaluator.evaluate_strings(
            prediction=predicted_answer,
            reference=references[0],  # 使用第一个参考答案
            input=question
        )
        
        # 2. 计算自定义标准评分
        criteria_result = self.criteria_evaluator.evaluate_strings(
            prediction=predicted_answer,
            reference=references[0],  # 使用第一个参考答案
            input=question
        )
        
        # 3. 计算与其他参考答案的相似度
        similarity_scores = []
        for ref in references:
            if ref.strip():  # 跳过空的参考答案
                similarity = self._calculate_similarity(predicted_answer, ref)
                similarity_scores.append(similarity)
        
        # 4. 合并评估结果
        return {
            "accuracy": accuracy_result.get("value", "UNKNOWN"),
            "accuracy_score": accuracy_result.get("score", None),
            "accuracy_reasoning": accuracy_result.get("reasoning", ""),
            
            "criteria": criteria_result,

            "avg_similarity": sum(similarity_scores) / len(similarity_scores) if similarity_scores else 0,
            "num_references": len(references)
        }


    def _calculate_similarity(self, text1: str, text2: str) -> float:
        """计算两个文本的相似度（简化版）"""
        # 在实际应用中，可以使用更复杂的相似度计算方法
        # 这里使用简单的字符匹配率作为示例
        common_chars = set(text1) & set(text2)
        total_chars = set(text1 + text2)
        return len(common_chars) / len(total_chars) if total_chars else 0

    def generate_report(self, evaluation_results: List[Dict], output_path: str = "evaluation_report.json") -> None:
        """生成评估报告"""
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(evaluation_results, f, ensure_ascii=False, indent=2)
            
        # 计算总体指标
        total_items = len(evaluation_results)
        correct_items = sum(1 for r in evaluation_results if r["evaluation"]["accuracy"].lower() == "correct")
        
        print(f"评估完成！共评估 {total_items} 个问题")
        print(f"准确率: {correct_items/total_items:.2%}")
        print(f"评估报告已保存至: {output_path}")

In [13]:
# 初始化评估器
evaluator = RAGEvaluator()

In [14]:
import langchain.cache
from langchain_core.runnables import RunnableSequence, RunnableMap, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# 定义 RAG 工作流
def create_rag_workflow():
    # 定义 LLM
    llm = ChatOpenAI(model=os.environ["Model"], temperature=int(os.environ['temperature']))

    # 定义输出解析器
    output_parser = StrOutputParser()

    # 定义 RAG 请求逻辑
    def rag_request_logic(inputs):
        user_input = inputs['user_input']
        content = rag.req(user_input, top_k=5)
        return {"Unstructured": content, "user_input": user_input}

    # 定义 Prompt 格式化逻辑
    def format_prompt(inputs):
        prompt = '''你是一个智能问答系统, 你会完全按照用户的要求进行返回.你的输出应该在描述清楚的情况下尽可能简短!不需要Markdown格式, 不需要打招呼等礼貌用语!
        ---需求
        你会根据知识库的查询内容(Unstructured documents) 和对话历史信息进行智能回答.
        - Unstructured documents:
        {Unstructured}
        --- 输入:
        {user_input}
        --- 输出:'''
        return prompt.format(**inputs)

    # 构建工作流
    workflow = RunnableLambda(rag_request_logic)|\
        RunnableLambda(format_prompt)|\
        llm | output_parser
        
    
    return workflow

# 创建工作流
workflow = create_rag_workflow()

# 执行工作流
user_input = '自动模式下，中央显示屏是如何切换日间和夜间模式的？'
result = workflow.invoke({"user_input": user_input})
print(result)

自动模式下中央显示屏有两种切换方式：1.日出到日落：白天日间模式，晚上夜间模式；2.自定时段：按设置时间段切换。


In [15]:
# 模拟RAG系统
def mock_rag_function(question: str) -> str:
    # 这里应该调用你的RAG系统
    # 为了演示，我们简单返回一个预设答案
    return workflow.invoke({"user_input": question})

# 评估
results = evaluator.evaluate_answers(
    result_path="data/result.json",
    llm_rag_func=mock_rag_function
)

Evaluating: 100%|██████████| 10/10 [02:28<00:00, 14.90s/it]


In [16]:
# 生成报告
evaluator.generate_report(results)

评估完成！共评估 10 个问题
准确率: 80.00%
评估报告已保存至: evaluation_report.json


根据知识库内容，关闭座椅加热功能可通过以下两种方式实现：

**方法一：通过中央显示屏**
1. 点击中央显示屏中的「座舱体验-座椅」，进入座椅设置界面。
2. 选择驾驶员或副驾驶员侧座椅加热控制界面。
3. 点击加热强度控制开关，循环切换至「关」状态。

**方法二：通过Lynk&Co App**
1. 登录Lynk&Co App，找到前排座椅加热图标。
2. 直接点击图标即可关闭加热功能（图标状态会同步显示开关状态）。

**注意事项：**
- 加热功能关闭后，座椅温度会逐渐恢复正常。
- 若乘客存在身体感知障碍（如病人、残疾人等），建议全程禁用该功能以确保安全。

两种方式均可即时生效，您可根据当前使用场景选择最便捷的操作方式。
