## 1. 什么是 LangChain 的"链"？

简单理解：链就是把多个处理步骤连接起来，数据从第一步流向最后一步，就像工厂的流水线一样。

用生活中的例子来理解
想象你要做一杯咖啡：

磨豆子 → 2. 冲泡 → 3. 加糖 → 4. 装杯
在 LangChain 中，链就是这样的流水线：

准备提示词 → 2. 调用LLM → 3. 解析结果 → 4. 返回数据

链还能支持 **异步调用**，**流式处理**，**批量处理**，**错误处理和重试** 等高级功能。

## 2. LCEL 语法

### 2.1 Python 魔术方法基础

在 Python 中，当你使用 | 操作符时，实际上调用的是对象的魔术方法：

```python
# 当你写 a | b 时，Python 实际执行：
result = a.__or__(b)

# 如果 a 没有 __or__ 方法，Python 会尝试：
result = b.__ror__(a)  # ror = reverse or
```

**LangChain 中的实现**

LangChain 的所有 Runnable 对象（包括 prompt、model、parser）都实现了这些魔术方法：

```python
class BaseRunnable:
    def __or__(self, other):
        """实现 self | other"""
        return RunnableSequence(first=self, last=other)
    
    def __ror__(self, other):
        """实现 other | self（当 other 没有 __or__ 时）"""
        return RunnableSequence(first=other, last=self)
```

**实际效果**


```python
# 这三种写法是等价的：
chain1 = prompt | model | parser
chain2 = prompt.__or__(model).__or__(parser)
chain3 = RunnableSequence(first=prompt, last=RunnableSequence(first=model, last=parser))
```


In [None]:

# prompt.__or__(model) 的伪代码实现

class PromptTemplate(BaseRunnable):
    def __init__(self, template, input_variables):
        self.template = template
        self.input_variables = input_variables
    
    def __or__(self, other):
        """
        实现 prompt | model 的核心逻辑
        """
        
        return RunnableSequence(first=self, last=other)
    
    def invoke(self, inputs):
        # 简单的字符串格式化
        return self.template.format(**inputs)

class RunnableSequence(BaseRunnable):
    def __init__(self, first, last):
        self.first = first
        self.last = last
        
    def invoke(self, inputs):
        """
        执行序列链的核心逻辑
        """
        
        # 步骤1：执行第一个组件
        intermediate_result = self.first.invoke(inputs)
        
        # 步骤2：将第一个组件的输出作为第二个组件的输入
        final_result = self.last.invoke(intermediate_result)
        
        return final_result
    
    def __or__(self, other):
        """
        实现 (prompt | model) | parser 的逻辑
        """
        
        return RunnableSequence(first=self, last=other)
    

# 使用示例的内部执行流程
prompt = PromptTemplate("分析：{text}", ["text"])
model = SomeLLM()

# 当执行 prompt | model 时：
chain = prompt.__or__(model)  # 返回 RunnableSequence(first=prompt, last=model)

# 当执行 chain.invoke({"text": "项目进度"}) 时：
# 1. 调用 RunnableSequence.invoke({"text": "项目进度"})
# 2. intermediate_result = prompt.invoke({"text": "项目进度"})  # 返回 "分析：项目进度"
# 3. final_result = model.invoke("分析：项目进度")  # 返回 LLM 的响应
# 4. 返回 final_result

In [None]:
# 完整的三步链：prompt | model | parser
def create_three_step_chain():
    prompt = PromptTemplate("分析：{text}", ["text"])
    model = SomeLLM()
    parser = SomeParser()
    
    # 第一步：prompt | model
    step1_chain = prompt.__or__(model)
    
    # 第二步：(prompt | model) | parser
    final_chain = step1_chain.__or__(parser)
    
    # 等价于：RunnableSequence(first=step1_chain, last=parser)
    
    return final_chain

