In [1]:
import numpy as np
import random
from itertools import permutations
from typing import List, Tuple, Optional
import time

In [2]:
class CapabilityProfile:
    def __init__(self,
                 robot_id: str,
                 mobility_type: str,
                 max_speed: float,
                 payload_capacity: float,
                 reach: float,
                 battery_life: float,
                 size: Tuple[float, float, float],
                 environmental_resistance: List[str],
                 sensors: List[str],
                 sensor_range: float,
                 manipulators: List[str],
                 communication_protocols: List[str],
                 processing_power: float,
                 autonomy_level: str,
                 special_functions: List[str],
                 safety_features: List[str],
                 adaptability: bool,
                 location: Tuple[float, float, float],
                 preferred_tasks: List[str],
                 current_task: Optional["TaskDescription"],
                 remaining_distance: float,
                 time_on_task: float):
        self.robot_id = robot_id
        self.mobility_type = mobility_type
        self.max_speed = max_speed
        self.payload_capacity = payload_capacity
        self.reach = reach
        self.battery_life = battery_life
        self.size = size  # (length, width, height)
        self.environmental_resistance = environmental_resistance
        self.sensors = sensors
        self.sensor_range = sensor_range
        self.manipulators = manipulators
        self.communication_protocols = communication_protocols
        self.processing_power = processing_power
        self.autonomy_level = autonomy_level
        self.special_functions = special_functions
        self.safety_features = safety_features
        self.adaptability = adaptability
        self.location = location  # (x, y, z) coordinates
        self.preferred_tasks = preferred_tasks
        self.current_task = current_task  # The task the robot is currently assigned to
        self.remaining_distance = remaining_distance  # Distance left to the current task location
        self.time_on_task = time_on_task  # Time spent on the current task

    def __repr__(self):
        return (f"CapabilityProfile(robot_id={self.robot_id}, mobility_type={self.mobility_type}, "
                f"max_speed={self.max_speed}, payload_capacity={self.payload_capacity}, "
                f"reach={self.reach}, battery_life={self.battery_life}, size={self.size}, "
                f"environmental_resistance={self.environmental_resistance}, sensors={self.sensors}, "
                f"sensor_range={self.sensor_range}, manipulators={self.manipulators}, communication_protocols={self.communication_protocols}, "
                f"processing_power={self.processing_power}, autonomy_level={self.autonomy_level}, "
                f"special_functions={self.special_functions}, safety_features={self.safety_features}, "
                f"adaptability={self.adaptability}, location={self.location}, preferred_tasks={self.preferred_tasks}, current_task={self.current_task}, remaining_distance={self.remaining_distance}, time_on_task={self.time_on_task})")


class TaskDescription:
    def __init__(self,
                 task_id: str,
                 task_type: str,
                 objective: str,
                 priority_level: str,
                 reward: float,
                 difficulty: float,
                 location: Tuple[float, float, float],
                 navigation_constraints: Optional[List[str]],
                 required_capabilities: List[str],
                 time_window: Optional[Tuple[str, str]],
                 duration: float,
                 environmental_conditions: Optional[List[str]],
                 dependencies: Optional[List[str]],
                 tools_needed: Optional[List[str]],
                 communication_requirements: Optional[List[str]],
                 safety_protocols: Optional[List[str]],
                 performance_metrics: List[str],
                 success_criteria: str,
                 assigned_robot: Optional["CapabilityProfile"],
                 time_to_complete: float):
        self.task_id = task_id
        self.task_type = task_type
        self.objective = objective
        self.priority_level = priority_level
        self.reward = reward
        self.difficulty = difficulty
        self.location = location  # (x, y, z) coordinates
        self.navigation_constraints = navigation_constraints
        self.required_capabilities = required_capabilities
        self.time_window = time_window  # (start_time, end_time) or None
        self.duration = duration
        self.environmental_conditions = environmental_conditions
        self.dependencies = dependencies
        self.tools_needed = tools_needed
        self.communication_requirements = communication_requirements
        self.safety_protocols = safety_protocols
        self.performance_metrics = performance_metrics
        self.success_criteria = success_criteria
        self.assigned_robot = assigned_robot  # The robot currently assigned to this task
        self.time_to_complete = time_to_complete  # Time required to complete the task

    def __repr__(self):
        return (f"TaskDescription(task_id={self.task_id}, task_type={self.task_type}, "
                f"objective={self.objective}, priority_level={self.priority_level}, "
                f"location={self.location}, reward={self.reward}, difficulty={self.difficulty}, navigation_constraints={self.navigation_constraints}, "
                f"required_capabilities={self.required_capabilities}, time_window={self.time_window}, "
                f"duration={self.duration}, environmental_conditions={self.environmental_conditions}, "
                f"dependencies={self.dependencies}, tools_needed={self.tools_needed}, "
                f"communication_requirements={self.communication_requirements}, "
                f"safety_protocols={self.safety_protocols}, performance_metrics={self.performance_metrics}, "
                f"success_criteria={self.success_criteria}, assigned_robot={self.assigned_robot}, time_to_complete={self.time_to_complete})")

In [3]:
def generate_random_robot_profile(robot_id: str) -> CapabilityProfile:
    mobility_types = ["wheeled", "tracked", "legged", "aerial", "hovering", "climbing"]
    environmental_resistances = ["weatherproof", "waterproof", "dustproof", "heat-resistant", "cold-resistant", "shock-resistant"]
    sensors = ["camera", "microphone", "LiDAR", "GPS", "ultrasonic", "temperature sensor", "infrared", "proximity sensor", "magnetometer"]
    manipulators = ["gripper", "drill", "welding tool"]
    communication_protocols = ["Wi-Fi", "Bluetooth", "4G", "5G", "Radio"]
    special_functions = ["object recognition", "speech output", "facial recognition", "object tracking", "gesture recognition"]
    safety_features = ["emergency stop", "collision avoidance", "overheat protection", "fall detection", "obstacle detection", "speed reduction in crowded areas"]
    task_preferences = ["delivery", "inspection", "cleaning", "monitoring", "maintenance", "assembly", "surveying", "data collection", "assistance"]
    
    return CapabilityProfile(
        robot_id=robot_id,
        mobility_type=random.choice(mobility_types),
        max_speed=random.uniform(0.5, 15.0),
        payload_capacity=random.uniform(0.0, 50.0),
        reach=random.uniform(0.0, 10.0),
        battery_life=random.uniform(5.0, 500.0),
        size=(random.uniform(0.5, 5.0), random.uniform(0.5, 5.0), random.uniform(0.5, 5.0)),
        environmental_resistance=random.sample(environmental_resistances, k=random.randint(0, len(environmental_resistances))),
        sensors=random.sample(sensors, k=random.randint(1, len(sensors))),
        sensor_range=random.uniform(1.0, 50.0),
        manipulators=random.sample(manipulators, k=random.randint(0, len(manipulators))),
        communication_protocols=random.sample(communication_protocols, k=random.randint(1, len(communication_protocols))),
        processing_power=random.uniform(1.0, 10.0),
        autonomy_level=random.choice(["teleoperated", "semi-autonomous", "fully autonomous"]),
        special_functions=random.sample(special_functions, k=random.randint(0, len(special_functions))),
        safety_features=random.sample(safety_features, k=random.randint(0, len(safety_features))),
        adaptability=bool(random.getrandbits(1)),
        location=(random.uniform(0.0, 100.0), random.uniform(0.0, 100.0), 0.0),
        preferred_tasks=random.sample(task_preferences, k=random.randint(0, len(task_preferences))),
        current_task = None,
        remaining_distance = 0.0,
        time_on_task = 0
    )


