[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aurelio-labs/langchain-course/blob/main/chapters/08-streaming.ipynb)

#### LangChain Essentials Course

# Streaming With Langchain

LangChain is one of the most popular open source libraries for AI Engineers. It's goal is to abstract away the complexity in building AI software, provide easy-to-use building blocks, and make it easier when switching between AI service providers.

In this example, we will introduce LangChain's async streaming, allowing us to receive and view the tokens as they are generated by OpenAI's LLM. The use of streaming is typical in conversational interfaces and can provide a more natural experience for users.

In [None]:
!pip install -qU \
  langchain-core==0.3.33 \
  langchain-openai==0.3.3 \
  langchain-community==0.3.16 \
  langsmith==0.3.4

---

> ⚠️ We will be using OpenAI for this example allowing us to run everything via API. If you would like to use Ollama instead, check out the [Ollama LangChain Course](https://github.com/aurelio-labs/langchain-course/tree/main/notebooks/ollama).

---

---

> ⚠️ If using LangSmith, add your API key below:

In [None]:
import os
from getpass import getpass

os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGCHAIN_API_KEY") or \
    getpass("Enter LangSmith API Key: ")

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_PROJECT"] = "aurelioai-langchain-course-streaming-openai"

---

For the LLM, we'll start by initializing our connection to the OpenAI API. We do need an OpenAI API key, which you can get from the [OpenAI platform](https://platform.openai.com/api-keys).

We will use the `gpt-4o-mini` model with a `temperature` of `0.0`:

In [14]:
import os
from getpass import getpass
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") \
    or getpass("Enter your OpenAI API key: ")

llm = ChatOpenAI(
    model_name="gpt-4o-mini",
    temperature=0.0,
    streaming=True
)

In [2]:
llm_out = llm.invoke("Hello there")
llm_out

AIMessage(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_34a54ae93c'}, id='run-4bce7f92-04dc-44f6-b2ee-3d5012e998c4-0')

➤ async 是 Python 中声明“异步函数/行为”的关键字
你可以用它做两类事：

* 等待某个耗时操作完成（如网络请求） 👉 使用 await

* 处理一连串异步结果（比如流式输出） 👉 使用 async for



 ##### 异步等待
👉你是“等着一个任务做完”，然后再处理结果，这叫 异步等待。
```py
async def get_data():
    response = await fetch_from_api()
    print(response)
```

##### 流式输出
👉 你是“一边生成，一边处理”，这叫 异步迭代（异步生成器）。
```py
async for token in stream_tokens():
    print(token)

```


| 用法    | 关键词         | 场景             | 类比            |
| ----- | ----------- | -------------- | ------------- |
| 等待结果  | `await`     | 异步调用一个函数       | 点外卖：等送到吃      |
| 边来边处理 | `async for` | 异步地遍历一个生成器/数据流 | 直播送礼物：来一个处理一个 |




## Streaming with `astream`

We will start by creating a aysnc stream from our LLM. We do this within an `async for` loop, allowing us to iterate through the chunks of data and use them as soon as the async `astream` method returns the tokens to us. By adding a pipe character `|` we can see the individual tokens that are generated. We set `flush` equal to `True` as this forces immediate output to the console, resulting in smoother streaming.

astream 方法 

* 异步流式方法
* 返回一个异步迭代器
* 每生成一个 token 就立即返回，不等待完整响应

| 名称    | 本质                                         | 是否用于 `async for` |
| ----- | ------------------------------------------ | ---------------- |
| 异步生成器 | 使用 `async def + yield` 写成的函数               | ✅ 是，推荐方式         |
| 异步迭代器 | 任意一个实现 `__aiter__()` 和 `__anext__()` 方法的对象 | ✅ 是，底层机制         |


In [3]:
tokens = []
async for token in llm.astream("What is NLP?"):
    tokens.append(token)
    print(token.content, end="|", flush=True)
    # flush=True 强制立即输出到控制台，不等待缓冲区满或程序结束

|N|LP| stands| for| Natural| Language| Processing|,| which| is| a| sub|field| of| artificial| intelligence| (|AI|)| and| computational| lingu|istics| focused| on| the| interaction| between| computers| and| human| (|natural|)| languages|.| The| goal| of| NLP| is| to| enable| machines| to| understand|,| interpret|,| generate|,| and| respond| to| human| language| in| a| way| that| is| both| meaningful| and| useful|.

|N|LP| encompasses| a| variety| of| tasks| and| applications|,| including|:

|1|.| **|Text| Analysis|**|:| Understanding| and| extracting| information| from| text|,| such| as| sentiment| analysis|,| topic| modeling|,| and| named| entity| recognition|.

|2|.| **|Machine| Translation|**|:| Automatically| translating| text| from| one| language| to| another|,| as| seen| in| tools| like| Google| Translate|.

|3|.| **|Speech| Recognition|**|:| Con|verting| spoken| language| into| text|,| which| is| used| in| applications| like| virtual| assistants| (|e|.g|.,| Siri|,| Alexa|).

|4|.

Since we appended each token to the `tokens` list, we can also see what is inside each and every token.

In [4]:
tokens[0]

AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-e0014ed1-ad9a-4b8a-8df4-df2a3e28b45d')

In [9]:
tokens[1]

AIMessageChunk(content='N', additional_kwargs={}, response_metadata={}, id='run-aea9d2a8-6b81-4d1e-aebc-d2cc59ec5ae3')

We can also merge multiple `AIMessageChunk` objects together with the `+` operator, creating a larger set of tokens / chunk:

In [11]:
tokens[0] + tokens[1] + tokens[2] + tokens[3] + tokens[4]

AIMessageChunk(content='NLP', additional_kwargs={}, response_metadata={}, id='run-aea9d2a8-6b81-4d1e-aebc-d2cc59ec5ae3')

A word of caution, there is nothing preventing you from merging tokens in the incorrect order, so be cautious to not output any token omelettes:

In [12]:
tokens[4] + tokens[3] + tokens[2] + tokens[1] + tokens[0]

AIMessageChunk(content=' for standsLPN', additional_kwargs={}, response_metadata={}, id='run-aea9d2a8-6b81-4d1e-aebc-d2cc59ec5ae3')

## Streaming with Agents

Streaming with agents, particularly the custom agent executor, is a little more complex. Let's begin by constructor a simple agent executor matching what we built in the [Agent Executor](https://github.com/aurelio-labs/langchain-course/blob/main/notebooks/openai/07-custom-agent-executor.ipynb) chapter.

To construct the agent executor we need:

* Tools
* `ChatPromptTemplate`
* Our LLM (already defined with `llm`)
* An agent
* Finally, the agent executor

Let's start defining each.

### Tools

Now we will define a few tools to be used by an async agent executor. Our goal for tool-use in regards to streaming are:

* The tool-use steps will be streamed in one big chunk, ie we do not return the tool use information token-by-token but instead it streams message-by-message.

* The final LLM output _will_ be streamed token-by-token as we saw above.

For these we need to define a few math tools and our final answer tool.

这段话描述了一个混合流式策略:

* 工具操作： 按消息块流式输出(结构化信息)

* 文本生成： 按token流式输出(自然语言)

In [10]:
from langchain_core.tools import tool

@tool
def add(x: float, y: float) -> float:
    """Add 'x' and 'y'."""
    return x + y

@tool
def multiply(x: float, y: float) -> float:
    """Multiply 'x' and 'y'."""
    return x * y

@tool
def exponentiate(x: float, y: float) -> float:
    """Raise 'x' to the power of 'y'."""
    return x ** y

@tool
def subtract(x: float, y: float) -> float:
    """Subtract 'x' from 'y'."""
    return y - x

@tool
def final_answer(answer: str, tools_used: list[str]) -> str:
    """Use this tool to provide a final answer to the user.
    The answer should be in natural language as this will be provided
    to the user directly. The tools_used must include a list of tool
    names that were used within the `scratchpad`. You MUST use this tool
    to conclude the interaction.
    """
    return {"answer": answer, "tools_used": tools_used}

We'll need all of our tools in a list when defining our `agent` and `agent_executor`.

In [11]:
tools = [add, multiply, exponentiate, subtract, final_answer]

### `ChatPromptTemplate`

We will create our `ChatPromptTemplate`, using a system message, chat history, user input, and a scratchpad for intermediate steps.

In [12]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

prompt = ChatPromptTemplate.from_messages([
    ("system", (
        "You're a helpful assistant. When answering a user's question "
        "you should first use one of the tools provided. After using a "
        "tool the tool output will be provided back to you. You MUST "
        "then use the final_answer tool to provide a final answer to the user. "
        "DO NOT use the same tool more than once."
    )),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])

### Agent

As before, we will define our `agent` with LCEL.

In [15]:
from langchain_core.runnables.base import RunnableSerializable

tools = [add, subtract, multiply, exponentiate, final_answer]

# define the agent runnable
agent: RunnableSerializable = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"],
        "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
    }
    | prompt
    | llm.bind_tools(tools, tool_choice="any")
)

