<a href="https://colab.research.google.com/github/J-Princess/ML-AI-DS-Projects/blob/main/mas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Multi-agent System Notebook

This workbook shows the development of a simple Large Language Model (LLM) multi-agent system that will enable it to complete a given task with minimal intervention.

In [None]:
!pip install langchain-google-genai
!pip install langgraph

In [None]:
from langchain_core.messages import HumanMessage, SystemMessage
# from langchain_core.tools import tool
from langchain_core.tools import StructuredTool

from langgraph.graph import MessagesState
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import Send

from langchain_google_genai import ChatGoogleGenerativeAI

from typing import Annotated, List, Literal
from typing_extensions import TypedDict

from pydantic import BaseModel, Field

from IPython.display import Image, display

import os, operator

In [None]:
# Configure API Keys to use
API_KEY = ''


os.environ['GOOGLE_API_KEY'] = API_KEY

In [None]:
# Configure the LLM
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")


### Case Study: Maintenance of City Road Infrastructures

The development of an LLM MAS for the automation of the process to determine when the Clifton Bridge requires maintenance.

In [None]:
# Set the path to the report files
docs_path = "./docs/"
def read_docs():
    docs = []
    for file in os.listdir(docs_path):

        if file.endswith('.md'):
            with open(os.path.join
            (docs_path, file)) as f:
                docs.append((file, f.read()))
    return docs


docs = read_docs()

In [None]:
# Define State And Required Types
class ClassifiedCondition(BaseModel):
    condition_details: str = Field(
        description="Condition Details of the bridge classified by engineering perspective.",
    )
    perspective: str = Field(
        description="The specialist linked to the perspective of bridge condition, just a single word e.g. drainage, structural",
    )
    section: str = Field(
        description="The section or component the condition refers to, e.g. main deck, span 1, etc ",
    )

class ClassifiedConditionList(BaseModel):
    classified_condition_list: List[ClassifiedCondition] = Field(
        description="Condition Details of the bridge classified by engineering perspective.",
    )

class ConditionState(BaseModel):
    condition:str = Field(
        description="Summary condition of the bridge by engineering perspective.",
    )
    perspective: str = Field(
        description="Engineering perspective.",
    )

class Condition(BaseModel):
    summary_condition:str = Field(
        description="Summary condition of the bridge by engineering perspective.",
    )
    perspective: str = Field(
        description="Engineering perspective.",
    )
    rating_condition: str = Field(
        description="Rating of condition in scale 0 to 2.",
    )


class State(TypedDict):
    docs: str
    task: str
    insights: str
    assessment: str
    result: str
    classified_condition_list:ClassifiedConditionList
    condition:Annotated[list,operator.add]

In [None]:
# Document Screener Agent
def doc_screening_officer(state: State) :
    """Extract key insights from documents for the task """

    base_prompt = f"Concise but complete answer, no introduction. Extract key insights from: {state['docs']} to {state['task']}"

    if state.get("feedback"):
        msg = llm.invoke(
            f"{base_prompt} and taking into account the feedback: {state['feedback']}"
        )
    else:
        msg = llm.invoke(base_prompt)

    return { "insights": msg.content }


task_prompt = "Determine maintenance timeframe for the bridge based on the condition of surface and drainage at March 2025"
state = {"docs": docs, "task": task_prompt}  # Initialize state with 'docs' and 'task'

# First, generate the insights using doc_screening_officer
insights = doc_screening_officer(state)

# Update the state with the insights, preserving the 'task' key
state.update(insights)

# Now, you can call screening_checker with the updated state
result = screening_checker(state)['result']

print(result)

In [None]:
insight_requirements="concise, relevant, well supported and organised per perspective of condition of the bridge e.g.drainage, structural"

prompt_feedback = "Assessment that insights extracted for the task are "

class Feedback(BaseModel):
    result: Literal["Strong", "Weak"] = Field(
        description=f"{prompt_feedback} {insight_requirements}",
    )
    rationale: str = Field(
        description="The rationale for the assessment",
    )

evaluator = llm.with_structured_output(Feedback)

In [None]:
def screening_checker(state:State):
    """Evaluate the insights extracted from the documents for the task"""

    assessment = evaluator.invoke(f"Check the insights: {state['insights']} are {insight_requirements} for {state['task']} and concisely explain why")

    return { "result": assessment.result, "rationale": assessment.rationale }