def generate_random_task_description(task_id: str) -> TaskDescription:
    task_types = ["delivery", "inspection", "cleaning", "monitoring", "maintenance", "assembly", "surveying", "data collection", "assistance"]
    priorities = ["low", "medium", "high", "urgent"]
    navigation_constraints = ["elevator", "stairs", "shelves", "no loud noises allowed", "narrow spaces", "low ceilings", "uneven floors", "low visibility", "slippery", "crowded", "loose debris", "no-fly zone", "windy", "dense obstructions", "smooth surfaces"]
    environmental_conditions = ["weatherproof", "waterproof", "dustproof", "heat-resistant", "cold-resistant", "shock-resistant"]
    performance_metrics = ["time taken", "accuracy", "energy consumption", "safety compliance", "completion rate"]
    requirements = [
        "payload capacity >= 5.0", 
        "payload capacity >= 1.0", 
        "payload capacity >= 10.0", 
        "payload capacity >= 20.0", 
        "reach >= 1.5", 
        "reach >= 3.0"
    ]
    tools = ["gripper", "camera", "microphone", "temperature sensor", "drill", "welding tool"]
    communication_types = ["Wi-Fi", "Bluetooth", "4G", "5G", "Radio"]
    safety_types = ["emergency stop", "collision avoidance", "overheat protection", "fall detection", "obstacle detection", "speed reduction in crowded areas"]
    duration_time = random.uniform(10.0, 300.0)
    
    return TaskDescription(
        task_id=task_id,
        task_type=random.choice(task_types),
        objective=f"Perform {random.choice(task_types)} task",
        priority_level=random.choice(priorities),
        reward=random.uniform(1, 10),
        difficulty=random.uniform(1, 10),
        location=(random.uniform(0.0, 100.0), random.uniform(0.0, 100.0), 0.0),
        navigation_constraints=random.sample(navigation_constraints, k=random.randint(0, len(navigation_constraints))),
        required_capabilities=random.sample(requirements, k=random.randint(0, len(requirements))),
        time_window=(f"{random.randint(8, 17)}:00", f"{random.randint(18, 23)}:00"),
        duration=duration_time,
        environmental_conditions=random.sample(environmental_conditions, k=random.randint(0, len(environmental_conditions))),
        dependencies=None,
        tools_needed=random.sample(tools, k=random.randint(0, len(tools))),
        communication_requirements=random.sample(communication_types, k=random.randint(1, len(communication_types))),
        safety_protocols=random.sample(safety_types, k=random.randint(0, len(safety_types))),
        performance_metrics=random.sample(performance_metrics, k=random.randint(1, len(performance_metrics))),
        success_criteria="Task completed within time window",
        assigned_robot = None,
        time_to_complete = duration_time
    )

In [4]:
def navigation_suitability(robot_mobility_type: str, robot_size: Tuple[float, float, float], task_constraints: List[str]) -> float:
    """
    Evaluates the suitability of a robot for navigating a task environment based on mobility type, size, and navigation constraints.
    
    Parameters:
    - robot_mobility_type: The mobility type of the robot (e.g., "wheeled", "tracked", "legged", "aerial", "hovering", "climbing").
    - robot_size: A tuple representing the robot's dimensions (length, width, height).
    - task_constraints: A list of navigation constraints for the task environment.
    
    Returns:
    - A float score representing the suitability for navigation. Returns 0 if there is a critical mismatch that prevents navigation.
    """

    # Initialize the score
    score = 0.0

    # Define size thresholds for narrow spaces, low ceilings, etc.
    narrow_space_threshold = 1.0  # Width limit for narrow spaces
    low_ceiling_threshold = 1.5   # Height limit for low ceilings

    # Handle each task constraint based on mobility type
    for constraint in task_constraints:
        # Constraint: Elevator access
        if constraint == "elevator":
            if robot_mobility_type in ["wheeled", "tracked", "legged"]:
                score += 1.0

        # Constraint: Stairs
        elif constraint == "stairs":
            if robot_mobility_type in ["legged", "aerial", "climbing"]:
                score += 1.0

        # Constraint: Shelves
        elif constraint == "shelves":
            if robot_size[2] < low_ceiling_threshold or robot_mobility_type in ["aerial", "climbing", "hovering"]:
                score += 1.0  # Only smaller robots can access shelves effectively

        # Constraint: No loud noises allowed
        elif constraint == "no loud noises allowed":
            if robot_mobility_type in ["legged", "hovering"]:
                score += 1.0  # Quieter mobility types

        # Constraint: Narrow spaces
        elif constraint == "narrow spaces":
            if robot_size[1] <= narrow_space_threshold:
                score += 1.0
            else:
                return 0.0  # Larger robots cannot pass through narrow spaces

        # Constraint: Low ceilings
        elif constraint == "low ceilings":
            if robot_size[2] <= low_ceiling_threshold:
                score += 1.0
            else:
                return 0.0  # Tall robots cannot navigate in areas with low ceilings

        # Constraint: Uneven floors
        elif constraint == "uneven floors":
            if robot_mobility_type in ["tracked", "legged"]:
                score += 1.0  # These types handle uneven floors well

        # Constraint: Low visibility
        elif constraint == "low visibility":
            if robot_mobility_type in ["wheeled", "tracked", "legged"]:
                score += 1.0  # Infrared or LiDAR-equipped robots are suitable

        # Constraint: Slippery surfaces
        elif constraint == "slippery":
            if robot_mobility_type in ["tracked", "hovering", "aerial"]:
                score += 1.0  # Hovering and tracked types handle slippery surfaces better
            elif robot_mobility_type in ["wheeled", "legged"]:
                return 0.0  # Wheeled and legged robots are unsuitable on slippery floors

        # Constraint: Crowded environments
        elif constraint == "crowded":
            if robot_size[0] <= 1.0 and robot_size[1] <= 1.0:
                score += 1.0  # Smaller robots are more suitable in crowded environments
            else:
                score += 0.5  # Larger robots get a lower score

        # Constraint: Loose debris
        elif constraint == "loose debris":
            if robot_mobility_type in ["aerial", "hovering"]:
                score += 1.0  # Aerial and hovering robots handle debris better
            elif robot_mobility_type == "legged":
                return 0.0  # Legged robots are unsuitable

        # Constraint: No-fly zone
        elif constraint == "no-fly zone":
            if robot_mobility_type == "aerial":
                return 0.0  # Aerial robots cannot navigate in no-fly zones
            else:
                score += 0.5  # Other mobility types are unaffected

        # Constraint: Windy conditions
        elif constraint == "windy":
            if robot_mobility_type == "aerial":
                return 0.0  # Aerial robots struggle in windy conditions
            else:
                score += 0.5  # All other types are more stable in wind

        # Constraint: Dense obstructions (e.g., tree branches, hanging cables)
        elif constraint == "dense obstructions":
            if robot_mobility_type in ["aerial", "legged"]:
                return 0.0  # Aerial and legged robots are unsuitable in dense obstruction areas
            else:
                score += 0.5  # Other types may navigate dense areas on the ground

        # Constraint: Smooth floors
        elif constraint == "smooth surfaces":
            if robot_mobility_type == "climbing":
                return 0.0  # Climbing robots are less suited for smooth surfaces

    # Final suitability score (0 if any constraint returns 0)
    return score if score > 0 else 0.0


def evaluate_suitability_loose(robot: CapabilityProfile, task: TaskDescription) -> float:
    """
    Evaluates the suitability of a robot for a given task.
    A higher score indicates better suitability.
    
    Parameters:
    - robot: The CapabilityProfile of the robot.
    - task: The TaskDescription of the task.
    
    Returns:
    - A float score representing the suitability of the robot for the task. A score of 0 indicates the robot cannot perform the task.
    """
    score = 0.0
    
#     print(task.required_capabilities, robot.payload_capacity)
    # Check if robot meets the minimum requirements
    if any(req for req in task.required_capabilities if "payload capacity" in req and robot.payload_capacity < float(req.split(">= ")[-1])):
        score += 0.0  # Suitability is zero if the robot doesn't meet minimum requirements
    else:
        score += 1.0  # Add score if payload meets or exceeds requirements
    
#     print(task.tools_needed, robot.sensors+robot.manipulators)
    # Check if the robot has the necessary tools for the task
    if task.tools_needed and not all(item in robot.sensors+robot.manipulators for item in task.tools_needed):
        score += 0.0  # Suitability is zero if the robot lacks necessary tools
    else:
        score += 1.0  # Add score if robot has necessary tools
    
#     print(task.communication_requirements, robot.communication_protocols)
    # Check if the robot can communicate as required by the task
    if task.communication_requirements and not all(protocol in robot.communication_protocols for protocol in task.communication_requirements):
        score += 0.0  # Suitability is zero if the robot lacks required communication protocols
    else:
        score += 1.0  # Add score if robot has communication requirements
    
#     print(task.safety_protocols, robot.safety_features)
    # Check if the robot can safely perform the task
    if task.safety_protocols and not all(safety in robot.safety_features for safety in task.safety_protocols):
        score += 0.0  # Suitability is zero if the robot lacks required safety features
    else:
        score += 1.0  # Add score if robot meets safety requirements
    
#     print(task.environmental_conditions, robot.environmental_resistance)
    # Environmental compatibility: Can the robot operate in the task’s conditions?
    if task.environmental_conditions and not all(condition in robot.environmental_resistance for condition in task.environmental_conditions):
        score += 0.0  # Suitability is zero if the robot can't operate in required environmental conditions
    else:
        score += 1.0  # Add score if robot has required environmental resistances
    
#     print(task.required_capabilities, robot.reach)
    # Check if the robot meets reach requirements
    if any(req for req in task.required_capabilities if "reach" in req and robot.reach < float(req.split(">= ")[-1])):
        score += 0.0  # Suitability is zero if the robot cannot reach the task area as required
    else:
        score += 1.0  # Add score if reach meets or exceeds requirements
    
#     print(task.navigation_constraints, robot.mobility_type, robot.size)
    # Check navigation constraints based on mobility type and robot size
    navigation_match = navigation_suitability(robot.mobility_type, robot.size, task.navigation_constraints)
    if navigation_match == 0:
        score += 0.0
    else:
        score += navigation_match

    distance_to_task = ((robot.location[0] - task.location[0]) ** 2 + (robot.location[1] - task.location[1]) ** 2) ** 0.5