### Agent Executor

Finally, we will create the agent executor.

In [16]:
import json
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage


# create tool name to function mapping
name2tool = {tool.name: tool.func for tool in tools}

class CustomAgentExecutor:
    chat_history: list[BaseMessage]

    def __init__(self, max_iterations: int = 3):
        self.chat_history = []
        self.max_iterations = max_iterations
        self.agent: RunnableSerializable = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: x["chat_history"],
                "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
            }
            | prompt
            | llm.bind_tools(tools, tool_choice="any")  # we're forcing tool use again
        )

    def invoke(self, input: str) -> dict:
        # invoke the agent but we do this iteratively in a loop until
        # reaching a final answer
        count = 0
        agent_scratchpad = []
        while count < self.max_iterations:
            # invoke a step for the agent to generate a tool call
            out = self.agent.invoke({
                "input": input,
                "chat_history": self.chat_history,
                "agent_scratchpad": agent_scratchpad
            })
            # if the tool call is the final answer tool, we stop
            if out.tool_calls[0]["name"] == "final_answer":
                break
            agent_scratchpad.append(out)  # add tool call to scratchpad
            # otherwise we execute the tool and add it's output to the agent scratchpad
            tool_out = name2tool[out.tool_calls[0]["name"]](**out.tool_calls[0]["args"])
            # add the tool output to the agent scratchpad
            action_str = f"The {out.tool_calls[0]['name']} tool returned {tool_out}"
            agent_scratchpad.append({
                "role": "tool",
                "content": action_str,
                "tool_call_id": out.tool_calls[0]["id"]
            })
            print(agent_scratchpad)
            # add a print so we can see intermediate steps
            print(f"{count}: {action_str}")
            count += 1
        # add the final output to the chat history
        final_answer = out.tool_calls[0]["args"]
        # this is a dictionary, so we convert it to a string for compatibility with
        # the chat history
        final_answer_str = json.dumps(final_answer)
        self.chat_history.append({"input": input, "output": final_answer_str})
        self.chat_history.extend([
            HumanMessage(content=input),
            AIMessage(content=final_answer_str)
        ])
        # return the final answer in dict form
        return final_answer

