# Work Allocation System

### Dependencies

```bash
pip install pandas sqlite3 boto3
pip install amazon-bedrock boto3 botocore
pip install strands strands-agents-tools

### reading the data

In [2]:
import pandas as pd

spec = pd.read_csv("data/specialty_mapping.csv")
print(spec.columns)
spec.head()


Index(['work_type', 'required_specialty', 'alternate_specialty'], dtype='object')


Unnamed: 0,work_type,required_specialty,alternate_specialty
0,CT_Scan_Brain,Neurologist,General_Radiologist
1,CT_Scan_Chest,General_Radiologist,
2,MRI_Brain,Neurologist,General_Radiologist
3,MRI_Cardiac,Cardiologist,General_Radiologist
4,X_Ray_Chest,General_Radiologist,


In [3]:
spec = pd.read_csv("data/resource_calendar.csv")
print(spec.columns)
spec.head()


Index(['calendar_id', 'resource_id', 'date', 'available_from', 'available_to',
       'current_workload'],
      dtype='object')


Unnamed: 0,calendar_id,resource_id,date,available_from,available_to,current_workload
0,C001,R001,2024-11-10,07:00:00,19:00:00,5
1,C002,R001,2024-11-11,09:00:00,17:00:00,2
2,C003,R001,2024-11-12,08:00:00,13:00:00,4
3,C004,R002,2024-11-10,13:00:00,18:00:00,2
4,C005,R002,2024-11-11,08:00:00,13:00:00,8


In [4]:
spec = pd.read_csv("data/resources.csv")
print(spec.columns)
spec.head()


Index(['resource_id', 'name', 'specialty', 'skill_level',
       'total_cases_handled'],
      dtype='object')


Unnamed: 0,resource_id,name,specialty,skill_level,total_cases_handled
0,R001,Dr. John Smith,General_Radiologist,4,178
1,R002,Dr. Emily Brown,General_Radiologist,3,100
2,R003,Dr. Michael Davis,General_Radiologist,3,172
3,R004,Dr. Alex Johnson,General_Radiologist,4,217
4,R005,Dr. Sarah Chen,Neurologist,5,345


In [5]:
spec = pd.read_csv("data/work_requests.csv")
print(spec.columns)
spec.head()


Index(['work_id', 'work_type', 'description', 'priority', 'timestamp',
       'status', 'assigned_to'],
      dtype='object')


Unnamed: 0,work_id,work_type,description,priority,timestamp,status,assigned_to
0,W001,CT_Scan_Chest,Lung nodule - URGENT,4,2024-11-12 07:17:00,pending,
1,W002,MRI_Brain,Acute stroke evaluation - URGENT,4,2024-11-12 03:47:00,completed,R011
2,W003,CT_Scan_Chest,Cancer staging,1,2024-11-10 13:13:00,pending,
3,W004,CT_Scan_Brain,Head trauma assessment,3,2024-11-12 04:34:00,pending,
4,W005,Mammography,Annual screening,3,2024-11-10 08:48:00,assigned,R015


### Database Schema 

In [6]:
import sqlite3

In [7]:
# Load CSVs
work = pd.read_csv("data/work_requests.csv")
res = pd.read_csv("data/resources.csv")
cal = pd.read_csv("data/resource_calendar.csv")

# Create sqlite DB
conn = sqlite3.connect("service_ops.db")

# Write tables
work.to_sql("work_requests", conn, if_exists="replace", index=False)
res.to_sql("resources", conn, if_exists="replace", index=False)
cal.to_sql("resource_calendar", conn, if_exists="replace", index=False)

conn.close()


In [8]:
from strands import Agent, tool

print("done")

done


### configration of AWS bedrock

In [None]:
import boto3
import json

ACCESS_KEY = ""
SECRET_KEY = ""

client = boto3.client(
    "bedrock-runtime",
    region_name="us-west-2",
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
)


In [10]:
payload = {
    "anthropic_version": "bedrock-2023-05-31",
    "max_tokens": 20,
    "messages": [
        {"role": "user", "content": "Hello from Python overriding credentials!"}
    ]
}

response = client.invoke_model(
    modelId="anthropic.claude-3-haiku-20240307-v1:0",
    body=json.dumps(payload)
)

print(json.loads(response["body"].read()))


{'id': 'msg_bdrk_01UxQzZWyCySBCKVCzWvz4Ch', 'type': 'message', 'role': 'assistant', 'model': 'claude-3-haiku-20240307', 'content': [{'type': 'text', 'text': 'Hello! I\'m afraid I don\'t have any information about "Python overriding credentials". I'}], 'stop_reason': 'max_tokens', 'stop_sequence': None, 'usage': {'input_tokens': 15, 'output_tokens': 20}}


In [11]:
def sql_query(self, query: str):
    conn = sqlite3.connect("service_ops.db") 
    cursor = conn.cursor()
    cursor.execute(query)
    rows = cursor.fetchall()
    columns = [col[0] for col in cursor.description]
    conn.close()
    return {"columns": columns, "rows": rows}

In [12]:
from strands import tool
import sqlite3
import pandas as pd
import json

### 1) AddWorkAgent
**Purpose:**  
Registers a new radiology work request into the database.

**Input:**  
- work_type  
- description  
- priority  

**Output:**  
- work_id (auto-generated)  


In [13]:
class AddWorkAgent:

    @tool
    def sql_query(self, query: str):
        conn = sqlite3.connect("service_ops.db") 
        cursor = conn.cursor()
        cursor.execute(query)
        rows = cursor.fetchall()
        columns = [col[0] for col in cursor.description]
        conn.close()
        return {"columns": columns, "rows": rows}
    

    @tool
    def add_work(self, work_type: str, description: str, priority: int):
        # Load existing work IDs
        rows = self.sql_query( 
            "SELECT work_id FROM work_requests WHERE work_id IS NOT NULL"
        )["rows"]

        existing = [x[0] for x in rows if x[0] and x[0].startswith("W")]

        # Generate new ID
        new_num = max([int(x[1:]) for x in existing]) + 1 if existing else 1
        new_id = f"W{new_num:03d}"

        # Insert into DB
        conn = sqlite3.connect("service_ops.db")
        cur = conn.cursor()
        cur.execute("""
            INSERT INTO work_requests (work_id, work_type, description, priority, timestamp, status)
            VALUES (?, ?, ?, ?, datetime('now'), 'pending')
        """, (new_id, work_type, description, priority))
        conn.commit()
        conn.close()

        return {"work_id": new_id}


### 2) WorkAnalyzerAgent
**Purpose:**  
Determines which radiology specialty is required for the given work_type.

**Uses:**  
- `SPECIALTY_MAP` derived from `specialty_mapping.csv`  

**Output:**  
- work_type  
- priority  
- required specialty  

In [14]:
import pandas as pd

CSV_PATH = "data/specialty_mapping.csv"

def load_specialty_map(csv_path: str = CSV_PATH) -> dict:
    """
    Load a flat mapping: work_type → required_specialty
    Example output:
        {
            "CT_Scan_Brain": "Neurologist",
            "X_Ray_Bone": "Musculoskeletal_Specialist",
            ...
        }
    """
    df = pd.read_csv(csv_path)

    # Ensure required columns exist
    if "work_type" not in df.columns or "required_specialty" not in df.columns:
        raise ValueError("CSV must contain 'work_type' and 'required_specialty' columns.")

    # Create flat dict
    specialty_map = dict(zip(df["work_type"], df["required_specialty"]))

    return specialty_map


# Load flat map
SPECIALTY_MAP = load_specialty_map()

SPECIALTY_MAP 


{'CT_Scan_Brain': 'Neurologist',
 'CT_Scan_Chest': 'General_Radiologist',
 'MRI_Brain': 'Neurologist',
 'MRI_Cardiac': 'Cardiologist',
 'X_Ray_Chest': 'General_Radiologist',
 'X_Ray_Bone': 'Musculoskeletal_Specialist',
 'Ultrasound_Abdomen': 'General_Radiologist',
 'Mammography': 'Breast_Imaging_Specialist'}

In [16]:
class WorkAnalyzerAgent:

    @tool
    def sql_query(self, query: str):
        conn = sqlite3.connect("service_ops.db")
        cursor = conn.cursor()
        cursor.execute(query)
        rows = cursor.fetchall()
        columns = [col[0] for col in cursor.description]
        conn.close()
        return {"columns": columns, "rows": rows}
    

    @tool
    def analyze_work(self, work_id: str):

        row = self.sql_query(
            f"SELECT work_type, priority FROM work_requests WHERE work_id='{work_id}'"
        )["rows"][0]

        work_type = row[0]
        priority = row[1]

        # Use the dictionary loaded in this notebook
        specialty = SPECIALTY_MAP.get(work_type, "General_Radiologist")

        return {
            "work_id": work_id,
            "work_type": work_type,
            "priority": priority,
            "specialty": specialty
        }


### 3) ResourceFinderAgent
**Purpose:**  
Fetches all radiologists who match the required specialty.

**Output:**  
- A list of radiologists with matching specialty  



In [17]:
class ResourceFinderAgent:

    @tool
    def sql_query(self, query: str):
        conn = sqlite3.connect("service_ops.db")
        cursor = conn.cursor()
        cursor.execute(query)
        rows = cursor.fetchall()
        columns = [col[0] for col in cursor.description]
        conn.close()
        return {"columns": columns, "rows": rows}


    @tool
    def find_resources(self, specialty: str):
        # Use the class sql_query method
        data = self.sql_query(f"SELECT * FROM resources WHERE specialty='{specialty}'")

        df = pd.DataFrame(data["rows"], columns=data["columns"])

        return {
            "specialty": specialty,
            "resources": df.to_dict(orient="records")
        }


### 4) AvailabilityCheckerAgent
**Purpose:**  
Scores radiologists using a weighted formula incorporating:
- Specialty match  
- Workload  
- Availability  
- Case priority  

Outputs the **best-scoring radiologist**.

Scoring Function- 
The `compute_score()` function evaluates each radiologist using a weighted scoring model that balances **clinical expertise**, **experience**, **workload**, **availability**, and **case urgency**.  
The goal is to select the radiologist who is **most capable** and **most available** for the requested study.


In [None]:
class AvailabilityCheckerAgent:

    @tool
    def sql_query(self, query: str):
        conn = sqlite3.connect("service_ops.db")
        cursor = conn.cursor()
        cursor.execute(query)
        rows = cursor.fetchall()
        columns = [col[0] for col in cursor.description]
        conn.close()
        return {"columns": columns, "rows": rows}


    def compute_score(self, r, cal, priority):
        skill = r["skill_level"] * 2
        exp = r["total_cases_handled"] * 0.05

        wl = cal["current_workload"]
        availability = max(0, 10 - wl)
        workload_penalty = wl * 1.2

        priority_bonus = 10 if priority == 5 else (5 if priority == 4 else 0)

        return (
            1
            + skill
            + exp
            + availability
            - workload_penalty
            + priority_bonus
        )


    @tool
    def score_resources(self, resources: list, work_type: str, priority: int):
        """
        Scores each radiologist using:
        - Skill level (weighted ×2)
        - Experience (cases × 0.05)
        - Availability (inverse workload)
        - Workload penalty
        - Priority bonus
        """
        cal_df = pd.read_csv("data/resource_calendar.csv")

        scored = []
        for r in resources:
            cal = cal_df[cal_df.resource_id == r["resource_id"]].iloc[0].to_dict()

            r["current_workload"] = cal["current_workload"]
            r["availability"] = f"{cal['available_from']} - {cal['available_to']}"
            r["score"] = self.compute_score(r, cal, priority)

            scored.append(r)

        best = sorted(scored, key=lambda x: x["score"], reverse=True)[0]

        return {"best_resource": best}
    


### 5)AssignmentAgent
**Purpose:**  
Assigns the selected radiologist in the database and generates a **2–3 sentence explanation** using an LLM (Claude 3 Haiku).

**Output:**  
- work_id  
- assigned radiologist  
- short clinical justification  

In [None]:
class AssignmentAgent:

    def __init__(self):
        """
        Initialize Bedrock client.
        """
        self.client = boto3.client(
            "bedrock-runtime",
            region_name="us-west-2"
        )


    @tool
    def assign_and_explain(self, work_id, best_resource, work_type, priority):
        """
        Assigns a radiologist in the database and generates
        a professional explanation using Bedrock (Claude 3 Haiku).
        """

        # --- Update DB ---
        conn = sqlite3.connect("service_ops.db")
        cur = conn.cursor()

        cur.execute(
            "UPDATE work_requests SET assigned_to=?, status='assigned' WHERE work_id=?",
            (best_resource["resource_id"], work_id)
        )

        conn.commit()
        conn.close()

        # --- Build LLM prompt ---
        prompt = f"""
        You are generating a short clinical justification for a radiology assignment.

        Write **exactly 2–3 sentences** explaining why this radiologist is the best match.
        Make the tone professional, concise, and decision-focused. 
        Reference their specialty, case volume, availability, and workload clearly.
        Do NOT exceed 3 sentences.

        Radiologist: {best_resource['name']}
        Skill Level: {best_resource['skill_level']}
        Cases Handled: {best_resource['total_cases_handled']}
        Availability: {best_resource['availability']}
        Current Workload: {best_resource['current_workload']}
        Work Type: {work_type}
        Priority: {priority}

        Output only the justification paragraph.
        """

        body = {
            "anthropic_version": "bedrock-2023-05-31",
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 150
        }

        # --- Call AWS Bedrock ---
        response = self.client.invoke_model(
            modelId="anthropic.claude-3-haiku-20240307-v1:0",
            body=json.dumps(body)
        )

        explanation = json.loads(response["body"].read())["content"][0]["text"]

        return {
            "work_id": work_id,
            "assigned_to": best_resource["resource_id"],
            "radiologist_name": best_resource["name"],
            "explanation": explanation.strip()
        }


### Multi-Agent Workflow Diagram

```mermaid
flowchart LR
    A(Add Work Request) --> B(Analyze Work Type)
    B --> C(Find Matching Radiologists)
    C --> D(Score Availability & Workload)
    D --> E(Assign Best Radiologist)
    E --> F(Generate Explanation)