#     print(robot.sensor_range)
    # Check sensor capabilities for the task
    if robot.sensor_range >= distance_to_task:
        score += 1.0
    elif robot.sensor_range >= distance_to_task/2:
        score += 0.5

    # Battery and distance check: Ensure the robot has sufficient battery to reach and complete the task
#     print(robot.max_speed, robot.battery_life, task.duration, distance_to_task)
    if ((distance_to_task / robot.max_speed)+task.duration) > robot.battery_life:
        score += 0.0  # Suitability is zero if the robot can't complete the task due to distance, speed, or battery limitations

    # Add to score based on proximity (closer robots get higher scores)
    if distance_to_task < 20.0:
        score += 1.0
    elif distance_to_task < 50.0:
        score += 0.5

#     print(task.priority_level, robot.autonomy_level)
    # Check if the robot's autonomy level matches the task's priority level
    if task.priority_level in ["high", "urgent"] and robot.autonomy_level in ["fully autonomous", "teleoperated"]:
        score += 1.0
    elif task.priority_level in ["medium", "low"] and robot.autonomy_level in ["semi-autonomous", "fully autonomous"]:
        score += 0.5

#     print(robot.battery_life, task.duration)
    # Evaluate battery life for task duration
    if robot.battery_life >= 2*((distance_to_task / robot.max_speed)+task.duration):
        score += 1.0
    else:
        score += 0.5

#     print(task.task_type, robot.special_functions)
    task_function_mapping = {
        "delivery": ["object recognition", "speech output", "facial recognition"],
        "inspection": ["object recognition", "object tracking", "gesture recognition"],
        "cleaning": ["object recognition"],
        "monitoring": ["speech output", "object tracking", "facial recognition"],
        "maintenance": ["object recognition", "path planning"],
        "assembly": ["object recognition"],
        "surveying": ["speech output", "facial recognition", "object recognition", "object tracking"],
        "data collection": ["object recognition", "object tracking", "facial recognition", "gesture recognition"],
        "assistance": ["speech output", "facial recognition", "gesture recognition"]
    }

    # Get the relevant functions for this task type
    required_functions = task_function_mapping[task.task_type]

    # Calculate the score based on matches between robot's functions and required functions
    for function in robot.special_functions:
        if function in required_functions:
            score += 1.0  # Increase score for each match
    
#     # Dependencies
#     if task.dependencies:
#         # Assume dependencies are represented as tasks that must be completed first
#         score += 0.5 if all(dep in completed_tasks for dep in task.dependencies) else 0.0
    
#     print(task.difficulty, robot.processing_power)
    # Processing power: Certain tasks may benefit from higher processing power if they are computationally demanding
    if task.difficulty > 7 and robot.processing_power >= 5.0:  # Difficulty > 7 indicates a complex task
        score += 1.0
    elif task.difficulty > 4 and robot.processing_power >= 3.0:
        score += 1.0
    elif task.difficulty > 2 and robot.processing_power >= 1.5:
        score += 0.5

#     print(robot.adaptability)
    # Consider robot's adaptability to changing conditions
    if robot.adaptability:
        score += 0.5
    
#     print(task.task_type, robot.preferred_tasks)
    # Preference matching
    if task.task_type in robot.preferred_tasks:
        score += 1.0

    # Score based on priority, reward, and difficulty
    priority_multiplier = {"low": 0.5, "medium": 1.0, "high": 1.5, "urgent": 2.0}[task.priority_level]
    reward_to_difficulty_ratio = task.reward / task.difficulty
#     print(task.priority_level, task.reward, task.difficulty, priority_multiplier, reward_to_difficulty_ratio)
    score += priority_multiplier * reward_to_difficulty_ratio

    # Return the final suitability score
#     print(score)
    return score

def evaluate_suitability_strict(robot: CapabilityProfile, task: TaskDescription) -> float:
    """
    Evaluates the suitability of a robot for a given task.
    A higher score indicates better suitability.
    
    Parameters:
    - robot: The CapabilityProfile of the robot.
    - task: The TaskDescription of the task.
    
    Returns:
    - A float score representing the suitability of the robot for the task. A score of 0 indicates the robot cannot perform the task.
    """
    score = 0.0
    
#     print(task.required_capabilities, robot.payload_capacity)
    # Check if robot meets the minimum requirements
    if any(req for req in task.required_capabilities if "payload capacity" in req and robot.payload_capacity < float(req.split(">= ")[-1])):
        return 0.0  # Suitability is zero if the robot doesn't meet minimum requirements
    else:
        score += 1.0  # Add score if payload meets or exceeds requirements
    
#     print(task.tools_needed, robot.sensors+robot.manipulators)
    # Check if the robot has the necessary tools for the task
    if task.tools_needed and not all(item in robot.sensors+robot.manipulators for item in task.tools_needed):
        return 0.0  # Suitability is zero if the robot lacks necessary tools
    else:
        score += 1.0  # Add score if robot has necessary tools
    
#     print(task.communication_requirements, robot.communication_protocols)
    # Check if the robot can communicate as required by the task
    if task.communication_requirements and not all(protocol in robot.communication_protocols for protocol in task.communication_requirements):
        return 0.0  # Suitability is zero if the robot lacks required communication protocols
    else:
        score += 1.0  # Add score if robot has communication requirements
    
#     print(task.safety_protocols, robot.safety_features)
    # Check if the robot can safely perform the task
    if task.safety_protocols and not all(safety in robot.safety_features for safety in task.safety_protocols):
        return 0.0  # Suitability is zero if the robot lacks required safety features
    else:
        score += 1.0  # Add score if robot meets safety requirements
    
#     print(task.environmental_conditions, robot.environmental_resistance)
    # Environmental compatibility: Can the robot operate in the task’s conditions?
    if task.environmental_conditions and not all(condition in robot.environmental_resistance for condition in task.environmental_conditions):
        return 0.0  # Suitability is zero if the robot can't operate in required environmental conditions
    else:
        score += 1.0  # Add score if robot has required environmental resistances
    
#     print(task.required_capabilities, robot.reach)
    # Check if the robot meets reach requirements
    if any(req for req in task.required_capabilities if "reach" in req and robot.reach < float(req.split(">= ")[-1])):
        return 0.0  # Suitability is zero if the robot cannot reach the task area as required
    else:
        score += 1.0  # Add score if reach meets or exceeds requirements
    
#     print(task.navigation_constraints, robot.mobility_type, robot.size)
    # Check navigation constraints based on mobility type and robot size
    navigation_match = navigation_suitability(robot.mobility_type, robot.size, task.navigation_constraints)
    if navigation_match == 0:
        return 0.0
    else:
        score += navigation_match

    distance_to_task = ((robot.location[0] - task.location[0]) ** 2 + (robot.location[1] - task.location[1]) ** 2) ** 0.5
#     print(robot.sensor_range)
    # Check sensor capabilities for the task
    if robot.sensor_range >= distance_to_task:
        score += 1.0
    elif robot.sensor_range >= distance_to_task/2:
        score += 0.5

    # Battery and distance check: Ensure the robot has sufficient battery to reach and complete the task
#     print(robot.max_speed, robot.battery_life, task.duration, distance_to_task)
    if ((distance_to_task / robot.max_speed)+task.duration) > robot.battery_life:
        return 0.0  # Suitability is zero if the robot can't complete the task due to distance, speed, or battery limitations

    # Add to score based on proximity (closer robots get higher scores)
    if distance_to_task < 20.0:
        score += 1.0
    elif distance_to_task < 50.0:
        score += 0.5

#     print(task.priority_level, robot.autonomy_level)
    # Check if the robot's autonomy level matches the task's priority level
    if task.priority_level in ["high", "urgent"] and robot.autonomy_level in ["fully autonomous", "teleoperated"]:
        score += 1.0
    elif task.priority_level in ["medium", "low"] and robot.autonomy_level in ["semi-autonomous", "fully autonomous"]:
        score += 0.5

#     print(robot.battery_life, task.duration)
    # Evaluate battery life for task duration
    if robot.battery_life >= 2*((distance_to_task / robot.max_speed)+task.duration):
        score += 1.0
    else:
        score += 0.5

#     print(task.task_type, robot.special_functions)
    task_function_mapping = {
        "delivery": ["object recognition", "speech output", "facial recognition"],
        "inspection": ["object recognition", "object tracking", "gesture recognition"],
        "cleaning": ["object recognition"],
        "monitoring": ["speech output", "object tracking", "facial recognition"],
        "maintenance": ["object recognition", "path planning"],
        "assembly": ["object recognition"],
        "surveying": ["speech output", "facial recognition", "object recognition", "object tracking"],
        "data collection": ["object recognition", "object tracking", "facial recognition", "gesture recognition"],
        "assistance": ["speech output", "facial recognition", "gesture recognition"]
    }

    # Get the relevant functions for this task type
    required_functions = task_function_mapping[task.task_type]

    # Calculate the score based on matches between robot's functions and required functions
    for function in robot.special_functions:
        if function in required_functions:
            score += 1.0  # Increase score for each match
    
