### 🔧 环境配置和检查

#### 概述

本教程需要特定的环境配置以确保最佳学习体验。以下配置将帮助你：

- 使用统一的conda环境：激活统一的学习环境
- 通过国内镜像源快速安装依赖：配置pip使用清华镜像源
- 加速模型下载：设置HuggingFace镜像代理
- 检查系统配置：检查硬件和软件配置

#### 配置

- **所需环境及其依赖已经部署好**
- 在`Notebook`右上角选择`jupyter内核`为`python(agent101)`，即可执行下方代码

In [1]:
%%script bash

# 1. 激活 conda 环境 (仅对当前单元格有效)
eval "$(conda shell.bash hook)"
conda activate agent101

echo "========================================="
echo "== Conda 环境检查报告 (仅针对当前 Bash 子进程) =="
echo "========================================="

# 2. 检查当前激活的环境
CURRENT_ENV_NAME=$(basename $CONDA_PREFIX)

if [ "$CURRENT_ENV_NAME" = "agent101" ]; then
    echo "✅ 当前单元格已成功激活到 agent101 环境。"
    echo "✅ 正在使用的环境路径: $CONDA_PREFIX"
    echo ""
    echo "💡 提示: 后续的Python单元格将使用Notebook当前选择的Jupyter内核。"
    echo "   如果需要后续单元格也使用此环境，请执行以下操作:"
    echo "   1. 检查 Notebook 右上角是否已选择 'python(agent101)'。"
else
    echo "❌ 激活失败或环境名称不匹配。当前环境: $CURRENT_ENV_NAME"
    echo ""
    echo "⚠️ 严重提示: 建议将 Notebook 的 Jupyter **内核 (Kernel)** 切换为 'python(agent101)'。"
    echo "   (通常位于 Notebook 右上角或 '内核' 菜单中)"
    echo ""
    echo "📚 备用方法 (不推荐): 如果无法切换内核，则必须在**每个**代码单元格的头部重复以下命令:"
    echo ""
    echo "%%script bash"
    echo "# 必须在每个单元格都执行"
    echo "eval \"\$(conda shell.bash hook)\""
    echo "conda activate agent101"
fi

echo "=========================================" 

== Conda 环境检查报告 (仅针对当前 Bash 子进��) ==
✅ 当前单元格已成功激活到 agent101 环境。
✅ 正在使用的环境路径: /root/miniconda3/envs/agent101

💡 提示: 后续的Python单元格将使用Notebook当前选择的Jupyter��核。
   如果需要后续单元格也使用此环境，请执行以下操作:
   1. 检查 Notebook 右上角是否已选择 'python(agent101)'。


In [2]:
# 2. 设置pip 为清华源
%pip config list -v set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
%pip config list -v list


