# How to stream LLM tokens from your graph

!!! info "Prerequisites"

    This guide assumes familiarity with the following:
    
    - [Streaming](../../concepts/streaming/)
    - [Chat Models](https://python.langchain.com/docs/concepts/chat_models/)

When building LLM applications with LangGraph, you might want to stream individual LLM tokens from the LLM calls inside LangGraph nodes. You can do so via `graph.stream(..., stream_mode="messages")`:

```python
from langgraph.graph import StateGraph, MessagesState
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
def call_model(state: MessagesState):
    model.invoke(state["messages"])
    ...

graph = (
    StateGraph(MessagesState)
    .add_node(call_model)
    ...
    .compile()
    
for msg, metadata in graph.stream(inputs, stream_mode="messages"):
    print(msg)
```

The streamed outputs will be tuples of `(message chunk, metadata)`:

* message chunk is the token streamed by the LLM
* metadata is a dictionary with information about the graph node where the LLM was called as well as the LLM invocation metadata

!!! note "Using without LangChain"

    If you need to stream LLM tokens **without using LangChain**, you can use [`stream_mode="custom"`](../streaming/streaming/#stream_modecustom) to stream the outputs from LLM provider clients directly. Check out the [example below](#using-without-langchain) to learn more.

!!! warning "Note on Python < 3.11"
    
    When using python 3.8, 3.9, or 3.10, please ensure you manually pass the `RunnableConfig` through to the chat model when invoking it like so: `model.ainvoke(..., config)`.
    The stream method collects all events from your nested code using a streaming tracer passed as a callback. In 3.11 and above, this is automatically handled via [contextvars](https://docs.python.org/3/library/contextvars.html); prior to 3.11, [asyncio's tasks](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task) lacked proper `contextvar` support, meaning that the callbacks will only propagate if you manually pass the config through. We do this in the `call_model` function below.

## Setup

First we need to install the packages required

In [1]:
# %%capture --no-stderr
# %pip install --quiet -U langgraph langchain_openai

Next, we need to set API keys for OpenAI (the LLM we will use).

In [3]:
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")

OPENAI_API_KEY:  ········


<div class="admonition tip">
    <p class="admonition-title">Set up <a href="https://smith.langchain.com">LangSmith</a> for LangGraph development</p>
    <p style="padding-top: 5px;">
        Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started <a href="https://docs.smith.langchain.com">here</a>. 
    </p>
</div>


After we've done this, we should make sure the model knows that it has these tools available to call.
We can do this by converting the LangChain tools into the format for function calling, and then bind them to the model class.


!!! note Manual Callback Propagation

    Note that in `call_model(state: State, config: RunnableConfig):` below, we a) accept the [`RunnableConfig`](https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.config.RunnableConfig.html#langchain_core.runnables.config.RunnableConfig) in the node function and b) pass it in as the second arg for `model.ainvoke(..., config)`. This is optional for python >= 3.11.

## Example

In [4]:
from typing import TypedDict
from langgraph.graph import START, StateGraph, MessagesState
from langchain_openai import ChatOpenAI


# Note: we're adding the tags here to be able to filter the model outputs down the line
joke_model = ChatOpenAI(model="gpt-4o-mini").with_config(tags=["joke"])
poem_model = ChatOpenAI(model="gpt-4o-mini").with_config(tags=["poem"])


class State(TypedDict):
    topic: str
    joke: str
    poem: str


# highlight-next-line
async def call_model(state, config):
    topic = state["topic"]
    print("Writing joke...")
    # Note: Passing the config through explicitly is required for python < 3.11
    # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
    joke_response = await joke_model.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        # highlight-next-line
        config,
    )
    print("\n\nWriting poem...")
    poem_response = await poem_model.ainvoke(
        [{"role": "user", "content": f"Write a short poem about {topic}"}],
        # highlight-next-line
        config,
    )
    return {"joke": joke_response.content, "poem": poem_response.content}


graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()

In [5]:
async for msg, metadata in graph.astream(
    {"topic": "cats"},
    # highlight-next-line
    stream_mode="messages",
):
    if msg.content:
        print(msg.content, end="|", flush=True)

Writing joke...
Why| did| the| cat| sit| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the| mouse|!|

Writing poem...
In| sun|lit| patches|,| they| softly| tread|,|  
|Wh|isk|ers| twitch|ing|,| with| grace| they| spread|.|  
|With| eyes| like| lantern|s|,| glowing| bright|,|  
|They| dance| through| shadows|,| a| silent| flight|.|  

|P|aws| like| whispers| on| the| floor|,|  
|Cur|led| up| tight|,| they| dream| and| sn|ore|.|  
|Ch|asing| ph|ant|oms| in| the| night|,|  
|F|eline| secrets|,| hidden| from| sight|.|  

|A| gentle| p|urr|,| a| playful| sw|at|,|  
|In| every| corner|,| a| cozy| spot|.|  
|Maj|estic| hunters|,| soft| as| a| sigh|,|  
|In| the| hearts| of| many|,| forever| they| lie|.|  |

In [6]:
metadata

{'langgraph_step': 1,
 'langgraph_node': 'call_model',
 'langgraph_triggers': ['start:call_model'],
 'langgraph_path': ('__pregel_pull', 'call_model'),
 'langgraph_checkpoint_ns': 'call_model:eeaca45a-85f2-c80f-e985-704a168a5d8c',
 'checkpoint_ns': 'call_model:eeaca45a-85f2-c80f-e985-704a168a5d8c',
 'ls_provider': 'openai',
 'ls_model_name': 'gpt-4o-mini',
 'ls_model_type': 'chat',
 'ls_temperature': 0.7}

### Filter to specific LLM invocation

You can see that we're streaming tokens from all of the LLM invocations. Let's now filter the streamed tokens to include only a specific LLM invocation. We will use `.astream_events()` method for this, and filter events using the tags we've added to the LLMs previously:

In [7]:
# highlight-next-line
async for event in graph.astream_events(
    {"topic": "cats"},
    # highlight-next-line
    version="v2",
):
    # filter on the custom tag
    # highlight-next-line
    if event["event"] == "on_chat_model_stream" and "joke" in event.get("tags", []):
        data = event["data"]
        if data["chunk"].content:
            print(data["chunk"].content, end="|", flush=True)

Writing joke...
Why| did| the| cat| sit| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the| mouse|!| 🐱|💻|

Writing poem...


## Example without LangChain

In [8]:
from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4o-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.content:
            yield {"role": delta.role, "content": delta.content}


# highlight-next-line
async def call_model(state, config, writer):
    topic = state["topic"]
    joke = ""
    poem = ""

    print("Writing joke...")
    async for msg_chunk in stream_tokens(
        model_name, [{"role": "user", "content": f"Write a joke about {topic}"}]
    ):
        joke += msg_chunk["content"]
        metadata = {**config["metadata"], "tags": ["joke"]}
        chunk_to_stream = (msg_chunk, metadata)
        # highlight-next-line
        writer(chunk_to_stream)

    print("\n\nWriting poem...")
    async for msg_chunk in stream_tokens(
        model_name, [{"role": "user", "content": f"Write a short poem about {topic}"}]
    ):
        poem += msg_chunk["content"]
        metadata = {**config["metadata"], "tags": ["poem"]}
        chunk_to_stream = (msg_chunk, metadata)
        # highlight-next-line
        writer(chunk_to_stream)

    return {"joke": joke, "poem": poem}


graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()

!!! note "stream_mode="custom""

    When streaming LLM tokens without LangChain, we recommend using [`stream_mode="custom"`](../streaming/streaming/#stream-modecustom). This allows you to explicitly control which data from the LLM provider APIs to include in LangGraph streamed outputs, including any additional metadata.

In [9]:
async for msg, metadata in graph.astream(
    {"topic": "cats"},
    # highlight-next-line
    stream_mode="custom",
):
    print(msg["content"], end="|", flush=True)

Writing joke...
Why| did| the| cat| sit| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the| mouse|!|

Writing poem...
In| shadows| soft|,| on| silent| paws|,|  
|A| whisk|ered| muse| with| gentle| claws|,|  
|They| weave| through| dreams| in| moon|lit| grace|,|  
|A| dance| of| warmth| in| a| sun|lit| place|.|  

|With| eyes| like| stars|,| they| peer| so| wise|,|  
|The| world| reflected| in| their| guise|.|  
|From| playful| leaps| to| cozy| curls|,|  
|In| each| sweet| p|urr|,| a| magic| sw|irls|.|  

|Oh|,| feline| friends|,| with| hearts| so| bold|,|  
|In| every| tale|,| your| love| unfolds|.|  
|A| quiet| comfort|,| a| steadfast| glance|,|  
|In| the| company| of| cats|,| we| find| our| trance|.|

In [10]:
metadata

{'langgraph_step': 1,
 'langgraph_node': 'call_model',
 'langgraph_triggers': ['start:call_model'],
 'langgraph_path': ('__pregel_pull', 'call_model'),
 'langgraph_checkpoint_ns': 'call_model:ca83e792-dddc-7f99-c8ff-4e8c166106f6',
 'tags': ['poem']}

To filter to the specific LLM invocation, you can use the streamed metadata:

In [11]:
async for msg, metadata in graph.astream(
    {"topic": "cats"},
    # highlight-next-line
    stream_mode="custom",
):
    if "poem" in metadata.get("tags", []):
        print(msg["content"], end="|", flush=True)

Writing joke...


Writing poem...
In| shadows| sleek|,| with| eyes| ag|low|,|  
|A| whisper| of| grace|,| as| they| softly| flow|,|  
|With| p|itter|-p|atter| on| the| midnight| floor|,|  
|Cur|iosity| blooms|,| they| explore| more| and| more|.|  

|A| stretch| and| a| y|awn|,| in| sun|beam|'s| embrace|,|  
|Ch|asing| the| dust| mot|es| that| dance| with| such| grace|,|  
|Each| p|ounce| a| ballet|,| each| leap| a| delight|,|  
|The| world| is| their| playground| from| morning| to| night|.|  

|F|urred| confid|ants|,| both| sly| and| serene|,|  
|With| silent| mis|chief|,| they| dwell| in| between|,|  
|In| the| heart| of| our| homes|,| they| fro|lic| and| play|,|  
|Oh|,| marvelous| creatures|,| in| every| way|.|