# 🚀 Streaming in LangGraph

LangGraph supports streaming outputs from nodes in your graph, allowing for token-level or chunk-level responses—ideal for chat interfaces or real-time applications.

## 🧠 What is Streaming?

 Streaming in LangGraph means that instead of waiting for a complete output from a node (e.g., an LLM response), the system yields outputs as they are generated. This is useful for:

- Improving responsiveness of chatbots or apps.

- Providing partial results quickly.

- Displaying progressive outputs in dashboards or UIs.



### 📦 Use Cases
- Live chat apps
- Data dashboards showing incremental results
- Summarization UIs that build up the summary over time

### 🔁 astream and Stream Outputs in LangGraph

- LangGraph provides built-in support for streaming execution, allowing you to receive intermediate outputs as they are generated. This is especially useful when your nodes use generators (like with LLMs that support streaming).



### 🧩 What is astream?

- The astream() method is an asynchronous generator provided by LangGraph’s compiled application (app) object. It is used when your workflow contains streaming nodes (i.e., nodes that yield partial results)

## 🔄 How Does Streaming Work?

LangGraph nodes can yield intermediate states using Python generators. When the app is compiled, astream() will iterate over those streamed yields.

## 📥 Stream Outputs
- Each item returned from astream() is:
- A dictionary (updated state)
- Can be printed, logged, or displayed in real time (e.g., in a chat app)
- If multiple nodes stream in sequence, you’ll get multiple updates from the full graph as it runs.

### 🌊 Stream Modes in LangGraph

LangGraph offers flexible stream modes to determine how state is emitted during graph execution. This is especially useful when using app.stream() or app.astream() to get live updates from the graph.

### 🎛️ Available Stream Modes

There are three main stream modes

1. StreamMode.NODE_ONLY 🔄
    - Streams output only when a node yields or returns.

2. StreamMode.EDGE_ONLY 🧩
    - Streams output only when a transition (edge) is taken between nodes.

3. StreamMode.ALL
    - Streams both node outputs and edge transitions.

### 🧪 Stream Output Formats: "value" vs "updates" in LangGraph

- When you stream execution using app.stream() or app.astream(), LangGraph lets you choose how the streamed data looks using output formats like:

1. "value" Format 📦

- This is the default mode.

- You receive the full state at each step.

- Good for when you want a complete picture of the state at every yield or edge transition.

2. "updates" Format 🧩
- This emits only the changed fields in the state.



| Format      | Description                   | Output Contains           |
| ----------- | ----------------------------- | ------------------------- |
| `"value"`   | Full state at each step       | All keys/values           |
| `"updates"` | Only the diff from last state | Changed fields + metadata |


In [4]:
from langchain_openai import ChatOpenAI
from langchain_community.tools import TavilySearchResults
from typing import TypedDict, Annotated
from langgraph.graph import START, END, StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from dotenv import load_dotenv
load_dotenv()

True

In [5]:
class AgentState(TypedDict):
    messages : Annotated[list, add_messages]

search_tool = TavilySearchResults()

tools = [search_tool]

In [6]:
llm = ChatOpenAI()
llm_with_tools = llm.bind_tools(tools = tools)

In [7]:
def model( state : AgentState):
    return {'messages' : [llm_with_tools.invoke(state["messages"])]

    }


def tools_router(state: AgentState):
    last_message = state["messages"][-1]

    if (hasattr(last_message, "tool_calls") and len(last_message.tool_calls)>0):
        return "tool_node"
    else:
        return END
    

tool_node = ToolNode(tools=tools)


graph = StateGraph(AgentState)

graph.add_node("model", model)
graph.add_node("tool_node",tool_node)
graph.set_entry_point("model")

graph.add_conditional_edges("model", tools_router)
graph.add_edge("tool_node", "model")
app = graph.compile()

In [11]:
app.invoke({"messages": ["what is the current weather in delhi?"]})

