In [1]:
# Runnable
# class langchain_core.runnables.base.Runnable
# 可被调用, 批处理, 流处理, 变换和组合的工作单元
# 代码上定义为抽象类

In [15]:
# 主要方法: (a代表async)
# invoke/ainvoke: 单输入->输出
# batch/abatch: 多输入->多输出
# stream/astream: 单输入->流式输出 (返回一个生成器, 由生成器来输出结果)
# astream_log: 单输入->流式输出(+中间结果)

# 内置优化:
# Batch: 默认通过线程池来并行调用invoke()方法, 可重写
# Async: 默认通过python的asyncio库来执行异步, 可重写

# 所有方法支持单个配置参数, 该参数可用于配置执行, 为追踪和调试添加标签及元数据
# 该参数称为RunnableConfig, 可定义以下属性
# run_name: 用于设置Runnable的名称, 该名称将用于日志和其他位置以标识运行(不继承)
# run_id: 此调用的唯一标识符, 子调用将获得它们自己的唯一运行 ID.
# tags: (列表)此调用和任何子调用的标签.
# metadata: (字典)此调用和任何子调用的元数据.
# callbacks: 此调用和任何子调用的回调.
# max_concurrency: 要进行的最大并行调用数(例如, 由batch使用)
# recursion_limit: 调用可以递归的最大次数 (例如: 由返回Runnables的Runnables使用)
# configurable: Runnable的可配置属性的运行时值
# 通过input_schema, output_schema和config_schema属性
# 分别暴露input, output,config的概要信息

In [None]:
some_runnable.invoke(
    some_input,
    config={
        "run_name": "my_run",
        "tags": ["tag1", "tag2"],
        "metadata": {"key": "value"}
    }
)

In [None]:
# RunnableConfig的传播
# 许多Runnables由其他Runnables组成, 重要的是RunnableConfig传播到Runnable进行的所有子调用
# 这允许向父Runnable提供运行时配置值, 这些值由所有子调用继承
# 对于不允许继承的Config值, 则不可能设置和传播回调或其他配置值

In [None]:
# max_concurrency参数
# 使用batch或batch_as_completed方法时, 可以通过该参数控制最大并行调用数.

In [None]:
# 创建Runnable的两种方式
# 1. RunnableLambda: 适用于对于不需要流式传输的简单转换
# 2. RunnableGenerator: 适用于需要流式传输的复杂转换
# 不推荐继承Runnables来创建自定义Runnable, 容易出错且更复杂

In [None]:
# 组合Runnables的两种方式
# 1. RunnableSequence. 通过LCEL(自动支持sync, async, batch和streaming)
# 或传入Runnables列表构造执行链
# 2. RunnableParallel, 通过传入字典字面量或字典构造并发执行Runnables.

In [4]:
# 例:

from langchain_core.runnables import RunnableLambda

# 使用LCEL构造RunnableSequence
sequence = RunnableLambda(lambda x: x+1) | RunnableLambda(lambda x: x*2)
print("single: ", sequence.invoke(1))
print("batch: ", sequence.batch([1, 2, 3]))

# 使用字典字面量构造RunnableParallel
parallel = RunnableLambda(lambda x: x+1) | {
    "mul_2": RunnableLambda(lambda x: x*2),
    "mul_5": RunnableLambda(lambda x: x*5)
}
print("parallel: ", parallel.invoke(1))

single:  4
batch:  [4, 6, 8]
parallel:  {'mul_2': 4, 'mul_5': 10}


In [5]:
# Runnable对象(包含Runnable Chain等复合对象) 含有可调用方法
# 以修改Runnable对象行为

In [10]:
# 例:

from langchain_core.runnables import RunnableLambda

import random

def add_one(x: int) -> int:
    return x + 1

def buggy_double(y: int) -> int:
    """Buggy code that will fail 70% of the time"""
    if random.random() > 0.3:
        print("This code failed, and will probably be retried!")
        raise ValueError("Triggered buggy code")
    return y * 2

sequence = (
    RunnableLambda(add_one) |
    RunnableLambda(buggy_double).with_retry(
        stop_after_attempt=10,
        wait_exponential_jitter=False
    )
)

print(sequence.input_schema.model_json_schema())
print(sequence.output_schema.model_json_schema())
print(sequence.invoke(2))

{'title': 'add_one_input', 'type': 'integer'}
{'title': 'buggy_double_output', 'type': 'integer'}
This code failed, and will probably be retried!
This code failed, and will probably be retried!
6


In [None]:
# 回调函数
# 用于构建可观测性, 调试能力和自定义逻辑的核心机制
# 通过注册回调函数, 可实时监控链式调用的执行过程, 捕获中间结果, 修改执行流程或记录关键指标
# Runnable允许在以下时间节点注入自定义逻辑
# 1. 执行前: on_chain_start
# 2. 执行后: on_chain_end
# 3. 输入生成时: on_input_generation
# 4. 输出生成时: on_output_generation
# 5. 错误发生时: on_error

In [None]:
# 回调函数通常包含以下参数
# 1. serialized(Dict): 序列化后的链结构描述
# 2. inputs(Dict): 输入参数(键值对形式)
# 3. **kwargs(Any): 框架保留参数, 用于未来扩展

In [28]:
# 例1: 日志记录与调试

from langchain_core.runnables import RunnableLambda
from langchain_core.callbacks import StdOutCallbackHandler

class DebugCallback(StdOutCallbackHandler):
    def __init__(self):
        super().__init__()
        
    def on_chain_start(self, serialized, inputs, **kwargs):
        if serialized is None:
            print("警告: serialized 参数为None")
            print(f"启动链: 未命名")
        else:
            print(f"启动链: {serialized}")
            
        print(f"输入参数: {inputs}")

    def on_chain_end(self, outputs, **kwargs):
        print(f"输出结果: {outputs}")

# 方法1
# 注册回调到链
# chain = RunnableLambda(lambda x: x*2).with_config(
#     name="DoubleNumberChain",  # 命名链以便回调识别
#     callbacks=[DebugCallback()]  # 绑定回调列表
# )
# 执行调用
# chain.invoke(3)    

# 方法2
chain = RunnableLambda(lambda x: x*2)
chain.invoke(
    input=3,
    config={"callbacks": [DebugCallback()]}
)

警告: serialized 参数为None
启动链: 未命名
输入参数: 3
[32;1m[1;3m[chain/start][0m [1m[chain:RunnableLambda] Entering Chain run with input:
[0m{
  "input": 3
}
输出结果: 6
[36;1m[1;3m[chain/end][0m [1m[chain:RunnableLambda] [1ms] Exiting Chain run with output:
[0m{
  "output": 6
}


6

In [11]:
# 调试与追踪
# 设置全局debug flag以启用chains的调试输出

In [12]:
# 例:

from langchain_core.globals import set_debug
set_debug(True)

In [13]:
# 也可以给chain传递现有的或定制的回调

In [14]:
# 例:

from langchain_core.tracers import ConsoleCallbackHandler

chain.invoke(
    ...,
    config={"callbacks": [ConsoleCallbackHandler()]}
)

NameError: name 'chain' is not defined