# LangChain 1.0+ Streaming Patterns

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/EnkrateiaLucca/oreilly_live_training_getting_started_with_langchain/blob/main/notebooks/1.3-streaming-patterns.ipynb)

In this notebook, we'll explore LangChain's streaming capabilities and why they matter for building responsive LLM applications.

## Setup

In [None]:
# LangChain 1.0+ Setup
# %pip install -qU langchain>=1.0.0
# %pip install -qU langchain-core>=1.0.0
# %pip install -qU langchain-openai
# %pip install -qU langgraph

In [None]:
import os
import getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")

## Why Streaming Matters

Streaming is crucial for building responsive LLM applications because:

1. **Low Latency**: Users see results immediately instead of waiting for the full response
2. **Better UX**: Real-time feedback makes the application feel more interactive
3. **Transparency**: Users can see the model "thinking" and generating responses
4. **Early Termination**: Users can stop generation if the response is going off-track

LangChain provides multiple streaming modes to handle different use cases, from simple token streaming to complex agent workflows.

## 1. Basic Invocation vs Streaming

Let's start by comparing `invoke()` (no streaming) with `stream()` (token-by-token streaming).

In [None]:
from langchain_openai import ChatOpenAI

MODEL = "gpt-4o-mini"
llm = ChatOpenAI(model=MODEL, temperature=0.7)

### Non-streaming: invoke()

With `invoke()`, you get the complete response all at once after waiting for the full generation.

In [None]:
prompt = "Write a short poem about streaming data in real-time."

# Non-streaming - wait for complete response
response = llm.invoke(prompt)
print(response.content)

### Streaming: stream()

With `stream()`, you get tokens as they are generated, enabling real-time display.

In [None]:
import sys

# Streaming - display tokens as they arrive
for chunk in llm.stream(prompt):
    print(chunk.content, end="", flush=True)
    sys.stdout.flush()

### Demonstrating the Difference with a Longer Response

The streaming effect is more noticeable with longer outputs.

In [None]:
long_prompt = """Explain the concept of streaming in large language models,
including why it's important for user experience, how it works technically,
and what are some common implementation patterns."""

print("=" * 50)
print("STREAMING RESPONSE:")
print("=" * 50)

for chunk in llm.stream(long_prompt):
    print(chunk.content, end="", flush=True)
    sys.stdout.flush()

print("\n" + "=" * 50)

## 2. Streaming with Chains (LCEL)

LangChain Expression Language (LCEL) chains also support streaming. The streaming flows through the entire chain.

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Create a chain: prompt | model | parser
prompt_template = ChatPromptTemplate.from_template(
    "You are a helpful assistant. Answer this question: {question}"
)

output_parser = StrOutputParser()

chain = prompt_template | llm | output_parser

In [None]:
# Stream through the entire chain
question = "What are the main benefits of using streaming in LLM applications?"

for chunk in chain.stream({"question": question}):
    print(chunk, end="", flush=True)
    sys.stdout.flush()

### Streaming with Multiple Output Parsers

You can stream even when using output parsers that transform the data.

In [None]:
# Chain with string output parser
list_prompt = ChatPromptTemplate.from_template(
    "List 5 {topic} in a numbered list format."
)

list_chain = list_prompt | llm | StrOutputParser()

print("Streaming a list:")
print("-" * 40)

for chunk in list_chain.stream({"topic": "benefits of async programming"}):
    print(chunk, end="", flush=True)
    sys.stdout.flush()

## 3. Streaming with Tools and Agents

LangChain 1.0 introduces `create_agent` for building agents. Agents can stream both their reasoning and tool calls.

In [None]:
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

@tool
def get_word_length(word: str) -> int:
    """Returns the length of a word."""
    return len(word)

@tool
def multiply_numbers(a: float, b: float) -> float:
    """Multiply two numbers together."""
    return a * b

tools = [get_word_length, multiply_numbers]

In [None]:
# Create agent prompt
agent_prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant. Use tools when necessary."),
    ("human", "{input}"),
    MessagesPlaceholder("agent_scratchpad"),
])