In [29]:
"""
Sequential Multi-Agent Pipeline for Clovertex Radiology Work Allocation.

This module orchestrates the 5 agents in the workflow:

    AddWork → Analyze → Finder → Availability → Assignment

All agents are defined inside the notebook, so no package imports are used.
The workflow is executed manually in a sequential, deterministic manner.
"""


# -------------------------------
# IMPORT AGENTS FROM NOTEBOOK CELLS
# -------------------------------
# These must be defined in earlier cells in the notebook:
# AddWorkAgent
# WorkAnalyzerAgent
# ResourceFinderAgent
# AvailabilityCheckerAgent
# AssignmentAgent


# BUILD PIPELINE
def build_pipeline():
    """
    Constructs all 5 agents and returns them as a pipeline dict.
    """

    add_agent = AddWorkAgent()
    analyze_agent = WorkAnalyzerAgent()
    finder_agent = ResourceFinderAgent()
    availability_agent = AvailabilityCheckerAgent()
    assign_agent = AssignmentAgent()

    return {
        "add": add_agent,
        "analyze": analyze_agent,
        "finder": finder_agent,
        "availability": availability_agent,
        "assign": assign_agent
    }

# RUN WORKFLOW
def run(task_input: dict):
    """
    Executes the full multi-agent workflow.

    task_input example:
        {
            "work_type": "MRI_Brain",
            "description": "Urgent stroke evaluation",
            "priority": 5
        }
    """

    pipeline = build_pipeline()

    # 1. Add Work
    step1 = pipeline["add"].add_work(
        task_input["work_type"],
        task_input["description"],
        task_input["priority"]
    )
    work_id = step1["work_id"]

    # 2. Analyze Work
    step2 = pipeline["analyze"].analyze_work(work_id)

    #3. Find Resources
    step3 = pipeline["finder"].find_resources(step2["specialty"])

    # If no radiologists exist → fail gracefully
    if not step3["resources"]:
        return {
            "status": "failed",
            "reason": f"No radiologists available for specialty '{step2['specialty']}'"
        }

    #4. Score Resources
    step4 = pipeline["availability"].score_resources(
        resources=step3["resources"],
        work_type=step2["work_type"],
        priority=step2["priority"]
    )
    
    #5. Assign & Explain
    step5 = pipeline["assign"].assign_and_explain(
        work_id=work_id,
        best_resource=step4["best_resource"],
        work_type=step2["work_type"],
        priority=step2["priority"]
    )

    return step5


