# LLMCompiler

This notebook shows how to implement [LLMCompiler, by Kim, et. al](https://arxiv.org/abs/2312.04511) in LangGraph.

LLMCompiler is an agent architecture designed to **speed up** the execution of agentic tasks by eagerly-executed tasks within a DAG. It also saves costs on redundant token usage by reducing the number of calls to the LLM. Below is an overview of its computational graph:

![LLMCompiler Graph](./img/llm-compiler.png)

It has 3 main components:

1. Planner: stream a DAG of tasks.
2. Task Fetching Unit: schedules and executes the tasks as soon as they are executable
3. Joiner: Responds to the user or triggers a second plan


This notebook walks through each component and shows how to wire them together using LangGraph. The end result will leave a trace [like the following](https://smith.langchain.com/public/218c2677-c719-4147-b0e9-7bc3b5bb2623/r).


**First,** install the dependencies, and set up LangSmith for tracing to more easily debug and observe the agent.

In [1]:
%pip install -U --quiet dateparser requests beautifulsoup4 langchainhub langchain_openai langsmith langgraph langchain numexpr langchain_community

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import getpass


def _get_pass(var: str):
    if var not in os.environ:
        os.environ[var] = getpass.getpass(f"{var}: ")


# Optional: Debug + trace calls using LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "True"
os.environ["LANGCHAIN_PROJECT"] = "LLMCompiler"
_get_pass("LANGCHAIN_API_KEY")
_get_pass("OPENAI_API_KEY")

## Part 1: Tools

We'll first define the tools for the agent to use in our demo. We'll give it the class search engine + calculator combo.

If you don't want to sign up for tavily, you can replace it with the free [DuckDuckGo](https://python.langchain.com/docs/integrations/tools/ddg).

In [3]:
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

# Imported from the https://github.com/langchain-ai/langgraph/tree/main/examples/plan-and-execute repo
from math_tools import get_math_tool

_get_pass("TAVILY_API_KEY")

calculate = get_math_tool(ChatOpenAI(model="gpt-4o"))
search = TavilySearchResults(
    max_results=5,
    description='tavily_search_results_json(query="the search query") - a search engine.',
)

tools = [search, calculate]

  warn_deprecated(


In [4]:
calculate.invoke(
    {
        "problem": "What's the temp of sf + 5?",
        "context": ["Thet empreature of sf is 32 degrees"],
    }
)

'37'

In [5]:
from langchain_openai import ChatOpenAI
from youtube_tools import get_youtube_parser_tool  # Adjust the import based on your file structure
from web_parser_tools import get_web_parser_tool
# Initialize the language model
llm = ChatOpenAI(model="gpt-4o")

# Get the youtube_parser tool
youtube_parser_tool = get_youtube_parser_tool(llm)
web_parser_tool = get_web_parser_tool(llm)

tools = [search, calculate] + [youtube_parser_tool, web_parser_tool]

for t in tools:
    print(t)

description='tavily_search_results_json(query="the search query") - a search engine.'
name='math' description='math(problem: str, context: Optional[list[str]]) -> float:\n - Solves the provided math problem.\n - `problem` can be either a simple math problem (e.g. "1 + 3") or a word problem (e.g. "how many apples are there if there are 3 apples and 2 apples").\n - You cannot calculate multiple expressions in one call. For instance, `math(\'1 + 3, 2 + 4\')` does not work. If you need to calculate multiple expressions, you need to call them separately like `math(\'1 + 3\')` and then `math(\'2 + 4\')`\n - Minimize the number of `math` actions as much as possible. For instance, instead of calling 2. math("what is the 10% of $1") and then call 3. math("$1 + $2"), you MUST call 2. math("what is the 110% of $1") instead, which will reduce the number of math actions.\n - You can optionally provide a list of strings as `context` to help the agent solve the problem. If there are multiple contexts

In [6]:
# Define the YouTube video URL
video_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"

# Call the tool with the video URL
result = youtube_parser_tool.func(video_url=video_url)

# Print the result
print(result)

DATA MODEL = video_url='https://www.youtube.com/watch?v=dQw4w9WgXcQ' context=None
{'url': 'https://www.youtube.com/watch?v=dQw4w9WgXcQ', 'title': 'Sample Title', 'description': 'Sample description of the video.', 'views': '12345', 'likes': '678', 'dislikes': '90', 'comments': [{'author': 'User1', 'text': 'Sample comment 1'}, {'author': 'User2', 'text': 'Sample comment 2'}]}


In [7]:
# import sys

# sys.modules.pop("get_web_parser_tool")

In [8]:
# Define the website link
link = "What the president's recent trip as mentioned in https://www.presidentofindia.gov.in/#:~:text=Hon'ble%20President%20of%20India,India%20on%2025%20July%2C%202022."

# Call the tool with the video URL
result = web_parser_tool.func(website_url=link)

# Print the result
print(result)

DATA MODEL = website_url="https://www.presidentofindia.gov.in/#:~:text=Hon'ble%20President%20of%20India,India%20on%2025%20July%2C%202022." context=['recent trip']
  Skip to main content
  FeedbackSitemapSkip to Main ContentScreen Reader AccessA+AA -EnglishHindiSmt. Droupadi MurmuThe President of IndiaSearchHomePress ReleasesSpeechesFormer PresidentsThe President of India, Smt Droupadi Murmu graced the National Launch of ‚ÄòSpiritual Empowerment for a Clean and Healthy Society‚Äô organised by Brahma Kumaris in New Delhi on May 27, 2024.The President of India, Smt Droupadi Murmu paid floral tributes to Shri Neelam Sanjiva Reddy, former President of India, on his birth anniversary at Rashtrapati Bhavan on May 19, 2024.The President of India, Smt Droupadi Murmu paid floral tributes to Shri Fakhruddin Ali Ahmed, former President of India on his birth anniversary at Rashtrapati Bhavan on May 13, 2024.The President of India, Smt Droupadi Murmu graced and addressed the 7th convocation of Centr

# Part 2: Planner


Largely adapted from [the original source code](https://github.com/SqueezeAILab/LLMCompiler/blob/main/src/llm_compiler/output_parser.py), the planner  accepts the input question and generates a task list to execute.

If it is provided with a previous plan, it is instructed to re-plan, which is useful if, upon completion of the first batch of tasks, the agent must take more actions.

The code below composes constructs the prompt template for the planner and composes it with LLM and output parser, defined in [output_parser.py](./output_parser.py). The output parser processes a task list in the following form:

```plaintext
1. tool_1(arg1="arg1", arg2=3.5, ...)
Thought: I then want to find out Y by using tool_2
2. tool_2(arg1="", arg2="${1}")'
3. join()<END_OF_PLAN>"
```

The "Thought" lines are optional. The `${#}` placeholders are variables. These are used to route tool (task) outputs to other tools.

In [9]:
from typing import Sequence

from langchain_core.language_models import BaseChatModel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableBranch
from langchain_core.tools import BaseTool
from langchain_core.messages import (
    BaseMessage,
    FunctionMessage,
    HumanMessage,
    SystemMessage,
)

from output_parser import LLMCompilerPlanParser, Task
from langchain import hub
from langchain_openai import ChatOpenAI


prompt = hub.pull("wfh/llm-compiler")
print(prompt.pretty_print())


Given a user query, create a plan to solve it with the utmost parallelizability. Each plan should comprise an action from the following [33;1m[1;3m{num_tools}[0m types:
[33;1m[1;3m{tool_descriptions}[0m
[33;1m[1;3m{num_tools}[0m. join(): Collects and combines results from prior actions.

 - An LLM agent is called upon invoking join() to either finalize the user query or wait until the plans are executed.
 - join should always be the last action in the plan, and will be called in two scenarios:
   (a) if the answer can be determined by gathering the outputs from tasks to generate the final response.
   (b) if the answer cannot be determined in the planning phase before you execute the plans. Guidelines:
 - Each action described above contains input/output types and description.
    - You must strictly adhere to the input and output types for each action.
    - The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions.
 -

In [10]:
def create_planner(
    llm: BaseChatModel, tools: Sequence[BaseTool], base_prompt: ChatPromptTemplate
):
    tool_descriptions = "\n".join(
        f"{i+1}. {tool.description}\n"
        for i, tool in enumerate(
            tools
        )  # +1 to offset the 0 starting index, we want it count normally from 1.
    )
    planner_prompt = base_prompt.partial(
        replan="",
        num_tools=len(tools)
        + 1,  # Add one because we're adding the join() tool at the end.
        tool_descriptions=tool_descriptions,
    )
    replanner_prompt = base_prompt.partial(
        replan=' - You are given "Previous Plan" which is the plan that the previous agent created along with the execution results '
        "(given as Observation) of each plan and a general thought (given as Thought) about the executed results."
        'You MUST use these information to create the next plan under "Current Plan".\n'
        ' - When starting the Current Plan, you should start with "Thought" that outlines the strategy for the next plan.\n'
        " - In the Current Plan, you should NEVER repeat the actions that are already executed in the Previous Plan.\n"
        " - You must continue the task index from the end of the previous one. Do not repeat task indices.",
        num_tools=len(tools) + 1,
        tool_descriptions=tool_descriptions,
    )

    def should_replan(state: list):
        # Context is passed as a system message
        return isinstance(state[-1], SystemMessage)

    def wrap_messages(state: list):
        return {"messages": state}

    def wrap_and_get_last_index(state: list):
        next_task = 0
        for message in state[::-1]:
            if isinstance(message, FunctionMessage):
                next_task = message.additional_kwargs["idx"] + 1
                break
        state[-1].content = state[-1].content + f" - Begin counting at : {next_task}"
        return {"messages": state}

    return (
        RunnableBranch(
            (should_replan, wrap_and_get_last_index | replanner_prompt),
            wrap_messages | planner_prompt,
        )
        | llm
        | LLMCompilerPlanParser(tools=tools)
    )

In [11]:
llm = ChatOpenAI(model="gpt-4o")
# This is the primary "agent" in our application
planner = create_planner(llm, tools, prompt)

In [12]:
planner.invoke([HumanMessage("What is given in www.botgems.com?")])

[{'idx': 1,
  'tool': StructuredTool(name='web_parser', description='web_parser(website_url: str, context: Optional[list[str]]) -> dict:\n - Extracts all the text content from the provided URL.\n - `website_url` should be a valid website URL.\n - You can optionally provide a list of strings as `context` to help the agent extract specific information.\n - The returned output contains website text content.\n - Minimize the number of `web_parser` actions as much as possible.', args_schema=<class 'pydantic.v1.main.web_parserSchema'>, func=<function get_web_parser_tool.<locals>.parse_website at 0x744df1d88700>),
  'args': {'website_url': 'https://www.botgems.com'},
  'dependencies': [],
  'thought': None},
 {'idx': 2, 'tool': 'join', 'args': (), 'dependencies': [1], 'thought': None}]

In [13]:
example_question = "What's the temperature in SF raised to the 3rd power?"

for task in planner.stream([HumanMessage(content=example_question)]):
    print(task["tool"], task["args"])
    print("---")

description='tavily_search_results_json(query="the search query") - a search engine.' {'query': 'current temperature in San Francisco'}
---
name='math' description='math(problem: str, context: Optional[list[str]]) -> float:\n - Solves the provided math problem.\n - `problem` can be either a simple math problem (e.g. "1 + 3") or a word problem (e.g. "how many apples are there if there are 3 apples and 2 apples").\n - You cannot calculate multiple expressions in one call. For instance, `math(\'1 + 3, 2 + 4\')` does not work. If you need to calculate multiple expressions, you need to call them separately like `math(\'1 + 3\')` and then `math(\'2 + 4\')`\n - Minimize the number of `math` actions as much as possible. For instance, instead of calling 2. math("what is the 10% of $1") and then call 3. math("$1 + $2"), you MUST call 2. math("what is the 110% of $1") instead, which will reduce the number of math actions.\n - You can optionally provide a list of strings as `context` to help the a

In [14]:
example_question = "What is the description of this video https://www.youtube.com/watch?v=dQw4w9WgXcQ by Krish naik on datascience?"

for task in planner.stream([HumanMessage(content=example_question)]):
    print(task["tool"], task["args"])
    print("---")

name='youtube_parser' description='youtube_parser(video_url: str, context: Optional[list[str]]) -> dict:\n - Extracts metadata and comments from the provided YouTube video URL.\n - `video_url` should be a valid YouTube video URL.\n - You can optionally provide a list of strings as `context` to help the agent extract specific information.\n - The returned dictionary contains video title, description, views, likes, dislikes, and comments.\n - Minimize the number of `youtube_parser` actions as much as possible.' args_schema=<class 'pydantic.v1.main.youtube_parserSchema'> func=<function get_youtube_parser_tool.<locals>.parse_youtube_video at 0x744df1d885e0> {'video_url': 'https://www.youtube.com/watch?v=dQw4w9WgXcQ', 'context': ['Krish naik', 'datascience']}
---
join ()
---


In [15]:
example_question = "What is the description of the latest youtube video by Krish naik on datascience?"

for task in planner.stream([HumanMessage(content=example_question)]):
    print(task["tool"], task["args"])
    print("---")

description='tavily_search_results_json(query="the search query") - a search engine.' {'query': 'latest youtube video by Krish naik on datascience'}
---
name='youtube_parser' description='youtube_parser(video_url: str, context: Optional[list[str]]) -> dict:\n - Extracts metadata and comments from the provided YouTube video URL.\n - `video_url` should be a valid YouTube video URL.\n - You can optionally provide a list of strings as `context` to help the agent extract specific information.\n - The returned dictionary contains video title, description, views, likes, dislikes, and comments.\n - Minimize the number of `youtube_parser` actions as much as possible.' args_schema=<class 'pydantic.v1.main.youtube_parserSchema'> func=<function get_youtube_parser_tool.<locals>.parse_youtube_video at 0x744df1d885e0> {'video_url': '$1'}
---
join ()
---


## 3. Task Fetching Unit

This component schedules the tasks. It receives a stream of tools of the following format:

```typescript
{
    tool: BaseTool,
    dependencies: number[],
}
```


The basic idea is to begin executing tools as soon as their dependencies are met. This is done through multi-threading. We will combine the task fetching unit and executor below:

![diagram](./img/diagram.png)

In [16]:
from typing import Any, Union, Iterable, List, Tuple, Dict
from typing_extensions import TypedDict
import re

from langchain_core.runnables import (
    chain as as_runnable,
)

from concurrent.futures import ThreadPoolExecutor, wait
import time


def _get_observations(messages: List[BaseMessage]) -> Dict[int, Any]:
    # Get all previous tool responses
    results = {}
    for message in messages[::-1]:
        if isinstance(message, FunctionMessage):
            results[int(message.additional_kwargs["idx"])] = message.content
    return results


class SchedulerInput(TypedDict):
    messages: List[BaseMessage]
    tasks: Iterable[Task]


def _execute_task(task, observations, config):
    tool_to_use = task["tool"]
    if isinstance(tool_to_use, str):
        return tool_to_use
    args = task["args"]
    try:
        if isinstance(args, str):
            resolved_args = _resolve_arg(args, observations)
        elif isinstance(args, dict):
            resolved_args = {
                key: _resolve_arg(val, observations) for key, val in args.items()
            }
        else:
            # This will likely fail
            resolved_args = args
    except Exception as e:
        return (
            f"ERROR(Failed to call {tool_to_use.name} with args {args}.)"
            f" Args could not be resolved. Error: {repr(e)}"
        )
    try:
        return tool_to_use.invoke(resolved_args, config)
    except Exception as e:
        return (
            f"ERROR(Failed to call {tool_to_use.name} with args {args}."
            + f" Args resolved to {resolved_args}. Error: {repr(e)})"
        )


def _resolve_arg(arg: Union[str, Any], observations: Dict[int, Any]):
    # $1 or ${1} -> 1
    ID_PATTERN = r"\$\{?(\d+)\}?"

    def replace_match(match):
        # If the string is ${123}, match.group(0) is ${123}, and match.group(1) is 123.

        # Return the match group, in this case the index, from the string. This is the index
        # number we get back.
        idx = int(match.group(1))
        return str(observations.get(idx, match.group(0)))

    # For dependencies on other tasks
    if isinstance(arg, str):
        return re.sub(ID_PATTERN, replace_match, arg)
    elif isinstance(arg, list):
        return [_resolve_arg(a, observations) for a in arg]
    else:
        return str(arg)


@as_runnable
def schedule_task(task_inputs, config):
    task: Task = task_inputs["task"]
    observations: Dict[int, Any] = task_inputs["observations"]
    try:
        observation = _execute_task(task, observations, config)
    except Exception:
        import traceback

        observation = traceback.format_exception()  # repr(e) +
    observations[task["idx"]] = observation


def schedule_pending_task(
    task: Task, observations: Dict[int, Any], retry_after: float = 0.2
):
    while True:
        deps = task["dependencies"]
        if deps and (any([dep not in observations for dep in deps])):
            # Dependencies not yet satisfied
            time.sleep(retry_after)
            continue
        schedule_task.invoke({"task": task, "observations": observations})
        break


@as_runnable
def schedule_tasks(scheduler_input: SchedulerInput) -> List[FunctionMessage]:
    """Group the tasks into a DAG schedule."""
    # For streaming, we are making a few simplifying assumption:
    # 1. The LLM does not create cyclic dependencies
    # 2. That the LLM will not generate tasks with future deps
    # If this ceases to be a good assumption, you can either
    # adjust to do a proper topological sort (not-stream)
    # or use a more complicated data structure
    tasks = scheduler_input["tasks"]
    args_for_tasks = {}
    messages = scheduler_input["messages"]
    # If we are re-planning, we may have calls that depend on previous
    # plans. Start with those.
    observations = _get_observations(messages)
    task_names = {}
    originals = set(observations)
    # ^^ We assume each task inserts a different key above to
    # avoid race conditions...
    futures = []
    retry_after = 0.25  # Retry every quarter second
    with ThreadPoolExecutor() as executor:
        for task in tasks:
            deps = task["dependencies"]
            task_names[task["idx"]] = (
                task["tool"] if isinstance(task["tool"], str) else task["tool"].name
            )
            args_for_tasks[task["idx"]] = task["args"]
            if (
                # Depends on other tasks
                deps
                and (any([dep not in observations for dep in deps]))
            ):
                futures.append(
                    executor.submit(
                        schedule_pending_task, task, observations, retry_after
                    )
                )
            else:
                # No deps or all deps satisfied
                # can schedule now
                schedule_task.invoke(dict(task=task, observations=observations))
                # futures.append(executor.submit(schedule_task.invoke dict(task=task, observations=observations)))

        # All tasks have been submitted or enqueued
        # Wait for them to complete
        wait(futures)
    # Convert observations to new tool messages to add to the state
    new_observations = {
        k: (task_names[k], args_for_tasks[k], observations[k])
        for k in sorted(observations.keys() - originals)
    }
    tool_messages = [
        FunctionMessage(
            name=name, content=str(obs), additional_kwargs={"idx": k, "args": task_args}
        )
        for k, (name, task_args, obs) in new_observations.items()
    ]
    return tool_messages

In [17]:
import itertools


@as_runnable
def plan_and_schedule(messages: List[BaseMessage], config):
    tasks = planner.stream(messages, config)
    # Begin executing the planner immediately
    try:
        tasks = itertools.chain([next(tasks)], tasks)
    except StopIteration:
        # Handle the case where tasks is empty.
        tasks = iter([])
    scheduled_tasks = schedule_tasks.invoke(
        {
            "messages": messages,
            "tasks": tasks,
        },
        config,
    )
    return scheduled_tasks

## Final node to check answer's recency

In [18]:
import sys
if sys.modules.get("check_answer_recency"):
    sys.modules.pop("check_answer_recency")
from check_answer_recency import answer_recent_node

#### Example Plan

We still haven't introduced any cycles in our computation graph, so this is all easily expressed in LCEL.

In [19]:
example_question

'What is the description of the latest youtube video by Krish naik on datascience?'

In [20]:
tool_messages = plan_and_schedule.invoke([HumanMessage(content=example_question)])

DATA MODEL = video_url='https://www.youtube.com/watch?v=LZzq1zSL1bs' context=None


In [21]:
tool_messages

[FunctionMessage(content="[{'url': 'https://github.com/krishnaik06/The-Grand-Complete-Data-Science-Materials', 'content': 'You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window.'}, {'url': 'https://www.youtube.com/watch?v=LZzq1zSL1bs', 'content': 'Statistics is the discipline that concerns the collection, organization, analysis, interpretation, and presentation of data. In applying statistics to a scie...'}, {'url': 'https://www.youtube.com/watch?v=JxgmHe2NyeY', 'content': 'All the materials are available in the below linkhttps://github.com/krishnaik06/The-Grand-Complete-Data-Science-Materials/tree/mainVisit https://krishnaik.in...'}, {'url': 'https://www.youtube.com/user/krishnaik06', 'content': 'I have delivered over 30 tech talks on data science, machine learning, and AI at various meet-ups, technical institutions, and community-arranged forums.

## 4. "Joiner" 

So now we have the planning and initial execution done. We need a component to process these outputs and either:

1. Respond with the correct answer.
2. Loop with a new plan.

The paper refers to this as the "joiner". It's another LLM call. We are using function calling to improve parsing reliability.

In [22]:
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.chains.openai_functions import create_structured_output_runnable
from langchain_core.messages import AIMessage


class FinalResponse(BaseModel):
    """The final response/answer."""

    response: str


class Replan(BaseModel):
    feedback: str = Field(
        description="Analysis of the previous attempts and recommendations on what needs to be fixed."
    )


class JoinOutputs(BaseModel):
    """Decide whether to replan or whether you can return the final response."""

    thought: str = Field(
        description="The chain of thought reasoning for the selected action"
    )
    action: Union[FinalResponse, Replan]


joiner_prompt = hub.pull("wfh/llm-compiler-joiner").partial(
    examples=""
)  # You can optionally add examples
llm = ChatOpenAI(model="gpt-4o")

runnable = create_structured_output_runnable(JoinOutputs, llm, joiner_prompt)

We will select only the most recent messages in the state, and format the output to be more useful for
the planner, should the agent need to loop.

In [23]:
def _parse_joiner_output(decision: JoinOutputs) -> List[BaseMessage]:
    response = [AIMessage(content=f"Thought: {decision.thought}")]
    if isinstance(decision.action, Replan):
        return response + [
            SystemMessage(
                content=f"Context from last attempt: {decision.action.feedback}"
            )
        ]
    else:
        return response + [AIMessage(content=decision.action.response)]


def select_recent_messages(messages: list) -> dict:
    selected = []
    for msg in messages[::-1]:
        selected.append(msg)
        if isinstance(msg, HumanMessage):
            break
    return {"messages": selected[::-1]}


joiner = select_recent_messages | runnable | _parse_joiner_output

In [24]:
tool_messages

[FunctionMessage(content="[{'url': 'https://github.com/krishnaik06/The-Grand-Complete-Data-Science-Materials', 'content': 'You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window.'}, {'url': 'https://www.youtube.com/watch?v=LZzq1zSL1bs', 'content': 'Statistics is the discipline that concerns the collection, organization, analysis, interpretation, and presentation of data. In applying statistics to a scie...'}, {'url': 'https://www.youtube.com/watch?v=JxgmHe2NyeY', 'content': 'All the materials are available in the below linkhttps://github.com/krishnaik06/The-Grand-Complete-Data-Science-Materials/tree/mainVisit https://krishnaik.in...'}, {'url': 'https://www.youtube.com/user/krishnaik06', 'content': 'I have delivered over 30 tech talks on data science, machine learning, and AI at various meet-ups, technical institutions, and community-arranged forums.

In [25]:
input_messages = [HumanMessage(content=example_question)] + tool_messages

In [26]:
joiner.invoke(input_messages)

[AIMessage(content='Thought: The information about the latest YouTube video description by Krish Naik on data science is not explicitly present in the search results. I need to replan to gather this specific information.'),
 SystemMessage(content='Context from last attempt: Need to specifically find the latest YouTube video by Krish Naik related to data science and get its description.')]

## 5. Compose using LangGraph

We'll define the agent as a stateful graph, with the main nodes being:

1. Plan and execute (the DAG from the first step above)
2. Join: determine if we should finish or replan
3. Recontextualize: update the graph state based on the output from the joiner

In [27]:
import sys
if sys.modules.get("check_answer_recency"):
    sys.modules.pop("check_answer_recency")
from check_answer_recency import answer_recent_node

In [28]:
from langgraph.graph import MessageGraph, END
from langchain_core.agents import AgentFinish
from typing import Dict

graph_builder = MessageGraph()

# 1.  Define vertices
# We defined plan_and_schedule above already
# Assign each node to a state variable to update
graph_builder.add_node("plan_and_schedule", plan_and_schedule)
graph_builder.add_node("join", joiner)
graph_builder.add_node("answer_recent", answer_recent_node)


## Define edges
graph_builder.add_edge("plan_and_schedule", "join")

### This condition determines looping logic


def should_continue(state: List[BaseMessage]):
    print("> should_continue  STATE =", state)
    if isinstance(state[-1], AIMessage) or "END" in state[-1].content:
        return "answer_recent"
    return "plan_and_schedule"


graph_builder.add_conditional_edges(
    "join",
    # Next, we pass in the function that will determine which node is called next.
    should_continue,
)

def should_replan_recency(state: List[BaseMessage]):
    print("> should_replan_recency", state)
    # import pdb; pdb.set_trace()
    if isinstance(state[-1], AIMessage) or "END" in state[-1].content:
        return END
    return "plan_and_schedule"

graph_builder.add_conditional_edges(
    "answer_recent",
    should_replan_recency
)

graph_builder.set_entry_point("plan_and_schedule")
chain = graph_builder.compile()

#### Simple question

Let's ask a simple question of the agent.

In [35]:
for step in chain.stream([HumanMessage(content="What's given in www.botgems.com?")]):
    print(step)
    print("---")
else:
    print(step["answer_recent"].content)

DATA MODEL = website_url='https://www.botgems.com' context=None
{'plan_and_schedule': [FunctionMessage(content='    BotGemsGet Started@FreeHomePricingAbout UsServicesContact UsGet Started@Free\n      BotGemsGet Started@Free\n        BotGems\n          \n          BotGems\n        Get Started@Free\n          Get Started\n          @Free\n          Home\n            Home\n          Pricing\n            Pricing\n          About Us\n            About Us\n          Services\n            Services\n          Contact Us\n            Contact Us\n          Get Started@Free\n            Get Started@Free\n              Get Started\n              @Free\n  Boost your performance2Xwith ourIntelligent Whatsapp AI ChatBotGet Started@Free\n    Boost your performance2Xwith ourIntelligent Whatsapp AI ChatBotGet Started@Free\n      Boost your performance2Xwith ourIntelligent Whatsapp AI ChatBot\n        Boost your performance2Xwith our\n          2X\n        Intelligent Whatsapp AI ChatBot\n      Get Start

In [57]:
step

{'answer_recent': AIMessage(content='The website www.botgems.com offers services related to AI-driven solutions and streamlined automation, specifically focusing on WhatsApp chatbots. It provides guides on getting started, pricing information, and contact details for support or customized solutions. You can also learn how to utilize features within the app and find useful links such as terms of service and privacy policy.', id='8d9501a2-b114-4ec1-bb95-2c6fba0b3b01')}

In [34]:
for step in chain.stream([HumanMessage(content="What's the GDP of New York?")]):
    print(step)
    print("---")
else:
    print(step["answer_recent"].content)

{'plan_and_schedule': [FunctionMessage(content='[{\'url\': \'https://fred.stlouisfed.org/series/NYNGSP\', \'content\': \'Graph and download economic data for Gross Domestic Product: All Industry Total in New York (NYNGSP) from 1997 to 2023 about GSP, NY, industry, GDP, and USA.\'}, {\'url\': \'https://en.wikipedia.org/wiki/Economy_of_New_York_(state)\', \'content\': "36,000 farms occupy 7.6\\xa0million acres or about 25 percent of the state\'s land area, to produce a variety of food products.[22] Here are some of the items in which New York ranks high nationally:\\nNew York is an agricultural leader and is one of the top five states for agricultural products, including dairy, cattle, apples, cabbages, potatoes, beets, viticulture, onions, maple syrup and many others.[23] The state is the second largest producer of cabbage in the U.S.[22] In April 2021, GlobalFoundries, a company specializing in the semiconductor industry, moved its headquarters from Silicon Valley, California to its mo

In [33]:
# Final answer
print(step)

{'join': [AIMessage(content='Thought: The GDP of New York for the 3rd quarter of 2023 is provided as $2.0 trillion.', id='29427046-04c4-4b75-baf3-b68422b51a08'), AIMessage(content='As of the 3rd quarter of 2023, the GDP of New York was $2.0 trillion.', id='0d087fc0-d26d-444b-a01f-3a2127d19834')]}


#### Multi-hop question

This question requires that the agent perform multiple searches.

In [33]:
steps = chain.stream(
    [
        HumanMessage(
            content="What's the oldest parrot alive, and how much longer is that than the average?"
        )
    ],
    {
        "recursion_limit": 100,
    },
)
for step in steps:
    print(step)
    print("---")
else:
    print(step["join"][-1].content)

{'plan_and_schedule': [FunctionMessage(content='[{\'url\': \'https://www.guinnessworldrecords.com/world-records/442525-oldest-parrot-ever\', \'content\': "Oldest parrot ever. The oldest parrot ever is Cookie, a Major Mitchell\'s cockatoo (Cacatua leadbeateri) who was at least 82 years and 88 days old when he passed away on 27 August 2016. Cookie\'s exact age was unknown when he arrived at Brookfield Zoo in May 1934. His arrival was documented in a ledger dated May 1934, when he was estimated to ..."}, {\'url\': \'https://en.wikipedia.org/wiki/Cookie_(cockatoo)\', \'content\': \'He was one of the longest-lived birds on record[4] and was recognised by the Guinness World Records as the oldest living parrot in the world.[5]\\nThe next-oldest pink cockatoo to be found in a zoological setting was a 31-year-old female bird located at Paradise Wildlife Sanctuary, England.[3] Information published by the World Parrot Trust states longevity for Cookie\\\'s species in captivity is on average 40‚Ä

KeyError: 'join'

In [32]:
steps = chain.stream(
    [
        HumanMessage(
            content="What's the recent youtube video posted by krish naik on datascience?"
        )
    ],
    {
        "recursion_limit": 100,
    },
)
for step in steps:
    print(step)
    print("---")
else:
    print(step["join"][-1].content)

{'plan_and_schedule': [FunctionMessage(content='[{\'url\': \'https://github.com/krishnaik06/The-Grand-Complete-Data-Science-Materials\', \'content\': \'You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window.\'}, {\'url\': \'https://www.youtube.com/user/krishnaik06\', \'content\': \'This is my YouTube channel where I explain various topics on machine learning, deep learning, and AI with many real-world problem scenarios. ... I have delivered over 30 tech talks on data science ...\'}, {\'url\': \'https://www.youtube.com/watch?v=vxZEUK2EutU\', \'content\': \'We are excited to welcome our guest speaker, Krish Naik for the lecture series.Topic : End to end lifecycle for dataDate : 9 February 2021Time : 11:00 a.m.A ...\'}, {\'url\': \'https://www.youtube.com/playlist?list=PLZoTAELRMXVOnN_g96ayzXX5i7RRO0QhL\', \'content\': \'Share your videos with friends,

KeyError: 'join'

In [30]:
steps = chain.stream(
    [
        HumanMessage(
            content="Give a list of atleast 5 recent trips of our president Murmu in 2024?"
        )
    ],
    {
        "recursion_limit": 100,
    },
)
for step in steps:
    print(step)
    print("---")
else:
    print(step["join"].content)

{'plan_and_schedule': [FunctionMessage(content='[{\'url\': \'https://thenewsmill.com/2024/03/president-murmu-successfully-wraps-up-her-3-day-state-visit-to-mauritius-receives-warm-send-off-from-pm-jugnauth/\', \'content\': "President Murmu also visited the Intercontinental Slavery Museum in Port Louis, Mauritius before concluding the trip. Meanwhile, on Tuesday, President Droupadi Murmu declared India\'s advancement across all domains, emphasizing the nation\'s dynamic and progressive trajectory. She said that the \'New Bharat\' is on the brink of joining the ..."}, {\'url\': \'https://www.dailypioneer.com/2024/state-editions/president-murmu-to-go-on-5-day-himachal-visit-beginning-may-4.html\', \'content\': \'President Droupadi Murmu will arrive in Shimla on May 4 for a five-day visit of Himachal Pradesh and will stay at the Rashtrapati Nivas, a presidential retreat, near Chharabra, about 14 km from ...\'}, {\'url\': \'https://www.outlookindia.com/national/parliament-budget-session-202

KeyError: 'join'

In [31]:
print(step["answer_recent"].content)

Here are three recent trips made by President Droupadi Murmu in 2024:

1. **State Visit to Mauritius**: From March 11 to March 13, 2024, President Murmu visited Mauritius as the Chief Guest for the National Day celebrations.

2. **Visit to Himachal Pradesh**: From May 4, 2024, President Murmu embarked on a five-day visit to Himachal Pradesh.

3. **Visit to Uttarakhand**: From April 23 to April 24, 2024, President Murmu visited Uttarakhand and attended the convocation of AIIMS Rishikesh and the Ganga Aarti at Rishikesh.

I could not find additional trips to make a total of five. You may need to consult further sources for a complete list.


In [29]:
steps = chain.stream(
    [
        HumanMessage(
            content="Where is PM Modi next rally?"
        )
    ],
    {
        "recursion_limit": 100,
    },
)
for step in steps:
    print(step)
    print("---")
else:
    print(step["answer_recent"].content)

{'plan_and_schedule': [FunctionMessage(content='[{\'url\': \'https://www.wionews.com/india-election-2024/lok-sabha-election-2024-live-pm-modi-to-address-a-poll-rally-in-delhi-724091\', \'content\': "Lok Sabha Elections 2024 Updates: In the latest news from the elections, Prime Minister Narendra Modi will be addressing a public meeting in New Delhi on Wednesday (May 22). PM Modi Rally In Delhi Dwarka Today Meanwhile, India is gearing up for the sixth phase of the general election on May 25, during which the national capital\'s seven parliamentary seats will be up for voting. So far, polling ..."}, {\'url\': \'https://www.cnn.com/2024/05/01/india/india-narendra-modi-rally-election-intl-hnk/index.html\', \'content\': "Modi supporters take selfies in Aligarh, India, on April 22, 2024. John Mees/CNN. And Modi\'s remarks did little to shake the faith of his devoted followers in Aligarh. Lawyer Gaurav Mahajan says ..."}, {\'url\': \'https://www.hindustantimes.com/india-news/lok-sabha-election

In [38]:
steps = chain.stream(
    [
        HumanMessage(
            content="What happened in recent cricket match IPL and when is the next match?"
        )
    ],
    {
        "recursion_limit": 100,
    },
)
for step in steps:
    print(step)
    print("---")
else:
    print(step["answer_recent"].content)

{'plan_and_schedule': [FunctionMessage(content='[{\'url\': \'https://m.cricbuzz.com/live-cricket-scores/91740/kkr-vs-srh-final-ipl-2024\', \'content\': "Kolkata Knight Riders, on the other hand, were relentless. In fact, that\'s how they\'ve been playing for a while. Didn\'t lose a single match this month, finished at the top of the table, beat SRH convincingly in Ahmedabad in Qualifier 1 and gave them a bigger thrashing at Chepauk in the final."}, {\'url\': \'https://www.iplt20.com/matches/results\', \'content\': \'MATCHES. Explore IPL 2024 match results on the official website. Stay updated with real-time cricket scores and detailed match outcomes.\'}, {\'url\': \'https://www.iplt20.com/video/52200/m08-srh-vs-mi--match-highlights\', \'content\': "Ajinkya Rahane\\nRelated Videos\\n27 Mar, 2024|\\n20k\\n03:02 mins\\n27 Mar, 2024|\\n4.7k\\n02:04 mins\\n27 Mar, 2024|\\n12.7k\\n01:23 mins\\n27 Mar, 2024|\\n7.2k\\n00:19 mins\\n27 Mar, 2024|\\n2.8k\\n00:29 mins\\n27 Mar, 2024|\\n1.4k\\n00:2

In [40]:
steps = chain.stream(
    [
        HumanMessage(
            content="When is next IPL?"
        )
    ],
    {
        "recursion_limit": 100,
    },
)
for step in steps:
    print(step)
    print("---")
else:
    print(step["answer_recent"].content)

{'plan_and_schedule': [FunctionMessage(content='[{\'url\': \'https://www.cricbuzz.com/cricket-series/5945/indian-premier-league-2023/matches\', \'content\': \'Narendra Modi Stadium, Ahmedabad. Chennai Super Kings won by 5 wkts (DLS method) 7:00 AM. 02:00 PM GMT / 07:30 PM LOCAL. Indian Premier League 2023 Schedule, Match Timings, Venue Details, Upcoming ...\'}, {\'url\': \'https://www.espncricinfo.com/series/ipl-2021-1249214/match-schedule-fixtures-and-results\', \'content\': "Kolkata Knight Riders\\nDelhi Capitals\\nDC won by 7 wickets (with 21 balls remaining)\\nPunjab Kings\\nRoyal Challengers Bangalore\\nPBKS won by 34 runs\\nChennai Super Kings\\nMumbai Indians\\nMI won by 4 wickets (with 0 balls remaining)\\nRajasthan Royals\\nSunrisers Hyderabad\\nRR won by 55 runs\\nPunjab Kings\\nDelhi Capitals\\nDC won by 7 wickets (with 14 balls remaining)\\nChennai Super Kings\\nMumbai Indians\\nCSK won by 20 runs\\nRoyal Challengers Bangalore\\nKolkata Knight Riders\\nKKR won by 9 wickets 

In [41]:
steps = chain.stream(
    [
        HumanMessage(
            content="Is IPL over?"
        )
    ],
    {
        "recursion_limit": 100,
    },
)
for step in steps:
    print(step)
    print("---")
else:
    print(step["answer_recent"].content)

{'plan_and_schedule': [FunctionMessage(content='[{\'url\': \'https://sports.ndtv.com/ipl-2024/how-does-ipls-rs-20-crore-prize-money-for-winners-compare-to-psl-bbl-and-sa20-5764866\', \'content\': \'The Indian Premier League (IPL) 2024 season is over and Kolkata Knight Riders emerged as the winners following an impressive campaign. The Shreyas Iyer-led side topped the league stage with 19 ...\'}, {\'url\': \'https://www.iplt20.com/#!\', \'content\': \'Follow the latest updates, scores, teams, and exclusive content of IPL 2024, the official site for real-time cricket. Watch highlights, player profiles, and more.\'}, {\'url\': \'https://www.bbc.com/sport/cricket/articles/cd114e2l99qo\', \'content\': "Kolkata Knight Riders\' previous IPL titles came in 2012 and 2014 Indian Premier League final, Chennai Sunrisers Hyderabad 113 all out (18.3 overs): Cummins 24 (19); Russell 3-19, Starc 2-14 ..."}, {\'url\': \'https://www.espncricinfo.com/story/ipl-2024-faqs-everything-you-need-to-know-about-

#### Multi-step  math

In [40]:
for step in chain.stream(
    [
        HumanMessage(
            content="What's ((3*(4+5)/0.5)+3245) + 8? What's 32/4.23? What's the sum of those two values?"
        )
    ]
):
    print(step)

{'plan_and_schedule': [FunctionMessage(content='3307.0', additional_kwargs={'idx': 1, 'args': {'problem': '3*(4+5)/0.5 + 3245 + 8'}}, name='math', id='ea6b72b7-0e21-4ae1-bcec-288be7061d83'), FunctionMessage(content='7.565011820330969', additional_kwargs={'idx': 2, 'args': {'problem': '32/4.23'}}, name='math', id='ef240d3c-baee-4125-8f63-df462ea746b5'), FunctionMessage(content='3314.565011820331', additional_kwargs={'idx': 3, 'args': {'problem': 'sum of $1 and $2', 'context': ['$1', '$2']}}, name='math', id='cf7ff256-3012-4c60-b7b6-7451ff2cbd15'), FunctionMessage(content='join', additional_kwargs={'idx': 4, 'args': ()}, name='join', id='41e4745e-20f3-4227-8519-e69fe5e857df')]}
> should_continue  STATE = [HumanMessage(content="What's ((3*(4+5)/0.5)+3245) + 8? What's 32/4.23? What's the sum of those two values?", id='c0d5d967-cef5-4c82-be3d-4fe9cc347229'), FunctionMessage(content='3307.0', additional_kwargs={'idx': 1, 'args': {'problem': '3*(4+5)/0.5 + 3245 + 8'}}, name='math', id='ea6b72

In [45]:
# Final answer
print(step["join"][-1].content)

The value of ((3*(4+5)/0.5)+3245) + 8 is 3307.0. The value of 32/4.23 is 7.565011820330969. The sum of those two values is 3314.565011820331.


## Conclusion

Congrats on building your first LLMCompiler agent! I'll leave you with some known limitations to the implementation above:

1. The planner output parsing format is fragile if your function requires more than 1 or 2 arguments. We could make it more robust by using streaming tool calling.
2. Variable substitution is fragile in the example above. It could be made more robust by using a fine-tuned model and a more robust syntax (using e.g., Lark or a tool calling schema)
3. The state can grow quite long if you require multiple re-planning runs. To handle, you could add a message compressor once you go above a certain token limit.
