## Load Env Vars

In [1]:
from dotenv import load_dotenv
import os


# Load the environment variables from the .env file
load_dotenv()


os.environ["LANGCHAIN_PROJECT"] = "Multi-Agents-Supervisor-Architecture"


## Helper Utilities

In [2]:
from typing import Annotated
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import AzureChatOpenAI


def create_agent(llm: AzureChatOpenAI, system_prompt: str):
    
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )
    return prompt | llm


In [3]:
def agent_node(state, agent, name):
    
    result = agent.invoke(state)
    
    if name == "Publisher":
        return {
            "messages": [HumanMessage(content=result.content, name=name)],
            "final_result": result.content
        }
    return {
        "messages": [HumanMessage(content=result.content, name=name)]
    }

### Create Agent Supervisor


In [4]:
from langchain_core.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder


deployment_name = 'gpt-4o-mini'


members = ["TestWriter", "TestReviewer", "CodeWriter", "CodeReviewer", "Publisher"]

system_prompt = (
    " You are a supervisor, you are asked to manage a conversation between the"
    " following workers:  {members}. Given the following user request,"
    " respond with the worker to act next."
    " Every work done must be reviewed by its respective prefix, if any."
    " Each worker will perform a task and respond with their results and status."
    " Before finishing, publish the answer and after that respond with FINISH."
)

options = ["FINISH"] + members

function_def = {
    "name": "route",
    "description": "Select the next role.",
    "parameters": {
        "title": "routeSchema",
        "type": "object",
        "properties": {
            "next": {
                "title": "Next",
                "anyOf": [
                    {"enum": options},
                ],
            }
        },
        "required": ["next"],
    },
}

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            " Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=str(options), members=", ".join(members))


llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=deployment_name,
    openai_api_version=os.environ["AZURE_OPENAI_API_VERSION"],
)

supervisor_chain = (
    prompt
    | llm.bind_functions(functions=[function_def], function_call="route")
    | JsonOutputFunctionsParser()
)

### Construct Graph


In [None]:
import functools
import operator
from typing import Sequence, TypedDict

from langgraph.graph import END, StateGraph, START


code_language = "Python"

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    next: str
    final_result: str


test_scenario_writer_agent = create_agent(llm, 
                                            """
                                            You must provide a set of coherent and well defined test scenarios for another agent to write codes,
                                            or if is the case, improve existant test scenarios following given instructions.
                                            You must not provide codes or other functions that are not you primary objective.
                                            """
                                        )

test_scenario_reviewer_agent = create_agent(llm, 
                                            """
                                            You must review a set of test scenarios, 
                                            and if the test scenarios needs enhancement you must provide instructions on how to improve. 
                                            You must not provide codes or other functions that are not you primary objective.
                                            """
                                        )


test_code_writer_agent = create_agent(llm, f"""You must write {code_language} code for test scenarios""")

test_code_reviewer_agent = create_agent(llm, 
                                        """
                                        You must review {code_language} code, 
                                        and if the code needs enhancement you must provide instructions on how to improve. 
                                        You must not provide codes or other functions that are not you primary objective.
                                        """
                                    )

publisher_agent = create_agent(llm, 
                                """
                                You must format the final response in markdown for the user. 
                                The response must have the final version of the test scenarios and the final version of the code.
                                """
                                )

test_scenario_writer_node = functools.partial(agent_node, agent=test_scenario_writer_agent, name="TestWriter")
test_scenario_reviewer_node = functools.partial(agent_node, agent=test_scenario_reviewer_agent, name="TestReviewer")
test_code_writer_node = functools.partial(agent_node, agent=test_code_writer_agent, name="CodeWriter")
test_code_reviewer_node = functools.partial(agent_node, agent=test_code_reviewer_agent, name="CodeReviewer")
publisher_node = functools.partial(agent_node, agent=publisher_agent, name="Publisher")


workflow = StateGraph(AgentState)
workflow.add_node("TestWriter", test_scenario_writer_node)
workflow.add_node("TestReviewer", test_scenario_reviewer_node)
workflow.add_node("CodeWriter", test_code_writer_node)
workflow.add_node("CodeReviewer", test_code_reviewer_node)
workflow.add_node("Publisher", publisher_node)
workflow.add_node("supervisor", supervisor_chain)


In [6]:
for member in members:
    workflow.add_edge(member, "supervisor")

conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)

workflow.add_edge(START, "supervisor")

graph = workflow.compile()

In [None]:
for k,v in conditional_map.items():
    print(f"{k} -> {v}")

In [None]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    pass

In [None]:
objective = """
    As a user, I want to securely log in to the application using my email and password, so I can access my personalized dashboard and features. 
    The system should also support role-based access control to restrict access to specific areas of the application based on user roles (e.g., admin, user, guest).
"""

final_result = ""

for s in graph.stream(
    {
        "messages": [
            HumanMessage(
                content=f" I would like for you to write 3 test scenarios, for this this objective : {objective}"
                f" for each test scenario I would like a {code_language} code script."
                " Once you code it up, finish."
            )
        ]
    }
):
    if "__end__" not in s:
        print("#"*50)
        for k, v in s.items():
            print(f"{k} : \n")
            if "messages" in v: 
                print(" - messages :", v.get("messages")[-1].content)
            if "next" in v: 
                print(" - next : ", v.get("next"))
            if "final_result" in v:
                final_result = v.get("final_result")
        print()

## Display Final Results of the Publisher

In [None]:
print(final_result)