# -------------------------------
# MANUAL TEST
# -------------------------------
if __name__ == "__main__":

    example = {
        "work_type": "MRI_Brain",
        "description": "Stroke protocol - CRITICAL",
        "priority": 5
    }

    result = run(example)

    print("\n--- Multi-Agent Workflow Result ---")
    print(result)
    
    



--- Multi-Agent Workflow Result ---
{'work_id': 'W031', 'assigned_to': 'R005', 'radiologist_name': 'Dr. Sarah Chen', 'explanation': 'Dr. Sarah Chen, with her extensive experience of 345 cases and a skill level of 5, is the best radiologist to handle this MRI Brain case. Her availability from 09:00:00 to 17:00:00 and a current workload of 3 ensure she can provide timely and high-quality interpretation of the scans, making her an excellent choice for this priority 5 assignment.'}


In [None]:

# Scenario 1 – Urgent Case (Priority 5)
scenario_1 = {
    "work_type": "MRI_Brain",
    "description": "CRITICAL — suspected stroke",
    "priority": 5
}
print("\n--- Scenario 1: Urgent Case (Priority 5) ---")
output_1 = run(scenario_1)
print(output_1)





--- Scenario 1: Urgent Case (Priority 5) ---
{'work_id': 'W024', 'assigned_to': 'R005', 'radiologist_name': 'Dr. Sarah Chen', 'explanation': "Dr. Sarah Chen is an experienced radiologist with a specialty in MRI brain imaging. With a skill level of 5 and a case volume of 345, she has the necessary expertise to efficiently and accurately interpret the requested MRI study. Dr. Chen's availability during the requested time frame and her current workload of 3 cases indicate that she is well-positioned to prioritize and complete this assignment in a timely manner."}


