Streaming!

In [1]:
from dotenv import load_dotenv
from langchain_groq import ChatGroq

load_dotenv()

model = "llama3-8b-8192"

llm = ChatGroq(model = model, temperature = 0.0)

In [2]:
llm_out = llm.invoke("Hi")
llm_out

AIMessage(content="Hi! It's nice to meet you. Is there something I can help you with or would you like to chat?", additional_kwargs={}, response_metadata={'token_usage': {'completion_tokens': 25, 'prompt_tokens': 11, 'total_tokens': 36, 'completion_time': 0.021893305, 'prompt_time': 0.001891923, 'queue_time': 0.265354295, 'total_time': 0.023785228}, 'model_name': 'llama3-8b-8192', 'system_fingerprint': 'fp_343314801a', 'service_tier': 'on_demand', 'finish_reason': 'stop', 'logprobs': None}, id='run--a5fd348a-f2ca-4cf7-8489-d0c0a480aabc-0', usage_metadata={'input_tokens': 11, 'output_tokens': 25, 'total_tokens': 36})

In [3]:
tokens = []
async for token in llm.astream("What is AI?"):
    tokens.append(token)
    print(token.content, end = "|", flush = True)

|Art|ificial| Intelligence| (|AI|)| refers| to| the| development| of| computer| systems| that| can| perform| tasks| that| would| typically| require| human| intelligence|,| such| as|:

|1|.| Learning|:| AI| systems| can| learn| from| data| and| improve| their| performance| over| time|.
|2|.| Problem|-solving|:| AI| systems| can| analyze| data| and| make| decisions| or| take| actions| based| on| that| data|.
|3|.| Reason|ing|:| AI| systems| can| draw| conclusions| and| make| predictions| based| on| the| data| they| have| been| trained| on|.
|4|.| Perception|:| AI| systems| can| interpret| and| understand| data| from| sensors|,| such| as| images|,| speech|,| or| text|.

|AI| is| a| broad| field| that| encompasses| many| sub|fields|,| including|:

|1|.| Machine| Learning| (|ML|):| a| type| of| AI| that| enables| machines| to| learn| from| data| without| being| explicitly| programmed|.
|2|.| Deep| Learning| (|DL|):| a| type| of| ML| that| uses| neural| networks| to| analyze| data|.
|3|.| Na

In [4]:
tokens[0]

AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run--ffac469c-3944-4ca2-825a-a28bc9b859a5')

In [5]:
tokens[1]

AIMessageChunk(content='Art', additional_kwargs={}, response_metadata={}, id='run--ffac469c-3944-4ca2-825a-a28bc9b859a5')

In [6]:
tokens[0] + tokens[1] + tokens[2] + tokens[3]

AIMessageChunk(content='Artificial Intelligence', additional_kwargs={}, response_metadata={}, id='run--ffac469c-3944-4ca2-825a-a28bc9b859a5')

In [7]:
tokens[3] + tokens[2] + tokens[1] + tokens[0]

AIMessageChunk(content=' IntelligenceificialArt', additional_kwargs={}, response_metadata={}, id='run--ffac469c-3944-4ca2-825a-a28bc9b859a5')

Streaming with Agents!

In [8]:
from langchain_core.tools import tool

@tool
def add(x: float, y: float) -> float:
    """Add 'x' and 'y'."""
    return x + y

@tool
def multiply(x: float, y: float) -> float:
    """Multiply 'x' and 'y'."""
    return x * y

@tool
def exponentiate(x: float, y: float) -> float:
    """Raise 'x' to the power of 'y'."""
    return x ** y

@tool
def subtract(x: float, y: float) -> float:
    """Subtract 'x' from 'y'."""
    return y - x

@tool
def final_answer(answer: str, tools_used: list[str]) -> str:
    """Use this tool to provide a final answer to the user.
    The answer should be in natural language as this will be provided
    to the user directly. The tools_used must include a list of tool
    names that were used within the `scratchpad`. You MUST use this tool
    to conclude the interaction.
    """
    return {"answer": answer, "tools_used": tools_used}

In [9]:
tools = [add, multiply, exponentiate, subtract, final_answer]

In [10]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

prompt = ChatPromptTemplate.from_messages([
    ("system", (
        "You're a helpful assistant. When answering a user's question "
        "you should first use one of the tools provided. After using a "
        "tool the tool output will be provided back to you. You MUST "
        "then use the final_answer tool to provide a final answer to the user. "
        "DO NOT use the same tool more than once."
    )),
    MessagesPlaceholder(variable_name = "chat_history"),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name = "agent_scratchpad")
])

In [11]:
from langchain_core.runnables.base import RunnableSerializable

tools = [add, subtract, multiply, exponentiate, final_answer]

