## Multi-Agent Work Allocation System using Strands

**Architecture Overview**

1. **Agent 1 - AddWorkAgent**: Accepts work request → inserts into database → returns work_id
2. **Agent 2 - WorkAnalyzerAgent**: Analyzes work_id → determines required specialty/role
3. **Agent 3 - ResourceFinderAgent**: Finds resources matching required specialty
4. **Agent 4 - AvailabilityCheckerAgent**: Scores candidates based on multiple factors
5. **Agent 5 - AssignmentAgent**: Updates database + generates LLM explanation

In [None]:
# Cell 1: Imports and Environment Setup
import os
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, time
from typing import Dict, Optional, List
from contextlib import contextmanager
import json

# Strands imports for multiagent system
from strands import Agent, tool
from strands.models import BedrockModel
from strands.multiagent.graph import GraphBuilder
from strands.models.gemini import GeminiModel

from dotenv import load_dotenv
load_dotenv()

True

In [None]:
# Cell 2: Database Manager (Connection Pooling)

class DatabaseManager:
    """Singleton database connection manager with connection pooling"""
    _engine = None

    @classmethod
    def get_engine(cls):
        if cls._engine is None:
            DB_USER = os.getenv('DB_USER', 'work_user')
            DB_PW = os.getenv('DB_PASSWORD', 'work_password')
            DB_NAME = os.getenv('DB_NAME', 'work_allocation')
            DB_HOST = os.getenv('DB_HOST', 'localhost')
            DB_PORT = os.getenv('DB_PORT', '3307')

            cls._engine = create_engine(
                f"mysql+pymysql://{DB_USER}:{DB_PW}@{DB_HOST}:{DB_PORT}/{DB_NAME}",
                pool_pre_ping=True,
                pool_size=5,
                max_overflow=10
            )
            print("Database engine created")
        return cls._engine
    
    @classmethod
    @contextmanager
    def transaction(cls):
        engine = cls.get_engine()
        conn = engine.connect()
        trans = conn.begin()
        try:
            yield conn
            trans.commit()
        except Exception as e:
            trans.rollback()
            print(f"Transaction rolled back: {e}")
            raise
        finally:
            conn.close()

In [None]:
# Cell 3: Database Helper Functions

def run_sql(query: str, params: Optional[Dict] = None):
    """Execute SQL query with proper error handling"""
    try:
        engine = DatabaseManager.get_engine()
        with engine.begin() as conn:
            result = conn.execute(text(query), params or {})
            if result.returns_rows:
                return pd.DataFrame(result.fetchall(), columns=result.keys())
            return result.rowcount
    except Exception as e:
        print(f"SQL execution error: {e}")
        raise

