In [1]:
import os
os.environ['OPENAI_API_KEY'] = "EXAMPLE"

# Streaming
##### langchain interface 에서 스트리밍을 사용하는데 두 가지 일반적인 접근 방식을 제공한다.


1.   sync stream 및 async astream : 체인의 최종 출력을 스트리밍하는 스트리밍의 기본 구현
2.   async astream_events 및 async : 체인의 중간 단계와 최종 출력을 모두 스트리밍 하는 방법(astream_log)



## 스트림 사용
##### 모든 Runnable 개체는 stream 이라는 동기화 메서드와 astream이라는 비동기 변형을 구현한다.
##### 최종 출력을 청크로 스트리밍하여 각 청크가 사용 가능하게 즉시 생성되도록 설계되었다.
##### 스트리밍은 프로그램의 모든 단계가 입력 스트림을 처리하는 알고 있는 경우에만 가능하다. 즉, 입력 청크를 한 번에 하나씩 처리하고 해당 출력 청크를 생성한다.

## LLM 및 채팅
##### LLM의 채팅의 앱 문제점은 주요 병목 현상이다. 사용자가 느끼는 임계값보다 훨씬 느리다.
##### 그래서, 중간 진행 사항을 보여주는 즉, 토큰별로 모델 토큰의 출력을 스트리밍 한다.

In [None]:
!pip install --upgrade --quiet langchain-core langchain-community langchain-openai

In [None]:
!pip install faiss-gpu

In [3]:
from langchain_openai import ChatOpenAI

model = ChatOpenAI()

chunks = []
async for chunk in model.astream("안녕. 너가 누구인지 나에게 소개해줘"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

|안|녕|하세요|!| 저|는| 인|공|지|능| 챗|봇|입니다|.| 궁|금|한| 것|이| 있|으면| 언|제|든|지| 물|어|보|세요|.| 함|께| 대|화| 나|누|는| 것|을| 즐|기|겠|습니다|.||

In [5]:
chunks[0]

AIMessageChunk(content='')

In [6]:
chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]

AIMessageChunk(content='안녕하세요!')

In [8]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("{주제}에 대해서 농담 하나 해줘")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"주제": "당근"}):
  print(chunk, end="|", flush=True)

|당|근|이| 산|책|을| 하|다|가| 미|쳐|서| 당|근| 차|를| 탔|다|고| 하|더|라|!||

## 입력
##### 생성되는 출력에서 JSON을 스트리밍하자 : JsonOutputParser

In [10]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)
async for text in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
  print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 652}]}
{'countries': [{'name': 'France', 'population': 652735}]}
{'countries': [{'name': 'France', 'population': 65273511}]}
{'countries': [{'name': 'France', 'population': 65273511}, {}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467237}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 46723749}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 46723749}, {}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467

##### 스트리밍을 중단하고 국가 이름을 출력해주는 추출 함수를 끝에 추가해보자

In [11]:
from langchain_core.output_parsers import (
    JsonOutputParser,
)

# 국가 이름 추출 함수
def _extract_country_names(inputs):
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names

# 체인을 형성할 때 국가 이름 추출 함수도 같이 형성
chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
    print(text, end="|", flush=True)

[None, '', 'France', 'France', 'France', 'France', 'France', None, 'France', '', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', None, 'France', 'Spain', '', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan']|

##### 생성기 함수(yield)를 사용해서 스트리밍을 수정해보자

In [12]:
from langchain_core.output_parsers import JsonOutputParser

async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
    print(text, end="|", flush=True)

France|Spain|Japan|

### 스트리밍을 하지 않는 경우
##### 리트리버 같은 일부 내장 요소는 streaming이 불필요 할 수 있다.

In [15]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho", "harrison likes spicy food"],
    embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks

[[Document(page_content='harrison worked at kensho'),
  Document(page_content='harrison likes spicy food')]]

In [16]:
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)

In [17]:
for chunk in retrieval_chain.stream(
    "Where did harrison work? " "Write 3 made up sentences about this place."
):
    print(chunk, end="|", flush=True)

|H|arrison| worked| at| Kens|ho|,| a| trendy| tech| startup| known| for| its| innovative| projects| and| collaborative| work| environment|.
|At| Kens|ho|,| employees| enjoy| perks| like| cater|ed| lunches|,| team|-building| activities|,| and| a| relaxed| dress| code|.
|The| office| at| Kens|ho| is| modern| and| open|-con|cept|,| with| plenty| of| natural| light| and| comfortable| work|spaces| for| employees| to| thrive| in|.||

## 스트림 이벤트
### 채팅
##### 채팅 모델에서 생성된 이벤트를 살펴보자

In [19]:
events = []
async for event in model.astream_events("hello", version="v1"):
  events.append(event)

In [20]:
events[:3]

