# How to stream runnables
[How to stream runnables](https://python.langchain.com/docs/how_to/streaming/) 如何流式处理可运行程序
- LangChain primitives like **chat models, output parsers, prompts, retrievers, and agents** implement the **LangChain Runnable Interface.**
- 提供了两种一般方法来流式传输内容:
    - sync stream and async astream, (同步 stream 和 异步 astream ：默认的流式传输实现，从链中流式**传输最终输出**。)
    - async astream_events and async astream_log, (异步 astream_events 和 异步 astream_log ：这些提供了从链中流式传输**中间步骤**和**最终输出**的方法。)

## chat models

In [2]:
import sys

from prompt_toolkit.key_binding.bindings.named_commands import complete

sys.path.append('/Users/ericyoung/ysx/code/github-study/langChain-rookie')
from env_utils import load_environment_variables

load_environment_variables()

__file__: /Users/ericyoung/ysx/code/github-study/langChain-rookie/env_utils.py
dotenv_path1: /Users/ericyoung/ysx/code/github-study/langChain-rookie/.env
load env ok


### 支持tool的 chat model

In [3]:
# Ollama
from langchain_ollama import ChatOllama
# llm = ChatOllama(
#     model="qwen2.5:latest", # gemma3:latest(不支持tool)
#     temperature=0.7,
#     # other params...
# )

### 不支持tool的chat model

In [4]:
# 任何chat model
from models import MyOpenAIModel

model = MyOpenAIModel()
response = model.generate("你好，介绍一下你自己")
print(response)

您好！我是一个大型语言模型，由 Google 训练。

我可以：

*   **生成不同类型的文本格式:** 例如诗歌、代码、剧本、音乐作品、电子邮件、信件等。我会尽力满足您的所有要求。
*   **回答您的各种问题:** 无论您是想了解事实、需要一些建议，还是仅仅想闲聊，我都会尽力提供帮助。
*   **进行翻译:** 我可以将文本从一种语言翻译成另一种语言。
*   **总结文本:**  我可以将较长的文章或文档总结成更简洁的版本。

我还在不断学习和改进中，我的知识截止到 2023 年 4 月。

您想和我聊些什么呢？ 或者您有什么具体的问题想要问我吗？ 例如，您可以问：

*   “请写一首关于秋天的诗。”
*   “什么是人工智能？”
*   “如何制作一个简单的蛋糕？”您好！我是一个大型语言模型，由 Google 训练。

我可以：

*   **生成不同类型的文本格式:** 例如诗歌、代码、剧本、音乐作品、电子邮件、信件等。我会尽力满足您的所有要求。
*   **回答您的各种问题:** 无论您是想了解事实、需要一些建议，还是仅仅想闲聊，我都会尽力提供帮助。
*   **进行翻译:** 我可以将文本从一种语言翻译成另一种语言。
*   **总结文本:**  我可以将较长的文章或文档总结成更简洁的版本。

我还在不断学习和改进中，我的知识截止到 2023 年 4 月。

您想和我聊些什么呢？ 或者您有什么具体的问题想要问我吗？ 例如，您可以问：

*   “请写一首关于秋天的诗。”
*   “什么是人工智能？”
*   “如何制作一个简单的蛋糕？”


### ⭐️ 不支持tool模型 利用 ChatOpenAI (使用base_url) 实现 bind_tools
- 支持结构化输出, with_structured_output方法
- 支持使用工具, bind_tools方法
- llm = ChatOpenAI(
                model="gpt-4o",
                temperature=0,
                max_tokens=None,
                timeout=None,
                max_retries=2,
                # api_key="...",
                # base_url="...",
                # organization="...",
                # other params...
            )

In [5]:
from models import get_base_url_model_with_tools
llm = get_base_url_model_with_tools()
print(llm.invoke("你是谁?").content)

我是Gemma，一个开放权重的AI助手。我是一个由Google DeepMind训练的大型语言模型。

作为开源模型，我可以被广泛使用和修改。 

你可以通过访问我的界面与我互动。



## 使用 Stream
- 所有的 Runnable 对象都实现了一个名为 stream 的方法，以及一个异步 astream 方法
- 这些方法设计用于分块流式传输最终输出，在每个块可用后立即生成每个块。

### LLMs and Chat Models
- LLMs和聊天模型
- 首先，我们可以使用同步 stream API：

In [12]:
chunks = []
for chunk in llm.stream("what color is the sky?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)
complete_str = ''.join([c.content for c in chunks ])
print(complete_str)

This| is| a| classic| question| that| has| surprisingly| complex| answers|!| The| sky| isn|'|t| actually| *|one|*| color|.| Here|’|s| the| breakdown|:|

|*| **|During| the| day| (|when| it|'|s| bright| blue|):|**| The| sky| appears| blue| because| of| a| phenomenon| called| **|Ray|leigh| scattering|**.| Sunlight| is| made| up| of| all| colors|,| and| when| it| enters| Earth|’|s| atmosphere|,| it| bumps| into| air| molecules| (|mostly| nitrogen| and| oxygen|).| Blue| light| has| shorter| wavelengths| and| is| scattered| much| more| than| other| colors| like| red| or| orange|.| This| scattered| blue| light| reaches| our| eyes| from| all| directions|,| making| the| sky| appear| blue|.|

|*| **|At| sunrise| and| sunset|:**| The| sky| appears| reddish| or| orange| because| when| the| sun| is| low| on| the| horizon|,| sunlight| has| to| travel| through| *|much|*| more| of| the| atmosphere|.|  |The| blue| light| gets| scattered| away| almost| completely| before| it| can| reach| us|.| This| le

- 如果您在异步环境中工作，可以考虑使用异步 astream API：

In [13]:
chunks = []
async for chunk in llm.astream("what color is the sky?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)
complete_str = ''.join([c.content for c in chunks ])
print(complete_str)

That|'|s| a| fantastic| question|,| and| it|’|s| surprisingly| complex|!| The| sky| isn|’|t| *|always|*| one| single| color|.| Here|’|s| a| breakdown|:|

|*| **|During| the| day| (|usually|):|**| Most| of| the| time|,| the| sky| appears| **|blue|**.| This| is| due| to| a| phenomenon| called| Rayleigh| scattering|.| Sunlight| is| made| up| of| all| colors|,| but| blue| light| has| shorter| wavelengths| and| scat|ters| more| easily| in| the| atmosphere| than| other| colors| like| red| or| orange|.| Think| of| it| like| tiny| air| molecules| bouncing| blue| light| around| in| every| direction|.|

|*| **|Sunrise| &| Sunset|:**| When| the| sun| is| low| on| the| horizon|,| sunlight| has| to| travel| through| *|much|*| more| of| the| atmosphere| to| reach| our| eyes|.|  |This| means| almost| all| of| the| blue| light| gets| scattered| away| before| it| reaches| us|.| The| longer| wavelengths| –| red|,| orange|,| and| yellow| –| are| able| to| pass| through| more| easily|,| creating| those| b

### Chains 链条
- 该链路结合了提示、模型和解析器，并验证流式传输是否正常工作。

In [15]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | llm | parser

async for chunk in chain.astream({"topic": "parrot"}):
    print(chunk, end="|", flush=True)

Why| did| the| parrot| cross| the| playground|?| |

|To| get| to| the| other| slide|!| 😂| |

|---|

|Would| you| like| to| hear| another| joke|?| 😊||

- 虽然我们在上述链子的末尾使用了 parser ，但我们仍然得到了流式输出。
- parser 会在每个流式数据块上独立操作。
- 许多 LCEL 原语也支持这种类型的转换式流式处理，这对于构建应用程序非常方便。

### Working with Input Streams 处理输入流
- 如果你想在生成时流式传输JSON输出怎么办？
    - 如果依赖 json.loads 解析部分 json，解析将会失败，因为部分 json 不是有效的 json
- 实际上有一种方法可以做到这一点: **解析器需要在输入流上操作**，并尝试“自动补全”部分 json 使其处于有效状态。

In [18]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    llm | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`" ):
    print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 6}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 678}]}
{'countries': [{'name': 'France', 'population': 6789}]}
{'countries': [{'name': 'France', 'population': 67890}]}
{'countries': [{'name': 'France', 'population': 678905}]}
{'countries': [{'name': 'France', 'population': 6789053}]}
{'countries': [{'name': 'France', 'population': 67890534}]}
{'countries': [{'name': 'France', 'population': 67890534}, {}]}
{'countries': [{'name': 'France', 'population': 67890534}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67890534}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67890534}, {'name': 'Spain', 'population': 4}]}
{'countries': [{'name': 'France', 'population': 67890534}, {'name': 'Spain', 'population': 47}]}
{'countries': [{'name': 'France

- 现在，让我们拆解流式处理。
- 我们将使用之前的例子，并在末尾附加一个提取函数，从最终的 JSON 中提取国家名称。
- 此时由于增加了提取函数, 导致输出就不是流式的了, 见后,可通过生成起函数(yield) 实现流式

In [19]:
from langchain_core.output_parsers import (
    JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = llm | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, end="|", flush=True)

['France', 'Spain', 'Japan']|

#### ⭐️Generator Functions 生成器函数助理流式输出
- 使用一个可以操作输入流的生成器函数来**修复流式传输**。
- 一个生成器函数（一个使用 yield 的函数）允许编写处理输入流的代码

In [20]:
from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)


chain = llm | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(text, end="|", flush=True)

France|Spain|Japan|

### Non-streaming components 非流式组件
- 并不是所有的组件都需要实现流处理——在某些情况下，流处理可能是不必要的、难以实现的，或者根本没有意义。

In [35]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
from embeddings import LMStudioEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
embedding = LMStudioEmbeddings()
vectorstore = FAISS.from_texts(["harrison worked at kensho", "harrison likes spicy food"], embedding=embedding)

In [36]:
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks

[[Document(id='4e5c7990-2419-48b9-99a7-3a695020e948', metadata={}, page_content='harrison worked at kensho'),
  Document(id='89c2440e-8b25-4a2a-986e-d6ee832cb35b', metadata={}, page_content='harrison likes spicy food')]]

- 使用非流处理组件构建的 LCEL 链，在很多情况下仍然能够流处理，流处理的部分输出将在链中的最后一个非流处理步骤之后开始。

In [37]:
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | llm
    | StrOutputParser()
)

for chunk in retrieval_chain.stream(
    "Where did harrison work? " "Write 3 made up sentences about this place."
):
    print(chunk, end="|", flush=True)

Based| on| the| provided| documents|,| Harrison| worked| at| K|ens|ho|.| Here| are| three| made|-|up| sentences| about| K|ens|ho|:|

|1|.|  |K|ens|ho|’|s| sleek| offices| offered| stunning| views| of| the| city| skyline|,| perfect| for| brainstorming| innovative| data| solutions|.|
|2|.|  |The| team| at| K|ens|ho| was| known| for| its| collaborative| spirit| and| dedication| to| pushing| the| boundaries| of| artificial| intelligence|.|
|3|.|  |K|ens|ho|'|s| state|-|of|-|the|-|art| servers| hum|med| with| complex| algorithms|,| powering| groundbreaking| research| in| natural| language| processing|.||

## ??Using Stream Events 使用事件流
- 此指南演示了 V2 API，并要求 langchain-core >= 0.2。对于与 LangChain 较旧版本兼容的 V1 API，请参阅[此处](https://python.langchain.com/v0.1/docs/expression_language/streaming/#using-stream-events)

In [38]:
import langchain_core

langchain_core.__version__

'0.3.37'

- 尽可能在整个代码中使用 async （例如，异步工具等）
- 在不使用 LCEL 的情况下使用可运行任务时，请确保调用 .astream() 在LLMs而不是 .ainvoke 以强制LLM流式传输令牌。

### Chat Model
- 对于 langchain-core<0.3.37 ，请显式设置 version 关键字（例如， model.astream_events("hello", version="v2") ）

In [41]:
events = []
async for event in llm.astream_events("hello", version="v2"):
    events.append(event)

In [42]:
# 让我们看看一些开始事件和一些结束事件。
events[:3]

[{'event': 'on_chat_model_start',
  'data': {'input': 'hello'},
  'name': 'ChatOpenAI',
  'tags': [],
  'run_id': 'a91051d7-de44-4746-ab5b-116d678b8df3',
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gemma-3-4b-it',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7,
   'ls_max_tokens': 8064},
  'parent_ids': []},
 {'event': 'on_chat_model_stream',
  'run_id': 'a91051d7-de44-4746-ab5b-116d678b8df3',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gemma-3-4b-it',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7,
   'ls_max_tokens': 8064},
  'data': {'chunk': AIMessageChunk(content='Hello', additional_kwargs={}, response_metadata={}, id='run-a91051d7-de44-4746-ab5b-116d678b8df3')},
  'parent_ids': []},
 {'event': 'on_chat_model_stream',
  'run_id': 'a91051d7-de44-4746-ab5b-116d678b8df3',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gemma-3-4b-it',
   'ls_model_typ

In [43]:
events[-2:]

[{'event': 'on_chat_model_stream',
  'run_id': 'a91051d7-de44-4746-ab5b-116d678b8df3',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gemma-3-4b-it',
   'ls_model_type': 'chat',
   'ls_temperature': 0.7,
   'ls_max_tokens': 8064},
  'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gemma-3-4b-it', 'system_fingerprint': 'gemma-3-4b-it'}, id='run-a91051d7-de44-4746-ab5b-116d678b8df3')},
  'parent_ids': []},
 {'event': 'on_chat_model_end',
  'data': {'output': AIMessageChunk(content="Hello there! How's your day going so far? Is there anything I can help you with today, or were you just saying hello? 😊 \n\nDo you want to:\n\n*   Chat about something?\n*   Ask me a question?\n*   Play a game?", additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gemma-3-4b-it', 'system_fingerprint': 'gemma-3-4b-it'}, id='run-a91051d7-de44-4746-ab5b-116d

### Chain
- 重新审视一下解析流式 JSON 的示例链，以探索流式事件 API。

In [45]:
chain = (
    llm | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
    event
    async for event in chain.astream_events(
        "output a list of the countries france, spain and japan and their populations in JSON format. "
        'Use a dict with an outer key of "countries" which contains a list of countries. '
        "Each country should have the key `name` and `population`",
        version="v2"
    )
]

In [46]:
events[:3]

[{'event': 'on_chain_start',
  'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
  'name': 'RunnableSequence',
  'tags': [],
  'run_id': '639575d4-b766-4b70-90fc-f8d2060bfc96',
  'metadata': {},
  'parent_ids': []},
 {'event': 'on_chat_model_start',
  'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}},
  'name': 'ChatOpenAI',
  'tags': ['seq:step:1'],
  'run_id': '1edff54b-39db-4819-81b7-1802f03f50c2',
  'metadata': {'ls_provider': 'openai',
   'ls_model_name': 'gemma-3-4b-it',
   'ls_model_type': 'chat',
   

- 输出模型和解析器的流事件。我们忽略了开始事件、结束事件和链中的事件。

In [48]:
num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2"
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: '```'
Chat model chunk: 'json'
Chat model chunk: '\n'
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n'
Chat model chunk: '  '
Chat model chunk: '"'
Chat model chunk: 'countries'
Chat model chunk: '":'
Chat model chunk: ' ['
Parser chunk: {'countries': []}
Chat model chunk: '\n'
Chat model chunk: '    '
Chat model chunk: '{'
Parser chunk: {'countries': [{}]}
Chat model chunk: '\n'
Chat model chunk: '      '
Chat model chunk: '"'
Chat model chunk: 'name'
Chat model chunk: '":'
Chat model chunk: ' "'
Parser chunk: {'countries': [{'name': ''}]}
Chat model chunk: 'France'
...


- 因为模型和解析器都支持流式处理，所以我们能实时从两个组件中看到流式事件

### Filtering Events  过滤事件
- 此 API 会产生大量的事件，因此能够过滤事件是有用的
- 可以通过组件 name ，组件 tags 或组件 type 进行过滤。

#### 按Name

In [53]:
chain = llm.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    include_names=["my_parser"],
    version="v2"
):
    print(f'==> {max_events+1}: {event}')
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

==> 1: {'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': '534a4201-b9e8-46e8-a8f8-69fd61453984', 'metadata': {}, 'parent_ids': ['8957b3bf-2e6a-473a-b37f-ce7f7eba2c4d']}
==> 2: {'event': 'on_parser_stream', 'run_id': '534a4201-b9e8-46e8-a8f8-69fd61453984', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['8957b3bf-2e6a-473a-b37f-ce7f7eba2c4d']}
==> 3: {'event': 'on_parser_stream', 'run_id': '534a4201-b9e8-46e8-a8f8-69fd61453984', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['8957b3bf-2e6a-473a-b37f-ce7f7eba2c4d']}
==> 4: {'event': 'on_parser_stream', 'run_id': '534a4201-b9e8-46e8-a8f8-69fd

#### 按Type

In [56]:
chain = llm.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    include_types=["chat_model"],
    version="v2"
):
    print(f'==> {max_events+1}: {event}')
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

==> 1: {'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': '847e79fb-db8f-43ea-8bd4-677847cd21d4', 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gemma-3-4b-it', 'ls_model_type': 'chat', 'ls_temperature': 0.7, 'ls_max_tokens': 8064}, 'parent_ids': ['5431be96-7d4c-4682-b872-142f883a191c']}
==> 2: {'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='```', additional_kwargs={}, response_metadata={}, id='run-847e79fb-db8f-43ea-8bd4-677847cd21d4')}, 'run_id': '847e79fb-db8f-43ea-8bd4-677847cd21d4', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gemma-3-4b-it', 'ls_model_type': 'chat', 'ls_temperature': 0.7, 'ls_max_tokens': 8064}, 

#### 按Tags
- 标签会被给定可运行组件的子组件继承。

In [60]:
chain = (llm | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    include_tags=["my_chain"],
    version="v2"
):
    print(f'==> {max_events+1}: {event}')
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

==> 1: {'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '5f427cde-6fd1-4a76-8b31-0f02c175eb78', 'metadata': {}, 'parent_ids': []}
==> 2: {'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatOpenAI', 'tags': ['seq:step:1', 'my_chain'], 'run_id': 'ee90f991-4c98-4441-9957-ed0f79ba5d22', 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gemma-3-4b-it', 'ls_model_type': 'chat'

### Non-streaming components  非流式组件
- 尽管此类组件在使用 astream 时可能会中断最终输出的流式传输，
- 但 astream_events 仍然会从支持流式传输的中间步骤生成流式事件！

In [61]:
# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = (
    llm | JsonOutputParser() | _extract_country_names
)  # This parser only works with OpenAI right now

In [63]:
# 非流式
async for chunk in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(chunk, flush=True)

['France', 'Spain', 'Japan']


In [64]:
# 流式
num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2"
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: '```'
Chat model chunk: 'json'
Chat model chunk: '\n'
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n'
Chat model chunk: '  '
Chat model chunk: '"'
Chat model chunk: 'countries'
Chat model chunk: '":'
Chat model chunk: ' ['
Parser chunk: {'countries': []}
Chat model chunk: '\n'
Chat model chunk: '    '
Chat model chunk: '{'
Parser chunk: {'countries': [{}]}
Chat model chunk: '\n'
Chat model chunk: '      '
Chat model chunk: '"'
Chat model chunk: 'name'
Chat model chunk: '":'
Chat model chunk: ' "'
Parser chunk: {'countries': [{'name': ''}]}
Chat model chunk: 'France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",'
Chat model chunk: '\n'
...


### Propagating Callbacks  传播回调
- 如果您在工具中使用了运行可执行程序，请确保将回调传递给runnable；否则，将不会生成流事件。
- 当使用 RunnableLambdas 或 @chain 装饰器时，回调会在后台自动传递。

In [67]:
from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
    return word[::-1]

# 回调会在后台自动传递
reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
    """Custom tool that doesn't propagate callbacks."""
    return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello", version="v2"):
    print(event)

{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'c6359add-5ea5-45dd-9853-b6560897cd24', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '0b07a90d-7170-436e-82c9-3ddf721b3c53', 'metadata': {}, 'parent_ids': ['c6359add-5ea5-45dd-9853-b6560897cd24']}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '0b07a90d-7170-436e-82c9-3ddf721b3c53', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['c6359add-5ea5-45dd-9853-b6560897cd24']}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'c6359add-5ea5-45dd-9853-b6560897cd24', 'name': 'bad_tool', 'tags': [], 'metadata': {}, 'parent_ids': []}


- 重新实现，现在我们可以从 reverse_word 可运行任务中接收到事件了。

In [68]:
@tool
def correct_tool(word: str, callbacks):
    """A tool that correctly propagates callbacks."""
    return reverse_word.invoke(word, {"callbacks": callbacks}) # 手动回调


async for event in correct_tool.astream_events("hello", version="v2"):
    print(event)

{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': 'bb418a24-1925-49eb-8ea1-38ae482ef7a2', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '9b49525f-50ec-4cce-8b0d-ec5c4af676d8', 'metadata': {}, 'parent_ids': ['bb418a24-1925-49eb-8ea1-38ae482ef7a2']}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '9b49525f-50ec-4cce-8b0d-ec5c4af676d8', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['bb418a24-1925-49eb-8ea1-38ae482ef7a2']}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'bb418a24-1925-49eb-8ea1-38ae482ef7a2', 'name': 'correct_tool', 'tags': [], 'metadata': {}, 'parent_ids': []}


- 在 Runnable Lambda 或 @chains 中调用可运行任务，那么回调将会自动传递给你。

In [69]:
from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v2"):
    print(event)

{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '4a049856-e9df-4967-a34f-2342c676d85e', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': 'f838b6d2-45ce-4efd-8693-82bcf9af1402', 'metadata': {}, 'parent_ids': ['4a049856-e9df-4967-a34f-2342c676d85e']}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': 'f838b6d2-45ce-4efd-8693-82bcf9af1402', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['4a049856-e9df-4967-a34f-2342c676d85e']}
{'event': 'on_chain_stream', 'run_id': '4a049856-e9df-4967-a34f-2342c676d85e', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'chunk': '43214321'}, 'parent_ids': []}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '4a049856-e9df-4967-a34f-2342c676d85e', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'parent_ids': []}


- @chain 装饰器:

In [71]:
from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v2"):
    print(event)

{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': 'a36829b1-1ab8-4f06-be4a-3de6fbb8bfa0', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '8dd948c7-b968-41e5-ae90-3ea4f2888588', 'metadata': {}, 'parent_ids': ['a36829b1-1ab8-4f06-be4a-3de6fbb8bfa0']}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '8dd948c7-b968-41e5-ae90-3ea4f2888588', 'name': 'reverse_word', 'tags': [], 'metadata': {}, 'parent_ids': ['a36829b1-1ab8-4f06-be4a-3de6fbb8bfa0']}
{'event': 'on_chain_stream', 'run_id': 'a36829b1-1ab8-4f06-be4a-3de6fbb8bfa0', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'chunk': '43214321'}, 'parent_ids': []}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': 'a36829b1-1ab8-4f06-be4a-3de6fbb8bfa0', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'parent_ids': []}