In [23]:
# Scenario 2 – Routine Case (Priority 2)
scenario_2 = {
    "work_type": "X_Ray_Chest",
    "description": "Routine outpatient chest X-ray",
    "priority": 2
}
print("\n--- Scenario 2: Routine Case (Priority 2) ---")
output_2 = run(scenario_2)
print(output_2)




--- Scenario 2: Routine Case (Priority 2) ---
{'work_id': 'W025', 'assigned_to': 'R002', 'radiologist_name': 'Dr. Emily Brown', 'explanation': 'Dr. Emily Brown is a highly skilled radiologist with extensive experience in chest X-ray interpretation, having handled 100 cases to date. Her availability during the requested time slot and current low workload of 2 cases make her an excellent choice to prioritize this time-sensitive X-ray assignment.'}


In [24]:

# Scenario 3 – Specialized Case (Priority 3)
scenario_3 = {
    "work_type": "MRI_Cardiac",
    "description": "Cardiac fibrosis assessment",
    "priority": 3
}
print("\n--- Scenario 3: Specialized Case (Priority 3) ---")
output_3 = run(scenario_3)
print(output_3)


--- Scenario 3: Specialized Case (Priority 3) ---
{'work_id': 'W026', 'assigned_to': 'R010', 'radiologist_name': 'Dr. Lisa Anderson', 'explanation': 'Dr. Lisa Anderson, with a skill level of 3 and a high case volume of 100, is the best match for this radiological assignment. Her availability during the requested time frame and her specialized expertise in MRI Cardiac procedures make her an excellent choice to handle this priority-3 case efficiently and effectively.'}