[{'event': 'on_chat_model_start',
  'run_id': '78d4817d-3bc3-4dc9-b645-098c03b59232',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {},
  'data': {'input': 'hello'}},
 {'event': 'on_chat_model_stream',
  'run_id': '78d4817d-3bc3-4dc9-b645-098c03b59232',
  'tags': [],
  'metadata': {},
  'name': 'ChatOpenAI',
  'data': {'chunk': AIMessageChunk(content='')}},
 {'event': 'on_chat_model_stream',
  'run_id': '78d4817d-3bc3-4dc9-b645-098c03b59232',
  'tags': [],
  'metadata': {},
  'name': 'ChatOpenAI',
  'data': {'chunk': AIMessageChunk(content='Hello')}}]

In [21]:
events[:2]

[{'event': 'on_chat_model_start',
  'run_id': '78d4817d-3bc3-4dc9-b645-098c03b59232',
  'name': 'ChatOpenAI',
  'tags': [],
  'metadata': {},
  'data': {'input': 'hello'}},
 {'event': 'on_chat_model_stream',
  'run_id': '78d4817d-3bc3-4dc9-b645-098c03b59232',
  'tags': [],
  'metadata': {},
  'name': 'ChatOpenAI',
  'data': {'chunk': AIMessageChunk(content='')}}]

##### 스트리밍 JSON을 구문 분석한 예제 체인을 보면, 2개의 시작 이벤트가 아닌 3개의 다른 시작 이벤트가 있다. (체인, 모델, 파서)

In [22]:
chain = (
    model | JsonOutputParser()
)

events = [
    event
    async for event in chain.astream_events(
        'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
        version="v1",
    )
]

In [24]:
events[:3]