For variant 'global', will try loading '/etc/xdg/pip/pip.conf'
For variant 'global', will try loading '/etc/pip.conf'
For variant 'user', will try loading '/root/.pip/pip.conf'
For variant 'user', will try loading '/root/.config/pip/pip.conf'
For variant 'site', will try loading '/root/miniconda3/envs/agent101/pip.conf'
[31mERROR: Got unexpected number of arguments, expected 0. (example: "/root/miniconda3/envs/agent101/bin/python -m pip config list")[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.
For variant 'global', will try loading '/etc/xdg/pip/pip.conf'
For variant 'global', will try loading '/etc/pip.conf'
For variant 'user', will try loading '/root/.pip/pip.conf'
For variant 'user', will try loading '/root/.config/pip/pip.conf'
For variant 'site', will try loading '/root/miniconda3/envs/agent101/pip.conf'
[31mERROR: Got unexpected number of arguments, expected 0. (example: "/root/miniconda3/envs/agent101/bin/python -m pip config list")[0m[31m


In [3]:
# 3. 设置HuggingFace代理
%env HF_ENDPOINT=https://hf-mirror.com
# 验证：使用shell命令检查
!echo $HF_ENDPOINT

env: HF_ENDPOINT=https://hf-mirror.com
https://hf-mirror.com


In [4]:
# 🔍 环境信息检查脚本
#
# 本脚本的作用：
# 1. 安装 pandas 库用于数据表格展示
# 2. 检查系统的各项配置信息
# 3. 生成详细的环境报告表格
#
# 对于初学者来说，这个步骤帮助你：
# - 了解当前运行环境的硬件配置
# - 确认是否满足模型运行的最低要求
# - 学习如何通过代码获取系统信息

# 安装 pandas 库 - 用于创建和展示数据表格
# pandas 是 Python 中最流行的数据处理和分析库
%pip install pandas==2.2.2 tabulate==0.9.0

import platform # 导入 platform 模块以获取系统信息
import os # 导入 os 模块以与操作系统交互
import subprocess # 导入 subprocess 模块以运行外部命令
import pandas as pd # 导入 pandas 模块，通常用于数据处理，这里用于创建表格
import shutil # 导入 shutil 模块以获取磁盘空间信息

# 获取 CPU 信息的函数，包括核心数量
def get_cpu_info():
    cpu_info = "" # 初始化 CPU 信息字符串
    physical_cores = "N/A"
    logical_cores = "N/A"

    if platform.system() == "Windows": # 如果是 Windows 系统
        cpu_info = platform.processor() # 使用 platform.processor() 获取 CPU 信息
        try:
            # 获取 Windows 上的核心数量 (需要 WMI)
            import wmi
            c = wmi.WMI()
            for proc in c.Win32_Processor():
                physical_cores = proc.NumberOfCores
                logical_cores = proc.NumberOfLogicalProcessors
        except:
            pass # 如果 WMI 不可用，忽略错误

    elif platform.system() == "Darwin": # 如果是 macOS 系统
        # 在 macOS 上使用 sysctl 命令获取 CPU 信息和核心数量
        os.environ['PATH'] = os.environ['PATH'] + os.pathsep + '/usr/sbin' # 更新 PATH 环境变量
        try:
            process_brand = subprocess.Popen(['sysctl', "machdep.cpu.brand_string"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_brand, stderr_brand = process_brand.communicate()
            cpu_info = stdout_brand.decode().split(': ')[1].strip() if stdout_brand else "Could not retrieve CPU info"

            process_physical = subprocess.Popen(['sysctl', "hw.physicalcpu"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_physical, stderr_physical = process_physical.communicate()
            physical_cores = stdout_physical.decode().split(': ')[1].strip() if stdout_physical else "N/A"

            process_logical = subprocess.Popen(['sysctl', "hw.logicalcpu"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_logical, stderr_logical = process_logical.communicate()
            logical_cores = stdout_logical.decode().split(': ')[1].strip() if stdout_logical else "N/A"

        except:
            cpu_info = "Could not retrieve CPU info"
            physical_cores = "N/A"
            logical_cores = "N/A"

    else:  # Linux 系统
        try:
            # 在 Linux 上读取 /proc/cpuinfo 文件获取 CPU 信息和核心数量
            with open('/proc/cpuinfo') as f:
                physical_cores_count = 0
                logical_cores_count = 0
                cpu_info_lines = []
                for line in f:
                    if line.startswith('model name'): # 查找以 'model name'开头的行
                        if not cpu_info: # 只获取第一个 model name
                            cpu_info = line.split(': ')[1].strip()
                    elif line.startswith('cpu cores'): # 查找以 'cpu cores' 开头的行
                        physical_cores_count = int(line.split(': ')[1].strip())
                    elif line.startswith('processor'): # 查找以 'processor' 开头的行
                        logical_cores_count += 1
                physical_cores = str(physical_cores_count) if physical_cores_count > 0 else "N/A"
                logical_cores = str(logical_cores_count) if logical_cores_count > 0 else "N/A"
                if not cpu_info:
                     cpu_info = "Could not retrieve CPU info"

        except:
            cpu_info = "Could not retrieve CPU info"
            physical_cores = "N/A"
            logical_cores = "N/A"

    return f"{cpu_info} ({physical_cores} physical cores, {logical_cores} logical cores)" # 返回 CPU 信息和核心数量


# 获取内存信息的函数
def get_memory_info():
    mem_info = "" # 初始化内存信息字符串
    if platform.system() == "Windows":
        # 在 Windows 上不容易通过标准库获取，需要外部库或 PowerShell
        mem_info = "Requires external tools on Windows" # 设置提示信息
    elif platform.system() == "Darwin": # 如果是 macOS 系统
        # 在 macOS 上使用 sysctl 命令获取内存大小
        process = subprocess.Popen(['sysctl', "hw.memsize"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 运行 sysctl 命令
        stdout, stderr = process.communicate() # 获取标准输出和标准错误
        mem_bytes = int(stdout.decode().split(': ')[1].strip()) # 解析输出，获取内存大小（字节）
        mem_gb = mem_bytes / (1024**3) # 转换为 GB
        mem_info = f"{mem_gb:.2f} GB" # 格式化输出
    else:  # Linux 系统
        try:
            # 在 Linux 上读取 /proc/meminfo 文件获取内存信息
            with open('/proc/meminfo') as f:
                total_mem_kb = 0
                available_mem_kb = 0
                for line in f:
                    if line.startswith('MemTotal'): # 查找以 'MemTotal' 开头的行
                        total_mem_kb = int(line.split(':')[1].strip().split()[0]) # 解析行，获取总内存（KB）
                    elif line.startswith('MemAvailable'): # 查找以 'MemAvailable' 开头的行
                         available_mem_kb = int(line.split(':')[1].strip().split()[0]) # 解析行，获取可用内存（KB）

                if total_mem_kb > 0:
                    total_mem_gb = total_mem_kb / (1024**2) # 转换为 GB
                    mem_info = f"{total_mem_gb:.2f} GB" # 格式化输出总内存
                    if available_mem_kb > 0:
                        available_mem_gb = available_mem_kb / (1024**2)
                        mem_info += f" (Available: {available_mem_gb:.2f} GB)" # 添加可用内存信息
                else:
                     mem_info = "Could not retrieve memory info" # 如果读取文件出错，设置错误信息

        except:
            mem_info = "Could not retrieve memory info" # 如果读取文件出错，设置错误信息
    return mem_info # 返回内存信息

# 获取 GPU 信息的函数，包括显存
def get_gpu_info():
    try:
        # 尝试使用 nvidia-smi 获取 NVIDIA GPU 信息和显存
        result = subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total', '--format=csv,noheader'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            gpu_lines = result.stdout.strip().split('\n') # 解析输出，获取 GPU 名称和显存
            gpu_info_list = []
            for line in gpu_lines:
                name, memory = line.split(', ')
                gpu_info_list.append(f"{name} ({memory})") # 格式化 GPU 信息
            return ", ".join(gpu_info_list) if gpu_info_list else "NVIDIA GPU found, but info not listed" # 返回 GPU 信息或提示信息
        else:
             # 尝试使用 lshw 获取其他 GPU 信息 (需要安装 lshw)
            try:
                result_lshw = subprocess.run(['lshw', '-C', 'display'], capture_output=True, text=True)
                if result_lshw.returncode == 0: # 如果命令成功执行
                     # 简单解析输出中的 product 名称和显存
                    gpu_info_lines = []
                    current_gpu = {}
                    for line in result_lshw.stdout.splitlines():
                        if 'product:' in line:
                             if current_gpu:
                                 gpu_info_lines.append(f"{current_gpu.get('product', 'GPU')} ({current_gpu.get('memory', 'N/A')})")
                             current_gpu = {'product': line.split('product:')[1].strip()}
                        elif 'size:' in line and 'memory' in line:
                             current_gpu['memory'] = line.split('size:')[1].strip()

                    if current_gpu: # 添加最后一个 GPU 的信息
                        gpu_info_lines.append(f"{current_gpu.get('product', 'GPU')} ({current_gpu.get('memory', 'N/A')})")

                    return ", ".join(gpu_info_lines) if gpu_info_lines else "GPU found (via lshw), but info not parsed" # 如果找到 GPU 但信息无法解析，设置提示信息
                else:
                    return "No GPU found (checked nvidia-smi and lshw)" # 如果两个命令都找不到 GPU，设置提示信息
            except FileNotFoundError:
                 return "No GPU found (checked nvidia-smi, lshw not found)" # 如果找不到 lshw 命令，设置提示信息
    except FileNotFoundError:
        return "No GPU found (nvidia-smi not found)" # 如果找不到 nvidia-smi 命令，设置提示信息


# 获取 CUDA 版本的函数
def get_cuda_version():
    try:
        # 尝试使用 nvcc --version 获取 CUDA 版本
        result = subprocess.run(['nvcc', '--version'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            for line in result.stdout.splitlines():
                if 'release' in line: # 查找包含 'release' 的行
                    return line.split('release ')[1].split(',')[0] # 解析行，提取版本号
        return "CUDA not found or version not parsed" # 如果找不到 CUDA 或版本无法解析，设置提示信息
    except FileNotFoundError:
        return "CUDA not found" # 如果找不到 nvcc 命令，设置提示信息

# 获取 Python 版本的函数
def get_python_version():
    return platform.python_version() # 获取 Python 版本

# 获取 Conda 版本的函数
def get_conda_version():
    try:
        # 尝试使用 conda --version 获取 Conda 版本
        result = subprocess.run(['conda', '--version'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            return result.stdout.strip() # 返回 Conda 版本
        return "Conda not found or version not parsed" # 如果找不到 Conda 或版本无法解析，设置提示信息
    except FileNotFoundError:
        return "Conda not found" # 如果找不到 conda 命令，设置提示信息

# 获取物理磁盘空间信息的函数
def get_disk_space():
    try:
        total, used, free = shutil.disk_usage("/") # 获取根目录的磁盘使用情况
        total_gb = total / (1024**3) # 转换为 GB
        used_gb = used / (1024**3) # 转换为 GB
        free_gb = free / (1024**3) # 转换为 GB
        return f"Total: {total_gb:.2f} GB, Used: {used_gb:.2f} GB, Free: {free_gb:.2f} GB" # 格式化输出
    except Exception as e:
        return f"Could not retrieve disk info: {e}" # 如果获取信息出错，设置错误信息

# 获取环境信息
os_name = platform.system() # 获取操作系统名称
os_version = platform.release() # 获取操作系统版本
if os_name == "Linux":
    try:
        # 在 Linux 上尝试获取发行版和版本
        lsb_info = subprocess.run(['lsb_release', '-a'], capture_output=True, text=True)
        if lsb_info.returncode == 0: # 如果命令成功执行
            for line in lsb_info.stdout.splitlines():
                if 'Description:' in line: # 查找包含 'Description:' 的行
                    os_version = line.split('Description:')[1].strip() # 提取描述信息作为版本
                    break # 找到后退出循环
                elif 'Release:' in line: # 查找包含 'Release:' 的行
                     os_version = line.split('Release:')[1].strip() # 提取版本号
                     # 尝试获取 codename
                     try:
                         codename_info = subprocess.run(['lsb_release', '-c'], capture_output=True, text=True)
                         if codename_info.returncode == 0:
                             os_version += f" ({codename_info.stdout.split(':')[1].strip()})" # 将 codename 添加到版本信息中
                     except:
                         pass # 如果获取 codename 失败则忽略

    except FileNotFoundError:
        pass # lsb_release 可能未安装，忽略错误

full_os_info = f"{os_name} {os_version}" # 组合完整的操作系统信息
cpu_info = get_cpu_info() # 调用函数获取 CPU 信息和核心数量
memory_info = get_memory_info() # 调用函数获取内存信息
gpu_info = get_gpu_info() # 调用函数获取 GPU 信息和显存
cuda_version = get_cuda_version() # 调用函数获取 CUDA 版本
python_version = get_python_version() # 调用函数获取 Python 版本
conda_version = get_conda_version() # 调用函数获取 Conda 版本
disk_info = get_disk_space() # 调用函数获取物理磁盘空间信息


# 创建用于存储数据的字典
env_data = {
    "项目": [ # 项目名称列表
        "操作系统",
        "CPU 信息",
        "内存信息",
        "GPU 信息",
        "CUDA 信息",
        "Python 版本",
        "Conda 版本",
        "物理磁盘空间" # 添加物理磁盘空间
    ],
    "信息": [ # 对应的信息列表
        full_os_info,
        cpu_info,
        memory_info,
        gpu_info,
        cuda_version,
        python_version,
        conda_version,
        disk_info # 添加物理磁盘空间信息
    ]
}

# 创建一个 pandas DataFrame
df = pd.DataFrame(env_data)

# 打印表格
print("### 环境信息") # 打印标题
print(df.to_markdown(index=False)) # 将 DataFrame 转换为 Markdown 格式并打印，不包含索引


[0mNote: you may need to restart the kernel to use updated packages.
### 环境信息
| 项目         | 信息                                                                               |
|:-------------|:-----------------------------------------------------------------------------------|
| 操作系统     | Linux Ubuntu 22.04.4 LTS                                                           |
| CPU 信息     | 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz (1 physical cores, 4 logical cores) |
| 内存信息     | 5.75 GB (Available: 3.76 GB)                                                       |
| GPU 信息     | No GPU found (nvidia-smi not found)                                                |
| CUDA 信息    | CUDA not found                                                                     |
| Python 版本  | 3.10.18                                                                            |
| Conda 版本   | conda 24.4.0                                                                       |
| 物理磁盘空间 | Total: 145.49 

# Langfuse集成LangChain

## 📚 前置知识导读

在开始学习 LangChain 与 Langfuse 的集成之前，让我们先了解一些基础概念：

### 🤖 LangChain 是什么？
- **定义**：LangChain 是一个用于构建大语言模型（LLM）应用的开发框架
- **作用**：帮助开发者将 LLM 与外部数据源、工具、数据库等连接起来
- **核心概念**：Chain（链）、Agent（智能体）、Tool（工具）、Memory（记忆）等

### 📊 Langfuse 是什么？
- **定义**：Langfuse 是一个专门为大语言模型应用设计的可观测性（Observability）平台
- **作用**：帮助开发者监控、调试和优化 LLM 应用的性能
- **核心功能**：追踪（Tracing）、评估（Evaluation）、监控（Monitoring）

### 🔗 两者如何协同工作？
```mermaid
graph LR
    A[你的应用] --> B[LangChain]
    B --> C[LLM/工具/数据]
    B --> D[Langfuse]
    D --> E[性能监控]
    D --> F[调试分析]
    D --> G[成本追踪]
```

### 🎯 学习目标
通过本教程，你将学会：
1. 如何在 LangChain 应用中集成 Langfuse
2. 如何追踪和监控 LLM 调用链
3. 如何分析应用性能和成本
4. 如何调试和优化 LLM 应用

---
**💡 提示**：如果你对 LangChain 或 Langfuse 还不太熟悉，建议先阅读官方文档了解基础概念。

# 示例：LangChain 集成

请按照[集成指南](https://langfuse.com/integrations/frameworks/langchain)将该集成添加到你的 LangChain 项目中。该集成也支持 LangChain JS。

## 环境准备

In [5]:
# 📦 安装必要的 Python 包
#
# 📚 包功能说明：
# - langfuse: Langfuse 客户端库，专门用于大语言模型应用的监控、追踪和评估
# - langchain: LangChain 核心框架，提供构建复杂 LLM 应用的基础组件
# - langchain-openai: LangChain 的 OpenAI 官方集成包，支持 GPT 系列模型
# - langchain_community: LangChain 社区维护的扩展组件，包含各种工具和集成

#
# 🔧 安装命令（根据需求选择）：
%pip install langfuse==3.3.0 langchain==0.3.27 langchain-openai==0.3.31 langchain_community==0.3.27


[0mNote: you may need to restart the kernel to use updated packages.


在 Langfuse 控制台的项目设置页获取 API Key，初始化 Langfuse 客户端，并将其设置到环境变量中。

In [6]:
# 🔐 环境变量配置 - 安全存储敏感信息
# 环境变量是存储API密钥等敏感信息的最佳实践
# 避免在代码中硬编码密钥，防止泄露

import os, getpass

def _set_env(var: str):
    """
    安全地设置环境变量
    如果环境变量不存在，会提示用户输入
    使用getpass模块隐藏输入内容，防止密码泄露
    """
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

# 🤖 OpenAI API 配置
# OpenAI API密钥：从 https://platform.openai.com/api-keys 获取
# 这是调用GPT模型必需的认证信息
_set_env("OPENAI_API_KEY")

# API代理地址：如果你使用第三方代理服务（如国内代理）
# 示例：https://api.apiyi.com/v1
# 如果直接使用OpenAI官方API，可以留空
_set_env("OPENAI_BASE_URL")

# 🌐 Langfuse 配置
# Langfuse是一个可观测性平台，需要注册账户获取密钥
# 注册地址：https://cloud.langfuse.com

# 公开密钥：用于标识你的项目
_set_env("LANGFUSE_PUBLIC_KEY")

# 秘密密钥：用于认证，请妥善保管
_set_env("LANGFUSE_SECRET_KEY")

# 服务器地址：选择离你最近的区域
# 🇪🇺 欧盟区域(推荐) https://cloud.langfuse.com
# 🇺🇸 美国区域（不推荐） https://us.cloud.langfuse.com
_set_env("LANGFUSE_HOST")

# 💡 初学者提示：
# 1. 环境变量存储在操作系统中，重启后需要重新设置
# 2. 生产环境中建议使用.env文件或云服务配置
# 3. 永远不要在代码中硬编码API密钥！


In [7]:
from langfuse.langchain import CallbackHandler

# 初始化 Langfuse 回调处理器
# 这个处理器会自动捕获 LangChain 的执行过程，包括：
# - LLM 调用的输入和输出
# - 执行时间和延迟
# - 错误信息（如果有）
# - 成本信息（token 使用量）
langfuse_handler = CallbackHandler()

# 💡 提示：这个回调处理器将在后续的 LangChain 调用中使用
# 通过 config={"callbacks": [langfuse_handler]} 参数传递

## 示例
### LangChain LCEL

In [8]:
# 🔧 导入 LangChain 核心组件
from operator import itemgetter  # Python 内置函数，用于从字典中提取特定键值对
from langchain_openai import ChatOpenAI  # OpenAI 聊天模型接口，支持 GPT 系列模型
from langchain.prompts import ChatPromptTemplate  # 聊天提示模板，用于格式化对话输入
from langchain.schema import StrOutputParser  # 字符串输出解析器，将模型输出转换为字符串

# 🔄 重新初始化 Langfuse 回调处理器
# 确保追踪功能正常工作，记录整个链式执行过程
langfuse_handler = CallbackHandler()

# 📝 第一个提示模板：人物地理信息查询
# 功能：根据人物姓名查询其来源城市
# 输入变量：{person} - 人物姓名
prompt1 = ChatPromptTemplate.from_template("{person} 来自哪座城市？")

# 📝 第二个提示模板：城市国家映射查询
# 功能：根据城市名称查询所属国家，并指定回答语言
# 输入变量：{city} - 城市名称，{language} - 回答语言
prompt2 = ChatPromptTemplate.from_template(
    "城市 {city} 位于哪个国家？请用 {language} 回答"
)

# 🤖 初始化 OpenAI 聊天模型
# 默认使用 gpt-3.5-turbo 模型，可根据需要调整
model = ChatOpenAI()

# ⛓️ 构建第一个处理链（简单链）
# 数据流：人名 → 提示模板 → 模型推理 → 字符串输出
# 作用：将人物姓名转换为城市名称
chain1 = prompt1 | model | StrOutputParser()

# ⛓️ 构建第二个处理链（复合链）
# 数据流：
# 1. 🔀 并行处理：通过 chain1 获取城市名，通过 itemgetter 提取语言参数
# 2. 📋 数据整合：将城市和语言组合为字典格式
# 3. 📝 提示生成：将数据传递给 prompt2 生成查询提示
# 4. 🤖 模型推理：调用 OpenAI 模型进行推理
# 5. 🔤 结果解析：将模型输出解析为最终字符串
chain2 = (
    {"city": chain1, "language": itemgetter("language")}  # 🔀 并行数据准备
    | prompt2    # 📝 第二阶段提示生成
    | model      # 🤖 模型推理调用
    | StrOutputParser()  # 🔤 输出格式化
)

# 🚀 执行链式调用并启用 Langfuse 追踪
# 示例1：查询苏东坡的地理信息，要求用中文回答
# 预期流程：苏东坡 → 眉山/杭州 → 中国，用中文回答
result = chain2.invoke(
    {"person": "苏东坡", "language": "中文"},
    config={"callbacks": [langfuse_handler]}
)

### LangChain LCEL 的追踪图

![LangChain LCEL 的跟踪图](https://cdn.jsdelivr.net/gh/Fly0905/note-picture@main/imag/202508261521543.png)

[Langfuse 中的示例追踪](https://cloud.langfuse.com/project/cmequpe0j00euad07w6wrvkzg/traces?peek=9ec09bc81b4173edf6e3d11d24d0cf2f&timestamp=2025-08-26T07%3A11%3A54.389Z)

#### LangChain Runnable 方法

Runnable 是可以被调用、批处理、流式处理、转换并进行组合的工作单元。

下面的示例展示了如何在 Langfuse 中使用这些方法：

- invoke/ainvoke：将单个输入转换为输出。
- batch/abatch：高效地将多个输入批量转换为输出。
- stream/astream：在生成过程中以流式方式输出单个输入的结果。

In [9]:
# 🔄 异步调用（Async Invoke）
# 🎯 适用场景：
#   - 需要在异步环境（如 FastAPI、Django Async）中执行单个请求
#   - 与其他异步操作（数据库查询、API调用）并行执行
# 💡 技术优势：
#   - 不会阻塞其他异步操作，提高整体应用吞吐量
#   - 支持 asyncio 事件循环，适合现代异步 Web 应用
await chain2.ainvoke(
    {"person": "biden", "language": "german"},
    config={"callbacks": [langfuse_handler]}
)

# 📦 批处理（Batch）
# 🎯 适用场景：
#   - 需要同时处理多个相似的请求（如批量数据处理）
#   - 数据分析、报告生成等需要并行处理的任务
# 💡 技术优势：
#   - 比逐个调用更高效，可以并行处理多个请求
#   - 减少网络开销，优化资源利用率
batch_results = chain2.batch([
    {"person": "elon musk", "language": "english"},
    {"person": "mark zuckerberg", "language": "english"}
], config={"callbacks": [langfuse_handler]})

# 🔄📦 异步批处理（Async Batch）
# 🎯 适用场景：
#   - 在异步环境中批量处理请求
#   - 高并发场景下的批量数据处理
# 💡 技术优势：
#   - 结合了异步和批处理的优点
#   - 支持更高的并发量，不阻塞主线程
batch_async_results = await chain2.abatch([
    {"person": "jeff bezos", "language": "english"},
    {"person": "tim cook", "language": "english"}
], config={"callbacks": [langfuse_handler]})

# 🌊 流式处理（Stream）
# 🎯 适用场景：
#   - 需要实时显示生成过程的聊天应用
#   - 长文本生成，提升用户体验
#   - 实时翻译、摘要等需要逐步展示结果的应用
# 💡 技术优势：
#   - 用户可以立即看到部分结果，无需等待完整响应
#   - 降低感知延迟，提升用户体验
#   - 支持实时取消和中断操作
print("🌊 流式处理示例：")
for chunk in chain2.stream(
    {"person": "steve jobs", "language": "english"},
    config={"callbacks": [langfuse_handler]}
):
    print(f"📤 流式分片: {chunk}")  # 每个分片会实时输出
    # 在实际应用中，这里可以将 chunk 发送到前端实时显示

# 🔄🌊 异步流式处理（Async Stream）
# 🎯 适用场景：
#   - 在异步 Web 应用中提供流式响应
#   - WebSocket 实时通信
#   - 服务器发送事件（Server-Sent Events）
# 💡 技术优势：
#   - 不阻塞其他异步操作，同时提供实时反馈
#   - 支持异步上下文管理，资源利用更高效
print("\n🔄🌊 异步流式处理示例：")
async for chunk in chain2.astream(
    {"person": "bill gates", "language": "english"},
    config={"callbacks": [langfuse_handler]}
):
    print(f"📤 异步流式分片: {chunk}")  # 异步接收每个分片
    # 在实际应用中，可以通过 WebSocket 或 SSE 发送到客户端


🌊 流式处理示例：
📤 流式分片: 
📤 流式分片: Steve
📤 流式分片:  Jobs
📤 流式分片:  comes
📤 流式分片:  from
📤 流式分片:  San
📤 流式分片:  Francisco
📤 流式分片: ,
📤 流式分片:  California
📤 流式分片: ,
📤 流式分片:  United
📤 流式分片:  States
📤 流式分片: .
📤 流式分片: 

🔄🌊 异步流式处理示例：
📤 异步流式分片: 
📤 异步流式分片: United
📤 异步流式分片:  States
📤 异步流式分片: 


### LangChain LCEL 的跟踪图

![LangChain LCEL 的跟踪图](https://cdn.jsdelivr.net/gh/Fly0905/note-picture@main/imag/202508261521543.png)

[Langfuse 中的示例追踪](https://cloud.langfuse.com/project/cmequpe0j00euad07w6wrvkzg/traces?peek=9ec09bc81b4173edf6e3d11d24d0cf2f&timestamp=2025-08-26T07%3A11%3A54.389Z)

### Langfuse中的LangChain检索式问答跟踪（RetrievalQA）



In [10]:
import os
# 设置 SerpAPI 密钥（用于网络搜索功能）
# 💡 提示：如果不使用搜索功能，可以跳过这个设置
# 获取免费 API 密钥：https://serpapi.com/
_set_env("SERPAPI_API_KEY")

In [11]:
# 安装检索式问答所需的额外依赖包
# unstructured: 用于处理各种文档格式（PDF、Word、HTML等）
# selenium: 用于网页内容抓取和动态页面处理
# langchain-chroma: 向量数据库，用于存储和检索文档嵌入
%pip install unstructured==0.18.13 selenium==4.35.0 langchain-chroma==0.2.5

Collecting unstructured==0.18.13
  Downloading unstructured-0.18.13-py3-none-any.whl.metadata (24 kB)
Collecting selenium==4.35.0
  Downloading selenium-4.35.0-py3-none-any.whl.metadata (7.4 kB)
Collecting langchain-chroma==0.2.5
  Downloading langchain_chroma-0.2.5-py3-none-any.whl.metadata (1.1 kB)
Collecting python-magic (from unstructured==0.18.13)
  Downloading python_magic-0.4.27-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting nltk (from unstructured==0.18.13)
  Using cached nltk-3.9.2-py3-none-any.whl.metadata (3.2 kB)
Collecting emoji (from unstructured==0.18.13)
  Downloading emoji-2.15.0-py3-none-any.whl.metadata (5.7 kB)
Collecting python-iso639 (from unstructured==0.18.13)
  Downloading python_iso639-2025.2.18-py3-none-any.whl.metadata (14 kB)
Collecting langdetect (from unstructured==0.18.13)
  Downloading langdetect-1.0.9.tar.gz (981 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m981.5/981.5 kB[0m [31m49.7 kB/s[0m  [33m0:00:22[0mta [36m0:00:04

In [13]:
# 🧩 导入检索式问答（RAG）所需的核心模块
from langchain_community.document_loaders import TextLoader # 📄 本地文本文件加载器
from langchain_chroma import Chroma  # 🗄️ Chroma 向量数据库，用于存储和检索嵌入向量
from langchain_text_splitters import CharacterTextSplitter  # ✂️ 文本分割器，将长文档切分为小块
from langchain_openai import OpenAIEmbeddings, ChatOpenAI  # 🤖 OpenAI 嵌入模型和聊天模型
from langchain.chains import RetrievalQA  # 🔗 检索式问答链，实现 RAG 功能

# 🔄 初始化 Langfuse 回调处理器
# 用于追踪整个 RAG 流程，包括文档检索和答案生成
langfuse_handler = CallbackHandler()

# 📄 定义要加载的文档 URL 列表
# 💡 示例：美国国情咨文演讲稿（包含丰富的政治、经济、社会话题）
# 这个文档非常适合展示 RAG 系统的检索和问答能力
file_path = "docs/state_of_the_union.txt"

# 📥 步骤1：文档加载
#  📂 使用 TextLoader 加载本地文件
loader = TextLoader(file_path)

# 🤖 步骤2：初始化语言模型
# 选择 ChatOpenAI 的原因：
# - 支持对话格式，更适合问答任务
# - 与现代 OpenAI API 完全兼容
# - 支持系统消息和角色设定
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)  # 低温度确保答案稳定性

# 📖 步骤3：文档内容加载
# 从指定 URL 获取文档内容并解析为 Document 对象
documents = loader.load()
print(f"📊 成功加载 {len(documents)} 个文档")

# ✂️ 步骤4：文档分割策略
# 🎯 分割参数说明：
# - chunk_size=1000: 每个文本块包含约1000个字符（平衡检索精度和上下文完整性）
# - chunk_overlap=0: 文本块间无重叠（避免重复信息，但可能丢失跨块语义）
# 💡 优化建议：在生产环境中，可以设置 chunk_overlap=100-200 以保持语义连续性
text_splitter = CharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=0,
    separator="\n"  # 按段落分割，保持语义完整性
)
texts = text_splitter.split_documents(documents)
print(f"📋 文档分割完成，共生成 {len(texts)} 个文本块")
print("🔍 【文档分割预览】")
for i, text in enumerate(texts[:2]):  # 只显示前两个块作为示例
    print(f"  块 {i+1}: {text.page_content[:100]}...")

# 🔢 步骤5：创建文本嵌入向量
# OpenAIEmbeddings 将文本转换为高维向量，用于语义相似性搜索
# 💰 成本提示：嵌入生成会产生 API 调用费用，但通常比 GPT 调用便宜得多
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")  # 使用较新的嵌入模型

# 🗄️ 步骤6：构建向量数据库
# Chroma 数据库的优势：
# - 轻量级，适合快速原型开发
# - 内存存储，启动快速
# - 支持多种相似性搜索算法
print("🔄 正在构建向量数据库...")
docsearch = Chroma.from_documents(
    documents=texts,
    embedding=embeddings,
    collection_name="state_of_union"  # 指定集合名称便于管理
)
print(f"✅ 向量数据库构建完成，包含 {docsearch._collection.count()} 个向量")

# ❓ 步骤7：定义查询问题
# 设计一个开放性问题来测试 RAG 系统的理解和综合能力
query = "美国国情咨文演讲稿的核心主题是什么？请总结主要观点。"

# 🔗 步骤8：构建检索式问答链
# RetrievalQA 工作流程：
# 1. 将用户问题转换为嵌入向量
# 2. 在向量数据库中检索最相关的文档块
# 3. 将检索到的文档作为上下文，结合用户问题生成提示
# 4. 调用语言模型生成最终答案
chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",  # "stuff" 模式：将所有检索到的文档拼接为一个提示
    retriever=docsearch.as_retriever(
        search_type="similarity",  # 使用语义相似性搜索
        search_kwargs={"k": 3}     # 检索最相关的 3 个文档块
    ),
    return_source_documents=True,  # 返回源文档，便于验证答案来源
    verbose=True  # 显示详细的执行过程
)

# 🚀 步骤9：执行 RAG 问答并启用 Langfuse 追踪
# 这将在 Langfuse 控制台中记录：
# - 文档检索过程和结果
# - 提示词生成
# - 模型推理过程
# - 最终答案生成
print("🔍 开始执行检索式问答...")
result = chain.invoke(query, config={"callbacks": [langfuse_handler]})

print(f"\n💡 问题：{query}")
print(f"🎯 答案：{result['result']}")
print(f"\n📚 参考来源（共 {len(result['source_documents'])} 个文档块）：")
for i, doc in enumerate(result['source_documents']):
    print(f"  📄 来源 {i+1}: {doc.page_content[:150]}...")

📊 成功加载 1 个文档
📋 文档分割完成，共生成 41 个文本块
🔍 【文档分割预览】
  块 1: Madam Speaker, Madam Vice President, our First Lady and Second Gentleman. Members of Congress and th...
  块 2: Groups of citizens blocking tanks with their bodies. Everyone from students to retirees teachers tur...
🔄 正在构建向量数据库...
✅ 向量数据库构建完成，包含 41 个向量
🔍 开始执行检索式问答...


[1m> Entering new RetrievalQA chain...[0m

[1m> Finished chain.[0m

💡 问题：美国国情咨文演讲稿的核心主题是什么？请总结主要观点。
🎯 答案：美国国情咨文演讲稿的核心主题是美国人民的强大。演讲中强调了美国人民的坚定意志和团结一致，强调了美国人民在面对挑战时的勇气和决心。演讲还强调了自由和民主的重要性，以及美国人民在保卫这些价值观时所展现的力量。总的来说，演讲强调了美国人民的坚强和团结，以及他们在面对困难时的乐观和决心。

📚 参考来源（共 3 个文档块）：
  📄 来源 1: And built the strongest, freest, and most prosperous nation the world has ever known. 
Now is the hour. 
Our moment of responsibility. 
Our test of re...
  📄 来源 2: And my report is this: the State of the Union is strong—because you, the American people, are strong. 
We are stronger today than we were a year ago. ...
  📄 来源 3: Madam Speaker, Madam Vice President, our First Lady and Second Gentleman

RetrievalQA 追踪图
![image-20250922113340738](https://cdn.jsdelivr.net/gh/Fly0905/note-picture@main/imag/202509221133205.png)
[Langfuse 中的示例追踪](https://cloud.langfuse.com/project/cmequpe0j00euad07w6wrvkzg/traces?peek=c1a402f423a8b848f07461edba960bfa&timestamp=2025-09-22T03%3A26%3A02.600Z)

## 🎉 总结与下一步学习

恭喜！你已经完成了 LangChain 与 Langfuse 集成的基础学习。让我们回顾一下学到的内容：

### 📋 本教程涵盖的内容
- ✅ **环境准备**：安装依赖、配置 API 密钥、初始化回调处理器
- ✅ **基础集成**：在 LangChain 应用中使用 Langfuse 进行追踪
- ✅ **LCEL 示例**：顺序链的构建与执行
- ✅ **Runnable 方法**：同步/异步、批处理、流式处理
- ✅ **检索式问答**：文档加载、向量化、问答链构建

### 🚀 下一步学习建议

#### 1. 深入 LangChain 核心概念
- 学习 [LangChain 官方教程](https://python.langchain.com/docs/tutorials/)
- 掌握 Agent（智能体）和 Tool（工具）的使用
- 了解 Memory（记忆）和 Chain（链）的高级用法

#### 2. 探索 Langfuse 高级功能
- 学习如何创建自定义评估指标
- 掌握成本分析和性能优化
- 了解团队协作和项目管理功能

#### 3. 实战项目练习
- 构建一个带有多步骤推理的聊天机器人
- 创建一个支持文档检索的问答系统
- 开发一个多模态（文本+图像）应用

#### 4. 生产环境部署
- 学习 Docker 容器化部署
- 了解 Kubernetes 集群管理
- 掌握监控和日志管理最佳实践

### 🔗 有用的资源链接

#### 📚 官方文档
- [LangChain 官方文档](https://python.langchain.com/) - 全面的 LangChain 使用指南
- [Langfuse 官方文档](https://langfuse.com/docs) - 详细的 Langfuse 功能说明
- [OpenAI API 文档](https://platform.openai.com/docs) - OpenAI API 使用指南

#### 🛠️ 开源社区
- [LangChain GitHub](https://github.com/langchain-ai/langchain) - 源码和问题讨论
- [Langfuse GitHub](https://github.com/langfuse/langfuse) - 开源版本和贡献指南
- [LangChain 模板库](https://github.com/langchain-ai/langchain/tree/master/templates) - 实用模板集合

#### 🎓 学习资源
- [LangChain 官方教程](https://python.langchain.com/docs/tutorials/) - 从入门到进阶
- [Langfuse 示例库](https://langfuse.com/docs/integrations) - 各种集成示例
- [大模型应用开发最佳实践](https://platform.openai.com/docs/guides/prompt-engineering) - 提示工程指南

### 💡 实践建议

#### 🏗️ 开发阶段
1. **从简单开始**：先构建基础的问答链，验证核心功能后再增加复杂度
2. **模块化设计**：将复杂的应用拆分为独立的组件，便于测试和维护
3. **版本控制**：使用 Git 管理代码，记录每次重要的功能变更
4. **环境隔离**：使用虚拟环境管理依赖，避免版本冲突

#### 📊 监控与优化
1. **全程追踪**：始终使用 Langfuse 追踪应用执行，建立完整的可观测性
2. **性能分析**：定期查看 Langfuse 控制台，分析响应时间和成功率
3. **成本监控**：密切关注 token 使用量和 API 调用成本，优化模型选择
4. **错误处理**：建立完善的错误处理机制，提高应用的稳定性

#### 🔄 迭代改进
1. **数据驱动**：基于 Langfuse 的分析数据，持续优化提示词和参数
2. **A/B 测试**：对比不同版本的性能，选择最优方案
3. **用户反馈**：收集真实用户的使用反馈，指导产品改进方向
4. **定期评估**：建立定期的性能评估机制，确保应用质量

### 🚨 常见问题与解决方案

#### ❓ API 密钥问题
- **问题**：API 调用失败或认证错误
- **解决**：检查环境变量设置，确保密钥正确且有效

#### ❓ 依赖包冲突
- **问题**：安装包时出现版本冲突
- **解决**：使用虚拟环境，或指定具体的包版本

#### ❓ 追踪数据缺失
- **问题**：Langfuse 控制台看不到追踪数据
- **解决**：确保回调处理器正确配置，检查网络连接

#### ❓ 性能问题
- **问题**：应用响应速度慢
- **解决**：优化提示词长度，选择合适的模型，使用缓存机制

---

### 🎯 最后的话

大模型应用开发是一个充满挑战和机遇的领域。通过本教程，你已经掌握了：

- ✅ **基础技能**：LangChain 与 Langfuse 的集成使用
- ✅ **实践经验**：从简单链到复杂应用的构建过程
- ✅ **监控能力**：使用 Langfuse 进行全面的应用监控
- ✅ **优化思路**：基于数据驱动的持续改进方法

记住，**实践是最好的老师**。建议你：
1. 🔨 **动手实践**：基于本教程的示例，构建自己的应用
2. 📖 **持续学习**：关注 LangChain 和 Langfuse 的最新发展
3. 🤝 **社区参与**：加入开发者社区，分享经验和学习心得
4. 🚀 **勇于创新**：探索大模型在你所在领域的应用可能性