#     # Dependencies
#     if task.dependencies:
#         # Assume dependencies are represented as tasks that must be completed first
#         score += 0.5 if all(dep in completed_tasks for dep in task.dependencies) else 0.0
    
#     print(task.difficulty, robot.processing_power)
    # Processing power: Certain tasks may benefit from higher processing power if they are computationally demanding
    if task.difficulty > 7 and robot.processing_power >= 5.0:  # Difficulty > 7 indicates a complex task
        score += 1.0
    elif task.difficulty > 4 and robot.processing_power >= 3.0:
        score += 1.0
    elif task.difficulty > 2 and robot.processing_power >= 1.5:
        score += 0.5

#     print(robot.adaptability)
    # Consider robot's adaptability to changing conditions
    if robot.adaptability:
        score += 0.5
    
#     print(task.task_type, robot.preferred_tasks)
    # Preference matching
    if task.task_type in robot.preferred_tasks:
        score += 1.0

    # Score based on priority, reward, and difficulty
    priority_multiplier = {"low": 0.5, "medium": 1.0, "high": 1.5, "urgent": 2.0}[task.priority_level]
    reward_to_difficulty_ratio = task.reward / task.difficulty
#     print(task.priority_level, task.reward, task.difficulty, priority_multiplier, reward_to_difficulty_ratio)
    score += priority_multiplier * reward_to_difficulty_ratio

    # Return the final suitability score
#     print(score)
    return score

def evaluate_suitability_distance(robot: CapabilityProfile, task: TaskDescription) -> float:
    """
    Evaluates the suitability of a robot for a given task.
    A higher score indicates better suitability.
    
    Parameters:
    - robot: The CapabilityProfile of the robot.
    - task: The TaskDescription of the task.
    
    Returns:
    - A float score representing the suitability of the robot for the task. A score of 0 indicates the robot cannot perform the task.
    """
    score = 0.0
    
#     print(task.required_capabilities, robot.payload_capacity)
    # Check if robot meets the minimum requirements
    if any(req for req in task.required_capabilities if "payload capacity" in req and robot.payload_capacity < float(req.split(">= ")[-1])):
        score += 0.0  # Suitability is zero if the robot doesn't meet minimum requirements
    else:
        score += 1.0  # Add score if payload meets or exceeds requirements
    
#     print(task.tools_needed, robot.sensors+robot.manipulators)
    # Check if the robot has the necessary tools for the task
    if task.tools_needed and not all(item in robot.sensors+robot.manipulators for item in task.tools_needed):
        score += 0.0  # Suitability is zero if the robot lacks necessary tools
    else:
        score += 1.0  # Add score if robot has necessary tools
    
#     print(task.communication_requirements, robot.communication_protocols)
    # Check if the robot can communicate as required by the task
    if task.communication_requirements and not all(protocol in robot.communication_protocols for protocol in task.communication_requirements):
        score += 0.0  # Suitability is zero if the robot lacks required communication protocols
    else:
        score += 1.0  # Add score if robot has communication requirements
    
#     print(task.safety_protocols, robot.safety_features)
    # Check if the robot can safely perform the task
    if task.safety_protocols and not all(safety in robot.safety_features for safety in task.safety_protocols):
        score += 0.0  # Suitability is zero if the robot lacks required safety features
    else:
        score += 1.0  # Add score if robot meets safety requirements
    
#     print(task.environmental_conditions, robot.environmental_resistance)
    # Environmental compatibility: Can the robot operate in the task’s conditions?
    if task.environmental_conditions and not all(condition in robot.environmental_resistance for condition in task.environmental_conditions):
        score += 0.0  # Suitability is zero if the robot can't operate in required environmental conditions
    else:
        score += 1.0  # Add score if robot has required environmental resistances
    
#     print(task.required_capabilities, robot.reach)
    # Check if the robot meets reach requirements
    if any(req for req in task.required_capabilities if "reach" in req and robot.reach < float(req.split(">= ")[-1])):
        score += 0.0  # Suitability is zero if the robot cannot reach the task area as required
    else:
        score += 1.0  # Add score if reach meets or exceeds requirements
    
#     print(task.navigation_constraints, robot.mobility_type, robot.size)
    # Check navigation constraints based on mobility type and robot size
    navigation_match = navigation_suitability(robot.mobility_type, robot.size, task.navigation_constraints)
    if navigation_match == 0:
        score += 0.0
    else:
        score += navigation_match

    distance_to_task = ((robot.location[0] - task.location[0]) ** 2 + (robot.location[1] - task.location[1]) ** 2) ** 0.5
#     print(robot.sensor_range)
    # Check sensor capabilities for the task
    if robot.sensor_range >= distance_to_task:
        score += 1.0
    elif robot.sensor_range >= distance_to_task/2:
        score += 0.5

    # Battery and distance check: Ensure the robot has sufficient battery to reach and complete the task
#     print(robot.max_speed, robot.battery_life, task.duration, distance_to_task)
    if ((distance_to_task / robot.max_speed)+task.duration) > robot.battery_life:
        score += 0.0  # Suitability is zero if the robot can't complete the task due to distance, speed, or battery limitations

    # Add to score based on proximity (closer robots get higher scores)
#     if distance_to_task < 20.0:
#         score += 1.0
#     elif distance_to_task < 50.0:
#         score += 0.5

#     print(task.priority_level, robot.autonomy_level)
    # Check if the robot's autonomy level matches the task's priority level
    if task.priority_level in ["high", "urgent"] and robot.autonomy_level in ["fully autonomous", "teleoperated"]:
        score += 1.0
    elif task.priority_level in ["medium", "low"] and robot.autonomy_level in ["semi-autonomous", "fully autonomous"]:
        score += 0.5

#     print(robot.battery_life, task.duration)
    # Evaluate battery life for task duration
    if robot.battery_life >= 2*((distance_to_task / robot.max_speed)+task.duration):
        score += 1.0
    else:
        score += 0.5

#     print(task.task_type, robot.special_functions)
    task_function_mapping = {
        "delivery": ["object recognition", "speech output", "facial recognition"],
        "inspection": ["object recognition", "object tracking", "gesture recognition"],
        "cleaning": ["object recognition"],
        "monitoring": ["speech output", "object tracking", "facial recognition"],
        "maintenance": ["object recognition", "path planning"],
        "assembly": ["object recognition"],
        "surveying": ["speech output", "facial recognition", "object recognition", "object tracking"],
        "data collection": ["object recognition", "object tracking", "facial recognition", "gesture recognition"],
        "assistance": ["speech output", "facial recognition", "gesture recognition"]
    }

    # Get the relevant functions for this task type
    required_functions = task_function_mapping[task.task_type]

    # Calculate the score based on matches between robot's functions and required functions
    for function in robot.special_functions:
        if function in required_functions:
            score += 1.0  # Increase score for each match
    
#     # Dependencies
#     if task.dependencies:
#         # Assume dependencies are represented as tasks that must be completed first
#         score += 0.5 if all(dep in completed_tasks for dep in task.dependencies) else 0.0
    
#     print(task.difficulty, robot.processing_power)
    # Processing power: Certain tasks may benefit from higher processing power if they are computationally demanding
    if task.difficulty > 7 and robot.processing_power >= 5.0:  # Difficulty > 7 indicates a complex task
        score += 1.0
    elif task.difficulty > 4 and robot.processing_power >= 3.0:
        score += 1.0
    elif task.difficulty > 2 and robot.processing_power >= 1.5:
        score += 0.5

#     print(robot.adaptability)
    # Consider robot's adaptability to changing conditions
    if robot.adaptability:
        score += 0.5
    
#     print(task.task_type, robot.preferred_tasks)
    # Preference matching
    if task.task_type in robot.preferred_tasks:
        score += 1.0

    # Score based on priority, reward, and difficulty
    priority_multiplier = {"low": 0.5, "medium": 1.0, "high": 1.5, "urgent": 2.0}[task.priority_level]
    reward_to_difficulty_ratio = task.reward / task.difficulty
#     print(task.priority_level, task.reward, task.difficulty, priority_multiplier, reward_to_difficulty_ratio)
    score += priority_multiplier * reward_to_difficulty_ratio

    # Weight score by distance to task
    score = score / distance_to_task
    
    # Return the final suitability score
#     print(score)
    return score

def evaluate_suitability_priority(robot: CapabilityProfile, task: TaskDescription) -> float:
    """
    Evaluates the suitability of a robot for a given task.
    A higher score indicates better suitability.
    
    Parameters:
    - robot: The CapabilityProfile of the robot.
    - task: The TaskDescription of the task.
    
    Returns:
    - A float score representing the suitability of the robot for the task. A score of 0 indicates the robot cannot perform the task.
    """
    score = 0.0
    
