How to stream runnables

In [2]:
# aync stream（同步流） 和 async astream（异步流）：流式传输的默认实现，用于流式传输链中的最终输出。
# async astream_events 和 async astream_log: 提供一种从链中传输中间步骤和最终输出的方法。
# 使应用程序响应更快的关键策略是显示中间进度；即逐个标记地流式传输模型标记的输出。

In [3]:
import os
from langchain_openai import ChatOpenAI

os.environ["http_proxy"] = "http://localhost:7890"
os.environ["https_proxy"] = "http://localhost:7890"

openai_api_key=os.environ["OPENAI_API_KEY"]
# print(openai_api_key)

os.environ["OPENAI_API_BASE"] ="https://api.zhizengzeng.com/v1/"

model = ChatOpenAI(model="gpt-3.5-turbo-0125")

sync stream API(同步流)

In [4]:
###  stream()
chunk = []
for chunk in model.stream("what color is the sky?"):
    chunk.append = chunk
    print(chunk.content,end = '|', flush=True)          # flush=True 是指在打印内容后立即刷新输出缓冲区。

|The| color| of| the| sky| can| vary| depending| on| the| time| of| day| and| weather| conditions|.| During| the| day|,| the| sky| is| typically| blue|,| while| at| sunrise| or| sunset| it| can| appear| orange|,| pink|,| or| purple|.| At| night|,| the| sky| is| usually| dark| blue| or| black|.||

async astream API(异步流)

In [5]:
### Astream()
chunks = []
async for chunk in model.astream("what color is the sky?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

|The| color| of| the| sky| can| vary| depending| on| the| time| of| day| and| weather| conditions|.| During| the| day|,| the| sky| is| typically| blue|,| but| it| can| also| appear| gray|,| orange|,| pink|,| or| red| during| sunrise| and| sunset|.| At| night|,| the| sky| is| usually| dark| blue| or| black|.||

In [6]:
### 打印消息块
chunks[0]

AIMessageChunk(content='', id='run-629bcffe-31bb-4ada-adf3-52b397da9914')

In [7]:
### 叠加消息块
chunks[0]+chunks[1]+chunks[2]+chunks[3]+chunks[4]

AIMessageChunk(content='The color of the', id='run-629bcffe-31bb-4ada-adf3-52b397da9914')

Chains

In [8]:
# 使用 StrOutputParser 来解析模型的输出
# 从 AIMessageChunk 中提取内容字段，提供模型返回的token
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
    print(chunk, end="|", flush=True)

|Why| did| the| par|rot| wear| a| rain|coat|?| Because| he| wanted| to| be| poly|-"|par|rot|"-|ic|!||

Working with Input Streams

In [9]:
### JsonOutputParser()转换
#  parser（解析器）对输入流进行操作，将部分 JSON“自动完成”为有效状态。

from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)     # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

async for text in chain.astream("output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"):

    print(text,flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 670}]}
{'countries': [{'name': 'France', 'population': 670760}]}
{'countries': [{'name': 'France', 'population': 67076000}]}
{'countries': [{'name': 'France', 'population': 67076000}, {}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 467330}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 46733038}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 46733038}, {}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 467

In [10]:
### 附加函数

from langchain_core.output_parsers import (JsonOutputParser,)

# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):                  # isinstance(input, dict)检查
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries,list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names