# define the agent runnable
agent : RunnableSerializable = (
    {
        "input" : lambda x: x["input"],
        "chat_history" : lambda x: x["chat_history"],
        "agent_scratchpad" : lambda x: x.get("agent_scratchpad", [])
    }
    | prompt
    | llm.bind_tools(tools, tool_choice = "any")
)

In [12]:
import json
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage


# create tool name to function mapping
name2tool = {tool.name: tool.func for tool in tools}

class CustomAgentExecutor:
    chat_history: list[BaseMessage]

    def __init__(self, max_iterations: int = 3):
        self.chat_history = []
        self.max_iterations = max_iterations
        self.agent: RunnableSerializable = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: x["chat_history"],
                "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
            }
            | prompt
            | llm.bind_tools(tools, tool_choice="any")  # we're forcing tool use again
        )

    def invoke(self, input: str) -> dict:
        # invoke the agent but we do this iteratively in a loop until
        # reaching a final answer
        count = 0
        agent_scratchpad = []
        while count < self.max_iterations:
            # invoke a step for the agent to generate a tool call
            out = self.agent.invoke({
                "input": input,
                "chat_history": self.chat_history,
                "agent_scratchpad": agent_scratchpad
            })
            # if the tool call is the final answer tool, we stop
            if out.tool_calls[0]["name"] == "final_answer":
                break
            agent_scratchpad.append(out)  # add tool call to scratchpad
            # otherwise we execute the tool and add it's output to the agent scratchpad
            tool_out = name2tool[out.tool_calls[0]["name"]](**out.tool_calls[0]["args"])
            # add the tool output to the agent scratchpad
            action_str = f"The {out.tool_calls[0]['name']} tool returned {tool_out}"
            agent_scratchpad.append({
                "role": "tool",
                "content": action_str,
                "tool_call_id": out.tool_calls[0]["id"]
            })
            # add a print so we can see intermediate steps
            print(f"{count}: {action_str}")
            count += 1
        # add the final output to the chat history
        final_answer = out.tool_calls[0]["args"]
        # this is a dictionary, so we convert it to a string for compatibility with
        # the chat history
        final_answer_str = json.dumps(final_answer)
        self.chat_history.append({"input": input, "output": final_answer_str})
        self.chat_history.extend([
            HumanMessage(content=input),
            AIMessage(content=final_answer_str)
        ])
        # return the final answer in dict form
        return final_answer

agent_executor = CustomAgentExecutor()

In [13]:
agent_executor.invoke(input="What is 10 + 10")

0: The add tool returned 20


BadRequestError: Error code: 400 - {'error': {'message': "Failed to call a function. Please adjust your prompt. See 'failed_generation' for more details.", 'type': 'invalid_request_error', 'code': 'tool_use_failed', 'failed_generation': '<tool-use>{"tool_call":{"id":"final_answer","function":{"name":"final_answer"},"parameters":{"answer":"The answer to 10 + 10 is 20.","tools_used":["add"]}}}</tool-use>'}}

In [15]:
from langchain_core.runnables import ConfigurableField

llm = ChatGroq(
    model_name="llama3-8b-8192",
    temperature=0.0,
    streaming=True
).configurable_fields(
    callbacks=ConfigurableField(
        id="callbacks",
        name="callbacks",
        description="A list of callbacks to use for streaming",
    )
)

In [16]:
# define the agent runnable
agent: RunnableSerializable = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"],
        "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
    }
    | prompt
    | llm.bind_tools(tools, tool_choice="any")
)

In [17]:
import asyncio
from langchain.callbacks.base import AsyncCallbackHandler


class QueueCallbackHandler(AsyncCallbackHandler):
    """Callback handler that puts tokens into a queue."""

    def __init__(self, queue: asyncio.Queue):
        self.queue = queue
        self.final_answer_seen = False

    async def __aiter__(self):
        while True:
            if self.queue.empty():
                await asyncio.sleep(0.1)
                continue
            token_or_done = await self.queue.get()

            if token_or_done == "<<DONE>>":
                # this means we're done
                return
            if token_or_done:
                yield token_or_done

    async def on_llm_new_token(self, *args, **kwargs) -> None:
        """Put new token in the queue."""
        #print(f"on_llm_new_token: {args}, {kwargs}")
        chunk = kwargs.get("chunk")
        if chunk:
            # check for final_answer tool call
            if tool_calls := chunk.message.additional_kwargs.get("tool_calls"):
                if tool_calls[0]["function"]["name"] == "final_answer":
                    # this will allow the stream to end on the next `on_llm_end` call
                    self.final_answer_seen = True
        self.queue.put_nowait(kwargs.get("chunk"))
        return

    async def on_llm_end(self, *args, **kwargs) -> None:
        """Put None in the queue to signal completion."""
        #print(f"on_llm_end: {args}, {kwargs}")
        # this should only be used at the end of our agent execution, however LangChain
        # will call this at the end of every tool call, not just the final tool call
        # so we must only send the "done" signal if we have already seen the final_answer
        # tool call
        if self.final_answer_seen:
            self.queue.put_nowait("<<DONE>>")
        else:
            self.queue.put_nowait("<<STEP_END>>")
        return

