# Streaming With Langchain
LangChain is one of the most popular open source libraries for AI Engineers. It's goal is to abstract away the complexity in building AI software, provide easy-to-use building blocks, and make it easier when switching between AI service providers.

In this example, we will introduce LangChain's async streaming, allowing us to receive and view the tokens as they are generated by Ollama LLM. The use of streaming is typical in conversational interfaces and can provide a more natural experience for users.

In [3]:
import os
from langchain_ollama import ChatOllama
llm = ChatOllama(
    model = "llama3.2",
    temperatur=0.0,
    streaming=True
)

In [4]:
llm_out = llm.invoke("Hello there")
llm_out

AIMessage(content="Hello! It's nice to meet you. Is there something I can help you with or would you like to chat?", additional_kwargs={}, response_metadata={'model': 'llama3.2', 'created_at': '2025-05-15T05:36:27.5757625Z', 'done': True, 'done_reason': 'stop', 'total_duration': 1797605200, 'load_duration': 342883100, 'prompt_eval_count': 27, 'prompt_eval_duration': 611695400, 'eval_count': 25, 'eval_duration': 839109300, 'model_name': 'llama3.2'}, id='run--edd8fc65-1796-4c58-be8f-cdfce30977a1-0', usage_metadata={'input_tokens': 27, 'output_tokens': 25, 'total_tokens': 52})

# Streaming with astream
We will start by creating a aysnc stream from our LLM. We do this within an async for loop, allowing us to iterate through the chunks of data and use them as soon as the async astream method returns the tokens to us. By adding a pipe character | we can see the individual tokens that are generated. We set flush equal to True as this forces immediate output to the console, resulting in smoother streaming.

In [3]:
tokens = []
async for token in llm.astream("What is NLP?"):
    tokens.append(token)
    print(token.content, end="|", flush=True)

N|LP| stands| for| Natural| Language| Processing|,| which| is| a| sub|field| of| artificial| intelligence| (|AI|)| that| deals| with| the| interaction| between| computers| and| humans| in| natural| language|.| It| involves| the| use| of| algorithms|,| statistical| models|,| and| machine| learning| techniques| to| process|,| understand|,| and| generate| human| language|.

|The| primary| goals| of| N|LP| are|:

|1|.| **|Text| Analysis|**:| To| extract| meaningful| information| from| text| data|,| such| as| sentiment| analysis|,| named| entity| recognition|,| and| topic| modeling|.
|2|.| **|Language| Understanding|**:| To| comprehend| the| meaning| of| natural| language| inputs|,| including| syntax|,| semantics|,| and| prag|m|atics|.
|3|.| **|Language| Generation|**:| To| generate| human|-like| text| or| speech| that| is| coherent|,| gramm|atically| correct|,| and| context|ually| relevant|.

|Some| common| N|LP| applications| include|:

|1|.| Sent|iment| analysis|:| Analy|zing| customer| 

In [4]:
tokens[0]

AIMessageChunk(content='N', additional_kwargs={}, response_metadata={}, id='run--9f22dba6-1d03-4100-a9f5-1ba08e918ef3')

In [5]:
tokens[1]

AIMessageChunk(content='LP', additional_kwargs={}, response_metadata={}, id='run--9f22dba6-1d03-4100-a9f5-1ba08e918ef3')

In [6]:
tokens[0] + tokens[1] + tokens[2] + tokens[3] + tokens[4]

AIMessageChunk(content='NLP stands for Natural', additional_kwargs={}, response_metadata={}, id='run--9f22dba6-1d03-4100-a9f5-1ba08e918ef3')

A word of caution, there is nothing preventing you from merging tokens in the incorrect order, so be cautious to not output any token omelettes:

In [8]:
tokens[4] + tokens[3] + tokens[2] + tokens[1] + tokens[0]

AIMessageChunk(content=' Natural for standsLPN', additional_kwargs={}, response_metadata={}, id='run--9f22dba6-1d03-4100-a9f5-1ba08e918ef3')

# Streaming with Agents
Streaming with agents, particularly the custom agent executor, is a little more complex. Let's begin by constructor a simple agent executor matching what we built in the Agent Executor chapter.

To construct the agent executor we need:

- Tools
- ChatPromptTemplate
- Our LLM (already defined with llm)
- An agent
- Finally, the agent executor

Let's start defining each.