[{'event': 'on_chain_start',
  'run_id': '7c603edd-0bd7-4a34-a327-126e54100e5d',
  'name': 'RunnableSequence',
  'tags': [],
  'metadata': {},
  'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}},
 {'event': 'on_chat_model_start',
  'name': 'ChatOpenAI',
  'run_id': '3d9489bf-cdbb-4701-a5c5-07cd8e5d1908',
  'tags': ['seq:step:1'],
  'metadata': {},
  'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}},
 {'event': 'on_chat_model_stream',
  'name': 'ChatOpenAI',
  'run_id': '3d9489bf-cdbb-4701-a5c5-07cd8e5d1908',
  'tags': ['seq:step:1'],
  'metadata': {}

##### 해당 api를 사용해서 모델과 파서에서 스트림 이벤트를 출력해보자.

In [25]:
num_events = 0

async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v1",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: ''
Chat model chunk: '{\n'
Parser chunk: {}
Chat model chunk: '   '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Chat model chunk: ' [\n'
Parser chunk: {'countries': []}
Chat model chunk: '       '
Chat model chunk: ' {\n'
Parser chunk: {'countries': [{}]}
Chat model chunk: '           '
Chat model chunk: ' "'
Chat model chunk: 'name'
Chat model chunk: '":'
Chat model chunk: ' "'
Parser chunk: {'countries': [{'name': ''}]}
Chat model chunk: 'France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n'
Chat model chunk: '           '
Chat model chunk: ' "'
...


## 이벤트 필터링
##### 해당 API는 너무 많은 이벤트를 생성하므로, 이벤트를 필터링 할 수 있다.
##### 이벤트는 name, tags, type 별로 필터링할 수 있다.

### 이름으로 필터링
##### 아래 예제는 my_parse 이름으로 필터링

In [26]:
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v1",
    include_names=["my_parser"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_parser_start', 'name': 'my_parser', 'run_id': '6aa48690-7afc-42d4-bede-0722da097fe3', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': '6aa48690-7afc-42d4-bede-0722da097fe3', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': '6aa48690-7afc-42d4-bede-0722da097fe3', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': '6aa48690-7afc-42d4-bede-0722da097fe3', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': '6aa48690-7afc-42d4-bede-0722da097fe3', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': ''}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': '6aa48690-7afc-42d4-bede-0722da097fe3', 'tags': ['seq:step:2'], 'metadat

### 타입별로 필터링
##### 타입은 chat_model로 필터링

In [27]:
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v1",
    include_types=["chat_model"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_chat_model_start', 'name': 'model', 'run_id': '1f4cff4a-8319-405b-a6ea-d3abe20558a1', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '1f4cff4a-8319-405b-a6ea-d3abe20558a1', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content='')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '1f4cff4a-8319-405b-a6ea-d3abe20558a1', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content='{\n')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '1f4cff4a-8319-405b-a6ea-d3abe20558a1', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content='   ')}

### 태그별로 필터링
##### 태그가 my_chain으로 필터링

In [28]:
chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v1",
    include_tags=["my_chain"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_chain_start', 'run_id': 'c6b8b782-4bd0-4b27-b2e0-0029baa9c791', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}}
{'event': 'on_chat_model_start', 'name': 'ChatOpenAI', 'run_id': '9c4684fa-3d11-4606-9a5b-68e96f7904f8', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}}
{'event': 'on_chat_model_stream', 'name': 'ChatOpenAI', 'run_id': '9c4684fa-3d11-4606-9a5b-68e96f7904f8', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {

## 비스트리밍
##### astream을 사용할 때 최종 출력의 스트리밍을 중단할 수는 있지만 astream_events을 지원하는 중간 단계에서는 스트리밍 이벤트를 생성한다.

In [29]:
# 해당 함수는 스트리밍을 지원하지 않는다.
# 최종 입력에 대해서 작동한다.
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names

chain = (
    model | JsonOutputParser() | _extract_country_names
)

In [30]:
async for chunk in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
):
    print(chunk, flush=True)

[None, '', 'France', 'France', 'France', 'France', 'France', None, 'France', '', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', None, 'France', 'Spain', '', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan']


In [31]:
num_events = 0

async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v1",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: ''
Chat model chunk: '{\n'
Parser chunk: {}
Chat model chunk: '   '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Chat model chunk: ' [\n'
Parser chunk: {'countries': []}
Chat model chunk: '       '
Chat model chunk: ' {\n'
Parser chunk: {'countries': [{}]}
Chat model chunk: '           '
Chat model chunk: ' "'
Chat model chunk: 'name'
Chat model chunk: '":'
Chat model chunk: ' "'
Parser chunk: {'countries': [{'name': ''}]}
Chat model chunk: 'France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n'
Chat model chunk: '           '
Chat model chunk: ' "'
Chat model chunk: 'population'
Chat model chunk: '":'
Chat model chunk: ' '
Chat model chunk: '670'
...


##### 중간 단계의 모델과 파서의 스트리밍 출력은 계속 표시된다.

## 콜백 전파
##### 도구 내에서 실행 가능 항목 호출을 사용하는 경우 실행 가능 항목에 콜백을 전파해야 한다. 그렇지 않으면 스트림 이벤트가 생성되지 않는다.
##### RunnableLambdas 또는 @chain 데코레이터를 사용하면 콜백이 자동으로 백그라운드에 전파된다.

In [32]:
from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
    return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
    """Custom tool that doesn't propagate callbacks."""
    return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello", version="v1"):
    print(event)

{'event': 'on_tool_start', 'run_id': 'c1dbdcfa-6957-4dea-856a-9f44a87c0d7d', 'name': 'bad_tool', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_tool_stream', 'run_id': 'c1dbdcfa-6957-4dea-856a-9f44a87c0d7d', 'tags': [], 'metadata': {}, 'name': 'bad_tool', 'data': {'chunk': 'olleh'}}
{'event': 'on_tool_end', 'name': 'bad_tool', 'run_id': 'c1dbdcfa-6957-4dea-856a-9f44a87c0d7d', 'tags': [], 'metadata': {}, 'data': {'output': 'olleh'}}


##### reverse word에 대한 스트림 이벤트가 생성되지 않는다.
##### 콜백을 올바르게 재구현하면 아래와 같다.

In [33]:
@tool
def correct_tool(word: str, callbacks):
    """A tool that correctly propagates callbacks."""
    return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello", version="v1"):
    print(event)

{'event': 'on_tool_start', 'run_id': '7a573992-49d7-45db-92f8-0ee3ae478371', 'name': 'correct_tool', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_chain_start', 'name': 'reverse_word', 'run_id': '847af050-8eb1-4fda-9c0e-1ae7563ca2eb', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_chain_end', 'name': 'reverse_word', 'run_id': '847af050-8eb1-4fda-9c0e-1ae7563ca2eb', 'tags': [], 'metadata': {}, 'data': {'input': 'hello', 'output': 'olleh'}}
{'event': 'on_tool_stream', 'run_id': '7a573992-49d7-45db-92f8-0ee3ae478371', 'tags': [], 'metadata': {}, 'name': 'correct_tool', 'data': {'chunk': 'olleh'}}
{'event': 'on_tool_end', 'name': 'correct_tool', 'run_id': '7a573992-49d7-45db-92f8-0ee3ae478371', 'tags': [], 'metadata': {}, 'data': {'output': 'olleh'}}


##### Runnable Lambda 또는 @chains 내에서 실행 가능 항목을 호출하는 경우 콜백이 자동으로 전달된다.

In [34]:
from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v1"):
    print(event)

{'event': 'on_chain_start', 'run_id': '84c86fa8-7923-4fd8-9041-14fe9e2fc60b', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_stream', 'run_id': '84c86fa8-7923-4fd8-9041-14fe9e2fc60b', 'tags': [], 'metadata': {}, 'name': 'reverse_and_double', 'data': {'chunk': '43214321'}}
{'event': 'on_chain_end', 'name': 'reverse_and_double', 'run_id': '84c86fa8-7923-4fd8-9041-14fe9e2fc60b', 'tags': [], 'metadata': {}, 'data': {'output': '43214321'}}


##### 그리고 @chain 데코레이터를 사용해도 된다.

In [35]:
from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
    return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v1"):
    print(event)

{'event': 'on_chain_start', 'run_id': '5f6c1777-f074-4f8f-a1f5-9678da2fcee8', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_stream', 'run_id': '5f6c1777-f074-4f8f-a1f5-9678da2fcee8', 'tags': [], 'metadata': {}, 'name': 'reverse_and_double', 'data': {'chunk': '43214321'}}
{'event': 'on_chain_end', 'name': 'reverse_and_double', 'run_id': '5f6c1777-f074-4f8f-a1f5-9678da2fcee8', 'tags': [], 'metadata': {}, 'data': {'output': '43214321'}}