def initialize_database(reset=True):
    """Initialize database schema and load CSV data"""
    print("Initializing database from CSV files")
    engine = DatabaseManager.get_engine()

    if reset:
        with engine.begin() as conn:
            conn.execute(text("SET FOREIGN_KEY_CHECKS=0;"))
            tables = ['assignment_log', 'work_requests', 'resource_calendar', 'specialty_mapping', 'resources']
            for table in tables:
                conn.execute(text(f"DROP TABLE IF EXISTS {table};"))
            conn.execute(text("SET FOREIGN_KEY_CHECKS=1;"))

        # Create schema
        with engine.begin() as conn:
            # Resources table
            conn.execute(text("""
                CREATE TABLE resources (
                    resource_id VARCHAR(10) PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    specialty VARCHAR(50) NOT NULL,
                    skill_level INT NOT NULL CHECK (skill_level BETWEEN 1 AND 5),
                    total_cases_handled INT DEFAULT 0
                )
            """))
            
            # Resource calendar table
            conn.execute(text("""
                CREATE TABLE resource_calendar (
                    calendar_id VARCHAR(10) PRIMARY KEY,
                    resource_id VARCHAR(10) NOT NULL,
                    date DATE NOT NULL,
                    available_from TIME NOT NULL,
                    available_to TIME NOT NULL,
                    current_workload INT DEFAULT 0,
                    FOREIGN KEY (resource_id) REFERENCES resources(resource_id)
                )
            """))
            
            # Work requests table
            conn.execute(text("""
                CREATE TABLE work_requests (
                    work_id VARCHAR(10) PRIMARY KEY,
                    work_type VARCHAR(50) NOT NULL,
                    description TEXT,
                    priority INT NOT NULL CHECK (priority BETWEEN 1 AND 5),
                    timestamp DATETIME NOT NULL,
                    status VARCHAR(20) DEFAULT 'pending',
                    assigned_to VARCHAR(10),
                    FOREIGN KEY (assigned_to) REFERENCES resources(resource_id)
                )
            """))
            
            # Specialty mapping table
            conn.execute(text("""
                CREATE TABLE specialty_mapping (
                    work_type VARCHAR(50) PRIMARY KEY,
                    required_specialty VARCHAR(50) NOT NULL,
                    alternate_specialty VARCHAR(50)
                )
            """))
            
            # Assignment log table
            conn.execute(text("""
                CREATE TABLE assignment_log (
                    log_id INT AUTO_INCREMENT PRIMARY KEY,
                    work_id VARCHAR(10) NOT NULL,
                    resource_id VARCHAR(10) NOT NULL,
                    assignment_timestamp DATETIME NOT NULL,
                    match_score FLOAT,
                    reasoning TEXT,
                    FOREIGN KEY (work_id) REFERENCES work_requests(work_id),
                    FOREIGN KEY (resource_id) REFERENCES resources(resource_id)
                )
            """))

        # Load CSV data

        # Resources
        resources_df = pd.read_csv('data/resources.csv')
        resources_df.to_sql('resources', engine, if_exists='append', index=False)
        
        # Resource Calendar
        calendar_df = pd.read_csv('data/resource_calendar.csv')
        calendar_df.to_sql('resource_calendar', engine, if_exists='append', index=False)
        
        # Specialty Mapping
        mapping_df = pd.read_csv('data/specialty_mapping.csv')
        mapping_df['alternate_specialty'] = mapping_df['alternate_specialty'].where(
            pd.notna(mapping_df['alternate_specialty']), None
        )
        mapping_df.to_sql('specialty_mapping', engine, if_exists='append', index=False)

        # Work Requests
        work_requests_df = pd.read_csv('data/work_requests.csv')
        work_requests_df['timestamp'] = pd.to_datetime(work_requests_df['timestamp'])
        work_requests_df.to_sql('work_requests', engine, if_exists='append', index=False)
                
        print("Database initialization complete")

In [None]:
# Cell 4: Tool Functions for Agents

# Agent 1 Tools
@tool
def add_work_tool(work_type: str, description: str, priority: int, timestamp: str) -> Dict:
    """Add work request to database - O(1) operation"""
    try:
        # Generate new work_id
        count_result = run_sql("SELECT COUNT(*) as cnt FROM work_requests")
        work_id = f"W{int(count_result['cnt'].iloc[0]) + 1:03d}"  # W001 format (3 digits)
        
        # Insert work request
        run_sql(
            """INSERT INTO work_requests (work_id, work_type, description, priority, timestamp, status)
               VALUES (:work_id, :work_type, :description, :priority, :timestamp, 'pending')""",
            {
                'work_id': work_id,
                'work_type': work_type,
                'description': description,
                'priority': priority,
                'timestamp': timestamp
            }
        )
        return {'work_id': work_id, 'status': 'success'}
    except Exception as e:
        return {'error': str(e)}