# 执行时的内部流程
def execute_chain(inputs):
    # inputs = {"text": "项目进度"}
    
    # 第一层 RunnableSequence.invoke()
    # first = RunnableSequence(prompt, model)
    # last = parser
    
    # 执行 first.invoke(inputs)
    # 这会触发第二层 RunnableSequence.invoke()
    # first = prompt, last = model
    
    # 执行 prompt.invoke({"text": "项目进度"})
    prompt_result = "分析：项目进度"
    
    # 执行 model.invoke("分析：项目进度")
    model_result = "这是一个关于项目进度的分析..."
    
    # 第一层继续执行 last.invoke(model_result)
    # 执行 parser.invoke("这是一个关于项目进度的分析...")
    final_result = {"analysis": "项目进度良好"}
    
    return final_result

## 3 其他问题

为什么需要 __ror__？


In [None]:
# 假设你有一个自定义函数想要加入链中
def custom_preprocessor(text):
    return text.upper()

# 如果 custom_preprocessor 没有 __or__ 方法
# 但 prompt 有 __ror__ 方法，这样就能工作：
chain = custom_preprocessor | prompt | model

# Python 会调用：
# prompt.__ror__(custom_preprocessor)


## 3. Runnable 对象接口

Runnable 对象是一个可以被调用、异步执行、批处理、流式处理、并行处理的工作单元，通过schema属性、run方法定义。


1. Runnable 核心执行方法:

基础执行方法

```python
# 同步执行
result = runnable.invoke(input)

# 异步执行
result = await runnable.ainvoke(input)

# 批量执行
results = runnable.batch([input1, input2, input3])
results = await runnable.abatch([input1, input2, input3])

# 流式执行
for chunk in runnable.stream(input):
    print(chunk)

async for chunk in runnable.astream(input):
    print(chunk)
```

输入输出类型

```python
# Runnable 是泛型，定义输入输出类型
class MyRunnable(Runnable[Dict, str]):  # 输入Dict，输出str
    def invoke(self, input: Dict) -> str:
        return f"处理结果: {input}"
```

2. 链式组合操作
管道操作符 |

```python
# 串行组合
chain = prompt | model | parser

# 等价于
chain = RunnableSequence(first=prompt, last=RunnableSequence(first=model, last=parser))

### 3.1 schema 属性的作用

#### 每个 Runnable 都有这两个属性
runnable.input_schema   # 定义期望的输入格式

runnable.output_schema  # 定义输出的数据格式

#### 主要用途
- 类型检查: 验证输入数据是否符合预期格式
- 文档生成: 自动生成API文档
- IDE支持: 提供代码补全和类型提示
- 调试帮助: 快速定位数据格式问题

### 3.2 Schema 的数据结构
Schema 通常是 JSON Schema 格式或 Pydantic 模型：

```python
# JSON Schema 格式示例
{
    "type": "object",
    "properties": {
        "text": {"type": "string"},
        "temperature": {"type": "number", "minimum": 0, "maximum": 2}
    },
    "required": ["text"]
}

# Pydantic 模型格式
class InputModel(BaseModel):
    text: str
    temperature: float = 0.7
```
### 3.3 实际例子 PromptTemplate 的 Schema


In [1]:
from langchain_core.prompts import PromptTemplate

prompt = PromptTemplate(
    input_variables=["name", "age"],
    template="你好，我是{name}，今年{age}岁"
)

# 查看输入 schema
print("输入 Schema:")
print(prompt.input_schema.schema())

# 查看输出 schema
print("输出 Schema:")
print(prompt.output_schema.schema())



输入 Schema:
{'properties': {'age': {'title': 'Age', 'type': 'string'}, 'name': {'title': 'Name', 'type': 'string'}}, 'required': ['age', 'name'], 'title': 'PromptInput', 'type': 'object'}
输出 Schema:


/var/folders/hn/5h0gmdv550d1_jd5nssyp2p40000gn/T/ipykernel_12398/841185670.py:10: PydanticDeprecatedSince20: The `schema` method is deprecated; use `model_json_schema` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  print(prompt.input_schema.schema())


**练习**
1. 掌握 LLM 的 Schema
2. 掌握链式组合的 Schema
3. 并行组合的 Schema （后面讲并行）

### 常见的应用场景

```python
# API 文档生成
def generate_api_docs(runnable):
    """根据 schema 生成 API 文档"""
    input_schema = runnable.input_schema.schema()
    output_schema = runnable.output_schema.schema()
    
    docs = f"""
    API 接口文档:
    
    输入格式:
    {json.dumps(input_schema, indent=2, ensure_ascii=False)}
    
    输出格式:
    {json.dumps(output_schema, indent=2, ensure_ascii=False)}
    """
    return docs