agent_executor = CustomAgentExecutor()

Our `agent_executor` is now ready to use, let's quickly test it before adding streaming.

In [17]:
agent_executor.invoke(input="What is 10 + 10")

[AIMessage(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_IFEM8lMS6bqIuQWYOAKGxKLP', 'function': {'arguments': '{"x":10,"y":10}', 'name': 'add'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_34a54ae93c'}, id='run-fbec83a9-e3c3-46e7-afd2-47f0f50511d6-0', tool_calls=[{'name': 'add', 'args': {'x': 10, 'y': 10}, 'id': 'call_IFEM8lMS6bqIuQWYOAKGxKLP', 'type': 'tool_call'}]), {'role': 'tool', 'content': 'The add tool returned 20', 'tool_call_id': 'call_IFEM8lMS6bqIuQWYOAKGxKLP'}]
0: The add tool returned 20


{'answer': '10 + 10 equals 20.', 'tools_used': ['functions.add']}

Let's modify our `agent_executor` to use streaming and parse the streamed output into a format that we can more easily work with.

First, when streaming with our custom agent executor we will need to pass our callback handler to the agent on every new invocation. To make this simpler we can make the `callbacks` field a configurable field and this will allow us to initialize the agent using the `with_config` method, allowing us to pass the callback handler to the agent with every invocation.