#     print(task.required_capabilities, robot.payload_capacity)
    # Check if robot meets the minimum requirements
    if any(req for req in task.required_capabilities if "payload capacity" in req and robot.payload_capacity < float(req.split(">= ")[-1])):
        score += 0.0  # Suitability is zero if the robot doesn't meet minimum requirements
    else:
        score += 1.0  # Add score if payload meets or exceeds requirements
    
#     print(task.tools_needed, robot.sensors+robot.manipulators)
    # Check if the robot has the necessary tools for the task
    if task.tools_needed and not all(item in robot.sensors+robot.manipulators for item in task.tools_needed):
        score += 0.0  # Suitability is zero if the robot lacks necessary tools
    else:
        score += 1.0  # Add score if robot has necessary tools
    
#     print(task.communication_requirements, robot.communication_protocols)
    # Check if the robot can communicate as required by the task
    if task.communication_requirements and not all(protocol in robot.communication_protocols for protocol in task.communication_requirements):
        score += 0.0  # Suitability is zero if the robot lacks required communication protocols
    else:
        score += 1.0  # Add score if robot has communication requirements
    
#     print(task.safety_protocols, robot.safety_features)
    # Check if the robot can safely perform the task
    if task.safety_protocols and not all(safety in robot.safety_features for safety in task.safety_protocols):
        score += 0.0  # Suitability is zero if the robot lacks required safety features
    else:
        score += 1.0  # Add score if robot meets safety requirements
    
#     print(task.environmental_conditions, robot.environmental_resistance)
    # Environmental compatibility: Can the robot operate in the task’s conditions?
    if task.environmental_conditions and not all(condition in robot.environmental_resistance for condition in task.environmental_conditions):
        score += 0.0  # Suitability is zero if the robot can't operate in required environmental conditions
    else:
        score += 1.0  # Add score if robot has required environmental resistances
    
#     print(task.required_capabilities, robot.reach)
    # Check if the robot meets reach requirements
    if any(req for req in task.required_capabilities if "reach" in req and robot.reach < float(req.split(">= ")[-1])):
        score += 0.0  # Suitability is zero if the robot cannot reach the task area as required
    else:
        score += 1.0  # Add score if reach meets or exceeds requirements
    
#     print(task.navigation_constraints, robot.mobility_type, robot.size)
    # Check navigation constraints based on mobility type and robot size
    navigation_match = navigation_suitability(robot.mobility_type, robot.size, task.navigation_constraints)
    if navigation_match == 0:
        score += 0.0
    else:
        score += navigation_match

    distance_to_task = ((robot.location[0] - task.location[0]) ** 2 + (robot.location[1] - task.location[1]) ** 2) ** 0.5
#     print(robot.sensor_range)
    # Check sensor capabilities for the task
    if robot.sensor_range >= distance_to_task:
        score += 1.0
    elif robot.sensor_range >= distance_to_task/2:
        score += 0.5

    # Battery and distance check: Ensure the robot has sufficient battery to reach and complete the task
#     print(robot.max_speed, robot.battery_life, task.duration, distance_to_task)
    if ((distance_to_task / robot.max_speed)+task.duration) > robot.battery_life:
        score += 0.0  # Suitability is zero if the robot can't complete the task due to distance, speed, or battery limitations

    # Add to score based on proximity (closer robots get higher scores)
    if distance_to_task < 20.0:
        score += 1.0
    elif distance_to_task < 50.0:
        score += 0.5

#     print(task.priority_level, robot.autonomy_level)
    # Check if the robot's autonomy level matches the task's priority level
    if task.priority_level in ["high", "urgent"] and robot.autonomy_level in ["fully autonomous", "teleoperated"]:
        score += 1.0
    elif task.priority_level in ["medium", "low"] and robot.autonomy_level in ["semi-autonomous", "fully autonomous"]:
        score += 0.5

#     print(robot.battery_life, task.duration)
    # Evaluate battery life for task duration
    if robot.battery_life >= 2*((distance_to_task / robot.max_speed)+task.duration):
        score += 1.0
    else:
        score += 0.5

#     print(task.task_type, robot.special_functions)
    task_function_mapping = {
        "delivery": ["object recognition", "speech output", "facial recognition"],
        "inspection": ["object recognition", "object tracking", "gesture recognition"],
        "cleaning": ["object recognition"],
        "monitoring": ["speech output", "object tracking", "facial recognition"],
        "maintenance": ["object recognition", "path planning"],
        "assembly": ["object recognition"],
        "surveying": ["speech output", "facial recognition", "object recognition", "object tracking"],
        "data collection": ["object recognition", "object tracking", "facial recognition", "gesture recognition"],
        "assistance": ["speech output", "facial recognition", "gesture recognition"]
    }

    # Get the relevant functions for this task type
    required_functions = task_function_mapping[task.task_type]

    # Calculate the score based on matches between robot's functions and required functions
    for function in robot.special_functions:
        if function in required_functions:
            score += 1.0  # Increase score for each match
    
#     # Dependencies
#     if task.dependencies:
#         # Assume dependencies are represented as tasks that must be completed first
#         score += 0.5 if all(dep in completed_tasks for dep in task.dependencies) else 0.0
    
#     print(task.difficulty, robot.processing_power)
    # Processing power: Certain tasks may benefit from higher processing power if they are computationally demanding
    if task.difficulty > 7 and robot.processing_power >= 5.0:  # Difficulty > 7 indicates a complex task
        score += 1.0
    elif task.difficulty > 4 and robot.processing_power >= 3.0:
        score += 1.0
    elif task.difficulty > 2 and robot.processing_power >= 1.5:
        score += 0.5

#     print(robot.adaptability)
    # Consider robot's adaptability to changing conditions
    if robot.adaptability:
        score += 0.5
    
#     print(task.task_type, robot.preferred_tasks)
    # Preference matching
    if task.task_type in robot.preferred_tasks:
        score += 1.0

    # Score based on reward and difficulty
    priority_multiplier = {"low": 0.5, "medium": 1.0, "high": 1.5, "urgent": 2.0}[task.priority_level]
    reward_to_difficulty_ratio = task.reward / task.difficulty
#     print(task.priority_level, task.reward, task.difficulty, priority_multiplier, reward_to_difficulty_ratio)
    score += reward_to_difficulty_ratio

    # Weight based on priority
    score = score * priority_multiplier
    
    # Return the final suitability score
#     print(score)
    return score

In [None]:
import openai  # Make sure to install openai library if you're using it

def evaluate_suitability_with_llm(robot: CapabilityProfile, task: TaskDescription) -> float:
    """
    Uses a large language model (LLM) to evaluate the suitability of a robot for a given task.

    Parameters:
    - robot: The CapabilityProfile of the robot.
    - task: The TaskDescription of the task.

    Returns:
    - A float score (0 to 10) representing the suitability of the robot for the task.
    """
    # Construct a detailed prompt describing the scenario
    prompt = f"""
    You are evaluating the suitability of a robot for a specific task in a robotic system.

    Here are the details of the robot:
    - Robot ID: {robot.robot_id}
    - Mobility Type: {robot.mobility_type}
    - Maximum Speed: {robot.max_speed} units/sec
    - Payload Capacity: {robot.payload_capacity} kg
    - Reach: {robot.reach} meters
    - Battery Life: {robot.battery_life} hours
    - Size: {robot.size} (length, width, height in meters)
    - Environmental Resistance: {', '.join(robot.environmental_resistance)}
    - Sensors: {', '.join(robot.sensors)}
    - Sensor Range: {robot.sensor_range} meters
    - Communication Protocols: {', '.join(robot.communication_protocols)}
    - Processing Power: {robot.processing_power} units
    - Autonomy Level: {robot.autonomy_level}
    - Special Functions: {', '.join(robot.special_functions)}
    - Safety Features: {', '.join(robot.safety_features)}
    - Adaptability: {"Yes" if robot.adaptability else "No"}
    - Preferred Task Types: {', '.join(robot.preferred_tasks)}

    Here are the details of the task:
    - Task ID: {task.task_id}
    - Task Type: {task.task_type}
    - Objective: {task.objective}
    - Priority Level: {task.priority_level}
    - Reward: {task.reward} units
    - Difficulty: {task.difficulty} (scale of 1 to 10)
    - Location: {task.location} (x, y, z coordinates)
    - Navigation Constraints: {', '.join(task.navigation_constraints) if task.navigation_constraints else 'None'}
    - Required Capabilities: {', '.join(task.required_capabilities)}
    - Time Window: {task.time_window[0]} to {task.time_window[1]} (start and end times)
    - Duration: {task.duration} seconds
    - Environmental Conditions: {', '.join(task.environmental_conditions) if task.environmental_conditions else 'None'}
    - Tools Needed: {', '.join(task.tools_needed) if task.tools_needed else 'None'}
    - Materials: {', '.join(task.materials) if task.materials else 'None'}
    - Communication Requirements: {', '.join(task.communication_requirements) if task.communication_requirements else 'None'}
    - Safety Protocols: {', '.join(task.safety_protocols) if task.safety_protocols else 'None'}
    - Success Criteria: {task.success_criteria}
    - Time to Complete: {task.time_to_complete} seconds

    Based on the robot's capabilities and the task requirements, please rate the suitability of this robot for this task on a scale of 0 to 10, where 0 means the robot is completely unsuitable and 10 means it is perfectly suited.

    Provide a single number as the output score.
    """

    # Send the prompt to the LLM and parse the response
    try:
        response = openai.Completion.create(
            engine="text-davinci-003",  # Replace with the appropriate model
            prompt=prompt,
            max_tokens=5,
            temperature=0  # Lower temperature for more deterministic responses
        )
        score_text = response.choices[0].text.strip()
        score = float(score_text) if score_text.isnumeric() else 0.0
    except Exception as e:
        print(f"Error querying LLM: {e}")
        score = 0.0  # Default to zero in case of an error

    return score