# 输入验证
def validate_input(runnable, input_data):
    """验证输入数据是否符合 schema"""
    try:
        # 使用 schema 验证输入
        validated = runnable.input_schema(**input_data)
        return True, validated
    except Exception as e:
        return False, str(e)

# 使用示例
is_valid, result = validate_input(analyzer, {
    "name": "张三",
    "age": 30,
    "email": "zhangsan@example.com"
})

# 类型安全的调用
def safe_invoke(runnable, input_data):
    """类型安全的调用方法"""
    # 验证输入
    is_valid, validated_input = validate_input(runnable, input_data)
    if not is_valid:
        raise ValueError(f"输入验证失败: {validated_input}")
    
    # 执行调用
    result = runnable.invoke(validated_input)
    
    # 可以进一步验证输出格式
    return result

# 调试
# 快速查看 schema
def inspect_runnable(runnable):
    print(f"组件类型: {type(runnable).__name__}")
    print(f"输入类型: {runnable.input_schema}")
    print(f"输出类型: {runnable.output_schema}")
    
    # 如果是链，递归查看每个组件
    if hasattr(runnable, 'steps'):
        for i, step in enumerate(runnable.steps):
            print(f"步骤 {i+1}: {type(step).__name__}")

# 测试数据生成
def generate_test_data(schema):
    """根据 schema 生成测试数据"""
    # 这里可以实现根据 schema 自动生成测试数据的逻辑
    pass

```

### ainvoke 异步方法

#### 1. ainvoke() 方法的基本原理
为什么需要 ainvoke()？

```python
# 同步调用 - 会阻塞当前线程
result = runnable.invoke(input)  # 等待几秒钟才返回

# 异步调用 - 不阻塞，可以并发执行其他任务
result = await runnable.ainvoke(input)  # 非阻塞执行
```

ainvoke() 的声明结构
```python
class BaseRunnable:
    async def ainvoke(self, input, config=None):
        """异步版本的 invoke 方法"""
        # 如果子类没有实现异步版本，就用线程池执行同步版本
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            None,  # 使用默认线程池
            self.invoke,  # 要执行的同步函数
            input,  # 传递给 invoke 的参数
            config
        )
```

#### 2. 执行流程详解

步骤1：获取事件循环  

loop = asyncio.get_running_loop()

作用：

- 获取当前正在运行的异步事件循环
- 事件循环是异步程序的核心，负责调度和执行异步任务
- 如果没有运行中的事件循环，会抛出 RuntimeError

步骤2：使用线程池执行器

await loop.run_in_executor(None, self.invoke, input, config)

参数解释：

- None：使用默认的 ThreadPoolExecutor
- self.invoke：要在线程池中执行的同步函数
- input, config：传递给函数的参数

步骤3：线程池执行

```python
# 内部执行过程（简化版）
def run_in_executor(executor, func, *args):
    # 1. 将同步函数提交到线程池
    future = executor.submit(func, *args)
    
    # 2. 将线程池的 Future 转换为 asyncio.Future
    asyncio_future = asyncio.wrap_future(future)
    
    # 3. 返回可等待的 Future 对象
    return asyncio_future