In [18]:
from langchain_core.runnables import ConfigurableField

llm = ChatOpenAI(
    model_name="gpt-4o-mini",
    temperature=0.0,
    streaming=True
).configurable_fields(
    # 我会在运行时传入一个“回调处理器”，来处理这些流式生成的 token，也就是说：
    # 📦 你把流式输出和回调处理解耦了 —— 流是默认开了的，但具体怎么处理这个流（
    # 比如实时打印、推送到前端、存入数据库）由后面传入的 callback handler 决定。
    callbacks=ConfigurableField(
        id="callbacks",
        name="callbacks",
        description="A list of callbacks to use for streaming",
    )
)

We reinitialize our `agent`, nothing changes here:

In [19]:
# define the agent runnable
agent: RunnableSerializable = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"],
        "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
    }
    | prompt
    | llm.bind_tools(tools, tool_choice="any")
)

Now, we will define our _custom_ callback handler. This will be a queue callback handler that will allow us to stream the output of the agent through an `asyncio.Queue` object and yield the tokens as they are generated elsewhere.

In [20]:
import asyncio #Python 标准库中用于异步任务调度的模块
from langchain.callbacks.base import AsyncCallbackHandler #是 LangChain 提供的异步回调基类，必须继承它才能接收流式 token


class QueueCallbackHandler(AsyncCallbackHandler):
    """Callback handler that puts tokens into a queue."""

    def __init__(self, queue: asyncio.Queue):
        self.queue = queue
        self.final_answer_seen = False

    async def __aiter__(self): # 生成器函数， 这个方法被async for 自动调用
        while True:
            if self.queue.empty():
                await asyncio.sleep(0.1)
                continue
            token_or_done = await self.queue.get()

            if token_or_done == "<<DONE>>":
                # this means we're done
                return
            if token_or_done:
                yield token_or_done # <- 在这里暂停，等待调用者处理
                # 当调用者处理完毕， 会从这里继续执行
                # 然后进入下一次while循环

    # 每当llm生成一个token， langchain就会自动调用这个方法，这是一个回调函数
    async def on_llm_new_token(self, *args, **kwargs) -> None:
        """Put new token in the queue."""
        #print(f"on_llm_new_token: {args}, {kwargs}")
        chunk = kwargs.get("chunk") # chunk是包含新生成的token和相关信息的对象
        if chunk:
            # check for final_answer tool call
            if tool_calls := chunk.message.additional_kwargs.get("tool_calls"):
                if tool_calls[0]["function"]["name"] == "final_answer":
                    # this will allow the stream to end on the next `on_llm_end` call
                    self.final_answer_seen = True
        self.queue.put_nowait(kwargs.get("chunk")) 
        # put_nowait表示不等待，立即放入队列， 不会阻塞当前协程，适合实时场景，
        # 如果队列满了则立即失败，抛出异常。 而await put() 则是等待直到有空间然后成功
        return

    async def on_llm_end(self, *args, **kwargs) -> None:
        """Put None in the queue to signal completion."""
        #print(f"on_llm_end: {args}, {kwargs}")
        # this should only be used at the end of our agent execution, however LangChain
        # will call this at the end of every tool call, not just the final tool call
        # so we must only send the "done" signal if we have already seen the final_answer
        # tool call
        if self.final_answer_seen:
            self.queue.put_nowait("<<DONE>>")
        else:
            self.queue.put_nowait("<<STEP_END>>")
        return