# 附加提取函数，从最终输出的json中提取国家/地区名称
chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream (
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    
    print(text, end="|", flush=True)


['France', 'Spain', 'Japan']|

In [11]:
### 修复流式输出 Generator Functions
# 使用可以对输入流进行操作的生成器函数来修复流式传输。

from langchain_core.output_parsers import JsonOutputParser

async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()          # 创建空集合                          

    # 异步迭代处理输入流中的每个输入对象
    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name                                # yield 关键字用于定义生成器函数，暂停函数的执行并返回一个值，稍后可以从暂停的地方继续执行。
                country_names_so_far.add(name)

chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(text, end="|", flush=True)


France|Spain|Japan|

Non-streaming components (非流式组件)

In [12]:
from langchain_community.vectorstores import FAISS             # 向量搜索库,快速查找最相似的文档
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

In [13]:

### 内置组件流式输出

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# 使用 FAISS 进行向量存储：
vectorstore = FAISS.from_texts(
    ["harrison worked at kensho", "harrison likes spicy food"],
    embedding=OpenAIEmbeddings(),
)

# 数据结构转换
retriever = vectorstore.as_retriever()

# 对内置组件（retriver）进行流式传输
chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks

[[Document(page_content='harrison worked at kensho'),
  Document(page_content='harrison likes spicy food')]]

In [14]:

### LCEL链流式传输

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        # 返回配置过的检索器对象，该对象使用了 “Docs” 的运行配置。
        "question": RunnablePassthrough(),
    }
    | prompt
    | model 
    | StrOutputParser()
)

for chunk in retrieval_chain.stream(
    "Where did harrison work? " "Write 3 made up sentences about this place."
):
    print(chunk, end="|", flush=True)

|H|arrison| worked| at| Kens|ho|,| a| trendy| restaurant| known| for| its| fusion| cuisine|.| The| atmosphere| at| Kens|ho| is| always| bustling| with| energy| and| excitement|.| The| menu| at| Kens|ho| features| a| variety| of| unique| dishes| that| perfectly| blend| different| culinary| traditions|.| Customers| at| Kens|ho| always| leave| satisfied| and| eager| to| come| back| for| more|.||

Using Stream Events

Event Reference（事件参考）
略

Chat Model