In [18]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

tokens = []

async def stream(query:str):
    response = agent.with_config(
        callbacks = [streamer]
    )
    async for token in response.astream({
        "input" : query,
        "chat_history" : [],
        "agent_scratchpad": []
    }):
        tokens.append(token)
        print(token, flush = True)

await stream("what is 10 + 10")

content='' additional_kwargs={} response_metadata={} id='run--a421c2ab-899e-4588-a930-ffaaadae9dae'
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': 'eeyr2mjvv', 'function': {'arguments': '{"x":10,"y":10}', 'name': 'add'}, 'type': 'function'}]} response_metadata={} id='run--a421c2ab-899e-4588-a930-ffaaadae9dae' tool_calls=[{'name': 'add', 'args': {'x': 10, 'y': 10}, 'id': 'eeyr2mjvv', 'type': 'tool_call'}] tool_call_chunks=[{'name': 'add', 'args': '{"x":10,"y":10}', 'id': 'eeyr2mjvv', 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={} response_metadata={'finish_reason': 'tool_calls', 'model_name': 'llama3-8b-8192', 'system_fingerprint': 'fp_343314801a', 'service_tier': 'on_demand'} id='run--a421c2ab-899e-4588-a930-ffaaadae9dae' usage_metadata={'input_tokens': 1373, 'output_tokens': 68, 'total_tokens': 1441}


In [19]:
tk = tokens[0]

for token in tokens[1:]:
    tk+=token

tk

AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'eeyr2mjvv', 'function': {'arguments': '{"x":10,"y":10}', 'name': 'add'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'llama3-8b-8192', 'system_fingerprint': 'fp_343314801a', 'service_tier': 'on_demand'}, id='run--a421c2ab-899e-4588-a930-ffaaadae9dae', tool_calls=[{'name': 'add', 'args': {'x': 10, 'y': 10}, 'id': 'eeyr2mjvv', 'type': 'tool_call'}], usage_metadata={'input_tokens': 1373, 'output_tokens': 68, 'total_tokens': 1441}, tool_call_chunks=[{'name': 'add', 'args': '{"x":10,"y":10}', 'id': 'eeyr2mjvv', 'index': 0, 'type': 'tool_call_chunk'}])

In [20]:
from langchain_core.messages import ToolMessage

class CustomAgentExecutor:
    chat_history: list[BaseMessage]

    def __init__(self, max_iterations: int = 3):
        self.chat_history = []
        self.max_iterations = max_iterations
        self.agent: RunnableSerializable = (
            {
                "input": lambda x: x["input"],
                "chat_history": lambda x: x["chat_history"],
                "agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
            }
            | prompt
            | llm.bind_tools(tools, tool_choice="any")  # we're forcing tool use again
        )

    async def invoke(self, input: str, streamer: QueueCallbackHandler, verbose: bool = False) -> dict:
        # invoke the agent but we do this iteratively in a loop until
        # reaching a final answer
        count = 0
        agent_scratchpad = []
        while count < self.max_iterations:
            # invoke a step for the agent to generate a tool call
            async def stream(query: str):
                response = self.agent.with_config(
                    callbacks=[streamer]
                )
                # we initialize the output dictionary that we will be populating with
                # our streamed output
                output = None
                # now we begin streaming
                async for token in response.astream({
                    "input": query,
                    "chat_history": self.chat_history,
                    "agent_scratchpad": agent_scratchpad
                }):
                    if output is None:
                        output = token
                    else:
                        # we can just add the tokens together as they are streamed and
                        # we'll have the full response object at the end
                        output += token
                    if token.content != "":
                        # we can capture various parts of the response object
                        if verbose: print(f"content: {token.content}", flush=True)
                    tool_calls = token.additional_kwargs.get("tool_calls")
                    if tool_calls:
                        if verbose: print(f"tool_calls: {tool_calls}", flush=True)
                        tool_name = tool_calls[0]["function"]["name"]
                        if tool_name:
                            if verbose: print(f"tool_name: {tool_name}", flush=True)
                        arg = tool_calls[0]["function"]["arguments"]
                        if arg != "":
                            if verbose: print(f"arg: {arg}", flush=True)
                return AIMessage(
                    content=output.content,
                    tool_calls=output.tool_calls,
                    tool_call_id=output.tool_calls[0]["id"]
                )

            tool_call = await stream(query=input)
            # add initial tool call to scratchpad
            agent_scratchpad.append(tool_call)
            # otherwise we execute the tool and add it's output to the agent scratchpad
            tool_name = tool_call.tool_calls[0]["name"]
            tool_args = tool_call.tool_calls[0]["args"]
            tool_call_id = tool_call.tool_call_id
            tool_out = name2tool[tool_name](**tool_args)
            # add the tool output to the agent scratchpad
            tool_exec = ToolMessage(
                content=f"{tool_out}",
                tool_call_id=tool_call_id
            )
            agent_scratchpad.append(tool_exec)
            count += 1
            # if the tool call is the final answer tool, we stop
            if tool_name == "final_answer":
                break
        # add the final output to the chat history, we only add the "answer" field
        final_answer = tool_out["answer"]
        self.chat_history.extend([
            HumanMessage(content=input),
            AIMessage(content=final_answer)
        ])
        # return the final answer in dict form
        return tool_args

