In [3]:
import time
from langchain_core.runnables import RunnableLambda, RunnableConfig

# 🚀 自定义回调函数，打印 config 里的 tags 和 metadata
def custom_callback(input_text, config: dict):
    print(f"\n[回调] 执行任务: {input_text}")
    # 使用字典访问方式获取 tags 和 metadata
    print(f"[回调] 任务标签 (tags): {config.get('tags', '未提供')}")
    print(f"[回调] 任务元数据 (metadata): {config.get('metadata', '未提供')}\n")


# 🚀 任务 1：转换为小写
def to_lowercase(text: str, config: RunnableConfig):
    custom_callback(text, config)  # 调用回调函数，打印 config 信息
    print("[Step 1] 转换为小写:", text.lower())
    return text.lower()

# 🚀 任务 2：去掉空格
def remove_spaces(text: str, config: RunnableConfig):
    custom_callback(text, config)
    print("[Step 2] 去掉空格:", text.replace(" ", ""))
    return text.replace(" ", "")

# 🚀 任务 3：反转字符串
def reverse_text(text: str, config: RunnableConfig):
    custom_callback(text, config)
    print("[Step 3] 反转字符串:", text[::-1])
    return text[::-1]

# 🚀 创建 Runnables
step1_runnable = RunnableLambda(to_lowercase)
step2_runnable = RunnableLambda(remove_spaces)
step3_runnable = RunnableLambda(reverse_text)

# 🚀 任务流水线
def text_pipeline(text: str, config: RunnableConfig):
    step1_result = step1_runnable.invoke(text, config=config)
    step2_result = step2_runnable.invoke(step1_result, config=config)
    step3_result = step3_runnable.invoke(step2_result, config=config)
    return step3_result

pipeline_runnable = RunnableLambda(text_pipeline)

# 🚀 定义 RunnableConfig，包含 tags 和 metadata
config = RunnableConfig(
    tags=["text-processing", "pipeline"],  # 任务标签
    metadata={"author": "user", "version": "1.0"}  # 任务额外信息
)

# 🚀 执行任务
result = pipeline_runnable.invoke("Hello World", config=config)

# 🚀 输出最终结果
print("\n最终结果:", result)



[回调] 执行任务: Hello World
[回调] 任务标签 (tags): ['text-processing', 'pipeline']
[回调] 任务元数据 (metadata): {'author': 'user', 'version': '1.0'}

[Step 1] 转换为小写: hello world

[回调] 执行任务: hello world
[回调] 任务标签 (tags): ['text-processing', 'pipeline']
[回调] 任务元数据 (metadata): {'author': 'user', 'version': '1.0'}

[Step 2] 去掉空格: helloworld

[回调] 执行任务: helloworld
[回调] 任务标签 (tags): ['text-processing', 'pipeline']
[回调] 任务元数据 (metadata): {'author': 'user', 'version': '1.0'}

[Step 3] 反转字符串: dlrowolleh

最终结果: dlrowolleh


In [20]:
from langchain_core.runnables import RunnableLambda, RunnableConfig
import time

# 定义一个带有超时控制的任务
def slow_task(input_text, config):
    # 从 "configurable" 子字典中获取 timeout 参数
    timeout = config.get("configurable", {}).get("timeout", 5)
    print('timeout===', timeout)
    start_time = time.time()
    
    # 假设任务执行时间5s
    while time.time() - start_time < 5: 
        time.sleep(1)  # 模拟任务执行中
        print("任务正在执行...")
    
        if time.time() - start_time >= timeout:
            raise TimeoutError("TimeoutError任务超时！")
    
    return input_text.upper()

task_runnable = RunnableLambda(slow_task)

# 设置配置，传递超时参数到 "configurable" 字段
config = RunnableConfig(configurable={"timeout": 2})

try:
    result = task_runnable.invoke("hello", config=config)
    print('result===',result)
except TimeoutError as e:
    print('出错了===',e)


timeout=== 2
任务正在执行...
任务正在执行...
出错了=== TimeoutError任务超时！
