## Agent Supervisor

The [previous example](multi-agent-collaboration.ipynb) routed messages automatically based on the output of the initial researcher agent.

We can also choose to use an LLM to orchestrate the different agents.

Below, we will create an agent group, with an agent supervisor to help delegate tasks.

![diagram](./img/supervisor-diagram.png)

To simplify the code in each agent node, we will use the AgentExecutor class from LangChain. This and other "advanced agent" notebooks are designed to show how you can implement certain design patterns in LangGraph. If the pattern suits your needs, we recommend combining it with some of the other fundamental patterns described elsewhere in the docs for best performance.

Before we build, let's configure our environment:

In [None]:
%%capture --no-stderr
%pip install -U langgraph langchain langchain_openai langchain_experimental langsmith pandas

In [2]:
import getpass
import os


def _set_if_undefined(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"Please provide your {var}")


_set_if_undefined("OPENAI_API_KEY")
_set_if_undefined("LANGCHAIN_API_KEY")
_set_if_undefined("TAVILY_API_KEY")
_set_if_undefined("EXA_API_KEY")
# Optional, add tracing in LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "Email Sending Test"

## Create tools

For this example, you will make an agent to do web research with a search engine, and one agent to create plots. Define the tools they'll use below:

In [3]:
from typing import Annotated

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_experimental.tools import PythonREPLTool

tavily_tool = TavilySearchResults(max_results=5)

# This executes code locally, which can be unsafe
python_repl_tool = PythonREPLTool()

In [4]:
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.tools import BaseTool
from typing import Annotated, Type, List
from pydantic import EmailStr
import resend

class MailArgs(BaseModel):
    """
    Input for the email sender tool
    """
    subject: str = Field(..., description="The subject of the email")
    content: str = Field(..., description="The content of the email in HTML format")
    recipients: List[str] = Field(..., description="The list of recipients")

    
class EmailSender(BaseTool):
    """
    A tool for sending emails to a list of recipients.
    """
    name: str = "EmailSender"
    description: str = (
        "A tool for sending emails to a list of recipients."
        )
    args_schema: Type[BaseModel] = MailArgs

    def _run(
            self,
            subject: str,
            content: str,
            recipients: List[str]
            ):
        """
        Use the tool
        """
        try:
            resend.api_key = "re_DMzZbNtC_EALxDZQYCGM7f46wnegaRdbV"
            params: resend.Emails.SendParams = {
                "from": "Acme <onboarding@resend.dev>",
                "to": recipients,
                "subject": subject,
                "html": content,
            }
            resend.Emails.send(params)
            return "Email sent successfully"
        except Exception as e:
            return repr(e)
        
        


email_sender_tool = EmailSender()



In [5]:
# from langchain_core.tools import BaseTool
# from exa_py import Exa
# import os
# from datetime import datetime, timedelta

# class SearchAndContents(BaseTool):
#     name: str = "Search and Contents Tool"
#     description: str = (
#         "Searches the web based on a search query for the latest results. Results are only from the last week. Uses the Exa API. This also returns the contents of the search results."
#     )

#     def _run(self, search_query: str) -> str:

#         exa = Exa(api_key=os.getenv("EXA_API_KEY"))

#         one_week_ago = datetime.now() - timedelta(days=7)
#         date_cutoff = one_week_ago.strftime("%Y-%m-%d")

#         search_results = exa.search_and_contents(
#             query=search_query,
#             use_autoprompt=True,
#             start_published_date=date_cutoff,
#             text={"include_html_tags": False, "max_characters": 8000},
#         )

#         return search_results


# class FindSimilar(BaseTool):
#     name: str = "Find Similar Tool"
#     description: str = (
#         "Searches for similar articles to a given article using the Exa API. Takes in a URL of the article"
#     )

#     def _run(self, article_url: str) -> str:

#         one_week_ago = datetime.now() - timedelta(days=7)
#         date_cutoff = one_week_ago.strftime("%Y-%m-%d")

#         exa = Exa(api_key=os.getenv("EXA_API_KEY"))

#         search_results = exa.find_similar(
#             url=article_url, start_published_date=date_cutoff
#         )

#         return search_results


# class GetContents(BaseTool):
#     name: str = "Get Contents Tool"
#     description: str = "Gets the contents of a specific article using the Exa API. Takes in the ID of the article in a list, like this: ['https://www.cnbc.com/2024/04/18/my-news-story']."
    
#     def _run(self, article_ids: str) -> str:

#         exa = Exa(api_key=os.getenv("EXA_API_KEY"))

#         contents = exa.get_contents(article_ids)
#         return contents

# search = SearchAndContents()
# find_similar = FindSimilar()
# get_contents = GetContents()

In [7]:
from exa_py import Exa
from langchain_core.tools import tool

exa = Exa(api_key=os.environ["EXA_API_KEY"])


@tool
def search(query: str, include_domains=None, start_published_date=None):
    """Search for a webpage based on the query.
    Set the optional include_domains (list[str]) parameter to restrict the search to a list of domains.
    Set the optional start_published_date (str) parameter to restrict the search to documents published after the date (YYYY-MM-DD).
    """
    return exa.search_and_contents(
        f"{query}",
        use_autoprompt=True,
        num_results=5,
        include_domains=include_domains,
        start_published_date=start_published_date,
    )


@tool
def find_similar(url: str):
    """Search for webpages similar to a given URL.
    The url passed in should be a URL returned from `search`.
    """
    return exa.find_similar_and_contents(url, num_results=5)