# Agent 2 Tools
@tool
def analyze_work_tool(work_id: str) -> Dict:
    """Analyze work request and determine required specialty - O(1) operation"""
    try:
        # Get work details
        work_df = run_sql(
            "SELECT work_type, priority FROM work_requests WHERE work_id = :work_id",
            {'work_id': work_id}
        )
        if work_df.empty:
            return {'error': 'Work request not found'}
        
        work_type = work_df.iloc[0]['work_type']
        priority = int(work_df.iloc[0]['priority'])
        
        # Get required specialty
        specialty_df = run_sql(
            "SELECT required_specialty, alternate_specialty FROM specialty_mapping WHERE work_type = :work_type",
            {'work_type': work_type}
        )
        if specialty_df.empty:
            return {'error': 'No specialty mapping found'}
        
        return {
            'required_specialty': specialty_df.iloc[0]['required_specialty'],
            'alternate_specialty': specialty_df.iloc[0]['alternate_specialty'],
            'priority': priority,
            'work_type': work_type,
            'work_id': work_id
        }
    except Exception as e:
        return {'error': str(e)}

# Agent 3 Tools
@tool
def find_resources_tool(work_id: str, required_specialty: str, alternate_specialty: str, timestamp: str) -> Dict:
    """Find resources matching required specialty - O(n) where n is matching resources"""
    try:
        # Parse date from timestamp
        work_date = datetime.strptime(timestamp.split()[0], '%Y-%m-%d').date()
        
        # Find resources with matching specialty
        query = """
        SELECT r.resource_id, r.name, r.specialty, r.skill_level, r.total_cases_handled,
               rc.available_from, rc.available_to, rc.current_workload
        FROM resources r
        INNER JOIN resource_calendar rc ON r.resource_id = rc.resource_id
        WHERE (r.specialty = :required_specialty OR r.specialty = :alternate_specialty)
        AND rc.date = :work_date
        """
        
        resources_df = run_sql(query, {
            'required_specialty': required_specialty,
            'alternate_specialty': alternate_specialty if alternate_specialty else required_specialty,
            'work_date': work_date
        })
        
        if resources_df.empty:
            return {'candidates': [], 'count': 0}
        
        candidates = resources_df.to_dict('records')
        
        return {
            'work_id': work_id,
            'candidates': candidates,
            'count': len(candidates),
            'timestamp': timestamp
        }
    except Exception as e:
        return {'error': str(e)}

