In [1]:
# Define the operation type (e.g., "Assembly", "Inspection", etc.)
class OperationType:
    def __init__(self, name, capable_robots=None):
        self.name = name                  # e.g., "Assembly"
        self.capable_robots = capable_robots or []  # list of robot names

    def __repr__(self):
        return f"OperationType({self.name})"

# A processing step is a single stage in a product's manufacturing process.
class ProcessingStep:
    def __init__(self, step_id, operation_type, base_time):
        self.step_id = step_id                # The order of the step
        self.operation_type = operation_type  # The type of operation required
        self.base_time = base_time            # Base processing time for this step

    def get_processing_time(self, robot):
        # Calculate effective processing time based on the robot's speed coefficient for this operation type.
        speed = robot.speed_coefficient.get(self.operation_type.name, 1.0)
        return self.base_time / speed

    def __repr__(self):
        return f"ProcessingStep(id={self.step_id}, op={self.operation_type.name}, base_time={self.base_time})"

# A product defines the overall "recipe" for manufacturing an item.
class Product:
    def __init__(self, product_id, name, processing_steps):
        self.product_id = product_id
        self.name = name
        self.processing_steps = processing_steps  # List of ProcessingStep objects in order

    def __repr__(self):
        return f"Product({self.name})"

# A job is an instance of a product that must be processed.
class Job:
    def __init__(self, job_id, product):
        self.job_id = job_id
        self.product = product
        self.current_step_index = 0  # Start at the first processing step

    def next_step(self):
        if self.current_step_index < len(self.product.processing_steps):
            return self.product.processing_steps[self.current_step_index]
        else:
            return None  # All steps completed

    def complete_step(self, robot):
        step = self.next_step()
        if step:
            proc_time = step.get_processing_time(robot)
            print(f"Job {self.job_id}: Processing step {step.step_id} ({step.operation_type.name}) on {robot.name} takes {proc_time:.2f} time units.")
            self.current_step_index += 1
        else:
            print(f"Job {self.job_id} is fully completed.")

    def __repr__(self):
        return f"Job(id={self.job_id}, product={self.product.name}, current_step={self.current_step_index + 1})"

# A Robot represents an agent that can process operations.
# Each robot has a speed coefficient for each operation type it can perform.
class Robot:
    def __init__(self, name, speed_coefficient):
        self.name = name
        # speed_coefficient: dict mapping operation type name to a speed factor (e.g., {"Assembly": 1.5})
        self.speed_coefficient = speed_coefficient

    def can_process(self, operation_type):
        # A robot can process an operation if it has a speed coefficient defined for that operation type.
        return operation_type.name in self.speed_coefficient

    def __repr__(self):
        return f"Robot({self.name})"

