In [1]:
from langchain_ollama  import ChatOllama

model = ChatOllama(model="mistral")

events = []
async for event in model.astream_events("hello"):
    events.append(event)

In [None]:
from langchain_core.output_parsers import JsonOutputParser

chain = model | JsonOutputParser()

query = """
        output a list of the countries france, spain and japan and their populations in JSON format. 
        'Use a dict with an outer key of "countries" which contains a list of countries. '
        Each country should have the key `name` and `population`
        """

# Asynchronously stream events from the chain for the given query
events = [event async for event in chain.astream_events(query)]

# Print the first 3 events to inspect the output
events[:3]

In [None]:
# Initialize a counter for the number of events
num_events = 0

# Asynchronously iterate through the events from the chain
async for event in chain.astream_events(query):
    kind = event["event"]
    # If the event is a chat model stream, print the content of the chunk
    if kind == "on_chat_model_stream":
        print(f"Chat model chunk: {repr(event['data']['chunk'].content)}",
            flush=True,)
    # If the event is a parser stream, print the chunk from the parser
    if kind == "on_parser_stream":
        print(f"Parser chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    # Truncate the output after 30 events
    if num_events > 30:
        print("...")
        break

In [None]:
# Configure the chain with run names for the model and parser for better tracing
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

max_events = 0
# Asynchronously stream events, including only "chat_model" type events
async for event in chain.astream_events(query,include_types=["chat_model"],):
    print(event)
    max_events += 1
    # Truncate the output after 10 events
    if max_events > 10:
        print("...")
        break

In [None]:
from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool

def reverse_word(word: str):
    return word[::-1]

# Create a RunnableLambda from the reverse_word function
reverse_word = RunnableLambda(reverse_word)

@tool
def bad_tool(word: str):
    """
        Custom tool that doesn't propagate callbacks.
    """
    return reverse_word.invoke(word)

# Asynchronously stream events from the bad_tool with the input "hello" and no callbacks
async for event in bad_tool.astream_events("hello"):
    print(event)

In [None]:
@tool
def correct_tool(word: str, callbacks):
    """A tool that correctly propagates callbacks."""
    return reverse_word.invoke(word, {"callbacks": callbacks})

# Asynchronously stream events with callbacks.
async for event in correct_tool.astream_events("hello"):
    print(event)