# Agent 4 Tools
@tool
def score_candidates_tool(work_id: str, candidates: List[Dict], priority: int, required_specialty: str, timestamp: str) -> Dict:
    """Score candidates based on multiple criteria - O(n log n) for sorting"""
    try:
        if not candidates:
            return {'error': 'No candidates to score'}
        
        work_time = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').time()
        scored_candidates = []
        
        for candidate in candidates:
            # BASE SCORING (Total = 100 points)
            
            # 1. Role match (40 points)
            role_score = 40 if candidate['specialty'] == required_specialty else 20
            
            # 2. Skill level (20 points)
            skill_score = candidate['skill_level'] * 4
            
            # 3. Experience (15 points)
            experience_score = min(candidate['total_cases_handled'] / 10, 15)
            
            # 4. Availability (15 points)
            available_from = candidate['available_from']
            available_to = candidate['available_to']
            
            # Handle timedelta format
            if hasattr(available_from, 'total_seconds'):
                seconds = int(available_from.total_seconds())
                available_from = time(seconds // 3600, (seconds % 3600) // 60, seconds % 60)
            elif isinstance(available_from, str):
                if 'days' in available_from:
                    time_part = available_from.split()[-1]
                    available_from = datetime.strptime(time_part, '%H:%M:%S').time()
                else:
                    available_from = datetime.strptime(available_from, '%H:%M:%S').time()
            
            if hasattr(available_to, 'total_seconds'):
                seconds = int(available_to.total_seconds())
                available_to = time(seconds // 3600, (seconds % 3600) // 60, seconds % 60)
            elif isinstance(available_to, str):
                if 'days' in available_to:
                    time_part = available_to.split()[-1]
                    available_to = datetime.strptime(time_part, '%H:%M:%S').time()
                else:
                    available_to = datetime.strptime(available_to, '%H:%M:%S').time()
            
            is_available = available_from <= work_time <= available_to
            availability_score = 15 if is_available else 5
            
            # 5. Workload (10 points)
            workload = candidate['current_workload']
            workload_score = max(10 - workload * 2, 0)
            
            # PRIORITY HANDLING
            priority_bonus_applied = False
            if priority >= 4 and workload < 3 and is_available:
                availability_score = min(availability_score + 5, 20)
                workload_score = min(workload_score + 5, 15)
                priority_bonus_applied = True
            
            total_score = role_score + skill_score + experience_score + availability_score + workload_score
            
            scored_candidates.append({
                'resource_id': candidate['resource_id'],
                'name': candidate['name'],
                'specialty': candidate['specialty'],
                'skill_level': candidate['skill_level'],
                'total_cases_handled': candidate['total_cases_handled'],
                'current_workload': candidate['current_workload'],
                'total_score': round(min(total_score, 100), 2),
                'role_score': role_score,
                'skill_score': skill_score,
                'experience_score': round(experience_score, 2),
                'availability_score': availability_score,
                'workload_score': workload_score,
                'priority_bonus': '+5 urgent' if priority_bonus_applied else 'none'
            })
        
        # Sort by total score (descending)
        scored_candidates.sort(key=lambda x: x['total_score'], reverse=True)
        
        # Return best match + 2 runners-up + scoring explanation
        best = scored_candidates[0]
        runners_up = scored_candidates[1:3] if len(scored_candidates) > 1 else []
        
        return {
            'work_id': work_id,
            'match_score': best['total_score'],
            'resource_id': best['resource_id'],
            'best_match': best,
            'runners_up': runners_up,  # For XAI comparison
            'scoring_logic': {
                'total': 100,
                'breakdown': {
                    'role_match': f"{best['role_score']}/40",
                    'skill': f"{best['skill_score']}/20",
                    'experience': f"{best['experience_score']}/15",
                    'availability': f"{best['availability_score']}/15",
                    'workload': f"{best['workload_score']}/10"
                },
                'priority_bonus': best['priority_bonus']
            }
        }
    except Exception as e:
        return {'error': str(e)}

# Agent 5 Tools
@tool
def assign_work_tool(work_id: str, resource_id: str, match_score: float) -> Dict:
    """Update database with assignment + validation - O(1) operation"""
    try:
        # Validate IDs format (W### and R### format)
        if not work_id.startswith('W') or len(work_id) != 4:
            return {'error': f'Invalid work_id format: {work_id}. Must be W### (e.g., W001, W023)'}
        
        if not resource_id.startswith('R') or len(resource_id) != 4:
            return {'error': f'Invalid resource_id format: {resource_id}. Must be R### (e.g., R001, R005)'}
        
        # Check if work_id exists
        work_check = run_sql(
            "SELECT work_id FROM work_requests WHERE work_id = :work_id",
            {'work_id': work_id}
        )
        if work_check.empty:
            return {'error': f'Work ID {work_id} not found in database'}
        
        # Check if resource_id exists
        resource_check = run_sql(
            "SELECT resource_id FROM resources WHERE resource_id = :resource_id",
            {'resource_id': resource_id}
        )
        if resource_check.empty:
            return {'error': f'Resource ID {resource_id} not found in database'}
        
        # Assign work in work_requests table
        run_sql(
            "UPDATE work_requests SET assigned_to = :resource_id, status = 'assigned' WHERE work_id = :work_id",
            {'resource_id': resource_id, 'work_id': work_id}
        )
        
        # Update resource_calendar workload for assigned date and resource
        run_sql(
            "UPDATE resource_calendar SET current_workload = current_workload + 1 WHERE resource_id = :resource_id AND date = (SELECT DATE(timestamp) FROM work_requests WHERE work_id = :work_id)",
            {'resource_id': resource_id, 'work_id': work_id}
        )
        
        return {
            'status': 'success', 
            'work_id': work_id, 
            'resource_id': resource_id,
            'message': f'Successfully assigned {work_id} to {resource_id}',
            'match_score': match_score
        }
    except Exception as e:
        return {'status': 'failure', 'error': str(e)}


@tool
def log_assignment_tool(work_id: str, resource_id: str, match_score: float, reasoning: str) -> Dict:
    """Log assignment to database - O(1) operation"""
    try:
        run_sql(
            """INSERT INTO assignment_log (work_id, resource_id, assignment_timestamp, match_score, reasoning)
               VALUES (:work_id, :resource_id, NOW(), :match_score, :reasoning)""",
            {
                'work_id': work_id,
                'resource_id': resource_id,
                'match_score': match_score,
                'reasoning': reasoning
            }
        )
        return {'status': 'logged'}
    except Exception as e:
        return {'status': 'failure', 'error': str(e)}

In [None]:
# Cell 5: LLM Setup with Bedrock Primary + Gemini Fallback

# LLM providers implemented by Strands Agents SDK
def get_bedrock_model():
    """Get Bedrock model with proper configuration"""
    try:
        return BedrockModel(
            model_id="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
            region_name="us-east-1"
        )
    except Exception as e:
        print(f"Bedrock model initialization failed: {e}")
        return None
    
def get_gemini_model():
    """Get Gemini model with proper configuration"""
    try:
        return GeminiModel(
                    client_args={
                        "api_key": os.getenv('GOOGLE_API_KEY'),
                    },
                    model_id="gemini-2.5-flash"
                )
    except Exception as e:
        print(f"Gemini model initialization failed: {e}")
        return None

# Reasoning Generation Function
def generate_assignment_reasoning_llm(work_data: Dict, resource_data: Dict) -> str:
    """
    Generate human-readable explanation using LLM with few-shot prompting.
    Primary: AWS Bedrock (Claude)
    Fallback: Google Gemini
    """
    # Few-shot examples for better output quality
    few_shot_examples = """
Example 1:
Input: MRI_Brain, Priority 5, Dr. Sarah Chen, Neurology, Skill 5, 150 cases, 2 current workload
Output: Dr. Sarah Chen was assigned to this urgent MRI_Brain case due to her specialized neurology expertise (150+ cases) and immediate availability with only 2 current cases, making her optimal for this Priority 5 request.

Example 2:
Input: CT_Chest, Priority 3, Dr. John Smith, Radiology, Skill 4, 80 cases, 4 current workload
Output: Dr. John Smith was selected for this CT_Chest scan based on his solid radiology background (80 cases) and skill level 4 proficiency, despite a moderate workload of 4 cases.

Example 3:
Input: Ultrasound_Abdomen, Priority 2, Dr. Emily Davis, General Imaging, Skill 3, 45 cases, 1 current workload
Output: Dr. Emily Davis was assigned to this routine Ultrasound_Abdomen procedure given her general imaging qualification, manageable experience (45 cases), and light current workload of only 1 case.
"""
    
    prompt = f"""{few_shot_examples}

Now generate a similar professional explanation for this assignment:
Work Type: {work_data['work_type']}
Priority: {work_data['priority']}
Resource Name: {resource_data['name']}
Specialty: {resource_data['specialty']}
Skill Level: {resource_data['skill_level']}
Cases Handled: {resource_data['total_cases_handled']}
Current Workload: {resource_data['current_workload']}
Match Score: {resource_data['total_score']}/100

Generate a 2-3 sentence professional explanation:"""
    
    # Try Bedrock first
    try:
        bedrock_model = get_bedrock_model()
        if bedrock_model:
            agent = Agent(
                name="ReasoningGenerator",
                model=bedrock_model,
                instructions="You are an expert at explaining resource allocation decisions clearly and professionally."
            )
            result = agent(prompt)
            reasoning = result.output.text.strip()
            if reasoning:
                print("Used Bedrock for reasoning generation")
                return reasoning
    except Exception as e:
        print(f"Bedrock failed: {e}, falling back to Gemini")
    
    # Fallback to Gemini
    try:
        gemini_model = get_gemini_model()
        response = gemini_model.generate_content(prompt)
        reasoning = response.text.strip()
        print("Used Gemini fallback for reasoning generation")
        return reasoning
    except Exception as e:
        print(f"Both LLMs failed: {e}")
        # Fallback to template-based reasoning
        return f"{resource_data['name']} was assigned to this {work_data['work_type']} case (Priority {work_data['priority']}) based on {resource_data['specialty']} expertise, skill level {resource_data['skill_level']}, and {resource_data['total_cases_handled']} cases handled with current workload of {resource_data['current_workload']} cases (Match Score: {resource_data['total_score']}/100)."

In [6]:
# Cell 6: 5 Strands Agents

# Agent 1: AddWorkAgent - Accepts work request and adds to database
add_work_agent = Agent(
    name="AddWorkAgent",
    model=get_gemini_model(),  # Need model for orchestration, but won't use for LLM reasoning
    system_prompt="""You are a work intake specialist. When given work request details:
    1. Use the add_work_tool to insert the work into the database
    2. Return the work_id that was generated
    3. Respond with JSON format: {"work_id": "WR####", "status": "success"}""",
    tools=[add_work_tool]
)

# Agent 2: WorkAnalyzerAgent - Analyzes work request to determine required specialty
work_analyzer_agent = Agent(
    name="WorkAnalyzerAgent",
    model=get_gemini_model(),
    system_prompt="Call analyze_work_tool with the work_id from the scenario input. Return the exact output without modification.",
    tools=[analyze_work_tool]
)

# Agent 3: ResourceFinderAgent - Finds resources based on required specialty
resource_finder_agent = Agent(
    name="ResourceFinderAgent",
    model=get_gemini_model(),
    system_prompt="Call find_resources_tool with work_id, required_specialty, alternate_specialty, timestamp. Return the exact output.",
    tools=[find_resources_tool]
)

# Agent 4: AvailabilityCheckerAgent - Scores candidates based on multiple criteria
availability_checker_agent = Agent(
    name="AvailabilityCheckerAgent",
    model=get_gemini_model(),
    system_prompt="Call score_candidates_tool with the work_id, candidates list, priority, required_specialty, and timestamp. Return the exact output without adding explanations or making any modifications.",
    tools=[score_candidates_tool]
)

# Agent 5: AssignmentAgent - Assigns work to best matched resource and logs the assignment with reasoning (+ updates DB)
assignment_agent = Agent(
    name="AssignmentAgent",
    model=get_gemini_model(),
    system_prompt="""
You assign work to resources.

STEP 1: Use the work_id, resource_id, match_score passed through the scenario input.
CRITICAL: Before proceeding, validate that work_id is in format W###, resource_id is in format R###, match_score is a number between 0 and 100. Donot proceed if formats are incorrect. If invalid, return an error message indicating the issue.

STEP 3: Assign and log:
1. assign_work_tool(work_id, resource_id, match_score)
2. Generate a 2-sentence reasoning using best_match data and scoring_logic.
3. log_assignment_tool(work_id, resource_id, match_score, reasoning)

Return the assignment status.
""",
    tools=[assign_work_tool, log_assignment_tool]
)


In [None]:
# Cell 7: Strands Graph Connecting the 5 Agents
builder = GraphBuilder()

agent1_node = builder.add_node(add_work_agent, node_id="agent1_add_work")
agent2_node = builder.add_node(work_analyzer_agent, node_id="agent2_analyze")
agent3_node = builder.add_node(resource_finder_agent, node_id="agent3_find_resources")
agent4_node = builder.add_node(availability_checker_agent, node_id="agent4_score")
agent5_node = builder.add_node(assignment_agent, node_id="agent5_assign")

builder.add_edge("agent1_add_work", "agent2_analyze")
builder.add_edge("agent2_analyze", "agent3_find_resources")
builder.add_edge("agent3_find_resources", "agent4_score")
builder.add_edge("agent4_score", "agent5_assign")

builder.set_entry_point("agent1_add_work")

builder.set_max_node_executions(10)  # Limited retries to 10
builder.set_execution_timeout(600.0)  # 10 minutes total timeout
builder.set_node_timeout(180.0)  # 3 minutes per node timeout

work_allocation_graph = builder.build()
print('Strands Graph built successfully')
print('Pipeline: Agent1 → Agent2 → Agent3 → Agent4 → Agent5')

Strands Graph built successfully
Pipeline: Agent1 → Agent2 → Agent3 → Agent4 → Agent5


In [None]:
# Cell 8: Initialize Database
initialize_database(reset=False)
print("Database initialized successfully")

Initializing database from CSV files
Database engine created
Database initialization complete
Database initialized successfully


In [None]:
# Cell 9: Scenario 1 - Urgent MRI_Brain Case (Priority 5)
print("\n" + "="*80)
print("SCENARIO 1: Urgent MRI_Brain - Neurology Specialist Required")
print("="*80)

scenario1_input = {
    "work_type": "MRI_Brain",
    "description": "Urgent brain MRI for suspected stroke patient",
    "priority": 5,
    "timestamp": "2024-11-10 09:30:00"
}

# Execute graph
result1 = work_allocation_graph(json.dumps(scenario1_input))

print("\n--- Execution Result ---")
print(f"Status: {result1.status}")

# Display assignment log
log_df = run_sql("SELECT * FROM assignment_log ORDER BY log_id DESC LIMIT 1")
if not log_df.empty:
    print("\n--- Assignment Log ---")
    print(log_df.to_string(index=False))
else:
    print("\n--- No assignments logged yet ---")


SCENARIO 1: Urgent MRI_Brain - Neurology Specialist Required

Tool #1: add_work_tool
{"work_id": "W023", "status": "success"}
Tool #1: analyze_work_tool
{"analyze_work_tool_response": {"output": [{"text": "{\'required_specialty\': \'Neurologist\', \'alternate_specialty\': \'General_Radiologist\', \'priority\': 5, \'work_type\': \'MRI_Brain\', \'work_id\': \'W023\'}"}]}}
Tool #1: find_resources_tool
{"find_resources_tool_response": {"output": [{"text": "{\'work_id\': \'W023\', \'candidates\': [{\'resource_id\': \'R001\', \'name\': \'Dr. John Smith\', \'specialty\': \'General_Radiologist\', \'skill_level\': 4, \'total_cases_handled\': 178, \'available_from\': Timedelta(\'0 days 07:00:00\'), \'available_to\': Timedelta(\'0 days 19:00:00\'), \'current_workload\': 5}, {\'resource_id\': \'R002\', \'name\': \'Dr. Emily Brown\', \'specialty\': \'General_Radiologist\', \'skill_level\': 3, \'total_cases_handled\': 100, \'available_from\': Timedelta(\'0 days 13:00:00\'), \'available_to\': Timede

In [None]:
# Cell 10: Scenario 2 - Routine Ultrasound_Abdomen (Priority 2)
print("\n" + "="*80)
print("SCENARIO 2: Routine Ultrasound_Abdomen - General Imaging Specialist")
print("="*80)

scenario2_input = {
    "work_type": "Ultrasound_Abdomen",
    "description": "Kidney stone suspected, routine ultrasound required",
    "priority": 2,
    "timestamp": "2024-11-11 14:00:00"
}

result2 = work_allocation_graph(json.dumps(scenario2_input))

print("\n--- Execution Result ---")
print(f"Status: {result2.status}")

log_df = run_sql("SELECT * FROM assignment_log ORDER BY log_id DESC LIMIT 1")
print("\n--- Assignment Log ---")
print(log_df.to_string(index=False))


SCENARIO 2: Routine Ultrasound_Abdomen - General Imaging Specialist

Tool #2: add_work_tool
{"work_id": "W024", "status": "success"}
Tool #2: analyze_work_tool
{"analyze_work_tool_response": {"output": [{"text": "{\'required_specialty\': \'General_Radiologist\', \'alternate_specialty\': None, \'priority\': 2, \'work_type\': \'Ultrasound_Abdomen\', \'work_id\': \'W024\'}"}]}}
Tool #2: find_resources_tool

Tool #3: find_resources_tool
{"find_resources_tool_response": {"output": [{"text": "{\'work_id\': \'W024\', \'candidates\': [{\'resource_id\': \'R001\', \'name\': \'Dr. John Smith\', \'specialty\': \'General_Radiologist\', \'skill_level\': 4, \'total_cases_handled\': 178, \'available_from\': Timedelta(\'0 days 09:00:00\'), \'available_to\': Timedelta(\'0 days 17:00:00\'), \'current_workload\': 2}, {\'resource_id\': \'R002\', \'name\': \'Dr. Emily Brown\', \'specialty\': \'General_Radiologist\', \'skill_level\': 3, \'total_cases_handled\': 100, \'available_from\': Timedelta(\'0 days 08

In [None]:
# Cell 11: Scenario 3 - Specialized X_Ray_Bone (Priority 3)
print("\n" + "="*80)
print("SCENARIO 3: Specialized X_Ray_Bone - Joint pain evaluation")
print("="*80)

scenario3_input = {
    "work_type": "X_Ray_Bone",
    "description": "Specialized x-ray for joint pain evaluation",
    "priority": 3,
    "timestamp": "2024-11-12 08:15:00"
}

result3 = work_allocation_graph(json.dumps(scenario3_input))

print("\n--- Execution Result ---")
print(f"Status: {result3.status}")

log_df = run_sql("SELECT * FROM assignment_log ORDER BY log_id DESC LIMIT 1")
print("\n--- Assignment Log ---")
print(log_df.to_string(index=False))


SCENARIO 3: Specialized X_Ray_Bone - Joint pain evaluation

Tool #3: add_work_tool
{"work_id": "W025", "status": "success"}
Tool #3: analyze_work_tool
{"analyze_work_tool_response": {"output": [{"text": "{\'required_specialty\': \'Musculoskeletal_Specialist\', \'alternate_specialty\': \'General_Radiologist\', \'priority\': 3, \'work_type\': \'X_Ray_Bone\', \'work_id\': \'W025\'}"}]}}
Tool #4: find_resources_tool


Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002196AF29810>


{"find_resources_tool_response": {"output": [{"text": "{\'work_id\': \'W025\', \'candidates\': [{\'resource_id\': \'R001\', \'name\': \'Dr. John Smith\', \'specialty\': \'General_Radiologist\', \'skill_level\': 4, \'total_cases_handled\': 178, \'available_from\': Timedelta(\'0 days 08:00:00\'), \'available_to\': Timedelta(\'0 days 13:00:00\'), \'current_workload\': 4}, {\'resource_id\': \'R002\', \'name\': \'Dr. Emily Brown\', \'specialty\': \'General_Radiologist\', \'skill_level\': 3, \'total_cases_handled\': 100, \'available_from\': Timedelta(\'0 days 09:00:00\'), \'available_to\': Timedelta(\'0 days 17:00:00\'), \'current_workload\': 8}, {\'resource_id\': \'R003\', \'name\': \'Dr. Michael Davis\', \'specialty\': \'General_Radiologist\', \'skill_level\': 3, \'total_cases_handled\': 172, \'available_from\': Timedelta(\'0 days 13:00:00\'), \'available_to\': Timedelta(\'0 days 18:00:00\'), \'current_workload\': 3}, {\'resource_id\': \'R004\', \'name\': \'Dr. Alex Johnson\', \'specialty\