In [15]:
### 查看Chat Model 生成事件
events = []
async for event in model.astream_events("hello", version="v2"):
    events.append(event)

  warn_beta(


In [16]:
events[:3]

[{'event': 'on_chat_model_start',
  'data': {'input': 'hello'},
  'name': 'ChatOpenAI',
  'tags': [],
  'run_id': '99132e12-8f8d-4253-938c-caab03655522',
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gpt-3.5-turbo-0125',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7},
  'parent_ids': []},
 {'event': 'on_chat_model_stream',
  'run_id': '99132e12-8f8d-4253-938c-caab03655522',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gpt-3.5-turbo-0125',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7},
  'data': {'chunk': AIMessageChunk(content='', id='run-99132e12-8f8d-4253-938c-caab03655522')},
  'parent_ids': []},
 {'event': 'on_chat_model_stream',
  'run_id': '99132e12-8f8d-4253-938c-caab03655522',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gpt-3.5-turbo-0125',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7},
  'data': {'chunk': AIMessageChunk(content='Hel

In [17]:
events[-2:]

[{'event': 'on_chat_model_stream',
  'run_id': '99132e12-8f8d-4253-938c-caab03655522',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gpt-3.5-turbo-0125',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7},
  'data': {'chunk': AIMessageChunk(content='', response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125'}, id='run-99132e12-8f8d-4253-938c-caab03655522')},
  'parent_ids': []},
 {'event': 'on_chat_model_end',
  'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125'}, id='run-99132e12-8f8d-4253-938c-caab03655522')},
  'run_id': '99132e12-8f8d-4253-938c-caab03655522',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gpt-3.5-turbo-0125',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7},
  'parent_ids': []}]

Chain

In [18]:
### 解析流式 JSON

chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
    event
    async for event in chain.astream_events(
        "output a list of the countries france, spain and japan and their populations in JSON format. "
        'Use a dict with an outer key of "countries" which contains a list of countries. '
        "Each country should have the key `name` and `population`",
        version="v2",
    )
]

In [19]:
events[:3]

[{'event': 'on_chain_start',
  'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
  'name': 'RunnableSequence',
  'tags': [],
  'run_id': '3bf53f16-dfe1-499b-8d19-29c54d12ac58',
  'metadata': {},
  'parent_ids': []},
 {'event': 'on_chat_model_start',
  'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}},
  'name': 'ChatOpenAI',
  'tags': ['seq:step:1'],
  'run_id': 'd0141f4c-0cc8-4670-8b30-cd0680a4cbe0',
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gpt-3.5-turbo-0125',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7},
  'parent_ids':

In [20]:

### 获取stream events输出
# take output the stream events from the model and the parser

num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
):
    kind = event["event"]
    
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
        
    num_events += 1

    # 截断（Truncate）
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: ''
Chat model chunk: '{\n'
Parser chunk: {}
Chat model chunk: '   '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Chat model chunk: ' [\n'
Parser chunk: {'countries': []}
Chat model chunk: '       '
Chat model chunk: ' {\n'
Parser chunk: {'countries': [{}]}
Chat model chunk: '           '
Chat model chunk: ' "'
Chat model chunk: 'name'
Chat model chunk: '":'
Chat model chunk: ' "'
Parser chunk: {'countries': [{'name': ''}]}
Chat model chunk: 'France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n'
Chat model chunk: '           '
Chat model chunk: ' "'
...


Filtering Events

In [21]:
### 通过组件名称(Name)过滤
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}                    # with_config()配置参数
)

max_events = 0
async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",

    # 指定只包括名为 "my_parser" 的处理器生成的事件
    include_names=["my_parser"],
):
    print(event)
    
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': '9c115fc6-bd04-4061-9979-adf7510c584a', 'metadata': {}, 'parent_ids': ['8e3a1d60-ee00-4a97-b3ca-5d8b13314f76']}
{'event': 'on_parser_stream', 'run_id': '9c115fc6-bd04-4061-9979-adf7510c584a', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['8e3a1d60-ee00-4a97-b3ca-5d8b13314f76']}
{'event': 'on_parser_stream', 'run_id': '9c115fc6-bd04-4061-9979-adf7510c584a', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['8e3a1d60-ee00-4a97-b3ca-5d8b13314f76']}
{'event': 'on_parser_stream', 'run_id': '9c115fc6-bd04-4061-9979-adf7510c584a', 'name': 'my_parse

In [22]:
### 通过组件类型(Type)过滤
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v2",

    # 指定包括"chat_model"特定类型的事件
    include_types=["chat_model"],
):
    print(event)

    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': 'c8dcc258-e9ca-4e63-8603-dc08a4c2d846', 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo-0125', 'ls_model_type': 'chat', 'ls_temperature': 0.7}, 'parent_ids': ['e9bc3aa1-3533-4189-9ca0-914a1168d4b3']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', id='run-c8dcc258-e9ca-4e63-8603-dc08a4c2d846')}, 'run_id': 'c8dcc258-e9ca-4e63-8603-dc08a4c2d846', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo-0125', 'ls_model_type': 'chat', 'ls_temperature': 0.7}, 'parent_ids': ['e9bc3aa1-3533-4189-9ca0-914a1168d4b3']}
{'event': 'on_chat_model_stream', 'data':

In [23]:
### 通过组件标签（Tag）过滤

chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v2",

    # 指定包括"my_chain"特定标签的事件
    include_tags=["my_chain"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '467c9991-f791-414a-9b94-f233bbfab9fc', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}, 'name': 'ChatOpenAI', 'tags': ['seq:step:1', 'my_chain'], 'run_id': 'e407887d-5355-49be-b3aa-c1f1230ead57', 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo-0125', 'ls_model_type': 'chat', 'ls_temperature': 0.7}, 'parent_ids': ['467c9991-f7

Non-streaming components （非流式组件）

In [24]:
# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.

def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = (
    model | JsonOutputParser() | _extract_country_names
)     # This parser only works with OpenAI right now

In [25]:
# astream 无法正常工作，因为 _extract_country_names 不适用于streams。

async for chunk in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(chunk, flush=True)

['France', 'Spain', 'Japan']


In [26]:
###  astream_events流式输出

num_events = 0

# 通过 astream_events 我们仍然可以看到来自模型和解析器的 streaming output。

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
):
    
    kind = event["event"]

    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )

    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)

    num_events += 1

    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: ''
