### 🔧 环境配置和检查

#### 概述

本教程需要特定的环境配置以确保最佳学习体验。以下配置将帮助您：

- 使用统一的conda环境：激活统一的学习环境
- 通过国内镜像源快速安装依赖：配置pip使用清华镜像源
- 加速模型下载：设置HuggingFace镜像代理
- 检查系统配置：检查硬件和软件配置

#### 配置

- **所需环境及其依赖已经部署好**
- 在`Notebook`右上角选择`jupyter内核`为`python(flyai_agent_in_action)`，即可执行下方代码

In [None]:
%%script bash

# 1. 激活 conda 环境 (仅对当前单元格有效)
eval "$(conda shell.bash hook)"
conda activate flyai_agent_in_action

echo "========================================="
echo "== Conda 环境检查报告 (仅针对当前 Bash 子进程) =="
echo "========================================="

# 2. 检查当前激活的环境
CURRENT_ENV_NAME=$(basename $CONDA_PREFIX)

if [ "$CURRENT_ENV_NAME" = "flyai_agent_in_action" ]; then
    echo "✅ 当前单元格已成功激活到 flyai_agent_in_action 环境。"
    echo "✅ 正在使用的环境路径: $CONDA_PREFIX"
    echo ""
    echo "💡 提示: 后续的 Python 单元格将使用 Notebook 当前选择的 Jupyter 内核。"
    echo "   如果需要后续单元格也使用此环境，请执行以下操作:"
    echo "   1. 检查 Notebook 右上角是否已选择 'python(flyai_agent_in_action)'。"
else
    echo "❌ 激活失败或环境名称不匹配。当前环境: $CURRENT_ENV_NAME"
    echo ""
    echo "⚠️ 严重提示: 建议将 Notebook 的 Jupyter **内核 (Kernel)** 切换为 'python(flyai_agent_in_action)'。"
    echo "   (通常位于 Notebook 右上角或 '内核' 菜单中)"
    echo ""
    echo "📚 备用方法 (不推荐): 如果无法切换内核，则必须在**每个**代码单元格的头部重复以下命令:"
    echo ""
    echo "%%script bash"
    echo "# 必须在每个单元格都执行"
    echo "eval \"\$(conda shell.bash hook)\""
    echo "conda activate flyai_agent_in_action"
fi

echo "=========================================" 

In [None]:
# 2. 设置pip 为清华源
%pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
%pip config list


In [None]:
# 3. 设置HuggingFace代理
%env HF_ENDPOINT=https://hf-mirror.com
# 验证：使用shell命令检查
!echo $HF_ENDPOINT

In [None]:
# 🔍 环境信息检查脚本
#
# 本脚本的作用：
# 1. 安装 pandas 库用于数据表格展示
# 2. 检查系统的各项配置信息
# 3. 生成详细的环境报告表格
#
# 对于初学者来说，这个步骤帮助您：
# - 了解当前运行环境的硬件配置
# - 确认是否满足模型运行的最低要求
# - 学习如何通过代码获取系统信息

# 安装 pandas 库 - 用于创建和展示数据表格
# pandas 是 Python 中最流行的数据处理和分析库
%pip install pandas==2.2.2 tabulate==0.9.0

import platform # 导入 platform 模块以获取系统信息
import os # 导入 os 模块以与操作系统交互
import subprocess # 导入 subprocess 模块以运行外部命令
import pandas as pd # 导入 pandas 模块，通常用于数据处理，这里用于创建表格
import shutil # 导入 shutil 模块以获取磁盘空间信息

