# Обработка потоковой передачи токенов

:::note

С этим руководством будет проще работать, если ознакомиться с разделами:

- [Chat models](/docs/concepts/#chat-models)
- [LangChain Expression Language](/docs/concepts/#langchain-expression-language)
- [Output parsers](/docs/concepts/#output-parsers)

:::

Потоковая передача токенов нужна для создания отзывчивых интерфейсов приложений, работающих с LLM.

В основе основных примитивов библиотеки GigaChain, таких как [чат-модели](/docs/concepts/#chat-models), [парсер выходных данных](/docs/concepts/#output-parsers), [промпты](/docs/concepts/#prompt-templates), [ретриверы](/docs/concepts/#retrievers) и [агенты](/docs/concepts/#agents) лежит [Runnable-интерфейс](/docs/expression_language/interface).

Этот интерфейс поддерживает два основных подхода к работе с потоковой передачей токенов:

* Обработка с помощью синхронного и асинхронного методов `stream` и `astream`. Методы позволяют обрабатывать итоговый результат работы цепочки и используются в качестве основного метода работы с потоковой передачей.
* Обработка с помощью асинхронных методов `astream_events` и `astream_log`. С помощью этих методов можно обрабатывать промежуточные результаты работы цепочки наряду с итоговыми.

## Работа с методами stream и astream

Методы обработки потоковой передачи `stream` (синхронный) и `astream` (асинхронный) доступны для всех Runnable-объектов.
Методы возвращают фрагменты итогового результата работы цепочки, как только они становятся доступны.

Обработка потоковой передачи токенов возможна только если компоненты программы могут обрабатывать поток входных данных.
То есть, обрабатывать входные фрагменты один за другим и возвращать результат обработки каждого из фрагментов.

Сложность обработки потока данных зависит от задач, которые вам нужно решить.
Это может быть как обычная обработка токенов, которые генерирует модель, так и более сложные задачи по обработке частей JSON-файла до получения итогового файла.

<!--
### LLMs and Chat Models

Large language models and their chat variants are the primary bottleneck in LLM based apps.

Large language models can take **several seconds** to generate a complete response to a query. This is far slower than the **~200-300 ms** threshold at which an application feels responsive to an end user.

The key strategy to make the application feel more responsive is to show intermediate progress; viz., to stream the output from the model **token by token**.
-->

Для демонстрации работы с потоковой передачей токенов истользуется модель GigaChat, интеграция с которой входит в основную билиотеку GigaChain.

In [None]:
pip install -qU gigachain

Пример работы синхронного API `stream`.

In [3]:
from langchain_gigachat.chat_models.gigachat import GigaChat

model = GigaChat(
    credentials="авторизационные_данные",
    verify_ssl_certs=False,
    scope="GIGACHAT_API_PERS",
)

chunks = []
async for chunk in model.astream("Привет. Расскажи побольше о себе"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

Привет! Я - GigaChat, виртуальный помощник, созданный компанией Сбер. Я могу отвечать на вопросы, давать информацию и помогать в решении задач. Я обучен на основе больших объемов данных и искусственного интеллекта, чтобы предоставлять точные и полезные ответы. Если у вас есть какие-либо| вопросы| или| нужна| помощь|,| я| буду| ра|д| помочь| вам|.||

:::note

Первый фрагмент ответа GigaChat в потоком режиме всегда самый большой.

:::

Если вы работаете в асинхронной среде, попробуйсте использовать асинхронный API `astream`:

In [None]:
chunks = []
async for chunk in model.astream("Привет. Расскажи побольше о себе"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

Посмотрим на один из фрагментов.

In [4]:
chunks[1]

AIMessageChunk(content=' вопросы', id='run-0f017856-0fdb-4a42-b766-b87cab567eed')

Фрагмент содержит `AIMessageChunk` — часть сообщения `AIMessage`.

Фрагменты созданы таким образом, что их можно добавлять друг к другу, чтобы восстановить исходное сообщение.

In [5]:
chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]

AIMessageChunk(content='Привет! Я - GigaChat, виртуальный помощник, созданный компанией Сбер. Я могу отвечать на вопросы, давать информацию и помогать в решении задач. Я обучен на основе больших объемов данных и искусственного интеллекта, чтобы предоставлять точные и полезные ответы. Если у вас есть какие-либо вопросы или нужна помощь', id='run-0f017856-0fdb-4a42-b766-b87cab567eed')

### Работа с цепочками

Как правило, запрос к модели это лишь одна из частей приложения.

Ниже работа потоковой передачи показана на примере цепочки созданной с помощью LangChain Expression Language (LCEL).
Цепочка состоит из промпта, модели и парсера.

Для примера использован парсер `StrOutputParser`, который извлекает поле `content` из сообщения `AIMessageChunk` и возвращает токен, полученный от модели.

:::note

LCEL — декларативный язык для соединения примитивов GigaChain в цепочки.
Методы `stream` и `astream` по умолчанию доступны в LCEL-цепочках, что позволяет обрабатывать итоговый результат генерации модели.

Кроме этих методов цепочки реализуют весь стандартный Runnable-интерфейс.

:::

In [6]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("расскажи шутку о {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "попугай"}):
    print(chunk, end="|", flush=True)

Here|'s| a| joke| about| a| par|rot|:|

A man| goes| to| a| pet| shop| to| buy| a| par|rot|.| The| shop| owner| shows| him| two| stunning| pa|rr|ots| with| beautiful| pl|um|age|.|

"|There|'s| a| talking| par|rot| an|d a| non|-|talking| par|rot|,"| the| owner| says|.| "|The| talking| par|rot| costs| $|100|,| an|d the| non|-|talking| par|rot| is| $|20|."|

The| man| says|,| "|I|'ll| take| the| non|-|talking| par|rot| at| $|20|."|

He| pays| an|d leaves| with| the| par|rot|.| As| he|'s| walking| down| the| street|,| the| par|rot| looks| up| at| him| an|d says|,| "|You| know|,| you| really| are| a| stupi|d man|!"|

The| man| is| stun|ne|d an|d looks| at| the| par|rot| in| dis|bel|ief|.| The| par|rot| continues|,| "|Yes|,| you| got| r|ippe|d off| big| time|!| I| can| talk| just| as| well| as| that| other| par|rot|,| an|d you| only| pai|d $|20| |for| me|!"|

Парсер `parser` обрабатывает фрагменты один за другим по мере их поступления от модели.
Такое повередие при работе с потоковой передачей данных характерно для многих примитивов LCEL, что заметно упрощает разработку приложений.

Вы можете создать собственные функции, которые будут возвращать генераторы, [способные работать с потоками](/docs/how_to/functions#streaming).

Тем не менее, некоторые Runnable, например [шаблоны промптов](/docs/how_to#prompt-templates) или [чат-модели](/docs/how_to#chat-models), не умеют обрабатывать отдельные фрагменты и, вместо этого, объединяют все предыдущие этапы работы цепочки.
Такое поведение ведет к прерыванию обработки потока данных.

:::note

Язык LCEL позволяет разделять создание цепочки и режим ее работы (синхронный или асинхронный, пакетный или потоковый и так далее).
Если в этом нет необходимости, вы также можете использовать при программировании стандартный подход: вызывать `invoke`, `batch` или `stream` для каждого отдельного компонента, сохранять результаты в переменных и использовать их в дальнейшем.

:::

### Обработка потока входных данных

Предположим, вам нужно транслировать JSON по ходу генерации.

Если использовать для разбора частичного JSON-объекта метод `json.loads`, будет возникать ошибка, потому что частичный JSON не является валидным.

Для решения этой задачи парсер должен работать с входным потоком и пытаться «автоматически завершать» частичный JSON до пригнодного к использованию состояния.

In [7]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)  # Баг в ранних версиях GigaChain приводил к тому, что JsonOutputParser не транслировал токены, полученные от некоторых моделей
async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 67413}]}
{'countries': [{'name': 'France', 'population': 67413000}]}
{'countries': [{'name': 'France', 'population': 67413000}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 4735156

Попробуем сломать потоковую передачу.
Для этого добавим в конце предыдущего примера функцию, которая извлекает название страны из итогового JSON.

:::warning

Любой этап цепочки, который работает с объединенными входными данными, а не с потоком данных, может сломать потоковую передачу с помощью методов `stream` или `astream`.

:::

<!--
:::{.callout-tip}
Later, we will discuss the `astream_events` API which streams results from intermediate steps. This API will stream results from intermediate steps even if the chain contains steps that only operate on **finalized inputs**.
:::
-->

In [8]:
from langchain_core.output_parsers import (
    JsonOutputParser,
)


# Функция обрабатывает объединенные входные данные, а не поток input_stream
def _extract_country_names(inputs):
    """Функция, которая не работает с потоком данных и ломает потоковую передачу."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, end="|", flush=True)

['France', 'Spain', 'Japan']|

#### Функции-генераторы

Исправить сделанные выше изменения можно с помощью функции-генератора, которая работает с потоком входных данных.

:::tip

Функция-генератор возвращает `yield` и позволяет создавать код, который работает с потоками входных данных.

:::

In [9]:
from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
    """Функция, которая работает с потоком входных данных."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(text, end="|", flush=True)

France|Spain|Japan|

:::note

Цепочка из примера выше может возвращать части названия страны.
Это связанно с работой автодополнения данных JSON.

Для этого примера это непринципиально, так как его цель продемонстрировать подходы в работе с потоковой передачей данных.

:::

### Компоненты без потоковой передачи

Некоторые компоненты GigaChain, например, ретриверы, не поддерживают работу с потоком токенов.

Ниже показано, что случится если вызвать метод `stream` в таких компонентах.

In [10]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho", "harrison likes spicy food"],
    embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks

[[Document(page_content='harrison worked at kensho'),
  Document(page_content='harrison likes spicy food')]]

Метод просто возвращает вывод компонента целиком.
Это нормальное поведение для компонентов, которые не поддерживают работу с потоком данных.

:::note

В большинстве случаев LCEL-цепочка, некоторые компоненты которой не поддерживают потоковую передачу, все равно будет работать с потоком данных.
В таких случаях потоковая передача возобновляется после последнего компонента, не поддерживающего потоковый вывод данных.

:::

In [11]:
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)

In [12]:
for chunk in retrieval_chain.stream(
    "Where did harrison work? " "Write 3 made up sentences about this place."
):
    print(chunk, end="|", flush=True)

Base|d on| the| given| context|,| Harrison| worke|d at| K|ens|ho|.|

Here| are| |3| |made| up| sentences| about| this| place|:|

1|.| K|ens|ho| was| a| cutting|-|edge| technology| company| known| for| its| innovative| solutions| in| artificial| intelligence| an|d data| analytics|.|

2|.| The| modern| office| space| at| K|ens|ho| feature|d open| floor| plans|,| collaborative| work|sp|aces|,| an|d a| vib|rant| atmosphere| that| fos|tere|d creativity| an|d team|work|.|

3|.| With| its| prime| location| in| the| heart| of| the| city|,| K|ens|ho| attracte|d top| talent| from| aroun|d the| worl|d,| creating| a| diverse| an|d dynamic| work| environment|.|

## Потоковая передача событий

Работа с потоком событий была добавлена в версии библиотеки langchain-core **0.1.14**.

Сейчас доступна бета-версия API, которая может меняться.

In [None]:
import langchain_core

langchain_core.__version__

Для корректной работы API `astream_events`:

* Используйте `async`, где это возможно, например, при вызове инструментов.
* Добавляйте обратные вызовы при определении пользовательских функций или Runnable.
* Если вы испрользуете Runnable без LCEL, вызывайте метод `.astream()` при работе с моделью, вместо `.ainvoke`. Это позволит запустить потоковую генерацию принудительно.

### Описание событий

В таблице ниже представлены некоторые события, которые могут быть сгенерированы различными объектами Runnable.

:::note

При потоковой передаче потока входных данных в Runnable-объект, они не будут доступны до тех пор, пока поток не будет потреблен полностью.
Это означает, что входные данные будут доступны на соответствующем событии `end`, а не на событии `start`.

:::

| событие                  | имя             | кусок                           | вход                                         | выход                                          |
|--------------------------|-----------------|---------------------------------|----------------------------------------------|------------------------------------------------|
| on_chat_model_start      | [имя модели]    |                                 | {"сообщения": [[SystemMessage, HumanMessage]]} |                                                |
| on_chat_model_stream     | [имя модели]    | AIMessageChunk(content="hello") |                                              |                                                |
| on_chat_model_end        | [имя модели]    |                                 | {"сообщения": [[SystemMessage, HumanMessage]]} | {"генерации": [...], "выход llm": None, ...}   |
| on_llm_start             | [имя модели]    |                                 | {'вход': 'привет'}                           |                                                |
| on_llm_stream            | [имя модели]    | 'Привет'                        |                                              |                                                |
| on_llm_end               | [имя модели]    |                                 | 'Привет, человек!'                           |
| on_chain_start           | format_docs     |                                 |                                              |                                                |
| on_chain_stream          | format_docs     | "привет мир!, прощай мир!"      |                                              |                                                |
| on_chain_end             | format_docs     |                                 | [Документ(...)]                              | "привет мир!, прощай мир!"                      |
| on_tool_start            | some_tool       |                                 | {"x": 1, "y": "2"}                           |                                                |
| on_tool_stream           | some_tool       | {"x": 1, "y": "2"}              |                                              |                                                |
| on_tool_end              | some_tool       |                                 |                                              | {"x": 1, "y": "2"}                             |
| on_retriever_start       | [имя ретривера] |                                 | {"запрос": "привет"}                         |                                                |
| on_retriever_chunk       | [имя ретривера] | {документы: [...]}              |                                              |                                                |
| on_retriever_end         | [имя ретривера] |                                 | {"запрос": "привет"}                         | {документы: [...]}                             |
| on_prompt_start          | [имя шаблона]   |                                 | {"вопрос": "привет"}                         |                                                |
| on_prompt_end            | [имя шаблона]   |                                 | {"вопрос": "привет"}                         | ChatPromptValue(сообщения: [SystemMessage, ...])|

### Чат-модель

Посмотрим какие события возвращает чат-модель.

In [14]:
events = []
async for event in model.astream_events("hello", version="v2"):
    events.append(event)

  warn_beta(


::: note

Параметр версии используется потому, что API находится в бета-версии.
Он позволит минимизировать проблемы, которые могут возникнуть при изменении API.

:::

Рассмотрим несоклько начальных (`start`) и конечных (`end`) событий.

In [15]:
events[:3]

[{'event': 'on_chat_model_start',
  'data': {'input': 'hello'},
  'name': 'ChatAnthropic',
  'tags': [],
  'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  'metadata': {}},
 {'event': 'on_chat_model_stream',
  'data': {'chunk': AIMessageChunk(content='Hello', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
  'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  'name': 'ChatAnthropic',
  'tags': [],
  'metadata': {}},
 {'event': 'on_chat_model_stream',
  'data': {'chunk': AIMessageChunk(content='!', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
  'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  'name': 'ChatAnthropic',
  'tags': [],
  'metadata': {}}]

In [16]:
events[-2:]

[{'event': 'on_chat_model_stream',
  'data': {'chunk': AIMessageChunk(content='?', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
  'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  'name': 'ChatAnthropic',
  'tags': [],
  'metadata': {}},
 {'event': 'on_chat_model_end',
  'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
  'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
  'name': 'ChatAnthropic',
  'tags': [],
  'metadata': {}}]

### Работа с цепочкой

Для демонстрации работы API событий обратимся к примеру цепочки, работавшему с потоковой передачей JSON.

In [17]:
chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
    event
    async for event in chain.astream_events(
        "output a list of the countries france, spain and japan and their populations in JSON format. "
        'Use a dict with an outer key of "countries" which contains a list of countries. '
        "Each country should have the key `name` and `population`",
        version="v2",
    )
]

Можно заметить, что вместо двух начальных событий мы получили три.

Три начальных события соответствую компонентам:

1. Цепочка (модель + парсер)
2. Модель.
3. Парсер.

In [18]:
events[:3]

[{'event': 'on_chain_start',
  'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
  'name': 'RunnableSequence',
  'tags': [],
  'run_id': '4765006b-16e2-4b1d-a523-edd9fd64cb92',
  'metadata': {}},
 {'event': 'on_chat_model_start',
  'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}},
  'name': 'ChatAnthropic',
  'tags': ['seq:step:1'],
  'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
  'metadata': {}},
 {'event': 'on_chat_model_stream',
  'data': {'chunk': AIMessageChunk(content='{', id='run-0320c234-7b52-4a14-ae4e-5f100949e589')},
  'run_id': '0320c

Используем представленный API для получения событий от модели и парсера.
Не будем обращать внимания на начальные и конечные события, а также на события цепочки.

In [19]:
num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n  '
Chat model chunk: '"'
Chat model chunk: 'countries'
Chat model chunk: '":'
Chat model chunk: ' ['
Parser chunk: {'countries': []}
Chat model chunk: '\n    '
Chat model chunk: '{'
Parser chunk: {'countries': [{}]}
Chat model chunk: '\n      '
Chat model chunk: '"'
Chat model chunk: 'name'
Chat model chunk: '":'
Chat model chunk: ' "'
Parser chunk: {'countries': [{'name': ''}]}
Chat model chunk: 'France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",'
Chat model chunk: '\n      '
Chat model chunk: '"'
Chat model chunk: 'population'
...


Мы можем получать события обоих компонентов одновременно, потому что каждый из них поддерживает работу с потоком данных.

### Фильтрация событий

API транслирует очень много событий.
События можно отфильтровать по названию (`name`), тегам (`tags`) или типу (`type`) компонента.

#### По названию

In [20]:
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
    include_names=["my_parser"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'metadata': {}}
{'event': 'on_parser_stream', 'data': {'chunk': {}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
{'event': 'on_parser_stream', 'data': {'chunk': {'countries': []}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{}]}}, 'run_id': 'e058d750-f2c2-40f6-aa61-10f84cd671a9', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}}
{'event': 'on_parser_stream', 'data': {'chunk': {'countries': [{'name': ''}]}}, 'run

#### По типу

In [21]:
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v2",
    include_types=["chat_model"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n  ', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='"', id='run-db246792-2a91-4eb3-a14b-29658947065d')}, 'run

#### По тегам

:::caution

Теги наследуются дочерними компонентами, поэтому фильтровать по тегам следует с осторожностью.

:::

In [22]:
chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v2",
    include_tags=["my_chain"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': 'fd68dd64-7a4d-4bdb-a0c2-ee592db0d024', 'metadata': {}}
{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': 'efd3c8af-4be5-4f6c-9327-e3f9865dd1cd', 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', id='run-efd3c8af-4be5-4f6c-9327-e3f9865dd1cd')}, 'run_id': 'efd3c8af-4be

### Компоненты без потоковой передачи

Метод `astream_events` позволяет получить поток событий от промежуточных шагов, поддерживающих работу с потоком данных.

In [23]:
# Функция, которая не поддерживает работу с потоком данных.
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = (
    model | JsonOutputParser() | _extract_country_names
)  # This parser only works with OpenAI right now

Метод `astream` API не будет работать корректно.

In [24]:
async for chunk in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(chunk, flush=True)

['France', 'Spain', 'Japan']


Убедимся, что метод `astream_events` по-прежнему предоставляет доступ к потоку данных от модели и парсера.

In [25]:
num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n  '
Chat model chunk: '"'
Chat model chunk: 'countries'
Chat model chunk: '":'
Chat model chunk: ' ['
Parser chunk: {'countries': []}
Chat model chunk: '\n    '
Chat model chunk: '{'
Parser chunk: {'countries': [{}]}
Chat model chunk: '\n      '
Chat model chunk: '"'
Chat model chunk: 'name'
Chat model chunk: '":'
Chat model chunk: ' "'
Parser chunk: {'countries': [{'name': ''}]}
Chat model chunk: 'France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",'
Chat model chunk: '\n      '
Chat model chunk: '"'
Chat model chunk: 'population'
Chat model chunk: '":'
Chat model chunk: ' '
Chat model chunk: '67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
...


### Добавление обратных вызовов

:::caution

При добавлении в свои инструменты вызываемых (`invoke`) Runnable, вам нужно предусмотреть наличие обратных вызовов для Runnable.
В противном случае события не будут генерироваться.

:::

:::note

При использовании RunnableLambdas или декоратора `@chain` обратные вызовы добавляются автоматически.

:::

In [26]:
from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
    return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
    """Custom tool that doesn't propagate callbacks."""
    return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello", version="v2"):
    print(event)

{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'name': 'bad_tool', 'tags': [], 'metadata': {}}


Ниже пример того как правильно добавить обратные вызовы.
При запуске вы должны получить события от Runnable `reverse_word`.

In [27]:
@tool
def correct_tool(word: str, callbacks):
    """A tool that correctly propagates callbacks."""
    return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello", version="v2"):
    print(event)

{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'name': 'correct_tool', 'tags': [], 'metadata': {}}


Пример автоматчиеского добавления обратных вызовов при использовании Runnable Lambdas.

In [28]:
from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v2"):
    print(event)

{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}


Пример автоматчиеского добавления обратных вызовов при использовании декоратора `@chain`.

In [29]:
from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v2"):
    print(event)

{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}


## Смотрите также

* [Langchain Expression Language](/docs/concepts/#langchain-expression-language/).