```

#### 3. 完整的执行示例

In [4]:
import asyncio
from concurrent.futures import ThreadPoolExecutor

class SimpleRunnable:
    def invoke(self, input):
        """同步方法 - 模拟耗时操作"""
        import time
        print(f"开始处理: {input}")
        time.sleep(2)  # 模拟耗时操作
        print(f"处理完成: {input}")
        return f"结果: {input}"
    
    async def ainvoke(self, input):
        """异步方法 - 使用线程池执行同步方法"""
        print(f"异步调用开始: {input}")
        
        # 步骤1: 获取事件循环
        loop = asyncio.get_running_loop()
        print(f"获取到事件循环: {loop}")
        
        # 步骤2: 在线程池中执行同步方法
        print(f"提交到线程池执行...")
        result = await loop.run_in_executor(
            None,           # 默认线程池
            self.invoke,    # 同步方法
            input          # 参数
        )
        
        print(f"异步调用完成: {input}")
        return result
    

# 使用示例
async def demo():
    runnable = SimpleRunnable()
    
    # 并发执行多个异步调用
    tasks = [
        runnable.ainvoke("任务1"),
        runnable.ainvoke("任务2"), 
        runnable.ainvoke("任务3")
    ]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    print(f"所有结果: {results}")


# 运行演示
# asyncio.run(demo())
# 运行演示 - Jupyter环境中直接使用await
await demo()
    

异步调用开始: 任务1
获取到事件循环: <_UnixSelectorEventLoop running=True closed=False debug=False>
提交到线程池执行...
异步调用开始: 任务2
获取到事件循环: <_UnixSelectorEventLoop running=True closed=False debug=False>
提交到线程池执行...
异步调用开始: 任务3
获取到事件循环: <_UnixSelectorEventLoop running=True closed=False debug=False>
提交到线程池执行...
开始处理: 任务1
开始处理: 任务2
开始处理: 任务3
处理完成: 任务2处理完成: 任务3

异步调用完成: 任务3
异步调用完成: 任务2
处理完成: 任务1
异步调用完成: 任务1
所有结果: ['结果: 任务1', '结果: 任务2', '结果: 任务3']


#### 4. 为什么使用 run_in_executor？

问题：同步代码在异步环境中的困境

```python
# 错误的做法 - 会阻塞整个事件循环
async def bad_example():
    result1 = some_sync_function()  # 阻塞2秒
    result2 = another_sync_function()  # 又阻塞2秒
    # 总共需要4秒，无法并发
```
解决方案：线程池执行
```python
# 正确的做法 - 使用线程池
async def good_example():
    loop = asyncio.get_running_loop()
    
    # 并发执行，总共只需要2秒
    task1 = loop.run_in_executor(None, some_sync_function)
    task2 = loop.run_in_executor(None, another_sync_function)
    
    result1, result2 = await asyncio.gather(task1, task2)
```
#### 5. 事件循环的工作原理
```python
# 事件循环的简化模型
class EventLoop:
    def __init__(self):
        self.tasks = []
        self.thread_pool = ThreadPoolExecutor()
    
    def run_in_executor(self, executor, func, *args):
        """在线程池中执行同步函数"""
        # 1. 提交到线程池
        future = self.thread_pool.submit(func, *args)
        
        # 2. 创建异步 Future
        async_future = asyncio.Future()
        
        # 3. 当线程池任务完成时，设置异步 Future 的结果
        def on_done(thread_future):
            try:
                result = thread_future.result()
                async_future.set_result(result)
            except Exception as e:
                async_future.set_exception(e)
        
        future.add_done_callback(on_done)
        return async_future
```


#### 6. 何时使用异步

```python
# LLM 调用通常是网络请求，天然适合异步
class AsyncLLM:
    async def ainvoke(self, prompt):
        # 直接使用异步 HTTP 客户端
        async with aiohttp.ClientSession() as session:
            response = await session.post(api_url, json={"prompt": prompt})
            return await response.json()

# 但某些组件可能只有同步实现
class SyncProcessor:
    def invoke(self, input):
        # 复杂的同步处理逻辑
        return process_data(input)
    
    async def ainvoke(self, input):
        # 使用线程池包装同步方法
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, self.invoke, input)
```

## stream方法
1. stream 方法返回迭代器：使用 yield 逐步返回结果
2. 实时显示：用 print(chunk, end="", flush=True) 实现逐字显示效果
3. 与 invoke 对比：invoke 一次性返回，stream 分步返回


```python
"""
Stream 方法
"""
from langchain_core.runnables import Runnable
from typing import Iterator
import time

class SimpleStreamRunnable(Runnable[str, str]):
    """简单的流式输出演示"""
    
    def invoke(self, input: str) -> str:
        """普通调用 - 一次性返回完整结果"""
        return f"完整处理结果: {input}"
    
    def stream(self, input: str) -> Iterator[str]:
        """流式调用 - 逐步返回部分结果"""
        words = f"逐步处理结果: {input}".split()
        
        for word in words:
            time.sleep(0.5)  # 模拟处理延迟
            yield word + " "  # 逐个返回单词