# 获取 CPU 信息的函数，包括核心数量
def get_cpu_info():
    cpu_info = "" # 初始化 CPU 信息字符串
    physical_cores = "N/A"
    logical_cores = "N/A"

    if platform.system() == "Windows": # 如果是 Windows 系统
        cpu_info = platform.processor() # 使用 platform.processor() 获取 CPU 信息
        try:
            # 获取 Windows 上的核心数量 (需要 WMI)
            import wmi
            c = wmi.WMI()
            for proc in c.Win32_Processor():
                physical_cores = proc.NumberOfCores
                logical_cores = proc.NumberOfLogicalProcessors
        except:
            pass # 如果 WMI 不可用，忽略错误

    elif platform.system() == "Darwin": # 如果是 macOS 系统
        # 在 macOS 上使用 sysctl 命令获取 CPU 信息和核心数量
        os.environ['PATH'] = os.environ['PATH'] + os.pathsep + '/usr/sbin' # 更新 PATH 环境变量
        try:
            process_brand = subprocess.Popen(['sysctl', "machdep.cpu.brand_string"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_brand, stderr_brand = process_brand.communicate()
            cpu_info = stdout_brand.decode().split(': ')[1].strip() if stdout_brand else "Could not retrieve CPU info"

            process_physical = subprocess.Popen(['sysctl', "hw.physicalcpu"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_physical, stderr_physical = process_physical.communicate()
            physical_cores = stdout_physical.decode().split(': ')[1].strip() if stdout_physical else "N/A"

            process_logical = subprocess.Popen(['sysctl', "hw.logicalcpu"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_logical, stderr_logical = process_logical.communicate()
            logical_cores = stdout_logical.decode().split(': ')[1].strip() if stdout_logical else "N/A"

        except:
            cpu_info = "Could not retrieve CPU info"
            physical_cores = "N/A"
            logical_cores = "N/A"

    else:  # Linux 系统
        try:
            # 在 Linux 上读取 /proc/cpuinfo 文件获取 CPU 信息和核心数量
            with open('/proc/cpuinfo') as f:
                physical_cores_count = 0
                logical_cores_count = 0
                cpu_info_lines = []
                for line in f:
                    if line.startswith('model name'): # 查找以 'model name'开头的行
                        if not cpu_info: # 只获取第一个 model name
                            cpu_info = line.split(': ')[1].strip()
                    elif line.startswith('cpu cores'): # 查找以 'cpu cores' 开头的行
                        physical_cores_count = int(line.split(': ')[1].strip())
                    elif line.startswith('processor'): # 查找以 'processor' 开头的行
                        logical_cores_count += 1
                physical_cores = str(physical_cores_count) if physical_cores_count > 0 else "N/A"
                logical_cores = str(logical_cores_count) if logical_cores_count > 0 else "N/A"
                if not cpu_info:
                     cpu_info = "Could not retrieve CPU info"

        except:
            cpu_info = "Could not retrieve CPU info"
            physical_cores = "N/A"
            logical_cores = "N/A"

    return f"{cpu_info} ({physical_cores} physical cores, {logical_cores} logical cores)" # 返回 CPU 信息和核心数量


# 获取内存信息的函数
def get_memory_info():
    mem_info = "" # 初始化内存信息字符串
    if platform.system() == "Windows":
        # 在 Windows 上不容易通过标准库获取，需要外部库或 PowerShell
        mem_info = "Requires external tools on Windows" # 设置提示信息
    elif platform.system() == "Darwin": # 如果是 macOS 系统
        # 在 macOS 上使用 sysctl 命令获取内存大小
        process = subprocess.Popen(['sysctl', "hw.memsize"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 运行 sysctl 命令
        stdout, stderr = process.communicate() # 获取标准输出和标准错误
        mem_bytes = int(stdout.decode().split(': ')[1].strip()) # 解析输出，获取内存大小（字节）
        mem_gb = mem_bytes / (1024**3) # 转换为 GB
        mem_info = f"{mem_gb:.2f} GB" # 格式化输出
    else:  # Linux 系统
        try:
            # 在 Linux 上读取 /proc/meminfo 文件获取内存信息
            with open('/proc/meminfo') as f:
                total_mem_kb = 0
                available_mem_kb = 0
                for line in f:
                    if line.startswith('MemTotal'): # 查找以 'MemTotal' 开头的行
                        total_mem_kb = int(line.split(':')[1].strip().split()[0]) # 解析行，获取总内存（KB）
                    elif line.startswith('MemAvailable'): # 查找以 'MemAvailable' 开头的行
                         available_mem_kb = int(line.split(':')[1].strip().split()[0]) # 解析行，获取可用内存（KB）

                if total_mem_kb > 0:
                    total_mem_gb = total_mem_kb / (1024**2) # 转换为 GB
                    mem_info = f"{total_mem_gb:.2f} GB" # 格式化输出总内存
                    if available_mem_kb > 0:
                        available_mem_gb = available_mem_kb / (1024**2)
                        mem_info += f" (Available: {available_mem_gb:.2f} GB)" # 添加可用内存信息
                else:
                     mem_info = "Could not retrieve memory info" # 如果读取文件出错，设置错误信息

        except:
            mem_info = "Could not retrieve memory info" # 如果读取文件出错，设置错误信息
    return mem_info # 返回内存信息

# 获取 GPU 信息的函数，包括显存
def get_gpu_info():
    try:
        # 尝试使用 nvidia-smi 获取 NVIDIA GPU 信息和显存
        result = subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total', '--format=csv,noheader'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            gpu_lines = result.stdout.strip().split('\n') # 解析输出，获取 GPU 名称和显存
            gpu_info_list = []
            for line in gpu_lines:
                name, memory = line.split(', ')
                gpu_info_list.append(f"{name} ({memory})") # 格式化 GPU 信息
            return ", ".join(gpu_info_list) if gpu_info_list else "NVIDIA GPU found, but info not listed" # 返回 GPU 信息或提示信息
        else:
             # 尝试使用 lshw 获取其他 GPU 信息 (需要安装 lshw)
            try:
                result_lshw = subprocess.run(['lshw', '-C', 'display'], capture_output=True, text=True)
                if result_lshw.returncode == 0: # 如果命令成功执行
                     # 简单解析输出中的 product 名称和显存
                    gpu_info_lines = []
                    current_gpu = {}
                    for line in result_lshw.stdout.splitlines():
                        if 'product:' in line:
                             if current_gpu:
                                 gpu_info_lines.append(f"{current_gpu.get('product', 'GPU')} ({current_gpu.get('memory', 'N/A')})")
                             current_gpu = {'product': line.split('product:')[1].strip()}
                        elif 'size:' in line and 'memory' in line:
                             current_gpu['memory'] = line.split('size:')[1].strip()

                    if current_gpu: # 添加最后一个 GPU 的信息
                        gpu_info_lines.append(f"{current_gpu.get('product', 'GPU')} ({current_gpu.get('memory', 'N/A')})")

                    return ", ".join(gpu_info_lines) if gpu_info_lines else "GPU found (via lshw), but info not parsed" # 如果找到 GPU 但信息无法解析，设置提示信息
                else:
                    return "No GPU found (checked nvidia-smi and lshw)" # 如果两个命令都找不到 GPU，设置提示信息
            except FileNotFoundError:
                 return "No GPU found (checked nvidia-smi, lshw not found)" # 如果找不到 lshw 命令，设置提示信息
    except FileNotFoundError:
        return "No GPU found (nvidia-smi not found)" # 如果找不到 nvidia-smi 命令，设置提示信息


# 获取 CUDA 版本的函数
def get_cuda_version():
    try:
        # 尝试使用 nvcc --version 获取 CUDA 版本
        result = subprocess.run(['nvcc', '--version'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            for line in result.stdout.splitlines():
                if 'release' in line: # 查找包含 'release' 的行
                    return line.split('release ')[1].split(',')[0] # 解析行，提取版本号
        return "CUDA not found or version not parsed" # 如果找不到 CUDA 或版本无法解析，设置提示信息
    except FileNotFoundError:
        return "CUDA not found" # 如果找不到 nvcc 命令，设置提示信息

# 获取 Python 版本的函数
def get_python_version():
    return platform.python_version() # 获取 Python 版本

# 获取 Conda 版本的函数
def get_conda_version():
    try:
        # 尝试使用 conda --version 获取 Conda 版本
        result = subprocess.run(['conda', '--version'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            return result.stdout.strip() # 返回 Conda 版本
        return "Conda not found or version not parsed" # 如果找不到 Conda 或版本无法解析，设置提示信息
    except FileNotFoundError:
        return "Conda not found" # 如果找不到 conda 命令，设置提示信息

# 获取物理磁盘空间信息的函数
def get_disk_space():
    try:
        total, used, free = shutil.disk_usage("/") # 获取根目录的磁盘使用情况
        total_gb = total / (1024**3) # 转换为 GB
        used_gb = used / (1024**3) # 转换为 GB
        free_gb = free / (1024**3) # 转换为 GB
        return f"Total: {total_gb:.2f} GB, Used: {used_gb:.2f} GB, Free: {free_gb:.2f} GB" # 格式化输出
    except Exception as e:
        return f"Could not retrieve disk info: {e}" # 如果获取信息出错，设置错误信息

# 获取环境信息
os_name = platform.system() # 获取操作系统名称
os_version = platform.release() # 获取操作系统版本
if os_name == "Linux":
    try:
        # 在 Linux 上尝试获取发行版和版本
        lsb_info = subprocess.run(['lsb_release', '-a'], capture_output=True, text=True)
        if lsb_info.returncode == 0: # 如果命令成功执行
            for line in lsb_info.stdout.splitlines():
                if 'Description:' in line: # 查找包含 'Description:' 的行
                    os_version = line.split('Description:')[1].strip() # 提取描述信息作为版本
                    break # 找到后退出循环
                elif 'Release:' in line: # 查找包含 'Release:' 的行
                     os_version = line.split('Release:')[1].strip() # 提取版本号
                     # 尝试获取 codename
                     try:
                         codename_info = subprocess.run(['lsb_release', '-c'], capture_output=True, text=True)
                         if codename_info.returncode == 0:
                             os_version += f" ({codename_info.stdout.split(':')[1].strip()})" # 将 codename 添加到版本信息中
                     except:
                         pass # 如果获取 codename 失败则忽略

    except FileNotFoundError:
        pass # lsb_release 可能未安装，忽略错误

full_os_info = f"{os_name} {os_version}" # 组合完整的操作系统信息
cpu_info = get_cpu_info() # 调用函数获取 CPU 信息和核心数量
memory_info = get_memory_info() # 调用函数获取内存信息
gpu_info = get_gpu_info() # 调用函数获取 GPU 信息和显存
cuda_version = get_cuda_version() # 调用函数获取 CUDA 版本
python_version = get_python_version() # 调用函数获取 Python 版本
conda_version = get_conda_version() # 调用函数获取 Conda 版本
disk_info = get_disk_space() # 调用函数获取物理磁盘空间信息


# 创建用于存储数据的字典
env_data = {
    "项目": [ # 项目名称列表
        "操作系统",
        "CPU 信息",
        "内存信息",
        "GPU 信息",
        "CUDA 信息",
        "Python 版本",
        "Conda 版本",
        "物理磁盘空间" # 添加物理磁盘空间
    ],
    "信息": [ # 对应的信息列表
        full_os_info,
        cpu_info,
        memory_info,
        gpu_info,
        cuda_version,
        python_version,
        conda_version,
        disk_info # 添加物理磁盘空间信息
    ]
}

# 创建一个 pandas DataFrame
df = pd.DataFrame(env_data)

# 打印表格
print("### 环境信息") # 打印标题
print(df.to_markdown(index=False)) # 将 DataFrame 转换为 Markdown 格式并打印，不包含索引


<a href="https://colab.research.google.com/github/FlyAIBox/AIAgent101/blob/main/06-agent-evaluation/langfuse/03_evaluation_with_langchain.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

---
# 在 Langfuse 上运行 LangChain 评测

本指南演示如何使用基于模型的评测，自动化评估 Langfuse 中线上产出的 LLM 完成结果。示例使用 LangChain框架

本指南分三步：
1. 从 Langfuse 获取线上存储的 `generations`
2. 使用 LangChain 对这些 `generations` 进行评测
3. 将结果作为 `scores` 回灌到 Langfuse


### 环境准备

先用 pip 安装 Langfuse 与 LangChain，然后设置环境变量。

In [1]:
# 🧰 安装示例所需的核心依赖包
# 使用 IPython 的 %pip 魔法命令可以在 notebook 内直接安装依赖，效果等同于在终端执行 `pip install`
# - langfuse: Langfuse 平台的 Python SDK，用于记录与评测 LLM 应用
# - langchain: 构建大模型应用的主框架，提供链、代理、工具等抽象
# - langchain-deepseek: LangChain 对 DeepSeek 系列模型的封装，便于统一调用接口
%pip install langfuse==3.3.0 langchain==0.3.27 langchain-openai==0.3.31 langchain-deepseek==0.1.4




In [2]:
# 🔐 环境变量配置 - 安全存储敏感信息
# 环境变量是存储API密钥等敏感信息的最佳实践
# 避免在代码中硬编码密钥，防止泄露

import os, getpass

def _set_env(var: str):
    """
    安全地设置环境变量
    如果环境变量不存在，会提示用户输入
    使用getpass模块隐藏输入内容，防止密码泄露
    """
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

# 🤖 DeepSeek API 配置
# OpenAI API密钥：从 https://platform.deepseek.com/api_keys 获取
_set_env("DEEPSEEK_API_KEY")

# 🌐 Langfuse 配置
# Langfuse是一个可观测性平台，需要注册账户获取密钥
# 注册地址：https://cloud.langfuse.com

# 公开密钥：用于标识你的项目
_set_env("LANGFUSE_PUBLIC_KEY")

# 秘密密钥：用于认证，请妥善保管
_set_env("LANGFUSE_SECRET_KEY")

# 服务器地址：选择离你最近的区域
# 🇪🇺 欧盟区域(推荐) https://cloud.langfuse.com
# 🇺🇸 美国区域 https://us.cloud.langfuse.com
_set_env("LANGFUSE_HOST")

# 💡 初学者提示：
# 1. 环境变量存储在操作系统中，重启后需要重新设置
# 2. 生产环境中建议使用.env文件或云服务配置
# 3. 永远不要在代码中硬编码API密钥！

DEEPSEEK_API_KEY: ··········
LANGFUSE_PUBLIC_KEY: ··········
LANGFUSE_SECRET_KEY: ··········
LANGFUSE_HOST: ··········


In [3]:
import os

# ⚙️ 指定在评测阶段要使用的 LLM 名称
# 这里默认采用 "gpt-3.5-turbo-instruct" ，你也可以按需切换为 gpt-4 等更强模型（成本更高）。
os.environ["EVAL_MODEL"] = "deepseek-chat"

# 🗂️ 配置 LangChain 内置的多维度评测开关
# 将要启用的评测维度设置为 True；False 表示跳过该指标。
EVAL_TYPES = {
    "hallucination": True,   # 幻觉：输出是否包含输入或参考中不存在的虚假信息
    "conciseness": True,     # 简洁性：回答是否言简意赅、避免冗余
    "relevance": True,       # 相关性：回答是否紧扣提问或任务
    "coherence": True,       # 连贯性：段落组织是否顺畅、逻辑是否自洽
    "harmfulness": True,     # 有害性：是否出现危险、伤害或不当内容
    "maliciousness": True,   # 恶意性：是否有恶意意图，例如煽动攻击
    "helpfulness": True,     # 有用性：回答是否提供实质性帮助
    "controversiality": True,# 争议性：是否包含易引发争议或极端观点
    "misogyny": True,        # 性别歧视：是否具有歧视女性的言论
    "criminality": True,     # 犯罪性：是否鼓励或描述犯罪行为
    "insensitivity": True    # 不敏感性：是否对敏感群体、事件缺乏尊重
}


初始化 Langfuse Python SDK，更多信息见[此处](https://langfuse.com/docs/sdk/python#1-installation)。

In [4]:
# 📡 连接 Langfuse 平台以读取评测样本
from langfuse import get_client

# Langfuse 客户端会自动读取刚才设置的环境变量完成认证
langfuse = get_client()

# ✅ 快速健康检查：确保密钥与网络配置正确
if langfuse.auth_check():
    print("Langfuse 客户端已通过认证，准备就绪！")
    print("现在可以开始从 Langfuse 获取数据并进行评测")
else:
    print("认证失败。请检查以下设置：")
    print("- LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY 是否填写正确")
    print("- LANGFUSE_HOST 是否指向正确的区域 (EU / US)")
    print("- 当前网络是否可以访问对应的 Langfuse 服务")


Langfuse 客户端已通过认证，准备就绪！
现在可以开始从 Langfuse 获取数据并进行评测


### 拉取数据

根据 `name` 从 Langfuse 载入所有 `generations`，此处示例为 `OpenAI`。在 Langfuse 中，`name` 用于标识应用内不同类型的生成。将其替换为你需要评测的名称。

关于在写入 LLM Generation 时如何设置 `name`，参见[文档](https://langfuse.com/docs/sdk/python#generation)。

In [5]:
def fetch_all_pages(name=None, user_id=None, limit=50):
    """从 Langfuse 分页拉取 trace 数据，直到拿齐所有结果。"""
    page = 1            # Langfuse API 的页码从 1 起步
    all_data = []       # 用列表收集每一页返回的数据

    while True:
        # 通过 SDK 调用后端接口。可以附加 name / user_id 过滤条件，limit 控制单页大小。
        response = langfuse.api.trace.list(name=name, limit=limit, user_id=user_id, page=page)

        # 当某一页没有数据时，说明遍历完毕，跳出循环。
        if not response.data:
            break

        # 将当前页的所有 trace 追加到结果列表中
        all_data.extend(response.data)
        page += 1  # 自增页码，继续请求下一页

    return all_data


In [6]:
# 📥 调用自定义工具，拉取指定用户的全部 trace
# 实际使用时请替换为你自己业务里记录的用户标识，如user_id='user_123'
generations = fetch_all_pages(name='OpenAI-generation')

print(f"成功获取到 {len(generations)} 条 trace 数据")


成功获取到 2 条 trace 数据


In [7]:
# 🔎 快速浏览拉取到的原始数据结构，帮助理解后续字段的来源
def _print_generations_preview(items):
    if not items:
        print()  # 分隔提示信息
        print("⚠️ 没有找到任何 trace 数据！请检查下列事项：")
        print("1. user_id 是否填写正确")
        print("2. Langfuse 项目中是否已有生成记录")
        print("3. 当前网络能否访问 Langfuse")
        return

    print("获取到的 trace 数据示例 (仅展示前 3 条)：")
    for item in items[:3]:
        print("-" * 60)
        print(f"trace_id: {item.id}")
        print(f"input: {item.input}")
        print(f"output: {item.output}")
        print(f"timestamp: {item.timestamp}")

_print_generations_preview(generations)


获取到的 trace 数据示例 (仅展示前 3 条)：
------------------------------------------------------------
trace_id: eafc67e8bfc5dadf75c681e1863c22a4
input: [{'role': 'system', 'content': '\n    你是一个有用的数学辅导老师。你将收到一个数学问题，\n    你的目标是输出逐步解决方案以及最终答案。\n    对于每个步骤，只需提供输出作为方程式，使用解释字段详细说明推理过程。\n'}, {'role': 'user', 'content': '如何解这个方程：8x + 7 = -23'}]
output: {'role': 'assistant', 'content': '{"final_answer":"x = -3.75","steps":[{"explanation":"首先，我们要将等式中的常数项移到方程的另一边，以便能够隔离变量x。我们通过从等式两边减去7来做到这一点。","output":"8x + 7 - 7 = -23 - 7"},{"explanation":"计算等式两边的简化结果。左边的7减去7得到0，而右边的-23减去7得到-30。","output":"8x = -30"},{"explanation":"现在，我们要通过除以系数8来使x单独成为方程中的一项。我们通过在两边除以8来做到这一点。","output":"8x/8 = -30/8"},{"explanation":"计算等式的结果，x等于-30除以8。","output":"x = -3.75"}]}'}
timestamp: 2025-09-23 03:01:08.296000+00:00
------------------------------------------------------------
trace_id: 058c85fac3a31c8fcad5291467b92633
input: [{'role': 'system', 'content': '\n    你是一个有用的数学辅导老师。你将收到一个数学问题，\n    你的目标是输出逐步解决方案以及最终答案。\n    对于每个步骤，只需提供输出作

In [8]:
# 🆔 示例：查看第一条 trace 的唯一 ID，可在 Langfuse 前端用它定位记录
# 仅当成功拉取到数据后再访问列表元素，避免 IndexError。
if generations:
    generations[0].id
    print(f"第一条 trace 的唯一 ID：{generations[0].id}")


第一条 trace 的唯一 ID：eafc67e8bfc5dadf75c681e1863c22a4


### 定义评测函数

本节基于 `EVAL_TYPES` 定义 LangChain 评测器；其中“幻觉”（hallucination）需要单独函数。关于 LangChain 评测的更多信息见[此处](https://python.langchain.com/docs/guides/evaluation/string/criteria_eval_chain)。

In [9]:
# 🛠️ 导入 LangChain 评测工具与 OpenAI 模型封装
from langchain.evaluation import load_evaluator
from langchain.evaluation.criteria import LabeledCriteriaEvalChain
# from langchain_openai import OpenAI
from langchain_deepseek import ChatDeepSeek


def get_evaluator_for_key(key: str):
    """为指定的评测维度加载 LangChain 内置评测器。"""
    # temperature 设为 0 以获得确定性更高的评测结果。
    llm = ChatDeepSeek(temperature=0, model=os.environ.get("EVAL_MODEL"))
    # load_evaluator 会返回一个可直接调用的评测链对象。
    return load_evaluator("criteria", criteria=key, llm=llm)


def get_hallucination_eval():
    """单独构建“幻觉”维度的评测链（Hallucination 需要参考文本）。"""
    criteria = {
        "hallucination": (
            "这个提交是否包含输入或参考中不存在的信息？"
        )
    }
    llm = ChatDeepSeek(temperature=0, model=os.environ.get("EVAL_MODEL"))
    return LabeledCriteriaEvalChain.from_llm(llm=llm, criteria=criteria)


### 执行评测

下面将对上面载入的每个 `Generation` 执行评测。每个得分将通过 [`langfuse.score()`](https://langfuse.com/docs/scores) 写回 Langfuse。


In [10]:
def execute_eval_and_score():
    """遍历所有 trace，针对开启的评测维度逐项打分。"""

    for generation in generations:
        # 过滤出所有开启的评测维度（除 hallucination 外，后者单独处理）
        criteria = [key for key, enabled in EVAL_TYPES.items() if enabled and key != "hallucination"]

        for criterion in criteria:
            # evaluate_strings 会返回一个包含 score 与 reasoning 的字典
            eval_result = get_evaluator_for_key(criterion).evaluate_strings(
                prediction=generation.output,
                input=generation.input,
            )
            print(eval_result)

            # 将评测得分写回 Langfuse，trace_id / observation_id 可用于后续回放
            langfuse.create_score(
                name=criterion,
                trace_id=generation.id,
                observation_id=generation.id,
                value=eval_result["score"],
                comment=eval_result["reasoning"],
            )


execute_eval_and_score()


{'reasoning': 'Let\'s break down the criterion:  \n\n**Criterion:** *conciseness: Is the submission concise and to the point?*  \n\nThe submission is a JSON-formatted response containing:  \n- A final answer  \n- A list of steps, each with an "explanation" and an "output"  \n\nThe explanations are written in full sentences and are pedagogically clear, but they are not strictly minimal. For example:  \n- "首先，我们要将等式中的常数项移到方程的另一边，以便能够隔离变量x。我们通过从等式两边减去7来做到这一点。"  \n  → Could be shorter: "两边减去7以移走常数项"  \n- "计算等式两边的简化结果。左边的7减去7得到0，而右边的-23减去7得到-30。"  \n  → Could be shorter: "简化得：8x = -30"  \n\nSince "conciseness" means avoiding unnecessary words and being direct, the submission includes more explanatory text than strictly needed for a purely concise answer.  \n\nThus, it does **not** fully meet the conciseness criterion.  \n\n**Final judgment:** N', 'value': 'N', 'score': 0}
{'reasoning': 'Let\'s break this down step by step.\n\n**Step 1: Understand the criterion**  \nThe criterion is: *releva

In [11]:
# 🎯 幻觉（hallucination）评测需要额外传入参考文本，这里单独处理

def eval_hallucination():
    chain = get_hallucination_eval()

    for generation in generations:
        eval_result = chain.evaluate_strings(
            prediction=generation.output,
            input=generation.input,
            reference=generation.input,  # 简单示例：以原始输入作为参考文本
        )
        print(eval_result)

        if (
            eval_result is not None
            and eval_result.get("score") is not None
            and eval_result.get("reasoning") is not None
        ):
            langfuse.create_score(
                name="hallucination",
                trace_id=generation.id,
                observation_id=generation.id,
                value=eval_result["score"],
                comment=eval_result["reasoning"],
            )


In [12]:
# ✅ 根据配置决定是否执行幻觉评测
if EVAL_TYPES.get("hallucination"):
    eval_hallucination()


{'reasoning': 'Let\'s break down the criteria:  \n\n**Criterion:** *Hallucination* — Does the submission contain information not present in the input or reference?  \n\n- **Input:** System prompt (math tutor role, step-by-step solution format) + user question: "如何解这个方程：8x + 7 = -23".  \n- **Reference:** Same as input (no extra solution content given).  \n- **Submission:** Assistant gives a step-by-step solution to the equation, with steps:  \n  1. Subtract 7 from both sides → 8x = -30  \n  2. Divide both sides by 8 → x = -30/8  \n  3. Final answer: x = -3.75  \n\n- **Analysis:**  \n  The equation given in the input is a standard linear equation. The steps shown are mathematically correct and directly derived from the given equation. No external facts or unrelated information are introduced. The solution process is logically required by the task and is not "hallucinated" in the sense of making up unsupported information.  \n\nThus, the submission does **not** contain hallucinated inform

In [13]:
# 📤 Langfuse Python SDK 内部使用异步队列发送数据，这里手动 flush 以确保所有打分已写入服务端
langfuse.flush()


### 在 Langfuse 中查看分数

在 Langfuse 界面中，你可以按 `Scores` 过滤 Traces，并查看每条的详细信息。

![image-20250923164445771](https://cdn.jsdelivr.net/gh/Fly0905/note-picture@main/imag/202509231644056.png)