We can see how this works together in our `agent` invocation:

In [21]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

tokens = []

async def stream(query: str):
    response = agent.with_config(
        callbacks=[streamer] #这里的callbacks对应上述的id = callbacks
    )
    # 这里的async for并没有触发__aiter__（只会在async for token in streamer中触发）
    # llm生成token时，同时触发
    # 1.on_llm_new_token()回调(放入队列)
    # 2.async for 接受token(直接显示)
    async for token in response.astream({
        "input": query,
        "chat_history": [],
        "agent_scratchpad": []
    }):
        tokens.append(token)
        print(token, flush=True)

await stream("What is 10 + 10")

content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_EtZELSAEmKqtAOFXMfwgVIGR', 'function': {'arguments': '', 'name': 'add'}, 'type': 'function'}]} response_metadata={} id='run-5375c74f-192a-4cae-b772-b2440a4af95e' tool_calls=[{'name': 'add', 'args': {}, 'id': 'call_EtZELSAEmKqtAOFXMfwgVIGR', 'type': 'tool_call'}] tool_call_chunks=[{'name': 'add', 'args': '', 'id': 'call_EtZELSAEmKqtAOFXMfwgVIGR', 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]} response_metadata={} id='run-5375c74f-192a-4cae-b772-b2440a4af95e' tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}] tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'x', 'name': None}, 'type': None}]} response_metadata={} id='run-5375c74f-19

流式处理主要拆分所有包含 JSON 参数的字段（arguments 和 args），而其他结构化字段要么保持不变，要么因为无法解析不完整 JSON 而变空。

本质上：任何需要"构建"的内容都会被拆分，任何"静态"的元数据都保持不变。

json_str = '{"x": 10, "y": 20}'

In [29]:
tk = tokens[0]

for token in tokens[1:]:
    tk += token

tk

AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_4iifThkIlMBZUX5J2JSnG1m8', 'function': {'arguments': '{"x":10,"y":10}', 'name': 'add'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c'}, id='run-3abebb2f-5890-40b8-bcb3-2a400a30a751', tool_calls=[{'name': 'add', 'args': {'x': 10, 'y': 10}, 'id': 'call_4iifThkIlMBZUX5J2JSnG1m8', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'add', 'args': '{"x":10,"y":10}', 'id': 'call_4iifThkIlMBZUX5J2JSnG1m8', 'index': 0, 'type': 'tool_call_chunk'}])

Now we're seeing that the output is being streamed token-by-token. Because we're being streamed a tool call the `content` field is empty. Instead, we can see the tokens being added inside the `tool_calls` fields, within `id`, `function.name`, and `function.arguments`.

In [None]:
# chunk = {
#     'message': AIMessageChunk(                    # ← message 字段包含真正的消息
#         content='',
#         additional_kwargs={
#             'tool_calls': [...]
#         },
#         response_metadata={...},
#         id='run-xxx',
#         tool_calls=[...],
#         tool_call_chunks=[...]
#     ),
#     'generation_info': {                          # ← 生成相关信息
#         'finish_reason': 'tool_calls',
#         'model_name': 'gpt-4o-mini-2024-07-18',
#         'system_fingerprint': 'fp_72ed7ab54c'
#     }
# }

In [22]:
from langchain_core.messages import ToolMessage