# 使用演示
def demo_stream():
    runnable = SimpleStreamRunnable()
    
    print("=== 普通调用 ===")
    result = runnable.invoke("测试文本")
    print(result)
    
    print("\n=== 流式调用 ===")
    for chunk in runnable.stream("测试文本"):
        print(chunk, end="", flush=True)  # 实时显示，不换行
    
    print("\n\n演示完成")

# 运行演示
demo_stream()
```

**astream 是 stream 的异步版本，astream的默认实现调用了ainvoke**

### batch方法

batch方法用于处理多个输入（批处理）

工程价值：
- 性能优化：批量处理比逐个处理快3-5倍
- 资源管理：分批处理避免内存溢出
- 错误处理：批次失败时自动降级为单个处理
- 监控报告：提供详细的处理统计和分析

适用场景：
- 大量文档批量分析
- 客户反馈批量处理
- 数据清洗和标注
- 内容审核和分类
- 报告自动生成

In [5]:
"""
LangChain 原生 batch 方法演示
"""

from langchain_core.runnables import Runnable
from langchain_core.prompts import PromptTemplate
from langchain_community.llms import Tongyi
from typing import Optional, Dict, Any

class SimpleAnalyzer(Runnable[str, str]):
    """简单分析器"""
    
    def invoke(self, input: str, config: Optional[Dict[str, Any]] = None) -> str:
        """单个分析"""
        return f"分析结果: {input} -> 类型:文档, 重要性:中等"
    
def demo_langchain_batch():
    """LangChain batch 方法演示"""
    
    # 1. 使用自定义 Runnable
    analyzer = SimpleAnalyzer()
    inputs = ["报告A", "合同B", "邮件C"]
    
    print("=== 使用自定义 Runnable ===")
    print(f"输入: {inputs}")
    
    # LangChain 的 batch 方法
    results = analyzer.batch(inputs)
    
    print("批量处理结果:")
    for result in results:
        print(f"  {result}")
    
    # 2. 使用 PromptTemplate
    print("\n=== 使用 PromptTemplate ===")
    
    prompt = PromptTemplate(
        input_variables=["text"],
        template="请分析: {text}"
    )
    
    texts = ["项目进展", "用户反馈", "市场分析"]
    inputs_dict = [{"text": text} for text in texts]
    
    print(f"输入: {texts}")
    
    # PromptTemplate 的 batch 方法
    prompt_results = prompt.batch(inputs_dict)
    
    print("提示词批量生成结果:")
    for result in prompt_results:
        print(f"  {result}")
    
    # 3. 链式 batch
    print("\n=== 链式 batch ===")
    
    # 创建简单链
    chain = prompt | analyzer
    
    # 链的 batch 方法
    chain_results = chain.batch(inputs_dict)
    
    print("链式批量处理结果:")
    for result in chain_results:
        print(f"  {result}")

if __name__ == "__main__":
    demo_langchain_batch()

=== 使用自定义 Runnable ===
输入: ['报告A', '合同B', '邮件C']
批量处理结果:
  分析结果: 报告A -> 类型:文档, 重要性:中等
  分析结果: 合同B -> 类型:文档, 重要性:中等
  分析结果: 邮件C -> 类型:文档, 重要性:中等

=== 使用 PromptTemplate ===
输入: ['项目进展', '用户反馈', '市场分析']
提示词批量生成结果:
  text='请分析: 项目进展'
  text='请分析: 用户反馈'
  text='请分析: 市场分析'

=== 链式 batch ===
链式批量处理结果:
  分析结果: text='请分析: 项目进展' -> 类型:文档, 重要性:中等
  分析结果: text='请分析: 用户反馈' -> 类型:文档, 重要性:中等
  分析结果: text='请分析: 市场分析' -> 类型:文档, 重要性:中等


### 并行组合

```python
from langchain_core.runnables import RunnableParallel

# 并行执行多个分支
parallel = RunnableParallel({
    "summary": summary_chain,
    "keywords": keyword_chain,
    "sentiment": sentiment_chain
})

# 输入会同时发送给所有分支
result = parallel.invoke({"text": "分析这段文本"})
# 输出: {"summary": "...", "keywords": [...], "sentiment": "positive"}
```



### 条件分支

RunnableBranch 是一种重要的路由机制，它根据给定的条件选择不同的处理路径。在运行过程中还能根据输入动态选择不同的执行路径。

```python
from langchain_core.runnables import RunnableBranch