In [5]:
def generate_random_assignments(num_robots: int, num_tasks: int, num_assignments: int = 5) -> List[List[Tuple[Optional[int], Optional[int]]]]:
    """
    Generates a list of random assignments of robots to tasks.
    Each assignment is a list of (robot, task) pairs, where some robots or tasks may be unassigned.
    
    Parameters:
    - num_robots: Number of available robots.
    - num_tasks: Number of tasks to be assigned.
    - num_assignments: Number of different random assignments to generate.
    
    Returns:
    - A list of random assignments, where each assignment is a list of (robot, task) pairs.
    """
    assignments = []

    # Generate random assignments
    for _ in range(num_assignments):
        assignment = []

        # Generate a list of available robots and tasks
        available_robots = list(range(num_robots))
        available_tasks = list(range(num_tasks))
        
        # Shuffle the robots and tasks
        random.shuffle(available_robots)
        random.shuffle(available_tasks)
        
        # Determine the number of pairs to create (limited by the smaller of num_robots or num_tasks)
        num_pairs = min(num_robots, num_tasks)

        # Pair robots and tasks randomly
        assigned_pairs = [(available_robots[i], available_tasks[i]) for i in range(num_pairs)]

        # Determine unassigned robots and tasks
        unassigned_robots = available_robots[num_pairs:] if num_robots > num_pairs else []
        unassigned_tasks = available_tasks[num_pairs:] if num_tasks > num_pairs else []

        assignments.append((assigned_pairs, unassigned_robots, unassigned_tasks))

    return assignments

def generate_all_unique_assignments(num_robots: int, num_tasks: int) -> List[Tuple[List[Tuple[int, int]], List[int], List[int]]]:
    """
    Generates all possible unique assignments of robots to tasks.
    Each assignment is a tuple containing:
      1. A list of (robot, task) pairs.
      2. A list of unassigned robots.
      3. A list of unassigned tasks.
    
    Parameters:
    - num_robots: Number of available robots.
    - num_tasks: Number of tasks to be assigned.
    
    Returns:
    - A list of all possible unique assignments, where each assignment is a tuple (assigned_pairs, unassigned_robots, unassigned_tasks).
    """
    all_assignments = []

    # Determine the number of pairs to create (limited by the smaller of num_robots or num_tasks)
    num_pairs = min(num_robots, num_tasks)

    # Generate all possible combinations of robots for the tasks
    robot_combinations = list(combinations(range(num_robots), num_pairs))
    task_combinations = list(combinations(range(num_tasks), num_pairs))

    # Generate all permutations of those combinations
    for robot_comb in robot_combinations:
        for task_comb in task_combinations:
            # Create all permutations for the selected robot and task pairs
            for permuted_robots in permutations(robot_comb):
                for permuted_tasks in permutations(task_comb):
                    # Pair the robots and tasks
                    assigned_pairs = [(permuted_robots[i], permuted_tasks[i]) for i in range(num_pairs)]

                    # Determine unassigned robots and tasks
                    unassigned_robots = [r for r in range(num_robots) if r not in permuted_robots]
                    unassigned_tasks = [t for t in range(num_tasks) if t not in permuted_tasks]

                    all_assignments.append((assigned_pairs, unassigned_robots, unassigned_tasks))

    return all_assignments

In [6]:
def calculate_total_suitability(assignment: List[Tuple[int, int]], suitability_matrix: List[List[float]]) -> float:
    """
    Calculates the total suitability score for a given assignment.
    
    Parameters:
    - assignment: A list of (robot, task) pairs representing the assignment.
    - suitability_matrix: A 2D list where the element at [i][j] represents the suitability of robot i for task j.
    
    Returns:
    - The total suitability score for the assignment.
    """
    total_suitability = 0.0
    
    # Sum the suitability ratings for each robot-task pair in the assignment
    for robot, task in assignment:
        total_suitability += suitability_matrix[robot][task]
    
    return total_suitability

def rank_assignments_range(assignments: List[List[Tuple[int, int]]], suitability_matrix: List[List[float]]) -> Tuple[List[float], List[int]]:
    """
    Ranks the given assignments based on their total suitability scores.
    
    Parameters:
    - assignments: A list of assignments, where each assignment is a list of (robot, task) pairs.
    - suitability_matrix: A 2D list where the element at [i][j] represents the suitability of robot i for task j.
    
    Returns:
    - A tuple containing:
      1. A list of total suitability scores for each assignment.
      2. A list of indices representing the ranking of the assignments, where the first index corresponds to the
         assignment with the highest total score.
    """
    # Calculate total suitability scores for each assignment
    total_scores = [calculate_total_suitability(assignment[0], suitability_matrix) for assignment in assignments]
    
    # Get the ranking of assignments based on the total scores (higher score is better)
    ranking = sorted(range(len(total_scores)), key=lambda i: total_scores[i], reverse=True)
    
    return total_scores, ranking

def rank_assignments_borda(assignments: List[List[Tuple[int, int]]], suitability_matrix: List[List[float]]) -> Tuple[List[float], List[int]]:
    """
    Ranks each assignment from the perspective of each robot based on its suitability for its task in each assignment,
    and uses the Borda count method to aggregate these rankings.
    
    Parameters:
    - assignments: A list of assignments, where each assignment is a list of (robot, task) pairs.
    - suitability_matrix: A 2D list where the element at [i][j] represents the suitability of robot i for task j.
    
    Returns:
    - A tuple containing:
      1. A list of Borda scores for each assignment.
      2. A list of indices representing the ranking of the assignments based on Borda scores, where the first index corresponds
         to the assignment with the highest total Borda score.
    """
    num_assignments = len(assignments)
    num_robots = len(suitability_matrix)

    # Initialize Borda scores for each assignment
    borda_scores = [0] * num_assignments

    # For each robot, rank the assignments based on its suitability rating
    for robot in range(num_robots):
        # Collect suitability scores for this robot across all assignments
        robot_scores = []
        
        for assignment_index, assignment in enumerate(assignments):
            # Find the task assigned to this robot in the current assignment
            assigned_task = next((task for r, task in assignment[0] if r == robot), None)
            
            # If the robot has no task assigned in this assignment, assign it the lowest score
            if assigned_task is None:
                robot_scores.append((assignment_index, -1))  # Using -1 to rank unassigned lower
            else:
                robot_scores.append((assignment_index, suitability_matrix[robot][assigned_task]))

        # Sort robot's scores for all assignments: unassigned (score -1) at the bottom, then by suitability
        robot_scores.sort(key=lambda x: x[1], reverse=True)

        # Assign Borda points based on the ranking
        for rank, (assignment_index, _) in enumerate(robot_scores):
            borda_points = len(robot_scores) - rank - 1  # Borda count: higher ranks get more points
            borda_scores[assignment_index] += borda_points

    # Rank assignments based on total Borda scores (higher score is better)
    ranked_assignments = sorted(range(num_assignments), key=lambda i: borda_scores[i], reverse=True)

    return borda_scores, ranked_assignments

def rank_assignments_approval(assignments: List[List[Tuple[int, int]]], suitability_matrix: List[List[float]], threshold: float = 10) -> Tuple[List[int], List[int]]:
    """
    Uses approval voting where each robot gives 1 point to assignments where its suitability for its task is above a threshold.
    If a robot's suitability is below the threshold or it is unassigned, it gives 0 points for that assignment.
    
    Parameters:
    - assignments: A list of assignments, where each assignment is a list of (robot, task) pairs.
    - suitability_matrix: A 2D list where the element at [i][j] represents the suitability of robot i for task j.
    - threshold: The suitability threshold above which a robot will approve an assignment (default is 10).
    
    Returns:
    - A tuple containing:
      1. A list of approval scores for each assignment.
      2. A list of indices representing the ranking of the assignments based on approval scores.
    """
    num_assignments = len(assignments)
    num_robots = len(suitability_matrix)

    # Initialize approval scores for each assignment
    approval_scores = [0] * num_assignments

    # For each robot, evaluate each assignment based on the suitability threshold
    for robot in range(num_robots):
        for assignment_index, assignment in enumerate(assignments):
            # Find the task assigned to this robot in the current assignment
            assigned_task = next((task for r, task in assignment[0] if r == robot), None)
            
            # Check if robot's suitability rating for this task is above the threshold
            if assigned_task is not None and suitability_matrix[robot][assigned_task] > threshold:
                approval_scores[assignment_index] += 1  # Robot approves this assignment

    # Rank assignments based on approval scores (higher score is better)
    ranked_assignments = sorted(range(num_assignments), key=lambda i: approval_scores[i], reverse=True)

    return approval_scores, ranked_assignments