class CustomAgentExecutor:
    chat_history: list[BaseMessage]

    def __init__(self, max_iterations: int = 3):
        self.chat_history = []
        self.max_iterations = max_iterations
        self.agent: RunnableSerializable = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: x["chat_history"],
                "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
            }
            | prompt
            | llm.bind_tools(tools, tool_choice="any")  # we're forcing tool use again
        )

    # verbose = True, 显示详细信息(调试模式)
    async def invoke(self, input: str, streamer: QueueCallbackHandler, verbose: bool = False) -> dict:
        
        # invoke the agent but we do this iteratively in a loop until
        # reaching a final answer
        count = 0
        agent_scratchpad = []
        while count < self.max_iterations:
            # invoke a step for the agent to generate a tool call
            async def stream(query: str):
                response = self.agent.with_config(
                    callbacks=[streamer]
                )
                # we initialize the output dictionary that we will be populating with
                # our streamed output
                output = None
                # now we begin streaming
                async for token in response.astream({
                    "input": query,
                    "chat_history": self.chat_history,
                    "agent_scratchpad": agent_scratchpad
                }):
                    if output is None:
                        output = token
                    else:
                        # we can just add the tokens together as they are streamed and
                        # we'll have the full response object at the end
                        output += token # 这里是AIMessageChunk对象的合并
                    if token.content != "":
                        # we can capture various parts of the response object
                        if verbose: print(f"content: {token.content}", flush=True)
                    tool_calls = token.additional_kwargs.get("tool_calls")
                    if tool_calls:
                        if verbose: print(f"tool_calls: {tool_calls}", flush=True)
                        tool_name = tool_calls[0]["function"]["name"]
                        if tool_name:
                            if verbose: print(f"tool_name: {tool_name}", flush=True)
                        arg = tool_calls[0]["function"]["arguments"]
                        if arg != "":
                            if verbose: print(f"arg: {arg}", flush=True)
                return AIMessage(
                    content=output.content,
                    tool_calls=output.tool_calls,
                    tool_call_id=output.tool_calls[0]["id"]
                )

            tool_call = await stream(query=input)
            # add initial tool call to scratchpad
            agent_scratchpad.append(tool_call)
            # otherwise we execute the tool and add it's output to the agent scratchpad
            tool_name = tool_call.tool_calls[0]["name"]
            tool_args = tool_call.tool_calls[0]["args"]
            tool_call_id = tool_call.tool_call_id
            tool_out = name2tool[tool_name](**tool_args)
            # add the tool output to the agent scratchpad
            tool_exec = ToolMessage(
                content=f"{tool_out}",
                tool_call_id=tool_call_id
            )
            agent_scratchpad.append(tool_exec)
            count += 1
            # if the tool call is the final answer tool, we stop
            if tool_name == "final_answer":
                break
        # add the final output to the chat history, we only add the "answer" field
        final_answer = tool_out["answer"]
        self.chat_history.extend([
            HumanMessage(content=input),
            AIMessage(content=final_answer)
        ])
        # return the final answer in dict form
        return tool_args

agent_executor = CustomAgentExecutor()

We've added a few `print` statements to help us see what is being output, we activate those by setting `verbose=True`. Let's see what is returned:

In [23]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

out = await agent_executor.invoke("What is 10 + 10", streamer, verbose=True)

tool_calls: [{'index': 0, 'id': 'call_EdkHIvqtJ1swTXaU2cHWCDoC', 'function': {'arguments': '', 'name': 'add'}, 'type': 'function'}]
tool_name: add
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]
arg: {"
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': 'x', 'name': None}, 'type': None}]
arg: x
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '":', 'name': None}, 'type': None}]
arg: ":
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]
arg: 10
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': ',"', 'name': None}, 'type': None}]
arg: ,"
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': 'y', 'name': None}, 'type': None}]
arg: y
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '":', 'name': None}, 'type': None}]
arg: ":
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]
a

We can see what is being output through the `verbose=True` flag. However, if we do _not_ `print` the output, we will see nothing:

In [24]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

out = await agent_executor.invoke("What is 10 + 10", streamer)
out

{'answer': '10 + 10 equals 20.', 'tools_used': ['functions.add']}

In [27]:
agent_executor.chat_history

[HumanMessage(content='What is 10 + 10', additional_kwargs={}, response_metadata={}),
 AIMessage(content='10 + 10 equals 20.', additional_kwargs={}, response_metadata={}),
 HumanMessage(content='What is 10 + 10', additional_kwargs={}, response_metadata={}),
 AIMessage(content='10 + 10 equals 20.', additional_kwargs={}, response_metadata={}),
 HumanMessage(content='What is 10 + 10', additional_kwargs={}, response_metadata={}),
 AIMessage(content='10 + 10 equals 20.', additional_kwargs={}, response_metadata={}),
 HumanMessage(content='What is 10 + 10', additional_kwargs={}, response_metadata={}),
 AIMessage(content='10 + 10 equals 20.', additional_kwargs={}, response_metadata={})]

