In [1]:
%load_ext autoreload
%autoreload 2

# [Streaming](https://python.langchain.com/docs/how_to/streaming/)

In [2]:
def load_env_to_dict(file_path):
    env_dict = {}
    with open(file_path, "r") as file:
        for line in file:
            # Remove whitespace and ignore comments or empty lines
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            # Split the line into key and value
            key, value = line.split("=", 1)
            env_dict[key.strip()] = value.strip()
    return env_dict

In [3]:
file_path = "/mnt/Exdisk/git-cuongpiger/secret/work/vngcloud/ai-platform/env"
env_variables = load_env_to_dict(file_path)

In [4]:
import os

if not os.getenv("HUGGINGFACEHUB_API_TOKEN"):
    os.environ["HUGGINGFACEHUB_API_TOKEN"] = env_variables["HUGGINGFACE_API_KEY"]

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = (
    "/mnt/Exdisk/git-cuongpiger/secret/work/vngcloud/ai-platform/vertex-ai-credential.json"
)

In [5]:
from langchain_google_vertexai import ChatVertexAI

In [6]:
model = ChatVertexAI(model="gemini-1.5-flash")

In [7]:
chunks = []
for chunk in model.stream("Viết cho tôi một câu tỏ tình ngọt ngào"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

Em| biết không, em giống như ánh nắng ban mai vậy, mang đến cho anh| một ngày mới rạng rỡ và đầy hy vọng. Anh muốn được ở bên em,| cùng em chia sẻ từng khoảnh khắc vui buồn, để mỗi ngày đều là một ngày đẹp trời. Em có muốn cùng anh tạo nên những kỷ niệm đẹp| như ánh nắng ban mai ấy không? 
|

In [8]:
chunks = []
async for chunk in model.astream("Viết cho tôi một câu tỏ tình ngọt ngào"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

Em| à, em có biết không, mỗi khi em cười, cả thế giới như| bừng sáng. Nụ cười của em là nắng ấm, là gió mát, là| tất cả những điều đẹp đẽ nhất em từng mang đến cho anh. Anh muốn được ở bên em, cùng em chia sẻ những khoảnh khắc vui buồn,| cùng em vẽ lên những giấc mơ đẹp. Em có muốn trao cho anh cơ hội được yêu thương em trọn đời không? 💖 
|

In [9]:
chunks[0]

AIMessageChunk(content='Em', additional_kwargs={}, response_metadata={'safety_ratings': []}, id='run-d023633f-cb33-4751-a056-d018b4b0b7a6')

In [10]:
chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]

AIMessageChunk(content='Em à, em có biết không, mỗi khi em cười, cả thế giới như bừng sáng. Nụ cười của em là nắng ấm, là gió mát, là tất cả những điều đẹp đẽ nhất em từng mang đến cho anh. Anh muốn được ở bên em, cùng em chia sẻ những khoảnh khắc vui buồn, cùng em vẽ lên những giấc mơ đẹp. Em có muốn trao cho anh cơ hội được yêu thương em trọn đời không? 💖 \n', additional_kwargs={}, response_metadata={'safety_ratings': [{'category': 'HARM_CATEGORY_HATE_SPEECH', 'probability_label': 'NEGLIGIBLE', 'probability_score': 0.08632375299930573, 'blocked': False, 'severity': 'HARM_SEVERITY_NEGLIGIBLE', 'severity_score': 0.08509916067123413}, {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'probability_label': 'NEGLIGIBLE', 'probability_score': 0.014957079663872719, 'blocked': False, 'severity': 'HARM_SEVERITY_NEGLIGIBLE', 'severity_score': 0.04742598906159401}, {'category': 'HARM_CATEGORY_HARASSMENT', 'probability_label': 'NEGLIGIBLE', 'probability_score': 0.07585824280977249, 'blocked': False

In [11]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

In [12]:
prompt = ChatPromptTemplate.from_template("Kể cho tôi một câu chuyện cười về {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

In [13]:
async for chunk in chain.astream({"topic": "gia đình"}):
    print(chunk, end="|", flush=True)

Một| ông bố hỏi con gái 5 tuổi: "Con gái yêu quý, con| có yêu bố không?".

Cô bé đáp: "Có chứ bố!".

"|Thế con yêu bố nhiều hơn mẹ hả?".

Cô bé suy nghĩ một chút rồi trả lời: "Con yêu mẹ nhiều hơn bố!".

Ông bố| hơi buồn, hỏi: "Tại sao con lại yêu mẹ nhiều hơn bố?".

Cô bé cười híp mắt: "Bởi vì mẹ luôn cho con| những gì con muốn, còn bố thì lại luôn nói: "Con gái yêu quý của bố, đợi đến khi con lớn lên rồi bố sẽ mua cho con!"." 
|

In [14]:
from langchain_core.output_parsers import JsonOutputParser

In [15]:
chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

In [16]:
async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, flush=True)

{}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67.39}]}
{'countries': [{'name': 'France', 'population': 67.39}, {'name': 'Spain', 'population': 47.35}, {}]}
{'countries': [{'name': 'France', 'population': 67.39}, {'name': 'Spain', 'population': 47.35}, {'name': 'Japan', 'population': 125.8}]}


In [17]:
from langchain_core.output_parsers import (
    JsonOutputParser,
)

In [18]:
# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names

In [19]:
chain = model | JsonOutputParser() | _extract_country_names

In [20]:
async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, end="|", flush=True)

['France', 'Spain', 'Japan']|

In [21]:
from langchain_core.output_parsers import JsonOutputParser

In [22]:
async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)