In [None]:
# Orchestrator Agent
splitter =  llm.with_structured_output(ClassifiedConditionList)


def orchestrator(state: State):
    """Orchestrator that organises the insights related to the condition of the bridge by perspective and section"""

    orchestrator_prompt = f"Extract condition details per section of the bridge from the insights {state['insights']} and organise into unique engineering perspectives. Multiple condition details can be assigned to the same perspective.  There is  a specialist type of engineer related to each perspective. "
    result = splitter.invoke(orchestrator_prompt)

    return {"classified_condition_list":result.classified_condition_list}

In [None]:
# Decision Points

def route_task(state: State):
    """route the task to the appropriate function"""
    result = state.get("result")
    output = "Rejected + Feedback"
    if result == "Strong":
        output =  "Accepted"

    return output

In [None]:
# Assign Specialist

def assign_specialists(state: State):
    """Assign a specialist to each perspective of assessment"""

    # group classified_condition_list by perspective
    grouped_classified_condition_list = {}

    stated = orchestrator(state)
    for condition in stated['classified_condition_list']:
        # Use square brackets to access or set the value associated with a key
        if condition.perspective not in grouped_classified_condition_list:
            grouped_classified_condition_list[condition.perspective] = []  # Initialize list if perspective is not in the dictionary
        grouped_classified_condition_list[condition.perspective].append(condition)

    # once grouped, we need to assign a specialist to each perspective
    # e.g. drainage and  surface

    # Note we use the Send function to call the "assess_condition"
    # function in parallel for as many perspectives as we have

    return [Send("assess_condition",
                 {"condition": grouped_classified_condition_list[s], "perspective":s}) for s in grouped_classified_condition_list.keys()]

In [None]:
# Assess Condition Agent
def assess_condition(state: ConditionState):
    """Assess the condition of the asset from each perspective"""

    condition_details = state["condition"]
    condition_details = "\n".join([f"- {cd}" for cd in condition_details])

    response = llm.with_structured_output(Condition).invoke(
        [
            SystemMessage(
                content=f"Assess the condition of the entire bridge from condition_details: {condition_details}."
            ),
            HumanMessage(
                content=f" From the perspective of {state['perspective']}"
            ),
        ]
    )

    return {"condition": [response]}

In [None]:
# Synthesiser Agent
def synthesiser(state: State):
    """Synthesise condition of the bridge. It calls the LLM to determine the maintenance timeframe given the state of the bridge"""

    # List of condition of the bridge by perspective
    condition = state["condition"]

    # Concatenate the condition of the bridge by perspective

    output = ""

    for perspective, assessment_result in condition.items():
        output += f"{perspective}: {assessment_result['summary']}. "

    prompt = f"Given evidence of drainage and surface, determine maintenance timeframe given the state: {output}"

    result = llm.invoke(prompt)

    return result

In [None]:
# Workflow graph definition
mas_workflow = StateGraph(State)

mas_workflow.add_node("doc_screening_officer", doc_screening_officer)
mas_workflow.add_node("screening_checker", screening_checker)
mas_workflow.add_node("orchestrator", orchestrator)
mas_workflow.add_node("assess_condition", assess_condition)
mas_workflow.add_node("synthesiser", synthesiser)

mas_workflow.add_edge(START, "doc_screening_officer")
mas_workflow.add_edge("doc_screening_officer", "screening_checker")
# mas_workflow.add_edge("screening_checker", "orchestrator")

mas_workflow.add_conditional_edges("orchestrator", assign_specialists, ["assess_condition"])
# mas_workflow.add_edge("assign_specialists", "assess_condition")
mas_workflow.add_edge("assess_condition", "synthesiser")
mas_workflow.add_edge("synthesiser", END)

mas_workflow.add_conditional_edges("screening_checker", route_task,
                                   {
                                     "Accepted": "orchestrator",
                                      "Rejected + Feedback": "doc_screening_officer"
                                   },
                                  )


# Compile the workflow
compiled_wf = mas_worflow.compile()


In [None]:
#Display the multi-agent workflow
try:
    display(Image(compiled_wf.get_graph().draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

In [None]:
# Run the workflow

task_prompt = "Determine maintenance timeframe for the bridge based on the condition of surface and drainage at March 2025"

response = compiled_wf.invoke({"docs": docs,"task": task_prompt})

print(response["condition"])