Although we see nothing, it does not mean that nothing is being returned to us - we're just not using our callback handler and `asyncio.Queue`. To use these we create an `asyncio` task, iterate over the `__aiter__` method of our `streamer` object, and await the task, like so:

时间线：
t1: task = create_task(invoke(...))  ← 开始 invoke，立即返回
t2: async for token in streamer:     ← 开始处理队列
t3:     invoke 生成 token1          ← 并行：invoke 工作
t4:     print(token1)               ← 并行：显示 token1
t5:     invoke 生成 token2          ← 并行：invoke 工作
t6:     print(token2)               ← 并行：显示 token2
...
tn: await task                      ← 等待 invoke 完全结束

create_task() 的作用就是：

👉 启动一个异步任务，在后台执行，不阻塞当前流程。

主线程就可以一边等它产出 token（通过 queue），一边实时处理这些 token。


In [25]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

# create_task将一个异步函数包装成异步任务，让它可以并行执行
# 这一步生产数据,不阻塞线程，任务和队列处理同时进行
task = asyncio.create_task(agent_executor.invoke("What is 10 + 10", streamer)) #立即开始执行但不用等待完成

# 这一步消费数据，触发_aiter_
# 这一步类比 async for token in streamer.__aiter__()
async for token in streamer: #立即开始处理队列 
    print(token, flush=True)

# 最后等待任务完成
await task


## 这里task和async for token in streamer是并行的，

message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_7popEQpetW0ZqEdFYU8Qwtoy', 'function': {'arguments': '', 'name': 'add'}, 'type': 'function'}]}, response_metadata={}, id='run-4483f4c2-ad4f-4ccb-b53d-a02d461df11d', tool_calls=[{'name': 'add', 'args': {}, 'id': 'call_7popEQpetW0ZqEdFYU8Qwtoy', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'add', 'args': '', 'id': 'call_7popEQpetW0ZqEdFYU8Qwtoy', 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]}, response_metadata={}, id='run-4483f4c2-ad4f-4ccb-b53d-a02d461df11d', tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}], tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'argume

{'answer': '10 + 10 equals 20.', 'tools_used': ['functions.add']}

Although this seems like a lot of work, we're now streaming tokens in a way that allows us to pass these tokens on to other parts of our code - such as through a websocket, streamed API response, or some downstream processing.

Let's try this out, we'll put together some simple post-processing to allow us to more nicely format the streamed output from out agent.

In [31]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

task = asyncio.create_task(agent_executor.invoke("What is 10 + 10", streamer))

async for token in streamer:
    # first identify if we have a <<STEP_END>> token
    if token == "<<STEP_END>>":
        print("\n", flush=True)
    # we'll first identify if the token is a tool call
    elif tool_calls := token.message.additional_kwargs.get("tool_calls"):
        # if we have a tool call with a tool name, we'll print it
        if tool_name := tool_calls[0]["function"]["name"]:
            print(f"Calling {tool_name}...", flush=True)
        # if we have a tool call with arguments, we ad them to our args string
        if tool_args := tool_calls[0]["function"]["arguments"]:
            print(f"{tool_args}", end="｜", flush=True)

_ = await task #_表示我并不关心返回值

Calling add...
{"｜x｜":｜10｜,"｜y｜":｜10｜}｜

Calling final_answer...
{"｜answer｜":"｜10｜ +｜ ｜10｜ equals｜ ｜20｜.","｜tools｜_used｜":["｜functions｜.add｜"]｜}｜

With that we've produced a nice streaming output within our notebook - which ofcourse can be applied with very similar logic elsewhere, such as within a more polished web app.

## yield 的工作原理