# 根据条件选择不同处理路径
branch = RunnableBranch(
    (lambda x: x["type"] == "question", qa_chain),
    (lambda x: x["type"] == "summary", summary_chain),
    default_chain
)
```


In [10]:
"""
RunnableBranch 条件分支演示
"""
from langchain_core.runnables import RunnableBranch, RunnableLambda
from langchain_core.prompts import PromptTemplate
from langchain_community.llms import Tongyi
from typing import Dict, Any

from dotenv import load_dotenv
load_dotenv()

# === 前面的准备工作 ===

# 1. 创建不同类型的处理链
# 问答链
qa_prompt = PromptTemplate(
    input_variables=["content"],
    template="请回答以下问题：{content}"
)

qa_llm = Tongyi(model_name="qwen-max", temperature=0.1)
qa_chain = qa_prompt | qa_llm

# 摘要链
summary_prompt = PromptTemplate(
    input_variables=["content"], 
    template="请总结以下内容：{content}"
)
summary_llm = Tongyi(model_name="qwen-max", temperature=0.3)
summary_chain = summary_prompt | summary_llm

# 默认处理链
default_prompt = PromptTemplate(
    input_variables=["content"],
    template="请分析以下内容：{content}"
)
default_llm = Tongyi(model_name="qwen-max", temperature=0.5)
default_chain = default_prompt | default_llm

# === 核心代码：创建条件分支 ===
branch = RunnableBranch(
    (lambda x: x["type"] == "question", qa_chain),
    (lambda x: x["type"] == "summary", summary_chain),
    default_chain  # 默认分支
)

# === 后面的使用方法 ===

# 使用示例
def demo_branch():
    """演示条件分支的使用"""
    
    # 测试数据
    test_cases = [
        {
            "type": "question",
            "content": "什么是人工智能？"
        },
        {
            "type": "summary", 
            "content": "人工智能是计算机科学的一个分支，致力于创建能够执行通常需要人类智能的任务的系统。它包括机器学习、深度学习、自然语言处理等多个子领域。"
        },
        {
            "type": "other",
            "content": "今天天气很好"
        }
    ]
    
    print("=== RunnableBranch 条件分支演示 ===")
    
    for i, test_case in enumerate(test_cases, 1):
        print(f"\n--- 测试用例 {i} ---")
        print(f"类型: {test_case['type']}")
        print(f"内容: {test_case['content']}")
        
        # 执行条件分支
        result = branch.invoke(test_case)
        print(f"处理结果: {result}")
        
# 更复杂的条件分支示例
def create_advanced_branch():
    """创建更复杂的条件分支"""
    # 定义更多条件
    advanced_branch = RunnableBranch(
        # 条件1：问题类型
        (lambda x: x["type"] == "question", qa_chain),
        
        # 条件2：摘要类型
        (lambda x: x["type"] == "summary", summary_chain),
        
        # 条件3：长文本（超过100字）
        (lambda x: len(x["content"]) > 100, 
         PromptTemplate(input_variables=["content"], template="这是长文本，请详细分析：{content}") | default_llm),
        
        # 条件4：包含特定关键词
        (lambda x: "紧急" in x["content"], 
         PromptTemplate(input_variables=["content"], template="紧急处理：{content}") | default_llm),
        
        # 默认分支
        default_chain
    )
    
    return advanced_branch


# 异步使用
async def demo_async_branch():
    """异步使用条件分支"""
    test_input = {
        "type": "question",
        "content": "如何使用 LangChain？"
    }
    
    # 异步调用
    result = await branch.ainvoke(test_input)
    print(f"异步结果: {result}")

# 批量处理
def demo_batch_branch():
    """批量处理演示"""
    batch_inputs = [
        {"type": "question", "content": "什么是机器学习？"},
        {"type": "summary", "content": "机器学习是人工智能的一个重要分支..."},
        {"type": "other", "content": "随机内容"}
    ]
    
    # 批量处理
    results = branch.batch(batch_inputs)
    
    for i, result in enumerate(results):
        print(f"批量结果 {i+1}: {result}")


if __name__ == "__main__":
    # 运行演示
    demo_branch()
    
    # 高级分支演示
    advanced_branch = create_advanced_branch()
    
    # 批量处理演示
    demo_batch_branch()


=== RunnableBranch 条件分支演示 ===

--- 测试用例 1 ---
类型: question
内容: 什么是人工智能？
处理结果: 人工智能（Artificial Intelligence，简称 AI）是指由人制造出来的机器所表现出来的智能。它使机器能够模拟、延伸或扩展人类的感知、学习、推理、规划、理解语言、识别图像、解决问题和决策等能力。

人工智能的核心目标是让机器具备类似人类的认知功能。根据其能力和应用范围，人工智能通常分为以下几类：

1. **弱人工智能（Narrow AI）**：专注于完成特定任务的人工智能，例如语音助手（如Siri）、图像识别系统、推荐算法等。这类AI在特定领域表现出色，但不具备通用智能。

2. **强人工智能（General AI）**：指具备与人类相当或超越人类的全面认知能力，能够在各种复杂环境中自主学习和适应。目前仍处于理论和研究阶段。

3. **超级人工智能（Superintelligent AI）**：在所有方面都远超人类智能水平的AI，属于未来可能的发展方向，尚无现实实现。

人工智能的技术基础包括机器学习（尤其是深度学习）、自然语言处理、计算机视觉、知识表示与推理、机器人技术等。

总之，人工智能是一门跨学科的科学技术，致力于让机器“像人一样思考”或“像人一样行动”，正在广泛应用于医疗、交通、金融、教育、娱乐等各个领域，深刻改变着人类社会的生活方式。

--- 测试用例 2 ---
类型: summary
内容: 人工智能是计算机科学的一个分支，致力于创建能够执行通常需要人类智能的任务的系统。它包括机器学习、深度学习、自然语言处理等多个子领域。
处理结果: 人工智能是计算机科学的分支，旨在开发能执行需人类智能任务的系统，涵盖机器学习、深度学习、自然语言处理等多个子领域。

--- 测试用例 3 ---
类型: other
内容: 今天天气很好
处理结果: “今天天气很好”这句话是一个简单的陈述句，表达了说话者对当前天气状况的主观评价。我们可以从多个角度进行分析：

1. **语义分析**：
   - “今天”：表示时间，指说话当天。
   - “天气”：指大气状况，包括温度、湿度、降水、风力、云量等自然现象。
   - “很好”：是评价性词语，表示积极、正面的感受。具体好在哪里（如晴朗、温暖、凉

  return self.__class__(


批量结果 1: 机器学习（Machine Learning）是人工智能的一个分支，它致力于让计算机系统能够从数据中自动学习并改进性能，而无需被明确编程去执行特定任务。

简单来说，机器学习通过设计算法，使计算机能够：

1. **从数据中识别模式**：利用大量历史数据，发现其中的规律和关联。
2. **做出预测或决策**：基于学到的模式，对新的、未见过的数据进行预测或分类。
3. **不断优化自身表现**：随着更多数据的输入和反馈，模型可以持续改进其准确性。

常见的机器学习类型包括：

- **监督学习**：使用带有标签的数据训练模型（如已知图片中的动物是猫还是狗），用于分类或回归任务。
- **无监督学习**：处理没有标签的数据，发现数据中的结构或聚类（如客户分群）。
- **强化学习**：通过试错和奖励机制训练模型在环境中做出最优决策（如游戏AI或机器人控制）。

机器学习广泛应用于图像识别、语音助手、推荐系统、医疗诊断、金融风控等领域，是现代智能技术的核心之一。
批量结果 2: 机器学习是人工智能的一个重要分支，致力于通过数据和算法使计算机系统具备自动学习和改进的能力，而无需显式编程。它通过分析大量数据，识别其中的模式并做出预测或决策，广泛应用于图像识别、自然语言处理、推荐系统等领域。主要学习方式包括监督学习、无监督学习和强化学习。随着数据量的增长和计算能力的提升，机器学习在近年来取得了显著进展，成为推动人工智能发展的重要力量。
批量结果 3: 您提到“请分析以下内容：随机内容”，但尚未提供具体需要分析的文本或信息。为了更好地帮助您，请补充以下任一类型的详细内容：

1. **待分析的文本/数据**（例如：一段文章、对话记录、用户反馈等）
2. **分析目标**（例如：情感倾向、主题归纳、逻辑漏洞、语言风格、数据规律等）
3. **特殊需求**（如需用特定理论框架、技术工具或呈现形式）

示例：
- 若您想分析一段社交媒体评论的情感，可提供具体文字；
- 若需识别数据中的异常模式，请上传表格或描述数据特征。

补充信息后，我将为您生成针对性的深度分析。


### 输入输出处理

RunnableLambda（自定义函数）
```python
from langchain_core.runnables import RunnableLambda

