# Implementing LLMCompiler using LangGraph
By Kim, et. al [🔗](https://arxiv.org/abs/2312.04511)

LLMCompiler is an agent architecture intented on speeding up the latency of agentic tasks via fast, parallel tool execution.

It has 3 main components:

1. Planner: generate a DAG of tasks.
2. Task Fetching Unit: schedules and executes the tasks
3. Joiner: Responds to the user or triggers a second plan.

# Part 1: Planner

#### Output Parser

Parses task lists in the following form:
```plaintext
1. tool_1("arg1", 3.5, ...)
Thought: I then want to find out Y by using tool_2
2. tool_2("", ${1})'
3. join()<END_OF_PLAN>"
```

The "Thought" lines are optional. The `${#}` placeholders are variables. These are used to route tool (task) outputs to other tools.

In [1]:
import ast
import json
import re
from typing import Any, Optional, Sequence, Union

from langchain.agents.agent import AgentOutputParser
from langchain.schema import OutputParserException
from langchain_core.tools import BaseTool

THOUGHT_PATTERN = r"Thought: ([^\n]*)"
# ACTION_PATTERN = r"\n*(\d+)\. (.*?})(\s*#\w+\n)?"
# $1 or ${1} -> 1
ID_PATTERN = r"\$\{?(\d+)\}?"
END_OF_PLAN = "<END_OF_PLAN>"


class ActionParserFSM:
    def __init__(self):
        self.reset()

    def reset(self):
        self.state = "START"
        self.task_index = ""
        self.action = ""
        self.comment = ""
        self.bracket_count = 0
        self.actions = []

    def parse(self, text: str):
        for char in text:
            action = self.process_char(char)
            if action:
                yield action
        action = self.save_action()
        if action:
            yield action

    def process_char(self, char: str) -> Optional[dict]:
        action = None
        if self.state == "START":
            if char.isdigit():
                self.state = "NUMBER"
                self.task_index += char
            elif char == "\n":
                self.reset()
        elif self.state == "NUMBER":
            if char == ".":
                self.state = "ACTION"
            elif char.isdigit():
                self.task_index += char
            else:
                self.reset()
        elif self.state == "ACTION":
            if char == "{":
                self.bracket_count += 1
            elif char == "}":
                self.bracket_count -= 1
                if self.bracket_count == 0:
                    self.state = "COMMENT"
            self.action += char
        elif self.state == "COMMENT":
            if char == "\n":
                action = self.save_action()
                self.reset()
            else:
                self.comment += char
        return action

    def save_action(self):
        if self.task_index and self.action:
            parsed_action = json.loads(self.action.strip())
            tool_name, args = next(iter(parsed_action.items()))
            return {
                "task_index": int(self.task_index),
                "tool_name": tool_name,
                "args": args,
            }


class LLMCompilerPlanParser(AgentOutputParser, extra="allow"):
    """Planning output parser."""

    def __init__(self, tools: Sequence[BaseTool], **kwargs):
        super().__init__(**kwargs)
        self.tools = tools

    def parse(self, text: str) -> list[str]:
        parser = ActionParserFSM()
        graph_dict = {}
        for task in parser.parse(text):
            idx = int(task["task_index"])

            task = instantiate_task(
                tools=self.tools,
                idx=idx,
                tool_name=task["tool_name"],
                args=task["args"],
            )

            graph_dict[idx] = task
            if task["tool"] == "join":
                break

        return graph_dict


### Helper functions


def default_dependency_rule(idx, args: str):
    matches = re.findall(ID_PATTERN, args)
    numbers = [int(match) for match in matches]
    return idx in numbers


def _get_dependencies_from_graph(
    idx: int, tool_name: str, args: Sequence[Any]
) -> dict[str, list[str]]:
    """Get dependencies from a graph."""
    if tool_name == "join":
        return list(range(1, idx))
    return [i for i in range(1, idx) if default_dependency_rule(i, str(args))]


def instantiate_task(
    tools: Sequence[BaseTool],
    idx: int,
    tool_name: str,
    args: Union[dict, str, bool, None],
) -> dict:
    dependencies = _get_dependencies_from_graph(idx, tool_name, args)
    if tool_name == "join":
        tool = "join"
    else:
        try:
            tool = tools[[tool.name for tool in tools].index(tool_name)]
        except ValueError as e:
            raise OutputParserException(f"Tool {tool_name} not found.")
    return dict(
        tool=tool,
        args=args,
        dependencies=dependencies,
    )

#### Planner Code

This takes the input and outputs a plan.

In [2]:
import asyncio
import json
import re
from typing import Any, Optional, Sequence, Union
from uuid import UUID

from langchain.callbacks.base import AsyncCallbackHandler, Callbacks
from langchain.chat_models.base import BaseChatModel
from langchain.schema import LLMResult
from langchain.schema.messages import HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import RunnableBranch
from langchain_core.tools import BaseTool

END_OF_PLAN = "<END_OF_PLAN>"

JOINER_FINISH = "Finish"
JOINER_REPLAN = "Replan"


JOIN_DESCRIPTION = (
    "join():\n"
    " - Collects and combines results from prior actions.\n"
    " - A LLM agent is called upon invoking join to either finalize the user query or wait until the plans are executed.\n"
    " - join should always be the last action in the plan, and will be called in two scenarios:\n"
    "   (a) if the answer can be determined by gathering the outputs from tasks to generate the final response.\n"
    "   (b) if the answer cannot be determined in the planning phase before you execute the plans. "
)

planner_prompt_tmpl_str = (
    "Given a user query, create a plan to solve it with the utmost parallelizability. "
    "Each plan should comprise an action from the following {num_tools} types:\n"
    "{tool_descriptions}"
    f"\n{{num_toolsp1}}. {JOIN_DESCRIPTION}"
    "Guidelines:\n"
    " - Each action described above contains input/output types and description.\n"
    "    - You must strictly adhere to the input and output types for each action.\n"
    "    - The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions.\n"
    " - Each action in the plan should strictly be one of the above types. Follow the Python conventions for each action.\n"
    " - Pass arguments by keyword ONLY\n"
    " - Each action MUST have a unique ID, which is strictly increasing.\n"
    " - Inputs for actions can either be constants or outputs from preceding actions. "
    "In the latter case, use the format $id to denote the ID of the previous action whose output will be the input.\n"
    f" - Always call join as the last action in the plan. Say '{END_OF_PLAN}' after you call join\n"
    " - Ensure the plan maximizes parallelizability.\n"
    " - Only use the provided action types. If a query cannot be addressed using these, invoke the join action for the next steps.\n"
    " - Never explain the plan with comments (e.g. #).\n"
    " - Never introduce new actions other than the ones provided.\n\n"
    "{replan}"
    "{examples}"
)


def _generate_planner_prompt(
    tools: Sequence[BaseTool],
    example_prompt=str,
):
    tool_descriptions = "\n".join(
        f"{i+1}. {tool.description}" for i, tool in enumerate(tools)
    )
    planner_prompt_template = ChatPromptTemplate.from_messages(
        [("system", planner_prompt_tmpl_str), ("user", "Question: {input}{context}")]
    ).partial(
        tool_descriptions=tool_descriptions,
        examples="Here are some examples:\n\n" + example_prompt
        if example_prompt
        else "",
        num_tools=len(tools),
        num_toolsp1=len(tools) + 1,
    )

    return planner_prompt_template


def create_planner(
    llm: BaseChatModel,
    example_prompt: str,
    example_prompt_replan: str,
    tools: Sequence[Union[BaseTool]],
    stop: Optional[list[str]] = None,
):
    og_planner_prompt = _generate_planner_prompt(tools, example_prompt).partial(
        replan="",
        context="",
    )
    replanner_prompt = _generate_planner_prompt(tools, example_prompt).partial(
        replan=' - You are given "Previous Plan" which is the plan that the previous agent created along with the execution results '
        "(given as Observation) of each plan and a general thought (given as Thought) about the executed results."
        'You MUST use these information to create the next plan under "Current Plan".\n'
        ' - When starting the Current Plan, you should start with "Thought" that outlines the strategy for the next plan.\n'
        " - In the Current Plan, you should NEVER repeat the actions that are already executed in the Previous Plan.\n"
    )
    bound_llm = llm.bind(stop=stop)
    return (
        RunnableBranch(
            ((lambda x: x.get("replan")), replanner_prompt),
            og_planner_prompt,
        )
        | bound_llm
        | LLMCompilerPlanParser(tools=tools)
    )

#### Example usage

Here's an example usage of the planner module

In [3]:
from langchain.tools import tool
from langchain_openai import ChatOpenAI


@tool
def get_user_id(first_name: str, last_name: Optional[str] = None):
    """Query the user IDs of everyone with the provided name."""
    return 4


@tool
def get_scores(class_name: str, user_id: int):
    """Query the class registry for grades of the provided user ID."""
    return "A+"


examples = (
    "Question: What's the user ID for Johnny Drop Tables?\n"
    '1. {"get_user_id": {"first_name": "Johnny", "last_name":"Drop Tables"}}\n'
    f'2. {{"join": null}}{END_OF_PLAN}\n'
    "###\n"
    "\n"
    "Question: What was Eric Zhang's score in Calc?\n"
    '1. {"get_user_id": {"first_name": "Eric", "last_name":"Zhang"}}\n'
    '2. {"get_scores": {"class_name": "calc", "user_id": "$1"}}\n'
    f'3. {{"join": null}}{END_OF_PLAN}\n'
    "###\n"
    "\n"
)

planner = create_planner(
    ChatOpenAI(model="gpt-3.5-turbo"),
    example_prompt=examples,
    example_prompt_replan="",
    tools=[get_user_id, get_scores],
)

In [4]:
tasks = planner.invoke(
    {"input": "What are the Calc BC grades for Sam and Will Van Damm?"}
)
tasks

{1: {'tool': StructuredTool(name='get_user_id', description='get_user_id(first_name: str, last_name: Optional[str] = None) - Query the user IDs of everyone with the provided name.', args_schema=<class 'pydantic.main.get_user_idSchemaSchema'>, func=<function get_user_id at 0x1242af240>),
  'args': {'first_name': 'Sam', 'last_name': 'Van Damm'},
  'dependencies': []},
 2: {'tool': StructuredTool(name='get_user_id', description='get_user_id(first_name: str, last_name: Optional[str] = None) - Query the user IDs of everyone with the provided name.', args_schema=<class 'pydantic.main.get_user_idSchemaSchema'>, func=<function get_user_id at 0x1242af240>),
  'args': {'first_name': 'Will', 'last_name': 'Van Damm'},
  'dependencies': []},
 3: {'tool': StructuredTool(name='get_scores', description='get_scores(class_name: str, user_id: int) - Query the class registry for grades of the provided user ID.', args_schema=<class 'pydantic.main.get_scoresSchemaSchema'>, func=<function get_scores at 0x124

## 2. Task Fetching Unit

This component scheudles the tasks. In the paper, it's kept separate from the "executor", but here we execute the tasks within the same DAG.

Basic idea is that, given a list of dicts of the form:

```typescript
{
    tool: BaseTool,
    dependencies: number[],
}
```

1. Create a topological sort of the tasks
2. Execute them on the previous step's output, ensuring to perform variable substitution where appropriate

In [5]:
import logging

from langchain_core.agents import AgentFinish
from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)
from langgraph.graph import END, Graph

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def _sort_tasks(data):
    sorted_tasks = []
    while data:
        no_deps = {k: v for k, v in data.items() if not v["dependencies"]}
        if not no_deps:
            raise ValueError("We seem to have run into a circular dependency.")

        sorted_tasks.append(no_deps)
        data = {
            k: {
                **v,
                "dependencies": [d for d in v["dependencies"] if d not in no_deps],
            }
            for k, v in data.items()
            if k not in no_deps
        }
    return sorted_tasks


def _resolve_arg(x: dict, arg):
    return x[f"task_{arg[1:]}"] if isinstance(arg, str) and arg.startswith("$") else arg


def _execute_task(task, x, config):
    tool_to_use = task["tool"]
    args = task["args"]
    if isinstance(args, str):
        resolved_args = _resolve_arg(x, args)
    elif isinstance(args, dict):
        resolved_args = {key: _resolve_arg(x, val) for key, val in args.items()}
    else:
        logger.warning(f"Unsupported arg type: {args}")
    return tool_to_use.invoke(resolved_args, config)


def construct_dag(tasks):
    sorted_tasks = _sort_tasks(tasks)
    chain = None
    for idx, task_group in enumerate(sorted_tasks):
        if len(task_group) == 1 and next(iter(task_group.values()))["tool"] == "join":
            # TODO: actually join the values
            step = lambda x: {"join": x}
        else:
            # Cascade all results forward
            constructor = (
                RunnableParallel if chain is None else RunnablePassthrough.assign
            )
            step = constructor(
                **{
                    f"task_{idx}": RunnableLambda(
                        lambda x, config: _execute_task(task, x, config)
                    ).with_config(run_name=f"task_{idx}")
                    for idx, task in task_group.items()
                }
            ).with_config(run_name=f"TaskGroup{idx}")
        if chain is None:
            chain = step
        else:
            chain |= step

    if chain is not None:
        return chain | RunnablePassthrough.assign(tasks=lambda _: tasks)
    return chain

In [6]:
graph = construct_dag(tasks)
graph.get_graph().print_ascii()

                                                      +------------------------------+                                          
                                                      | Parallel<task_1,task_2>Input |                                          
                                                      +------------------------------+                                          
                                                      *****                       *****                                         
                                                  ****                                 ****                                     
                                               ***                                         ***                                  
                      +---------------------------------------+               +---------------------------------------+         
                      | Lambda(lambda x, config: _execute_... |               | Lambda(lambda x, 

### Example

We still haven't introduced any cycles in our computation graph, so this is all easily expressed in LCEL.

In [7]:
chain = planner | construct_dag

In [15]:
example_question = "Did Aliya get a better score than Roger in Geology?"
task_results = chain.invoke({"input": example_question})
task_results["join"]

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


KeyError: "Input to ChatPromptTemplate is missing variables {'input'}.  Expected: ['input', 'scratchpad'] Received: ['join', 'tasks', 'scratchpad']"

## Agent Logic

So now we have the planning and initial execution done. We need a component to process these outputs and either:
1. Respond with the correct answer.
2. Loop with a new plan.

The paper calls this the "joiner".

In [9]:
from langchain_core.output_parsers import StrOutputParser
from typing_extensions import TypedDict


def format_task(task, idx):
    thought = ""  # TODO: Pass through CoT
    tool = task["tool"]
    tool_name = tool if isinstance(tool, str) else tool.name  # Handle join()
    return f"{thought}{idx}. {{{tool_name}: {task['args']}}}"


def format_tasks(executor_output: dict):
    tasks = executor_output["tasks"]
    formatted_plan = "\n".join(format_task(task, idx) for idx, task in tasks.items())
    observations = "\n".join(f"{k}: {v}" for k, v in executor_output["join"].items())
    return f"Original Plan:\n{formatted_plan}\nExecuted plan results:\n{observations}"


def _parse_joiner_output(raw_answer: str) -> str:
    thought, answer, is_replan = "", "", False  # default values
    raw_answers = raw_answer.split("\n")
    for ans in raw_answers:
        if ans.startswith("Action:"):
            answer = ans[ans.find("(") + 1 : ans.find(")")]
            is_replan = JOINER_REPLAN in ans
        elif ans.startswith("Thought:"):
            thought = ans.split("Thought:")[1].strip()
    if is_replan:
        return {"thought": thought, "context": answer}
    else:
        return {"thought": thought, "answer": answer}

In [17]:
def create_joiner(prompt, llm):
    return (
        (lambda x: {**x["plan"], "input": x["input"]})
        | RunnablePassthrough.assign(scratchpad=format_tasks)
        | ChatPromptTemplate.from_messages([("system", prompt), ("user", "{input}")])
        | llm
        | StrOutputParser()
        | _parse_joiner_output
    )

In [18]:
system_prompt = (
    "Solve a question answering task with interleaving Observation, Thought, and Action steps. Here are some guidelines:\n"
    " - In the Assistant Scratchpad, you will be given results of a plan you have executed to answer the user's question.\n"
    " - Thought needs to reason about the question based on the Observations in 1-2 sentences.\n"
    " - There are cases where the Observations are unclear or irrelevant (in the case the task execution was unsuccessful or subpar). Only heed the relevant ones\n"
    " Respond in the following format:\n\n"
    "Thought: <reason about the task results and whether you have sufficient information to answer the question>\n"
    "Action: <action to take>\n"
    "Available actions:\n"
    f" (1) {JOINER_FINISH}(the final answer to return to the user): returns the answer and finishes the task.\n"
    f" (2) {JOINER_REPLAN}(the reasoning and information to provide to make a better next plan): instructs why we must replan\n\n"
    " Examples:\n"
    "Question: How many users are currently using the new product?\n"
    "...task returns the number 32,000\n"
    "Thought: I find no issue with the original plan, and the results satisfy everything in the user question.\n"
    f"Action: {JOINER_FINISH}(32,000 users currently use the new product)\n###\n"
    "Question: Are the gophers beating the rabbits??\n"
    "...task returns the a score of 7 for rabbits but no other value...\n"
    "Thought: I need the gophers' score to make a final decision.\n"
    f"Action: {JOINER_REPLAN}(The rabbits have a score of 7, but I need the gophers' score.)"
    "Assistant Scratchpad:\n{scratchpad}"
)

In [19]:
joiner = create_joiner(system_prompt, ChatOpenAI(model="gpt-3.5-turbo"))
joiner.invoke({"plan": task_results, "input": example_question})

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


{'thought': 'Based on the executed plan, both Aliya and Roger received an A+ score in Geology. Therefore, Aliya did not get a better score than Roger in Geology.',
 'answer': 'No, Aliya did not get a better score than Roger in Geology. They both received an A+.'}

### Construct Agent

Now we have all the required pieces!

In [25]:
from langgraph.graph import END, Graph

workflow = Graph()

# 1.  Define vertices

planner = create_planner(
    llm=ChatOpenAI(model="gpt-3.5-turbo"),
    example_prompt=examples,
    # TODO: You can update and optimize the replanner prompt to
    # better critique the original plan
    example_prompt_replan=examples,
    tools=[get_user_id, get_scores],
)
plan_and_execute = RunnablePassthrough.assign(plan=planner | construct_dag)
joiner = create_joiner(system_prompt, ChatOpenAI(model="gpt-3.5-turbo"))
joiner_node = RunnablePassthrough.assign(joined_output=joiner)


workflow.add_node("plan_and_execute", plan_and_execute)
workflow.add_node("join", joiner_node)
workflow.add_node("end", lambda x: x["joined_output"].get("answer", x))

## Define edges

workflow.add_edge("plan_and_execute", "join")

### This condition determines looping logic


def should_continue(joiner_output):
    if joiner_output["joined_output"].get("context"):
        return "continue"
    return "end"


workflow.add_conditional_edges(
    start_key="join",
    # Next, we pass in the function that will determine which node is called next.
    condition=should_continue,
    conditional_edge_mapping={
        # If it generates context, we must replan
        "continue": "plan_and_execute",
        # Otherwise we finish.
        "end": "end",
    },
)
workflow.set_entry_point("plan_and_execute")
workflow.set_finish_point("end")
chain = workflow.compile()

In [28]:
# chain.invoke({"input": "Did Aliya get a better score than Roger in Geology?"})

In [29]:
# Try something to trigger replanning
chain.invoke(
    {
        "input": "Did the sum of Aliya and Roger's grades in Calc map out to less than"
        " the total grade for the class?"
    }
)

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


KeyError: "Input to ChatPromptTemplate is missing variables {'input'}.  Expected: ['input'] Received: ['thought', 'context']"