# 如何流式传输可运行文件 How to stream runnables

流式传输对于使基于 LLM 的应用程序对最终用户具有响应能力至关重要。

重要的 LangChain 原语（如聊天模型、输出解析器、提示、检索器和代理）实现了 LangChain Runnable 接口。

此接口提供了两种流式传输内容的通用方法：

 - 同步流和异步流：流式传输的默认实现，可从链中流式传输最终输出。
 - 异步 astream_events 和异步 astream_log：它们提供了一种从链中流式传输中间步骤和最终输出的方法。
   
让我们看看这两种方法，并尝试了解如何使用它们。

## 使用 Stream
所有 Runnable 对象都实现了同步方法 stream 和异步变体 astream。

这些方法旨在将最终输出以块的形式流式传输，并在每个块可用时立即生成。

只有当程序中的所有步骤都知道如何处理输入流时，才能进行流式传输；即一次处理一个输入块，并生成相应的输出块。

此处理的复杂性各不相同，从简单的任务（如发出 LLM 生成的令牌）到更具挑战性的任务（如在整个 JSON 完成之前流式传输 JSON 结果的部分）。

## LLM 和聊天模型
大型语言模型及其聊天变体是基于 LLM 的应用程序的主要瓶颈。

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这比应用程序对最终用户的响应速度约 200-300 毫秒的阈值要慢得多。

使应用程序感觉响应更快的关键策略是显示中间进度；即逐个标记地流式传输模型的输出。

我们将展示使用聊天模型进行流式传输的示例。从以下选项中选择一个：

In [3]:
import os
from dotenv import load_dotenv,find_dotenv

_ = load_dotenv(find_dotenv())

In [8]:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    base_url="http://api.baichuan-ai.com/v1",
    api_key=os.environ["BAICHUAN_API_KEY"],
    model="Baichuan4",
)

In [10]:
chunks = []
for chunk in model.stream("天空是什么颜色?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

天空|的颜色取决于多种因素|，包括天气、|时间和大气中的粒子|。在晴朗的日子|，天空呈现蓝色|，这是因为大气散|射阳光中的短|波长光（|蓝光）比长|波长光（|红光）更多。|这种现象被称为瑞利|散射。
|
然而，天空|的颜色可以变化：|

1.| 黎明和黄昏|时，天空可能|呈现红色或橙色|。这是由于太阳|在地平线附近|时，光线通过|大气层的路径更长|，导致更多的蓝光|被散射掉|，而红光和|橙光则较为|突出。

|2. 在阴天|或有雾的时候，|天空可能显得灰|蒙蒙的，因为|云层或雾气|中的水滴和小颗粒|散射了所有|波长的光，|导致光线均匀地|散射，呈现出|灰色。

|3. 在日落|之后或日出之前|，天空可能呈现|深蓝色甚至接近|黑色，特别是在没有|月亮和较少城市|灯光的情况下。
|
4. 在|极高的大气层|中，如平|流层或中间|层，天空可能会|呈现出不同的颜色，|例如在极光的|条件下，天空可能会|显现出绿色、|红色、紫色等|色彩。

|总的来说，天空的颜色|是一个复杂的现象，|受到多种自然条件|的影响。|

或者，如果您在异步环境中工作，您可以考虑使用异步 astream API：

In [11]:
chunks = []
async for chunk in model.astream("天空是什么颜色?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

天空|的颜色取决于多种因素|，包括时间、|天气和观察位置|。在晴朗的日子|，天空呈现出蔚|蓝色，这是因为大气|散射阳光中|较短波长的蓝光|。然而，在|日落或日出时|，天空可能呈现出|橙色、粉红色或|红色，这是由于|太阳光线穿过|大气层的路径更长|，导致较长波|长的光被散|射。此外，|阴天或多云时|，天空可能呈现出|灰白色。总之|，天空的颜色可以|变化很大，但|通常以蓝色为主|。|

In [12]:
chunks[0]

AIMessageChunk(content='天空', id='run-780aa557-b206-4564-8c52-e5a8577b6066')

## 链 Chains
几乎所有 LLM 应用程序都涉及比调用语言模型更多的步骤。

让我们使用结合了提示、模型和解析器的 LangChain 表达语言 (LCEL) 构建一个简单的链，并验证流式传输是否有效。

我们将使用 StrOutputParser 来解析模型的输出。这是一个简单的解析器，它从 AIMessageChunk 中提取内容字段，为我们提供模型返回的令牌。

In [15]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "鹦鹉"}):
    print(chunk, end="|", flush=True)