# 将普通函数转换为 Runnable，实现与 LCEL 组件兼容
def preprocess(text):
    return text.upper().strip()

preprocessor = RunnableLambda(preprocess)
chain = preprocessor | model
```


### 错误处理和容错
重试机制
```python
from langchain_core.runnables import RunnableRetry

# 自动重试
retry_runnable = RunnableRetry(
    bound=model,
    max_attempts=3,
    wait_exponential_jitter=True
)
```

回退机制
```python
# 主要方法失败时使用备用方法
fallback_chain = primary_model.with_fallbacks([backup_model, simple_template])
```

超时控制
```python
# 设置执行超时
with_timeout = model.with_timeout(30.0)  # 30秒超时
```


"""
企业级 LangChain 应用 - 智能客服系统
兼容当前版本，包含完整的错误处理和容错机制
"""
- p14-kefu

### 5. 配置和定制
运行时配置

```python
# 传递配置参数
config = {"temperature": 0.7, "max_tokens": 100}
result = model.invoke(input, config=config)

# 绑定默认配置
bound_model = model.bind(temperature=0.7)
```


标签和元数据

```python
# 添加标签用于追踪
tagged_chain = chain.with_tags(["production", "v1.0"])

# 添加元数据
chain_with_metadata = chain.with_metadata({"version": "1.0", "author": "team"})
```

### 6. 调试和监控
中间结果获取

```python
# 获取每一步的输出
def debug_step(step_output):
    print(f"步骤输出: {step_output}")
    return step_output