# --- Example Usage ---
if __name__ == '__main__':
    # Define operation types.
    op_assembly = OperationType("Assembly", capable_robots=["Robot1", "Robot2"])
    op_inspection = OperationType("Inspection", capable_robots=["Robot3"])
    op_packaging = OperationType("Packaging", capable_robots=["Robot4", "Robot5"])
    op_welding   = OperationType("Welding", capable_robots=["Robot6"])

    # Define processing steps for each product.
    # Product_A: Assembly (30), Inspection (20), Packaging (15)
    steps_A = [
        ProcessingStep(step_id=1, operation_type=op_assembly, base_time=30),
        ProcessingStep(step_id=2, operation_type=op_inspection, base_time=20),
        ProcessingStep(step_id=3, operation_type=op_packaging, base_time=15)
    ]
    # Product_B: Assembly (25), Packaging (10)
    steps_B = [
        ProcessingStep(step_id=1, operation_type=op_assembly, base_time=25),
        ProcessingStep(step_id=2, operation_type=op_packaging, base_time=10)
    ]
    # Product_C: Welding (40), Assembly (30), Inspection (15), Packaging (20)
    steps_C = [
        ProcessingStep(step_id=1, operation_type=op_welding, base_time=40),
        ProcessingStep(step_id=2, operation_type=op_assembly, base_time=30),
        ProcessingStep(step_id=3, operation_type=op_inspection, base_time=15),
        ProcessingStep(step_id=4, operation_type=op_packaging, base_time=20)
    ]

    # Define three different products.
    product_A = Product(product_id=101, name="WidgetA", processing_steps=steps_A)
    product_B = Product(product_id=102, name="WidgetB", processing_steps=steps_B)
    product_C = Product(product_id=103, name="WidgetC", processing_steps=steps_C)

    # Create jobs from the products.
    job1 = Job(job_id=1, product=product_A)
    job2 = Job(job_id=2, product=product_B)
    job3 = Job(job_id=3, product=product_C)

    # Define robots with different speed coefficients.
    robot1 = Robot("Robot1", speed_coefficient={"Assembly": 1.0})
    robot2 = Robot("Robot2", speed_coefficient={"Assembly": 1.5})
    robot3 = Robot("Robot3", speed_coefficient={"Inspection": 0.8})
    robot4 = Robot("Robot4", speed_coefficient={"Packaging": 1.2})
    robot5 = Robot("Robot5", speed_coefficient={"Packaging": 1.0})
    robot6 = Robot("Robot6", speed_coefficient={"Welding": 1.3})

    # Define a mapping from operation type to a chosen robot.
    # (For simplicity, we assume a fixed assignment based on capabilities.)
    chosen_robot_for_op = {
        "Assembly": robot2,     # choose Robot2 for assembly tasks
        "Inspection": robot3,   # choose Robot3 for inspection
        "Packaging": robot4,    # choose Robot4 for packaging
        "Welding": robot6       # choose Robot6 for welding
    }

    def process_job(job):
        while (current_step := job.next_step()) is not None:
            op_name = current_step.operation_type.name
            robot = chosen_robot_for_op.get(op_name)
            if robot and robot.can_process(current_step.operation_type):
                job.complete_step(robot)
            else:
                print(f"No available robot can process {op_name} for Job {job.job_id}.")
                break
        if job.next_step() is None:
            print(f"Job {job.job_id} completed all processing steps.\n")

    # Process each job.
    process_job(job1)
    process_job(job2)
    process_job(job3)


Job 1: Processing step 1 (Assembly) on Robot2 takes 20.00 time units.
Job 1: Processing step 2 (Inspection) on Robot3 takes 25.00 time units.
Job 1: Processing step 3 (Packaging) on Robot4 takes 12.50 time units.
Job 1 completed all processing steps.

Job 2: Processing step 1 (Assembly) on Robot2 takes 16.67 time units.
Job 2: Processing step 2 (Packaging) on Robot4 takes 8.33 time units.
Job 2 completed all processing steps.

Job 3: Processing step 1 (Welding) on Robot6 takes 30.77 time units.
Job 3: Processing step 2 (Assembly) on Robot2 takes 20.00 time units.
Job 3: Processing step 3 (Inspection) on Robot3 takes 18.75 time units.
Job 3: Processing step 4 (Packaging) on Robot4 takes 16.67 time units.
Job 3 completed all processing steps.



In [2]:
# Define the operation type (only three types per the paper)
class OperationType:
    def __init__(self, name, capable_robots=None):
        self.name = name                   # e.g., "Assembly"
        self.capable_robots = capable_robots or []  # List of robot names capable of this op

    def __repr__(self):
        return f"OperationType({self.name})"

# A processing step is a single operation in the production process.
class ProcessingStep:
    def __init__(self, step_id, operation_type, base_time):
        self.step_id = step_id                   # Step order within the job
        self.operation_type = operation_type     # One of the three operation types
        self.base_time = base_time               # Base processing time for this step

    def get_processing_time(self, robot):
        # Calculate effective processing time on a given robot
        speed = robot.speed_coefficient.get(self.operation_type.name, 1.0)
        return self.base_time / speed

    def __repr__(self):
        return f"ProcessingStep(id={self.step_id}, op={self.operation_type.name}, base_time={self.base_time})"

# A product is defined by a recipe (ordered list of processing steps).
class Product:
    def __init__(self, product_id, name, processing_steps):
        self.product_id = product_id
        self.name = name
        self.processing_steps = processing_steps  # List of ProcessingStep objects

    def __repr__(self):
        return f"Product({self.name})"

# A job is an instance of a product.
class Job:
    def __init__(self, job_id, product):
        self.job_id = job_id
        self.product = product
        self.current_step_index = 0  # Start at the first step

    def next_step(self):
        if self.current_step_index < len(self.product.processing_steps):
            return self.product.processing_steps[self.current_step_index]
        else:
            return None  # All steps are complete

    def complete_step(self, robot):
        step = self.next_step()
        if step:
            proc_time = step.get_processing_time(robot)
            print(f"Job {self.job_id}: Step {step.step_id} ({step.operation_type.name}) on {robot.name} takes {proc_time:.2f} time units.")
            self.current_step_index += 1
        else:
            print(f"Job {self.job_id} is fully completed.")

    def __repr__(self):
        return f"Job(id={self.job_id}, product={self.product.name}, current_step={self.current_step_index + 1})"

