## LangChain Streaming - Basic

In [1]:
from langchain_openai import ChatOpenAI

In [7]:
llm: ChatOpenAI = ChatOpenAI(
    model="gpt-4o-mini", 
    temperature=0.0,
    streaming=True
)

In [8]:
llm_out = llm.invoke("Hello there")
llm_out

AIMessage(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_34a54ae93c', 'service_tier': 'default'}, id='run--31a9345b-71ba-4cb6-8eff-a8abcdc86c0b-0')

### Streaming with astream

In [9]:
tokens = []
async for token in llm.astream("What is NLP?"):
    tokens.append(token)
    print(token.content, end="|", flush=True)
    

|N|LP| stands| for| Natural| Language| Processing|,| which| is| a| sub|field| of| artificial| intelligence| (|AI|)| and| computer| science| focused| on| the| interaction| between| computers| and| humans| through| natural| language|.| The| goal| of| NLP| is| to| enable| computers| to| understand|,| interpret|,| generate|,| and| respond| to| human| language| in| a| way| that| is| both| meaningful| and| useful|.

|Key| components| of| NLP| include|:

|1|.| **|Text| Processing|**|:| In|vol|ves| cleaning| and| preparing| text| data| for| analysis|,| including| token|ization|,| stemming|,| le|mmat|ization|,| and| removing| stop| words|.

|2|.| **|Syntax| and| Parsing|**|:| An|aly|zing| the| grammatical| structure| of| sentences| to| understand| relationships| between| words|,| including| part|-of|-s|peech| tagging| and| dependency| parsing|.

|3|.| **|Sem|antics|**|:| Understanding| the| meaning| of| words| and| sentences|,| including| word| sense| dis|ambigu|ation| and| semantic| role| labe

In [14]:
tokens[1]

AIMessageChunk(content='N', additional_kwargs={}, response_metadata={}, id='run--1f2142b0-5ef6-48db-8304-06efe40bdae1')

In [13]:
tokens[2]

AIMessageChunk(content='LP', additional_kwargs={}, response_metadata={}, id='run--1f2142b0-5ef6-48db-8304-06efe40bdae1')

In [15]:
tokens[1] + tokens[2] + tokens[3] + tokens[4] + tokens[5] + tokens[6] + tokens[7] + tokens[8]

AIMessageChunk(content='NLP stands for Natural Language Processing,', additional_kwargs={}, response_metadata={}, id='run--1f2142b0-5ef6-48db-8304-06efe40bdae1')

### Streaming with Agents

Streaming with agents is a little more complex
To construct the agent executor we need
- Tools
- ChatPromptTemplate
- Our LLM
- An Agent
- Finally, the agent executor

In [17]:
# TOOLS

from langchain_core.tools import tool

@tool
def add(x: float, y: float) -> float:
    """Add 'x' and 'y'."""
    return x + y

@tool
def multiply(x: float, y: float) -> float:
    """Multiply 'x' and 'y'."""
    return x * y

@tool
def exponentiate(x: float, y: float) -> float:
    """Raise 'x' to the power of 'y'."""
    return x ** y

@tool
def subtract(x: float, y: float) -> float:
    """Subtract 'x' from 'y'."""
    return y - x

@tool
def final_answer(answer: str, tools_used: list[str]) -> str:
    """Use this tool to provide a final answer to the user.
    The answer should be in natural language as this will be provided
    to the user directly. The tools_used must include a list of tool
    names that were used within the `scratchpad`. You MUST use this tool
    to conclude the interaction.
    """
    return {"answer": answer, "tools_used": tools_used}

In [19]:
tools = [add, subtract, multiply, exponentiate, final_answer]

In [21]:
# ChatPromptTemplate

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

prompt = ChatPromptTemplate.from_messages([
    ("system", (
        "You're a helpful assistant. When answering a user's question "
        "you should first use one of the tools provided. After using a "
        "tool the tool output will be provided back to you. You MUST "
        "then use the final_answer tool to provide a final answer to the user. "
        "DO NOT use the same tool more than once."
    )),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])

In [None]:
# AGENT

from langchain_core.runnables.base import RunnableSerializable

agent: RunnableSerializable = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"],
        "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
    }
    | prompt
    | llm.bind_tools(tools=tools, tool_choice="any")
)

In [23]:
# AGENT EXECUTOR

import json
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

name2tool = {tool.name: tool.func for tool in tools}