# Create agent
llm_with_tools = llm.bind_tools(tools)
agent = create_tool_calling_agent(llm_with_tools, tools, agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

### Basic Agent Streaming

Stream the agent's execution to see its reasoning process.

In [None]:
# Stream agent execution
for chunk in agent_executor.stream(
    {"input": "What is the length of the word 'streaming' multiplied by 3?"}
):
    print(chunk)
    print("-" * 50)

## 4. Advanced Streaming: stream_events()

For more granular control, `stream_events()` (also called `astream_events()` for async) provides detailed event-by-event streaming.

In [None]:
# Stream events for detailed monitoring
async def demonstrate_stream_events():
    """Demonstrate streaming events with astream_events."""
    async for event in chain.astream_events(
        {"question": "What is LangChain?"},
        version="v2"
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="", flush=True)

# Run in notebook
import asyncio
await demonstrate_stream_events()

## 5. Async Streaming with astream()

For production applications, async streaming provides better performance and resource utilization.

In [None]:
# Async streaming example
async def async_stream_example():
    """Demonstrate async streaming."""
    prompt = "Explain the difference between sync and async streaming in 3 sentences."
    
    print("Async streaming:")
    async for chunk in llm.astream(prompt):
        print(chunk.content, end="", flush=True)
    print("\n")

# Execute async function
await async_stream_example()

### Async Chain Streaming

In [None]:
async def async_chain_stream():
    """Stream through an async chain."""
    async for chunk in chain.astream({"question": "What are async operations?"}):
        print(chunk, end="", flush=True)
    print("\n")

await async_chain_stream()

## 6. Production Pattern: FastAPI Streaming Endpoint

Here's a complete example of how to build a streaming endpoint with FastAPI.

In [None]:
# Example FastAPI streaming endpoint (for reference)
example_code = '''
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import asyncio

app = FastAPI()

# Initialize chain
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
prompt = ChatPromptTemplate.from_template(
    "You are a helpful assistant. Answer: {question}"
)
chain = prompt | llm | StrOutputParser()

@app.post("/stream")
async def stream_response(question: str):
    """Stream LLM response to client."""
    async def generate():
        async for chunk in chain.astream({"question": question}):
            # Send Server-Sent Events format
            yield f"data: {chunk}\n\n"
            await asyncio.sleep(0.01)  # Small delay for better streaming
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

# Run with: uvicorn app:app --reload
'''

print(example_code)

## 7. Practical Example: Interactive Streaming Assistant

Let's build a practical streaming assistant that handles multi-turn conversations.

In [None]:
from langchain_core.messages import HumanMessage, AIMessage

class StreamingAssistant:
    """A streaming conversational assistant."""
    
    def __init__(self, model="gpt-4o-mini"):
        self.llm = ChatOpenAI(model=model, temperature=0.7)
        self.history = []
    
    def stream_response(self, user_input: str):
        """Stream a response and maintain conversation history."""
        # Add user message to history
        self.history.append(HumanMessage(content=user_input))
        
        # Stream response
        full_response = ""
        for chunk in self.llm.stream(self.history):
            content = chunk.content
            full_response += content
            print(content, end="", flush=True)
        
        print()  # New line after streaming
        
        # Add AI response to history
        self.history.append(AIMessage(content=full_response))
        
        return full_response

# Demo the assistant
assistant = StreamingAssistant()

print("Assistant: ", end="")
assistant.stream_response("Hi! What is streaming in LLMs?")

print("\nAssistant: ", end="")
assistant.stream_response("Can you give me a practical example?")

## 8. Streaming Best Practices

Key patterns for production streaming applications:

1. **Always use async in production**: `astream()` is more efficient than `stream()`
2. **Handle errors gracefully**: Wrap streams in try-except blocks
3. **Implement timeout mechanisms**: Prevent infinite streaming
4. **Use Server-Sent Events (SSE)**: Standard format for HTTP streaming
5. **Buffer chunks appropriately**: Balance between latency and throughput
6. **Monitor token usage**: Track costs even during streaming
7. **Implement cancellation**: Allow users to stop generation
8. **Test streaming thoroughly**: Ensure chunks are delivered correctly

## Summary

In this notebook, we covered:

1. **Why streaming matters**: Low latency and better UX
2. **Basic streaming**: `invoke()` vs `stream()`
3. **Chain streaming**: Using `stream()` with LCEL chains
4. **Agent streaming**: Streaming tool calls and reasoning
5. **Advanced streaming**: `stream_events()` for fine-grained control
6. **Async streaming**: `astream()` for production use
7. **Production patterns**: FastAPI streaming endpoint example
8. **Best practices**: Guidelines for building robust streaming apps

Streaming is essential for building responsive, production-ready LLM applications. Start with basic `stream()` for prototypes, then move to `astream()` for production deployments.