agent_executor = CustomAgentExecutor()

In [21]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

out = await agent_executor.invoke("What is 10 + 10", streamer, verbose=True)

tool_calls: [{'index': 0, 'id': 'm0jr407hf', 'function': {'arguments': '{"x":10,"y":10}', 'name': 'add'}, 'type': 'function'}]
tool_name: add
arg: {"x":10,"y":10}


APIError: Failed to call a function. Please adjust your prompt. See 'failed_generation' for more details.

In [22]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

out = await agent_executor.invoke("What is 10 + 10", streamer)

In [24]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

task = asyncio.create_task(agent_executor.invoke("What is 10 + 10", streamer))

async for token in streamer:
    print(token, flush=True)

await task

Task exception was never retrieved
future: <Task finished name='Task-169' coro=<CustomAgentExecutor.invoke() done, defined at C:\Users\Harivenkat\AppData\Local\Temp\ipykernel_10376\3754874902.py:19> exception=APIError("Failed to call a function. Please adjust your prompt. See 'failed_generation' for more details.")>
Traceback (most recent call last):
  File "C:\Users\Harivenkat\AppData\Local\Temp\ipykernel_10376\3754874902.py", line 63, in invoke
    tool_call = await stream(query=input)
  File "C:\Users\Harivenkat\AppData\Local\Temp\ipykernel_10376\3754874902.py", line 34, in stream
    async for token in response.astream({
  File "d:\Langchain\langchain_env\lib\site-packages\langchain_core\runnables\base.py", line 5624, in astream
    async for item in self.bound.astream(
  File "d:\Langchain\langchain_env\lib\site-packages\langchain_core\runnables\base.py", line 3464, in astream
    async for chunk in self.atransform(input_aiter(), config, **kwargs):
  File "d:\Langchain\langchain_e

message=AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run--13852083-ec1f-455f-b29e-3abccc4b1571')
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': '8ab2d2c5p', 'function': {'arguments': '{"x":10,"y":10}', 'name': 'add'}, 'type': 'function'}]}, response_metadata={}, id='run--13852083-ec1f-455f-b29e-3abccc4b1571', tool_calls=[{'name': 'add', 'args': {'x': 10, 'y': 10}, 'id': '8ab2d2c5p', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'add', 'args': '{"x":10,"y":10}', 'id': '8ab2d2c5p', 'index': 0, 'type': 'tool_call_chunk'}])
generation_info={'finish_reason': 'tool_calls', 'model_name': 'llama3-8b-8192', 'system_fingerprint': 'fp_5b339000ab', 'service_tier': 'on_demand'} message=AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'llama3-8b-8192', 'system_fingerprint': 'fp_5b339000ab', 'service_tier': 'on_demand'}, id='run--13852083-ec1f-455f-b29e-3abccc4b15

{'answer': 'The answer to 10 + 10 is 20.', 'tools_used': ['add']}

In [25]:
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

task = asyncio.create_task(agent_executor.invoke("What is 10 + 10", streamer))

async for token in streamer:
    # first identify if we have a <<STEP_END>> token
    if token == "<<STEP_END>>":
        print("\n", flush=True)
    # we'll first identify if the token is a tool call
    elif tool_calls := token.message.additional_kwargs.get("tool_calls"):
        # if we have a tool call with a tool name, we'll print it
        if tool_name := tool_calls[0]["function"]["name"]:
            print(f"Calling {tool_name}...", flush=True)
        # if we have a tool call with arguments, we ad them to our args string
        if tool_args := tool_calls[0]["function"]["arguments"]:
            print(f"{tool_args}", end="", flush=True)

_ = await task

Calling add...
{"x":10,"y":10}

Calling final_answer...
{"answer":"The answer to 10 + 10 is 20.","tools_used":["add"]}