class CustomAgentExecutor:
    chat_history: list[BaseMessage]

    def __init__(self, max_iterations: int = 3):
        self.chat_history = []
        self.max_iterations = max_iterations
        self.agent: RunnableSerializable = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: x["chat_history"],
                "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
            }
            | prompt
            | llm.bind_tools(tools, tool_choice="any")  # we're forcing tool use again
        )

    def invoke(self, input: str) -> dict:
        # invoke the agent but we do this iteratively in a loop until
        # reaching a final answer
        count = 0
        agent_scratchpad = []
        while count < self.max_iterations:
            # invoke a step for the agent to generate a tool call
            out = self.agent.invoke({
                "input": input,
                "chat_history": self.chat_history,
                "agent_scratchpad": agent_scratchpad
            })
            # if the tool call is the final answer tool, we stop
            if out.tool_calls[0]["name"] == "final_answer":
                break
            agent_scratchpad.append(out)  # add tool call to scratchpad
            # otherwise we execute the tool and add it's output to the agent scratchpad
            tool_out = name2tool[out.tool_calls[0]["name"]](**out.tool_calls[0]["args"])
            # add the tool output to the agent scratchpad
            action_str = f"The {out.tool_calls[0]['name']} tool returned {tool_out}"
            agent_scratchpad.append({
                "role": "tool",
                "content": action_str,
                "tool_call_id": out.tool_calls[0]["id"]
            })
            # add a print so we can see intermediate steps
            print(f"{count}: {action_str}")
            count += 1
        # add the final output to the chat history
        final_answer = out.tool_calls[0]["args"]
        # this is a dictionary, so we convert it to a string for compatibility with
        # the chat history
        final_answer_str = json.dumps(final_answer)
        self.chat_history.append({"input": input, "output": final_answer_str})
        self.chat_history.extend([
            HumanMessage(content=input),
            AIMessage(content=final_answer_str)
        ])
        # return the final answer in dict form
        return final_answer
    
agent_executor = CustomAgentExecutor()

In [24]:
agent_executor.invoke("What is 10+10?")

0: The add tool returned 20


{'answer': '10 + 10 equals 20.', 'tools_used': ['functions.add']}

In [None]:
# The LLM needs a callback to handle how to give the generated tokens to the next code block
from langchain_core.runnables import ConfigurableField

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.0,
    streaming=True
).configurable_fields(
    callbacks=ConfigurableField(
        id="callbacks",
        name="callbacks",
        description="A list of callbacks to use for streaming"
    )
)

In [26]:
agent: RunnableSerializable = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"],
        "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
    }
    | prompt
    | llm.bind_tools(tools=tools, tool_choice="any")
)

In [32]:
import asyncio
from langchain.callbacks.base import AsyncCallbackHandler

class QueueCallbackHandler(AsyncCallbackHandler):
    """
    Callback handler that puts tokens into a queue
    """

    def __init__(self, queue: asyncio.Queue):
        self.queue = queue
        self.final_answer_seen = False

    async def __aiter__(self):
        while True:
            if self.queue.empty():
                await asyncio.sleep(0.1)
                continue
            token_or_done = await self.queue.get()

            if token_or_done == "<<DONE>>":
                # this means we're done
                return
            if token_or_done:
                yield token_or_done

    async def on_llm_new_token(self, *args, **kwargs) -> None:
        """
        Put new token in the queue.
        """
        chunk = kwargs.get("chunk")
        if chunk:
            # check for final answer tool call
            if tool_calls := chunk.message.additional_kwargs.get("tool_calls"):
                if tool_calls[0]["function"]["name"] == "final_answer":
                    # this will allow the stream to end on the next 'on_llm_end' call
                    self.final_answer_seen = True
        self.queue.put_nowait(kwargs.get("chunk"))
        return
    
    async def on_llm_end(self, *args, **kwargs):
        """
        Put None in the queue to signal completion
        """
        # this should only be used at the end of our agent execution, however LangChain
        # will call this at the end of every tool call, not just the final tool call
        # so we must only send the "done" signal if we have already seen the final_answer
        # tool call
        if self.final_answer_seen:
            self.queue.put_nowait("<<DONE>>")
        else:
            self.queue.put_nowait("<<STEP_END>>")
        return

In [35]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

tokens = []

async def stream(query: str):
    response = agent.with_config(
        callbacks=[streamer]
    )
    async for token in response.astream({
        "input": query,
        "chat_history": [],
        "agent_scratchpad": []
    }):
        tokens.append(token)
        print(token, flush=True)

await stream("What is 10+10?")

content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_mfXCssI9X44wka8Lj6FdTRYI', 'function': {'arguments': '', 'name': 'add'}, 'type': 'function'}]} response_metadata={} id='run--e2b83914-fb44-4239-a23c-4b5678f21a45' tool_calls=[{'name': 'add', 'args': {}, 'id': 'call_mfXCssI9X44wka8Lj6FdTRYI', 'type': 'tool_call'}] tool_call_chunks=[{'name': 'add', 'args': '', 'id': 'call_mfXCssI9X44wka8Lj6FdTRYI', 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]} response_metadata={} id='run--e2b83914-fb44-4239-a23c-4b5678f21a45' tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}] tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'x', 'name': None}, 'type': None}]} response_metadata={} id='run--e2b83914