Chat model chunk: '{\n'
Parser chunk: {}
Chat model chunk: ' '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Chat model chunk: ' [\n'
Parser chunk: {'countries': []}
Chat model chunk: '   '
Chat model chunk: ' {\n'
Parser chunk: {'countries': [{}]}
Chat model chunk: '     '
Chat model chunk: ' "'
Chat model chunk: 'name'
Chat model chunk: '":'
Chat model chunk: ' "'
Parser chunk: {'countries': [{'name': ''}]}
Chat model chunk: 'France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n'
Chat model chunk: '     '
Chat model chunk: ' "'
Chat model chunk: 'population'
Chat model chunk: '":'
Chat model chunk: ' '
Chat model chunk: '670'
...


Propagating Callbacks（传播回调）

In [27]:
# 如果在工具中使用调用可运行对象，则需要将回调传播到可运行对象；否则，不会生成任何流事件。
# 当使用 RunnableLambdas 或 @chain 装饰器时，回调会在幕后自动传播。

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool

# 反转字符串
def reverse_word(word: str):
    return word[::-1]           

# RunnableLambda() 将该函数包装成可运行对象，使其能够在处理流中被调用和处理
reverse_word = RunnableLambda(reverse_word)

{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': '06872639-06c9-4e30-8172-fec13dfa5bb7', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '0f492353-c025-41ce-84dd-ae1419fbb61f', 'metadata': {}, 'parent_ids': ['06872639-06c9-4e30-8172-fec13dfa5bb7']}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '0f492353-c025-41ce-84dd-ae1419fbb61f', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['06872639-06c9-4e30-8172-fec13dfa5bb7']}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': '06872639-06c9-4e30-8172-fec13dfa5bb7', 'name': 'bad_tool', 'tags': [], 'metadata': {}, 'parent_ids': []}


In [None]:
### 错误 propagate callbacks（传播回调）
@tool
def bad_tool(word: str):
    """Custom tool that doesn't propagate callbacks."""
    return reverse_word.invoke(word)

async for event in bad_tool.astream_events("hello", version="v2"):
    print(event)

In [28]:
### 正确 propagate callbacks（传播回调）
# 从reverse_word runnable 获取事件。
@tool
def correct_tool(word:str,callbacks):                       # callbacks（回调函数的集合或字典）
    """A tool that correctly propagates callbacks."""
    return reverse_word.invoke(word,{"callbacks":callbacks})

async for event in correct_tool.astream_events("hello", version="v2"):
    print(event)

{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': 'cbad434a-3f2f-4087-b5ed-0ed193a44133', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '5e5edbab-cfc6-4324-8b79-779a9432ee87', 'metadata': {}, 'parent_ids': ['cbad434a-3f2f-4087-b5ed-0ed193a44133']}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '5e5edbab-cfc6-4324-8b79-779a9432ee87', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['cbad434a-3f2f-4087-b5ed-0ed193a44133']}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'cbad434a-3f2f-4087-b5ed-0ed193a44133', 'name': 'correct_tool', 'tags': [], 'metadata': {}, 'parent_ids': []}


In [29]:
### 调用Runnable
# 从 Runnable Lambda 或 @chains 中调用 Runnable，则callbacks将自动传递

from langchain_core.runnables import RunnableLambda      #RunnableLambda类用于创建可调用的异步对象。

async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2

reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")                 # ainvoke()方法异步调用函数

async for event in reverse_and_double.astream_events("1234", version="v2"):
    print(event)

{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '270f416e-83e2-4a93-af98-3169097068b9', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_stream', 'run_id': '270f416e-83e2-4a93-af98-3169097068b9', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'chunk': '43214321'}, 'parent_ids': []}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '270f416e-83e2-4a93-af98-3169097068b9', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'parent_ids': []}


In [30]:
### 使用@chain装饰器：

from langchain_core.runnables import chain

@chain     
# 将 reverse_and_double 函数包装成一个可链式调用的对象，意味着可以在该函数的基础上连续调用其他方法或函数。
async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v2"):
    print(event)

{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': 'ecea49f6-e018-4642-9ad3-8b663f337690', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_stream', 'run_id': 'ecea49f6-e018-4642-9ad3-8b663f337690', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'chunk': '43214321'}, 'parent_ids': []}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': 'ecea49f6-e018-4642-9ad3-8b663f337690', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'parent_ids': []}