@tool
def get_contents(ids: list[str]):
    """Get the contents of a webpage.
    The ids passed in should be a list of ids returned from `search`.
    """
    return exa.get_contents(ids)


exa_tools = [search, get_contents, find_similar]

## Helper Utilities

Define a helper function below, which make it easier to add new agent worker nodes.

In [8]:
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI


def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
    # Each worker node will be given a name and some tools.
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools)
    return executor

We can also define a function that we will use to be the nodes in the graph - it takes care of converting the agent response to a human message. This is important because that is how we will add it the global state of the graph

In [9]:
def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}

### Create Agent Supervisor

It will use function calling to choose the next worker node OR finish processing.

In [10]:
from langchain_core.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

members = ["Researcher", "ContentDesigner", "ContentDistributor"]
system_prompt = (
    "You are a supervisor tasked with managing a conversation between the"
    " following workers:  {members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH."
)
# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = ["FINISH"] + members
# Using openai function calling can make output parsing easier for us
function_def = {
    "name": "route",
    "description": "Select the next role.",
    "parameters": {
        "title": "routeSchema",
        "type": "object",
        "properties": {
            "next": {
                "title": "Next",
                "anyOf": [
                    {"enum": options},
                ],
            }
        },
        "required": ["next"],
    },
}
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=str(options), members=", ".join(members))

llm = ChatOpenAI(model="gpt-4o")

supervisor_chain = (
    prompt
    | llm.bind_functions(functions=[function_def], function_call="route")
    | JsonOutputFunctionsParser()
)

## Construct Graph

We're ready to start building the graph. Below, define the state and worker nodes using the function we just defined.

In [11]:
from langchain.tools import tool

@tool
def load_html_template():
    """Use this tool to load the newsletter template"""
    try:
        with open("../config/newsletter_template.html", 'r') as file:
            html_template = file.read()
    except FileNotFoundError:
        raise FileNotFoundError("The HTML template file could not be found. Please ensure the file exists at 'src/research_crew/config/research_template.html'.")
    except Exception as e:
        raise Exception(f"An error occurred while reading the HTML template file: {e}")
    return html_template


In [12]:
content_designer_prompt = """
You are a content designer. Your task is to convert the research output and convert it into html content that can be sent to the user. 
Please design the newsletter as outlined by the template.

Here's the template:
{newsletter_template}
"""

In [13]:
import functools
import operator
from typing import Sequence, TypedDict

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

from langgraph.graph import END, StateGraph


# The agent state is the input to each node in the graph
class AgentState(TypedDict):
    # The annotation tells the graph that new messages will always
    # be added to the current states
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # The 'next' field indicates where to route to next
    next: str


research_agent = create_agent(llm, exa_tools, "You are a web researcher.")
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")

content_designer_agent = create_agent(llm, [load_html_template], "You are a content designer. Your task is to convert the research output and convert it into html content that can be sent to the user. Always use the load_html_template tool to get the html template.")
content_designer_node = functools.partial(agent_node, agent=content_designer_agent, name="ContentDesigner")

# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION
content_distributor_agent = create_agent(
    llm,
    [email_sender_tool],
    "You are a mail distributor. Please send an email as described by the user to the provided email address.",
)
content_distributor_node = functools.partial(agent_node, agent=content_distributor_agent, name="ContentDistributor")

workflow = StateGraph(AgentState)
workflow.add_node("Researcher", research_node)
workflow.add_node("ContentDesigner", content_designer_node)
workflow.add_node("ContentDistributor", content_distributor_node)
workflow.add_node("supervisor", supervisor_chain)

Now connect all the edges in the graph.

In [14]:
for member in members:
    # We want our workers to ALWAYS "report back" to the supervisor when done
    workflow.add_edge(member, "supervisor")
# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
# Finally, add entrypoint
workflow.set_entry_point("supervisor")

graph = workflow.compile()

## Invoke the team

With the graph created, we can now invoke it and see how it performs!

In [15]:
for s in graph.stream(
    {
        "messages": [
            HumanMessage(content="Do a thorough research on the latest news on the USA stock market and send the results to ibrahim.aka.ajax@gmail.com")
        ]
    }
):
    if "__end__" not in s:
        print(s)
        print("----")

{'supervisor': {'next': 'Researcher'}}
----
{'Researcher': {'messages': [HumanMessage(content="Here is a summary of the latest news on the USA stock market:\n\n1. **Stock Market Today: US Stocks Edge Higher Amid Rate-Cut Optimism After New Inflation Data**\n   - **Source**: [Business Insider](https://www.businessinsider.in/stock-market/news/stock-market-today-us-stocks-edge-higher-amid-rate-cut-optimism-after-new-inflation-data/articleshow/108950851.cms)\n   - **Date**: April 1, 2024\n   - **Author**: Matthew Fox\n   - **Summary**: US stocks edged higher as investors reacted to new PCE inflation data. The year-over-year PCE price index rose 2.5% in February, aligning with expectations, leading to optimism about potential interest rate cuts from the Federal Reserve in June. The probability of a rate cut rose to 60%. Major indices like the S&P 500, Dow Jones, and Nasdaq showed varied performances. Retail investors are increasingly investing in riskier ETFs, and meme stocks are gaining po

In [None]:
for s in graph.stream(
    {"messages": [HumanMessage(content="Send a simple eamil to ibrahim.aka.ajax@gmail.com with a summary of the results from the Euros 2024 being held in Germany.'")]},
    {"recursion_limit": 100},
):
    if "__end__" not in s:
        print(s)
        print("----")