In [23]:
chain = model | JsonOutputParser() | _extract_country_names_streaming

In [24]:
async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(text, end="|", flush=True)

France|Spain|Japan|

# Non-streaming components
- Some built-in components like `Retrievers` do not offer any `streaming`. What happens if we try to stream them?

In [25]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_google_vertexai import ChatVertexAI, VertexAIEmbeddings

In [26]:
template = """Trả lời cầu dưới và chỉ dựa trên những thông tin dưới đây:
{context}

Câu hỏi: {question}
"""

In [27]:
prompt = ChatPromptTemplate.from_template(template)

In [28]:
vectorstore = FAISS.from_texts(
    ["Cường là một kỹ sư phần mềm", "Cường hiện đang làm việc tại VngCloud ở quận 7"],
    embedding=VertexAIEmbeddings(model_name="text-embedding-004"),
)

In [29]:
retriever = vectorstore.as_retriever()

In [30]:
chunks = [chunk for chunk in retriever.stream("Cường làm ở đâu")]
chunks

[[Document(id='725812a7-af5f-43da-bb51-731837b032f9', metadata={}, page_content='Cường là một kỹ sư phần mềm'),
  Document(id='40c8a5e1-7071-4811-9e97-25db71a166d7', metadata={}, page_content='Cường hiện đang làm việc tại VngCloud ở quận 7')]]

- Stream just yielded the final result from that component.
- Not all components have to implement streaming. In some cases streaming is either unnecessary, difficult or just doesn't make sense.

In [31]:
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)

In [32]:
for chunk in retrieval_chain.stream(
    "Cường làm ở đâu" "Viết 3 câu về nơi làm việc của Cường"
):
    print(chunk, end="|", flush=True)

C|ường| làm việc tại VngCloud. 
Công ty của Cường nằm ở quận |7.
Cường là một kỹ sư phần mềm làm việc tại VngCloud| ở quận 7. 
|

- Now that we've seen how `stream` and `astream` work, let's venture into the world of streaming events.

## Using Stream Events
- For the `astream_events` API to work properly:
  - Use `async` throughout the code to the extent possible (e.g., async tools etc).
  - Propagate callbacks if defining custom functions / runnables.
  - Whenever using runnables without LCEL, make sure to call `.astream()` on LLMs rather than `.ainvoke` to force the LLM to stream tokens.