{'messages': [HumanMessage(content='what is the current weather in delhi?', additional_kwargs={}, response_metadata={}, id='a3bb424c-7405-4520-9a9f-5574d7482b92'),
  AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_DPE2na0aI212qShsb9m9Lrlf', 'function': {'arguments': '{"query":"current weather in Delhi"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 21, 'prompt_tokens': 90, 'total_tokens': 111, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--38fe78eb-78c1-44d9-9148-0d91a3220874-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Delhi'}, 'id': 'call_DPE2na0aI21

In [12]:
input = {"messages": ["what is the current weather in delhi?"]}

events = app.stream(input = input, stream_mode="values")

for event in events:
    print (event)

{'messages': [HumanMessage(content='what is the current weather in delhi?', additional_kwargs={}, response_metadata={}, id='7ff48997-9603-4205-9558-d0cf16298755')]}
{'messages': [HumanMessage(content='what is the current weather in delhi?', additional_kwargs={}, response_metadata={}, id='7ff48997-9603-4205-9558-d0cf16298755'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Cy1Z0Id6iDxkDoYBkG1TfYFo', 'function': {'arguments': '{"query":"current weather in Delhi"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 21, 'prompt_tokens': 90, 'total_tokens': 111, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-

In [14]:
input = {"messages": ["what is the current weather in delhi?"]}

events = app.stream(input = input, stream_mode="updates")

for event in events:
    print (event)

{'model': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Yrfy2PUtK5orUyLQ8VT4Rgkg', 'function': {'arguments': '{"query":"current weather in Delhi"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 21, 'prompt_tokens': 90, 'total_tokens': 111, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'tool_calls', 'logprobs': None}, id='run--2ad063f3-60a2-4fda-ac77-6fbcbf4313b3-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Delhi'}, 'id': 'call_Yrfy2PUtK5orUyLQ8VT4Rgkg', 'type': 'tool_call'}], usage_metadata={'input_tokens': 90, 'output_tokens': 21, 'total_tokens': 111, 'input_token_details': {'

## astream_events

- ***astream_events*** is an advanced streaming interface in LangGraph that emits detailed execution events as the graph runs — including metadata about node execution, edge transitions, and yields.

🔍 What is astream_events?

``` python

async for event in app.astream_events(input_state):
    print(event)


- Instead of returning only the state, astream_events yields structured event objects, which include:
    - The type of event (e.g., on_node_start, on_node_yield, on_edge_transition, etc.)
    - The node or edge name
    - The current or updated state

### Other Possible type Values:
- "on_node_start" – when a node starts running

- "on_node_yield" – when a node yields (streaming node)

- "on_node_end" – when a node finishes

- "on_edge_transition" – when the graph moves to another node

- "on_graph_end" – when the entire graph finishes

| Method             | What it Streams                 | Best For                        |
| ------------------ | ------------------------------- | ------------------------------- |
| `astream()`        | Full/updated state (value/diff) | UI updates, state management    |
| `astream_events()` | Execution **events + state**    | Debugging, tracing, custom logs |


In [17]:
input = {'messages' :['how are you?']}
events = app.astream_events(input=input, version='v2')

async for event in events:
    print (event)

{'event': 'on_chain_start', 'data': {'input': {'messages': ['how are you?']}}, 'name': 'LangGraph', 'tags': [], 'run_id': '53a4032f-e151-410a-b943-4663d91e31a3', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chain_start', 'data': {'input': {'messages': ['how are you?']}}, 'name': '__start__', 'tags': ['graph:step:0', 'langsmith:hidden'], 'run_id': 'b85d7200-b187-4998-8a6d-7d915d2d8e55', 'metadata': {'langgraph_step': 0, 'langgraph_node': '__start__', 'langgraph_triggers': ['__start__'], 'langgraph_path': ('__pregel_pull', '__start__'), 'langgraph_checkpoint_ns': '__start__:d1e687b1-8136-9144-add2-24c04fbfa520'}, 'parent_ids': ['53a4032f-e151-410a-b943-4663d91e31a3']}
{'event': 'on_chain_start', 'data': {'input': {'messages': ['how are you?']}}, 'name': '_write', 'tags': ['seq:step:1', 'langsmith:hidden', 'langsmith:hidden'], 'run_id': 'f3f86317-5dda-4efa-800b-e02f232c914c', 'metadata': {'langgraph_step': 0, 'langgraph_node': '__start__', 'langgraph_triggers': ['__start__'], 'langgrap