def rank_assignments_majority_judgment(assignments: List[List[Tuple[int, int]]], suitability_matrix: List[List[float]]) -> Tuple[List[float], List[int]]:
    """
    Uses majority judgment to rank assignments based on robot suitability for assigned tasks.
    
    Parameters:
    - assignments: A list of assignments, where each assignment is a list of (robot, task) pairs.
    - suitability_matrix: A 2D list where the element at [i][j] represents the suitability of robot i for task j.
    
    Returns:
    - A tuple containing:
      1. A list of median scores for each assignment based on majority judgment.
      2. A list of indices representing the ranking of assignments based on median scores.
    """
    # Define qualitative rating categories mapped to suitability score ranges
    rating_scale = {"Excellent": 3, "Good": 2, "Fair": 1, "Poor": 0}
    rating_thresholds = [(15, "Excellent"), (10, "Good"), (5, "Fair"), (0, "Poor")]

    assignment_ratings = []

    for assignment in assignments:
        ratings = []
        
        for robot, task in assignment[0]:
            # Get the suitability score for the robot-task pair
            suitability_score = suitability_matrix[robot][task]

            # Convert suitability score to qualitative rating
            for threshold, rating in rating_thresholds:
                if suitability_score >= threshold:
                    ratings.append(rating_scale[rating])
                    break
        
        # Calculate the median rating for this assignment
        median_rating = np.median(ratings) if ratings else 0
        assignment_ratings.append(median_rating)

    # Rank assignments based on median ratings
    ranked_assignments = sorted(range(len(assignment_ratings)), key=lambda i: assignment_ratings[i], reverse=True)

    return assignment_ratings, ranked_assignments

def rank_assignments_cumulative_voting(assignments: List[List[Tuple[int, int]]], suitability_matrix: List[List[float]], total_votes: int = 10) -> Tuple[List[float], List[int]]:
    """
    Uses cumulative voting to rank assignments, where each robot distributes a fixed number of votes
    across assignments based on suitability scores.
    
    Parameters:
    - assignments: A list of assignments, where each assignment is a list of (robot, task) pairs.
    - suitability_matrix: A 2D list where the element at [i][j] represents the suitability of robot i for task j.
    - total_votes: The total number of votes each robot has to distribute (default is 10).
    
    Returns:
    - A tuple containing:
      1. A list of cumulative votes for each assignment.
      2. A list of indices representing the ranking of assignments based on cumulative votes.
    """
    num_assignments = len(assignments)
    cumulative_votes = [0] * num_assignments
    if total_votes < num_assignments/2:
        total_votes = num_assignments/2
    
    for robot in range(len(suitability_matrix)):
        # Collect suitability scores for this robot across all assignments
        robot_votes = []
        
        for assignment_index, assignment in enumerate(assignments):
            # Find the task assigned to this robot in the current assignment
            assigned_task = next((task for r, task in assignment[0] if r == robot), None)
            
            # If robot is unassigned in this assignment, give it a suitability score of 0
            suitability_score = suitability_matrix[robot][assigned_task] if assigned_task is not None else 0
            robot_votes.append((assignment_index, suitability_score))
        
        # Sort robot votes based on suitability scores in descending order
        robot_votes.sort(key=lambda x: x[1], reverse=True)

        # Distribute the total votes among assignments proportional to suitability scores
        total_score = sum(score for _, score in robot_votes if score > 0)
        for assignment_index, score in robot_votes:
            if total_score > 0:
                cumulative_votes[assignment_index] += total_votes * (score / total_score)

    # Rank assignments based on cumulative votes (higher score is better)
    ranked_assignments = sorted(range(num_assignments), key=lambda i: cumulative_votes[i], reverse=True)

    return cumulative_votes, ranked_assignments

def rank_assignments_condorcet_method(assignments: List[List[Tuple[int, int]]], suitability_matrix: List[List[float]]) -> Tuple[List[int], List[int]]:
    """
    Uses the Condorcet method to rank assignments by comparing each assignment in a pairwise manner.
    
    Parameters:
    - assignments: A list of assignments, where each assignment is a list of (robot, task) pairs.
    - suitability_matrix: A 2D list where the element at [i][j] represents the suitability of robot i for task j.
    
    Returns:
    - A tuple containing:
      1. A list of pairwise win counts for each assignment.
      2. A list of indices representing the ranking of assignments based on pairwise win counts.
    """
    num_assignments = len(assignments)
    num_robots = len(suitability_matrix)

    # Initialize a matrix to track pairwise wins for each assignment comparison
    pairwise_wins = [0] * num_assignments

    # Compare each pair of assignments
    for i in range(num_assignments):
        for j in range(i + 1, num_assignments):
            # Count the number of robots that prefer assignment i over assignment j and vice versa
            i_wins, j_wins = 0, 0
            
            for robot in range(num_robots):
                # Find the task assigned to the robot in both assignments
                task_i = next((task for r, task in assignments[i][0] if r == robot), None)
                task_j = next((task for r, task in assignments[j][0] if r == robot), None)
                
                # Get suitability scores (or 0 if unassigned)
                suitability_i = suitability_matrix[robot][task_i] if task_i is not None else 0
                suitability_j = suitability_matrix[robot][task_j] if task_j is not None else 0

                # Determine which assignment the robot prefers
                if suitability_i > suitability_j:
                    i_wins += 1
                elif suitability_j > suitability_i:
                    j_wins += 1

            # Update pairwise win counts based on robot preferences
            if i_wins > j_wins:
                pairwise_wins[i] += 1
            elif j_wins > i_wins:
                pairwise_wins[j] += 1

    # Rank assignments based on pairwise win counts
    ranked_assignments = sorted(range(num_assignments), key=lambda k: pairwise_wins[k], reverse=True)

    return pairwise_wins, ranked_assignments

In [12]:
def simulate_time_step(robots: List[CapabilityProfile], tasks: List[TaskDescription], unassigned_robots: List[int], time_step: float = 1.0) -> int:
    """
    Simulates a single time step of the simulation, updating robot positions and task progress.
    
    Parameters:
    - robots: List of all robots.
    - tasks: List of all tasks.
    - time_step: The time increment for the simulation step, in seconds (default is 1.0).
    """
    count = 0
    for robot in robots:
#         print(robot.robot_id)
        if robot.current_task is not None:
            # Get the assigned task
            task = robot.current_task
#             print(task.task_id, robot.remaining_distance, robot.time_on_task, task.time_to_complete)

            # Calculate distance to move in this time step
            distance_to_travel = robot.max_speed * time_step
            if robot.remaining_distance > 0:
                # Move the robot towards the task
                robot.remaining_distance -= distance_to_travel
                if robot.remaining_distance <= 0:
                    # The robot has reached the task
                    robot.remaining_distance = 0
                    robot.time_on_task = 0  # Reset time on task

            # If the robot is at the task, increment time on task
            if robot.remaining_distance == 0:
                robot.time_on_task += time_step
                # Check if the task is completed
                if robot.time_on_task >= task.time_to_complete:
                    # Mark task as completed
#                     print_robots(robots)
#                     print_tasks(tasks)
                    robot.current_task = None
                    unassigned_robots.append(robot.robot_id)
                    print(f"Robot {robot.robot_id} completed task {task.task_id}.")
                    tasks.remove(task)
        else:
            count += 1
    return count

def add_new_tasks(tasks: List[TaskDescription], unassigned_tasks: List[str], task_max_id: int, new_task_count: int) -> int:
    """Adds new tasks to the system."""
    for _ in range(new_task_count):
        task_id = f"T{task_max_id}"
        task_max_id += 1
        new_task = generate_random_task_description(task_id)
        tasks.append(new_task)
        unassigned_tasks.append(task_id)
        print(f"New task {new_task.task_id} added.")
    return task_max_id

def add_new_robots(robots: List[CapabilityProfile], unassigned_robots: List[str], robot_max_id: int, new_robot_count: int) -> int:
    """Adds new robots to the system."""
    for _ in range(new_robot_count):
        robot_id = f"R{robot_max_id}"
        robot_max_id += 1
        new_robot = generate_random_robot_profile(robot_id)
        robots.append(new_robot)
        unassigned_robots.append(robot_id)
        print(f"New robot {new_robot.robot_id} added.")
    return robot_max_id