# A Robot is an agent with a speed coefficient for each operation type.
class Robot:
    def __init__(self, name, speed_coefficient):
        self.name = name
        # speed_coefficient is a dictionary: e.g., {"Assembly": 1.5}
        self.speed_coefficient = speed_coefficient

    def can_process(self, operation_type):
        # Check if the robot has a defined speed for the operation type.
        return operation_type.name in self.speed_coefficient

    def __repr__(self):
        return f"Robot({self.name})"

# --- Example Usage ---
if __name__ == '__main__':
    # Define the three operation types (as per the paper)
    op_assembly   = OperationType("Assembly", capable_robots=["Robot_A", "Robot_I"])
    op_inspection = OperationType("Inspection", capable_robots=["Robot_I", "Robot_P"])
    op_packaging  = OperationType("Packaging", capable_robots=["Robot_A", "Robot_I"])

    # Define processing steps for three different products.
    # Product_A: uses all three operations in order.
    steps_A = [
        ProcessingStep(step_id=1, operation_type=op_assembly, base_time=30),
        ProcessingStep(step_id=2, operation_type=op_inspection, base_time=20),
        ProcessingStep(step_id=3, operation_type=op_packaging, base_time=15)
    ]
    # Product_B: uses Assembly then Packaging.
    steps_B = [
        ProcessingStep(step_id=1, operation_type=op_assembly, base_time=25),
        ProcessingStep(step_id=2, operation_type=op_packaging, base_time=10)
    ]
    # Product_C: uses Inspection then Packaging.
    steps_C = [
        ProcessingStep(step_id=1, operation_type=op_inspection, base_time=18),
        ProcessingStep(step_id=2, operation_type=op_packaging, base_time=12)
    ]

    # Create three products using the three operation types.
    product_A = Product(product_id=101, name="WidgetA", processing_steps=steps_A)
    product_B = Product(product_id=102, name="WidgetB", processing_steps=steps_B)
    product_C = Product(product_id=103, name="WidgetC", processing_steps=steps_C)

    # Create jobs (each is an instance of a product)
    job1 = Job(job_id=1, product=product_A)
    job2 = Job(job_id=2, product=product_B)
    job3 = Job(job_id=3, product=product_C)

    # Define robots with different speed coefficients.
    # For simplicity, we assume one robot per operation type:
    robot_A = Robot("Robot_A", speed_coefficient={"Assembly": 1.5})
    robot_I = Robot("Robot_I", speed_coefficient={"Inspection": 1.0})
    robot_P = Robot("Robot_P", speed_coefficient={"Packaging": 1.2})

    # Mapping from operation type to the chosen robot.
    chosen_robot_for_op = {
        "Assembly": robot_A,
        "Inspection": robot_I,
        "Packaging": robot_P
    }

    def process_job(job):
        while (current_step := job.next_step()) is not None:
            op_name = current_step.operation_type.name
            robot = chosen_robot_for_op.get(op_name)
            if robot and robot.can_process(current_step.operation_type):
                job.complete_step(robot)
            else:
                print(f"No available robot can process {op_name} for Job {job.job_id}.")
                break
        if job.next_step() is None:
            print(f"Job {job.job_id} completed all processing steps.\n")

    # Process each job.
    process_job(job1)
    process_job(job2)
    process_job(job3)


Job 1: Step 1 (Assembly) on Robot_A takes 20.00 time units.
Job 1: Step 2 (Inspection) on Robot_I takes 20.00 time units.
Job 1: Step 3 (Packaging) on Robot_P takes 12.50 time units.
Job 1 completed all processing steps.

Job 2: Step 1 (Assembly) on Robot_A takes 16.67 time units.
Job 2: Step 2 (Packaging) on Robot_P takes 8.33 time units.
Job 2 completed all processing steps.

Job 3: Step 1 (Inspection) on Robot_I takes 18.00 time units.
Job 3: Step 2 (Packaging) on Robot_P takes 10.00 time units.
Job 3 completed all processing steps.

