## LCEL - 使用 LangChain 进行流式处理



In [1]:
# 环境变量设置
import os
os.environ["OPENAI_API_KEY"] = "sk-xxx"
os.environ["OPENAI_API_BASE"] = "https://api.chatanywhere.tech/v1"

使用流式处理可以增强用户体验，LCEL中的组成模块LLMs、解析器、提示、检索器和代理都实现了 LangChain Runnable 接口。

在之前的小节中有提到过，今天深入其使用技巧

- sync stream 和 async astream：流式处理的默认实现，从链中流式传输最终输出。

- async astream_events 和 async astream_log：这些方法提供了一种从链中流式传输中间步骤和最终输出的方式。


### 使用Stream

所有Runnable对象都实现了一个名为stream的同步方法和一个名为astream的异步变体。

这些方法旨在以块的形式流式传输最终输出，只要可用就会产生每个块。

只有在程序中的所有步骤都知道如何处理输入流时，即一次处理一个输入块，并生成相应的输出块时，才能进行流式处理。

这种处理的复杂程度可以有所不同，从像发出由 LLM 生成的令牌这样的简单任务，到在整个 JSON 完成之前流式传输 JSON 结果的更具挑战性的任务。

#### LLMs 和 Chat Models

In [2]:
# 使用人类论示例，但您可以使用您喜欢的聊天模型！
from langchain_openai import ChatOpenAI

model = ChatOpenAI()

chunks = []
async for chunk in model.astream("你好。告诉我一些关于你自己的事情"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

|你|好|！|我|是|一个|语|言|模|型|人|工|智|能|助|手|，|专|注|于|提|供|信息|和|回|答|问题|。|我|能|够|帮|助|你|解|决|问题|，|提|供|建|议|和|支|持|。|有|什|么|我|可以|帮|助|你|的|吗|？||

第一个消息块

In [6]:
chunks[0]

AIMessageChunk(content='', id='run-facbeed8-35f7-47f2-89df-3f28a875054e')

消息块是可以添加的 -- 可以简单地将它们相加以获得到目前为止响应的状态！

In [4]:
chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]

AIMessageChunk(content='你好！我', id='run-facbeed8-35f7-47f2-89df-3f28a875054e')

#### Chain

下面是一个使用LCEL的简单例子，通过将不同组件的链接在一起的声明方式，可以自动实现stream和astream等功能

因为这些模块都实现了标准的Runnable接口

In [7]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("告诉我一个关于{topic}的笑话")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "鹦鹉"}):
    print(chunk, end="|", flush=True)

|为|什|么|鹦|鹉|喜|欢|去|电|影|院|？

|因|为|它|们|喜|欢|重|复|别|人|说|的|台|词|！||

如果您想要在生成时从输出中流式传输 JSON，解析器需要操作输入流

In [22]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)  # 由于 Langchain 旧版本中的一个错误，JsonOutputParser 未能从某些模型中流式传输结果
async for text in chain.astream(
    '以 JSON 格式输出法国、西班牙和日本的国家及其人口的列表。使用一个带有“countries”外键的字典，其中包含一个国家列表。每个国家应该有“name”和“population”关键字。'
):
    print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 670}]}
{'countries': [{'name': 'France', 'population': 670760}]}
{'countries': [{'name': 'France', 'population': 67076000}]}
{'countries': [{'name': 'France', 'population': 67076000}, {}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 467330}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 46733038}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 46733038}, {}]}
{'countries': [{'name': 'France', 'population': 67076000}, {'name': 'Spain', 'population': 467

这是JsonOutputParser单独处理接受的输入和输出，接受字符串，输出json

In [25]:
res = json_string = '''
{
    "countries": [
        {"name": "France", "population": 67081000},
        {"name": "Spain", "population": 47329000},
        {"name": "Japan", "population": 126860000}
    ]
}
'''
JsonOutputParser().invoke(res)

{'countries': [{'name': 'France', 'population': 67081000},
  {'name': 'Spain', 'population': 47329000},
  {'name': 'Japan', 'population': 126860000}]}

**在这基础上增加处理函数提取国家信息**

In [20]:
from langchain_core.output_parsers import (
    JsonOutputParser,
)


# 一个操作最终输入而不是输入流的函数
def _extract_country_names(inputs):
    """一个不操作输入流并且会中断流式传输的函数。"""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    '以 JSON 格式输出法国、西班牙和日本的国家及其人口的列表。使用一个带有“countries”外键的字典，其中包含一个国家列表。每个国家应该有“name”和“population”关键字。'
):
    print(text, end="|", flush=True)

['法国', '西班牙', '日本']|