def remove_random_robots(robots: List[CapabilityProfile], tasks: List[TaskDescription], unassigned_robots: List[str], unassigned_tasks: List[str], count: int):
    """Randomly removes robots from the system."""
    for _ in range(min(count, len(robots))):
        robot_to_remove = random.choice(robots)
        if robot_to_remove.current_task == None:
            unassigned_robots.remove(robot_to_remove.robot_id)
        else:
            task_id = robot_to_remove.current_task.task_id
            t_index = [task.task_id for task in tasks ].index(task_id)
            tasks[t_index].assigned_robot = None
            unassigned_tasks.append(tasks[t_index].task_id)
        robots.remove(robot_to_remove)
        print(f"Robot {robot_to_remove.robot_id} left the system.")

def reassign_robots_to_tasks(robots: List[CapabilityProfile], tasks: List[TaskDescription], num_candidates: int, voting_method: str, suitability_method: str, unassigned_robots: List[str], unassigned_tasks: List[str]):
    """Tries to assign unassigned robots to unassigned tasks."""
#     print(unassigned_robots, unassigned_tasks)
    urobots = [robot for robot in robots if robot.robot_id in unassigned_robots]
    utasks = [task for task in tasks if task.task_id in unassigned_tasks]
    output, score = main(urobots, utasks, num_candidates, voting_method, suitability_method)
    assigned_pairs = output[0]
    unassigned_robots = [urobots[val].robot_id for val in output[1]]
    unassigned_tasks = [utasks[val].task_id for val in output[2]]
    for pair in assigned_pairs:
        r_index = [robot.robot_id for robot in robots ].index(urobots[pair[0]].robot_id)
        t_index = [task.task_id for task in tasks ].index(utasks[pair[1]].task_id)
        robots[r_index].current_task = tasks[t_index]
        robots[r_index].remaining_distance = ((robots[r_index].location[0] - tasks[t_index].location[0]) ** 2 + (robots[r_index].location[1] - tasks[t_index].location[1]) ** 2) ** 0.5
        tasks[t_index].assigned_robot = robots[r_index]
    return unassigned_robots, unassigned_tasks

In [16]:
def print_robots(robots: List[CapabilityProfile]):
    for robot in robots:
        print(robot.robot_id)
        
def print_tasks(tasks: List[TaskDescription]):
    for task in tasks:
        print(task.task_id)

def main(robots: List[CapabilityProfile], tasks: List[TaskDescription], num_candidates: int, voting_method: str, suitability_method: str)  -> List[Tuple[int, int]]:
#     robots = [generate_random_robot_profile(f"R{i+1}") for i in range(num_robots)]
#     tasks = [generate_random_task_description(f"T{i+1}") for i in range(num_tasks)]
    num_robots = len(robots)
    num_tasks = len(tasks)
    # Display generated robots and tasks
#     print("\nGenerated Robot Profiles:")
#     for robot in robots:
#         print(robot)

#     print("\nGenerated Task Descriptions:")
#     for task in tasks:
#         print(task)

    # Create a 2D matrix to store suitability scores
    suitability_matrix = np.zeros((num_robots, num_tasks))

    # Evaluate suitability of each robot for each task and store it in the matrix
#     print("\nEvaluating Suitability:")
    for i, robot in enumerate(robots):
        for j, task in enumerate(tasks):
            suitability_score = globals()[suitability_method](robot, task)
            suitability_matrix[i][j] = suitability_score
#             print(f"Suitability of {robot.robot_id} for {task.task_id}: {suitability_score:.2f}")

    # Print the suitability matrix
#     print("\nSuitability Matrix:")
#     print(suitability_matrix)
    
    random_assignments = generate_random_assignments(num_robots, num_tasks, num_candidates)
#     print("\nRandom Complete Assignments of Robots to Tasks:")
#     for i, assignment in enumerate(random_assignments):
#         print(f"Assignment {i+1}: {assignment}")
#         total_score = calculate_total_suitability(assignment[0], suitability_matrix)
#         print(f"Total Suitability Score for the Assignment: {total_score:.2f}")

    total_scores, assignment_ranking = globals()[voting_method](random_assignments, suitability_matrix)

#     print("\nTotal Suitability Scores for Each Assignment:")
#     for i, score in enumerate(total_scores):
#         print(f"Assignment {i+1}: {score:.2f}")

#     print("\nRanking of Assignments (from best to worst):")
#     for rank, assignment_index in enumerate(assignment_ranking):
#         print(f"Rank {rank+1}: Assignment {assignment_index+1}")
    
#     print(random_assignments[assignment_ranking[0]])
    return random_assignments[assignment_ranking[0]], total_scores[assignment_ranking[0]]

def main_simulation(num_robots: int, num_tasks: int, num_candidates: int, voting_method: str, suitability_method: str, max_time_steps: int, add_tasks: bool, add_robots: bool, remove_robots: bool, tasks_to_add: int = 1, robots_to_add: int = 1, robots_to_remove: int = 1):
    robots = [generate_random_robot_profile(f"R{i+1}") for i in range(num_robots)]
    tasks = [generate_random_task_description(f"T{i+1}") for i in range(num_tasks)]
    robot_max_id = num_robots+1
    task_max_id = num_tasks+1
    start = time.perf_counter_ns()
    output, score = main(robots,tasks,num_candidates,voting_method,suitability_method)
    end = time.perf_counter_ns()
    length = (end - start) / 1000.0
    print(num_robots, num_tasks, num_candidates, voting_method, suitability_method, score, length)
    assigned_pairs = output[0]
    unassigned_robots = [robots[val].robot_id for val in output[1]]
    unassigned_tasks = [tasks[val].task_id for val in output[2]]
#     print(unassigned_robots, unassigned_tasks, task_max_id)
    for pair in assigned_pairs:
        robots[pair[0]].current_task = tasks[pair[1]]
        robots[pair[0]].remaining_distance = ((robots[pair[0]].location[0] - tasks[pair[1]].location[0]) ** 2 + (robots[pair[0]].location[1] - tasks[pair[1]].location[1]) ** 2) ** 0.5
        tasks[pair[1]].assigned_robot = robots[pair[0]]
    
    for time_step in range(max_time_steps):
        print(f"\n--- Time Step {time_step + 1} ---")

        # Simulate time step
        status = simulate_time_step(robots, tasks, unassigned_robots)
#         print(status, len(robots), unassigned_robots, unassigned_tasks)
        # Periodically add new tasks and robots
        if add_tasks:
            task_max_id = add_new_tasks(tasks, unassigned_tasks, task_max_id, random.randint(0, tasks_to_add))
        if add_robots:
            robot_max_id = add_new_robots(robots, unassigned_robots, robot_max_id, random.randint(0, robots_to_add))

        # Periodically remove robots
        if remove_robots:
            remove_random_robots(robots, tasks, unassigned_robots, unassigned_tasks, random.randint(0, robots_to_remove))

        # Reassign unassigned robots to unassigned tasks
        if len(unassigned_robots) > 0 and len(unassigned_tasks) > 0:
            unassigned_robots, unassigned_tasks = reassign_robots_to_tasks(robots, tasks, num_candidates, voting_method, suitability_method, unassigned_robots, unassigned_tasks)

if __name__ == "__main__":
#     winner = main(100,50)
#     voting_methods = ["rank_assignments_borda", "rank_assignments_approval", "rank_assignments_majority_judgment", "rank_assignments_cumulative_voting", "rank_assignments_condorcet_method", "rank_assignments_range"]
    voting_methods = ["rank_assignments_range"]
    suitability_methods = ["evaluate_suitability_loose", "evaluate_suitability_strict", "evaluate_suitability_distance", "evaluate_suitability_priority"]
    for i in [100]:
        for j in [100]:
            for vm in voting_methods:
                for sm in suitability_methods:
                    for k in range(0,20):
                        main_simulation(i, j, j**2, vm, sm, 0, False, False, False)


100 100 10000 rank_assignments_range evaluate_suitability_loose 995.3326998182771 1963429.8
100 100 10000 rank_assignments_range evaluate_suitability_loose 1007.3797444884118 1687996.6
100 100 10000 rank_assignments_range evaluate_suitability_loose 988.2816131822253 2245757.1
100 100 10000 rank_assignments_range evaluate_suitability_loose 1034.6218711477566 2355871.9
100 100 10000 rank_assignments_range evaluate_suitability_loose 1050.848631267186 2487335.2
100 100 10000 rank_assignments_range evaluate_suitability_loose 1063.3944741153878 2202747.9
100 100 10000 rank_assignments_range evaluate_suitability_loose 1000.987498618428 2513368.1
100 100 10000 rank_assignments_range evaluate_suitability_loose 970.1495169602498 2474621.5
100 100 10000 rank_assignments_range evaluate_suitability_loose 1023.24087499912 2478228.8
100 100 10000 rank_assignments_range evaluate_suitability_loose 1013.719235427019 2441302.2
100 100 10000 rank_assignments_range evaluate_suitability_loose 1066.083878598