yield 确实会暂停函数执行，但不是暂停整个循环，而是：

1. 返回一个值给调用者

2. 暂停在 yield 这一行

3. 等待调用者请求下一个值

4. 从 yield 的位置继续执行


In [4]:
import asyncio
async def simple_generator():  #这是异步生成器函数
    print("开始生成器")
    
    yield "第一个数据"  # ← 暂停在这里，返回数据
    print("第一个数据被处理完了，继续执行")
    
    yield "第二个数据"  # ← 暂停在这里，返回数据  
    print("第二个数据被处理完了，继续执行")
    
    print("生成器结束")

# 使用方式
async def main():
    async for item in simple_generator(): #simple_generator()是生成器对象
        print(f"收到: {item}")
        print("处理这个数据中...")
        await asyncio.sleep(3)  # 模拟处理时间
        print("处理完毕")

#过程：
# 1.执行到 yield，暂停，把值给 item
# 2.外面 async for 拿到这个值，执行处理逻辑
# 3.下一次自动 resume（恢复）到上一次暂停的地方

# 输出顺序：
# 开始生成器
# 收到: 第一个数据
# 处理这个数据中...
# 处理完毕
# 第一个数据被处理完了，继续执行
# 收到: 第二个数据  
# 处理这个数据中...
# 处理完毕
# 第二个数据被处理完了，继续执行
# 生成器结束
await main()

开始生成器
收到: 第一个数据
处理这个数据中...
处理完毕
第一个数据被处理完了，继续执行
收到: 第二个数据
处理这个数据中...
处理完毕
第二个数据被处理完了，继续执行
生成器结束


In [5]:
def fn(*args, **kwargs):
    print(args)    # 接收位置参数（变为元组）
    print(kwargs)  # 接收关键字参数（变为字典）

fn(1, 2, 3, name="Alice", age=30)

(1, 2, 3)
{'name': 'Alice', 'age': 30}


以下展示了create_task不await 和 await的区别。

In [29]:
import asyncio
import time

async def slow_task(name):
    print(f"任务 {name} 开始: {time.strftime('%H:%M:%S')}")
    await asyncio.sleep(2)
    print(f"任务 {name} 完成: {time.strftime('%H:%M:%S')}")
    return f"任务 {name} 的结果"

async def demo_no_await():
    print("=== 演示1: create_task 后不跟 await ===")
    
    # 创建任务但不等待
    task = asyncio.create_task(slow_task("A"))
    
    print("任务已创建，程序继续...")
    
    # 做一些其他工作
    for i in range(3):
        print(f"主程序工作 {i+1}: {time.strftime('%H:%M:%S')}")
        await asyncio.sleep(0.5)
    
    print("主程序结束，但任务可能还没完成!")
    # 注意：这里没有 await task

async def demo_with_await():
    print("\n=== 演示2: create_task 后跟 await ===")
    
    # 创建任务并等待
    task = asyncio.create_task(slow_task("B"))
    
    print("任务已创建，程序继续...")
    
    # 做一些其他工作
    for i in range(3):
        print(f"主程序工作 {i+1}: {time.strftime('%H:%M:%S')}")
        await asyncio.sleep(0.5)
    
    print("等待任务完成...")
    result = await task
    print(f"任务完成，结果: {result}")

# 运行演示
await (demo_no_await())
await(demo_with_await())

=== 演示1: create_task 后不跟 await ===
任务已创建，程序继续...
主程序工作 1: 23:52:58
任务 A 开始: 23:52:58
主程序工作 2: 23:52:58
主程序工作 3: 23:52:59
主程序结束，但任务可能还没完成!

=== 演示2: create_task 后跟 await ===
任务已创建，程序继续...
主程序工作 1: 23:52:59
任务 B 开始: 23:52:59
任务 A 完成: 23:53:00
主程序工作 2: 23:53:00
主程序工作 3: 23:53:00
等待任务完成...
任务 B 完成: 23:53:01
任务完成，结果: 任务 B 的结果