### Chat Model

In [33]:
events = []
async for event in model.astream_events("hello", version="v2"):
    events.append(event)

In [34]:
events[:3]

[{'event': 'on_chat_model_start',
  'data': {'input': 'hello'},
  'name': 'ChatVertexAI',
  'tags': [],
  'run_id': 'c215d4c1-1fb6-452d-af93-c87952b1fa9f',
  'metadata': {'ls_provider': 'google_vertexai',
   'ls_model_name': 'gemini-1.5-flash',
   'ls_model_type': 'chat',
   'ls_temperature': None},
  'parent_ids': []},
 {'event': 'on_chat_model_stream',
  'run_id': 'c215d4c1-1fb6-452d-af93-c87952b1fa9f',
  'name': 'ChatVertexAI',
  'tags': [],
  'metadata': {'ls_provider': 'google_vertexai',
   'ls_model_name': 'gemini-1.5-flash',
   'ls_model_type': 'chat',
   'ls_temperature': None},
  'data': {'chunk': AIMessageChunk(content='Hello', additional_kwargs={}, response_metadata={'safety_ratings': [{'category': 'HARM_CATEGORY_HATE_SPEECH', 'probability_label': 'NEGLIGIBLE', 'probability_score': 0.037353515625, 'blocked': False, 'severity': 'HARM_SEVERITY_NEGLIGIBLE', 'severity_score': 0.0966796875}, {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'probability_label': 'NEGLIGIBLE', 'proba

### Chain

In [35]:
chain = model | JsonOutputParser()

In [36]:
events = [
    event
    async for event in chain.astream_events(
        "output a list of the countries france, spain and japan and their populations in JSON format. "
        'Use a dict with an outer key of "countries" which contains a list of countries. '
        "Each country should have the key `name` and `population`",
        version="v2",
    )
]

In [37]:
events[:3]

[{'event': 'on_chain_start',
  'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
  'name': 'RunnableSequence',
  'tags': [],
  'run_id': 'd555f631-7041-4432-a5d8-f16e3b4c2764',
  'metadata': {},
  'parent_ids': []},
 {'event': 'on_chat_model_start',
  'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}},
  'name': 'ChatVertexAI',
  'tags': ['seq:step:1'],
  'run_id': 'f7485c30-5e80-4ab5-bbb6-10213affb536',
  'metadata': {'ls_provider': 'google_vertexai',
   'ls_model_name': 'gemini-1.5-flash',
   'ls_model_type

In [38]:
num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: '```'
Chat model chunk: 'json\n{'
Parser chunk: {}
Chat model chunk: '\n  "countries": [\n    {\n      "name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n      "population": 67.39 \n    },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67.39}]}
Chat model chunk: '\n    {\n      "name": "Spain",\n      "population": 47.35 \n    },\n    {\n      "name'
Parser chunk: {'countries': [{'name': 'France', 'population': 67.39}, {'name': 'Spain', 'population': 47.35}, {}]}
Chat model chunk: '": "Japan",\n      "population": 125.8 \n    }\n  ]\n}\n``` \n'
Parser chunk: {'countries': [{'name': 'France', 'population': 67.39}, {'name': 'Spain', 'population': 47.35}, {'name': 'Japan', 'population': 125.8}]}


In [39]:
events[:3]

[{'event': 'on_chain_start',
  'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
  'name': 'RunnableSequence',
  'tags': [],
  'run_id': 'd555f631-7041-4432-a5d8-f16e3b4c2764',
  'metadata': {},
  'parent_ids': []},
 {'event': 'on_chat_model_start',
  'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}},
  'name': 'ChatVertexAI',
  'tags': ['seq:step:1'],
  'run_id': 'f7485c30-5e80-4ab5-bbb6-10213affb536',
  'metadata': {'ls_provider': 'google_vertexai',
   'ls_model_name': 'gemini-1.5-flash',
   'ls_model_type

In [40]:
num_events = 0

async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

Chat model chunk: '```'
Chat model chunk: 'json\n{'
Parser chunk: {}
Chat model chunk: '\n  "countries": [\n    {\n      "name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n      "population": 67.39 \n    },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67.39}]}
Chat model chunk: '\n    {\n      "name": "Spain",\n      "population": 47.35\n    },\n    {\n      "name":'
Parser chunk: {'countries': [{'name': 'France', 'population': 67.39}, {'name': 'Spain', 'population': 47.35}, {}]}
Chat model chunk: ' "Japan",\n      "population": 125.8\n    }\n  ]\n}\n```\n'
Parser chunk: {'countries': [{'name': 'France', 'population': 67.39}, {'name': 'Spain', 'population': 47.35}, {'name': 'Japan', 'population': 125.8}]}


### Filtering Events
- Because this API produces so many events, it is useful to be able to filter on events.
- You can filter by either component `name`, component `tags` or component `type`.



**By name**

In [41]:
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

In [42]:
max_events = 0
async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
    include_names=["my_parser"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': '9b37e141-8522-4a7c-b29d-af771ed0e93e', 'metadata': {}, 'parent_ids': ['1374f409-f1a3-44e2-b44f-71966c45f13f']}
{'event': 'on_parser_stream', 'run_id': '9b37e141-8522-4a7c-b29d-af771ed0e93e', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['1374f409-f1a3-44e2-b44f-71966c45f13f']}
{'event': 'on_parser_stream', 'run_id': '9b37e141-8522-4a7c-b29d-af771ed0e93e', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'parent_ids': ['1374f409-f1a3-44e2-b44f-71966c45f13f']}
{'event': 'on_parser_stream', 'run_id': '9b37e141-8522-4a7c-b29d-af771ed0e93e',

**By Type**

In [43]:
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

In [44]:
max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v2",
    include_types=["chat_model"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': 'dc599d81-5c98-444f-a588-fea0566e4adb', 'metadata': {'ls_provider': 'google_vertexai', 'ls_model_name': 'gemini-1.5-flash', 'ls_model_type': 'chat', 'ls_temperature': None}, 'parent_ids': ['15ffffb9-285b-4224-bfc1-d0c5ba04a606']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='```', additional_kwargs={}, response_metadata={'safety_ratings': [{'category': 'HARM_CATEGORY_HATE_SPEECH', 'probability_label': 'NEGLIGIBLE', 'probability_score': 0.055908203125, 'blocked': False, 'severity': 'HARM_SEVERITY_NEGLIGIBLE', 'severity_score': 0.1416015625}, {'category': 'HARM_CATEGORY_DANGEROUS_CONTENT', 'probability_label': 'NEGLIGIBLE', 'pro

**By Tags**
> **CAUTION**:
> - Tags are inherited by child components of a given runnable.
> - If you're using tags to filter, make sure that this is what you want.

In [45]:
chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

In [46]:
max_events = 0
async for event in chain.astream_events(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
    version="v2",
    include_tags=["my_chain"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '18f5067e-1bd0-442b-b25a-39be2f60cb1e', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatVertexAI', 'tags': ['seq:step:1', 'my_chain'], 'run_id': 'b691435c-bb10-42bd-add2-00a300bd845a', 'metadata': {'ls_provider': 'google_vertexai', 'ls_model_name': 'gemini-1.5-flash', 'ls_model_type': 'chat'

### Non-streaming components
- Remember how some components don't stream well because they don't operate on input streams?
- While such components can break streaming of the final output when using `astream`, `astream_events` will still yield streaming events from intermediate steps that support streaming!

In [47]:
# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names

In [48]:
chain = (
    model | JsonOutputParser() | _extract_country_names
)  # This parser only works with OpenAI right now

- As expected, the astream API doesn't work correctly because _extract_country_names doesn't operate on streams.

In [49]:
async for chunk in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
):
    print(chunk, flush=True)

['France', 'Spain', 'Japan']


- Now, let's confirm that with astream_events we're still seeing streaming output from the model and the parser.