# Tools
Now we will define a few tools to be used by an async agent executor. Our goal for tool-use in regards to streaming are:

- The tool-use steps will be streamed in one big chunk, ie we do not return the tool use information token-by-token but instead it streams message-by-message.

- The final LLM output will be streamed token-by-token as we saw above.

For these we need to define a few math tools and our final answer tool.

In [10]:
from langchain_core.tools import tool

@tool
def add(x: float, y: float) -> float:
    """Add 'x' and 'y'."""
    return x + y

@tool
def multiply(x: float, y: float) -> float:
    """Multiply 'x' and 'y'."""
    return x * y

@tool
def exponentiate(x: float, y: float) -> float:
    """Raise 'x' to the power of 'y'."""
    return x ** y

@tool
def subtract(x: float, y: float) -> float:
    """Subtract 'x' from 'y'."""
    return y - x

@tool
def final_answer(answer: str, tools_used: list[str]) -> str:
    """Use this tool to provide a final answer to the user.
    The answer should be in natural language as this will be provided
    to the user directly. The tools_used must include a list of tool
    names that were used within the `scratchpad`. You MUST use this tool
    to conclude the interaction.
    """
    return {"answer": answer, "tools_used": tools_used}

In [11]:
tools = [add, multiply, exponentiate, subtract, final_answer]

# ChatPromptTemplate
We will create our ChatPromptTemplate, using a system message, chat history, user input, and a scratchpad for intermediate steps.

In [13]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

prompt = ChatPromptTemplate.from_messages([
    ("system", (
        "You're a helpful assistant. When answering a user's question "
        "you should first use one of the tools provided. After using a "
        "tool the tool output will be provided back to you. You MUST "
        "then use the final_answer tool to provide a final answer to the user. "
        "DO NOT use the same tool more than once."
    )),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])

# Agent
As before, we will define our agent with LCEL.

In [14]:
from langchain_core.runnables.base import RunnableSerializable

tools = [add, subtract, multiply, exponentiate, final_answer]

# define the agent runnable
agent: RunnableSerializable = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"],
        "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
    }
    | prompt
    | llm.bind_tools(tools, tool_choice="any")
)

# Agent Executor
Finally, we will create the agent executor.

In [15]:
import json
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage


# create tool name to function mapping
name2tool = {tool.name: tool.func for tool in tools}

class CustomAgentExecutor:
    chat_history: list[BaseMessage]

    def __init__(self, max_iterations: int = 3):
        self.chat_history = []
        self.max_iterations = max_iterations
        self.agent: RunnableSerializable = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: x["chat_history"],
                "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
            }
            | prompt
            | llm.bind_tools(tools, tool_choice="any")  # we're forcing tool use again
        )

    def invoke(self, input: str) -> dict:
        # invoke the agent but we do this iteratively in a loop until
        # reaching a final answer
        count = 0
        agent_scratchpad = []
        while count < self.max_iterations:
            # invoke a step for the agent to generate a tool call
            out = self.agent.invoke({
                "input": input,
                "chat_history": self.chat_history,
                "agent_scratchpad": agent_scratchpad
            })
            # if the tool call is the final answer tool, we stop
            if out.tool_calls[0]["name"] == "final_answer":
                break
            agent_scratchpad.append(out)  # add tool call to scratchpad
            # otherwise we execute the tool and add it's output to the agent scratchpad
            tool_out = name2tool[out.tool_calls[0]["name"]](**out.tool_calls[0]["args"])
            # add the tool output to the agent scratchpad
            action_str = f"The {out.tool_calls[0]['name']} tool returned {tool_out}"
            agent_scratchpad.append({
                "role": "tool",
                "content": action_str,
                "tool_call_id": out.tool_calls[0]["id"]
            })
            # add a print so we can see intermediate steps
            print(f"{count}: {action_str}")
            count += 1
        # add the final output to the chat history
        final_answer = out.tool_calls[0]["args"]
        # this is a dictionary, so we convert it to a string for compatibility with
        # the chat history
        final_answer_str = json.dumps(final_answer)
        self.chat_history.append({"input": input, "output": final_answer_str})
        self.chat_history.extend([
            HumanMessage(content=input),
            AIMessage(content=final_answer_str)
        ])
        # return the final answer in dict form
        return final_answer

agent_executor = CustomAgentExecutor()

In [None]:
agent_executor.invoke(input="What is 10 + 10")