好的|，这里有一个关于|鹦鹉的笑话：|

为什么鹦鹉|不会飞过大海|？

因为它|怕水！（|注：这是一个文字|游戏，“飞|过大海”和|“喝水”在|中文里发音相近|。）|

## 使用输入流
如果您想在输出生成时流式传输 JSON，该怎么办？

如果您依赖 json.loads 来解析部分 JSON，则解析将失败，因为部分 JSON 不是有效的 JSON。

您可能会完全不知道该怎么做，并声称无法流式传输 JSON。

好吧，事实证明有一种方法可以做到这一点——解析器需要对输入流进行操作，并尝试将部分 JSON“自动完成”为有效状态。

让我们看看这样的解析器的实际作用，以了解这意味着什么。

In [25]:
from langchain_core.output_parsers import JsonOutputParser

chain =  model | JsonOutputParser() 
async for text in chain.astream(
    "以 JSON 格式输出法国、西班牙和日本的国家及其人口列表。"
    '使用一个带有“countries”外部键的字典，其中包含国家列表。'
    "每个国家都应该有键`name`和`population`"
):
    print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 6706}]}
{'countries': [{'name': 'France', 'population': 67065000}]}
{'countries': [{'name': 'France', 'population': 67065000}, {}]}
{'countries': [{'name': 'France', 'population': 67065000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67065000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67065000}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 67065000}, {'name': 'Spain', 'population': 4673303}]}
{'countries': [{'name': 'France', 'population': 67065000}, {'name': 'Spain', 'population': 46733038}]}
{'countries': [{'name': 'France', 'population': 67065000}, {'name': 'Spain', 'population': 46733038}, {}]}
{'countries': [{'name': 'France', 'population': 67065000}, {'name': 'Spain', 'population': 46733038}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67065000},

In [22]:
from langchain_core.output_parsers import (
    JsonOutputParser,
)

# 提取国家名称
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    "以 JSON 格式输出法国、西班牙和日本的国家及其人口列表。"
    '使用一个带有“countries”外部键的字典，其中包含国家列表。'
    "每个国家都应该有键`name`和`population`"
):
    print(text, end="|", flush=True)

['France', 'Spain', 'Japan']|

***生成器函数***  
让我们使用可以对输入流进行操作的生成器函数来修复流式传输。

生成器函数（使用yield的函数）允许编写对输入流进行操作的代码

In [23]:
from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    "以 JSON 格式输出法国、西班牙和日本的国家及其人口列表。"
    '使用一个带有“countries”外部键的字典，其中包含国家列表。'
    "每个国家都应该有键`name`和`population`"
):
    print(text, end="|", flush=True)

France|Spain|Japan|

## 非流式传输组件
某些内置组件（如 Retrievers）不提供任何流式传输。如果我们尝试对它们进行流式传输，会发生什么情况？

In [30]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

from langchain_community.embeddings import BaichuanTextEmbeddings

embeddings = BaichuanTextEmbeddings(baichuan_api_key=os.environ["BAICHUAN_API_KEY"])

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["张三曾在食为天工作过","张三喜欢吃辣的食物"],
    embedding=embeddings,
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("张三喜欢吃什么?")]
chunks

[[Document(page_content='张三喜欢吃辣的食物'), Document(page_content='张三曾在食为天工作过')]]

In [32]:
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)

In [33]:
for chunk in retrieval_chain.stream(
    "张三在哪工作? " "写出 3 个关于这个地方的虚构句子。"
):
    print(chunk, end="|", flush=True)

张三|在食为天工作|。以下是三个关于|这个地方的虚构句子|：

1|. 食为|天的招牌菜是|麻辣香锅，|张三每次都会向|客人推荐这道菜。|
2. 张|三在食为天|的工作让他有机会尝试|各种辣味菜肴|，他的味蕾因此|变得更加敏锐。
|3. 在食|为天工作期间，张三|曾帮助餐厅研发|了一款新的辣椒酱|，这款辣椒酱|后来成为了店里的热销|产品。|