debug_chain = prompt | RunnableLambda(debug_step) | model
```

性能监控

```python
import time

def timing_wrapper(runnable):
    def timed_invoke(input):
        start = time.time()
        result = runnable.invoke(input)
        print(f"执行时间: {time.time() - start:.2f}秒")
        return result
    return RunnableLambda(timed_invoke)
```

7. 自定义 Runnable
基本实现

```python
from langchain_core.runnables import Runnable

class CustomProcessor(Runnable[str, Dict]):
    def invoke(self, input: str, config=None) -> Dict:
        # 自定义处理逻辑
        return {"processed": input.upper(), "length": len(input)}
    
    async def ainvoke(self, input: str, config=None) -> Dict:
        # 异步版本
        return self.invoke(input, config)
```


完整实现模板
```python
class AdvancedRunnable(Runnable[InputType, OutputType]):
    def __init__(self, param1, param2):
        self.param1 = param1
        self.param2 = param2
    
    def invoke(self, input: InputType, config=None) -> OutputType:
        # 同步执行逻辑
        pass
    
    async def ainvoke(self, input: InputType, config=None) -> OutputType:
        # 异步执行逻辑
        pass
    
    def batch(self, inputs: List[InputType], config=None) -> List[OutputType]:
        # 批量处理逻辑
        return [self.invoke(inp, config) for inp in inputs]
    
    def stream(self, input: InputType, config=None):
        # 流式处理逻辑
        yield from self._stream_generator(input)

```

### 最佳实践

```python
# 1. 明确类型定义
class MyChain(Runnable[Dict[str, str], Dict[str, Any]]):
    pass

# 2. 合理使用并行
parallel_processing = RunnableParallel({
    "fast_analysis": quick_model,
    "detailed_analysis": detailed_model
})

# 3. 添加错误处理
robust_chain = (
    preprocessing |
    model.with_fallbacks([backup_model]) |
    postprocessing
)

# 4. 使用配置管理
configurable_chain = model.configurable_fields(
    temperature=ConfigurableField(id="temp", name="Temperature")
)
```