# 指派问题简易环境



In [20]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from heapq import heappush, heappop

In [49]:
from IPython.core.interactiveshell import InteractiveShell 
InteractiveShell.ast_node_interactivity = 'all'

In [73]:
# 环境参数
N_QC = 3       # 岸桥数量
N_ARMG = 2     # 场桥数量
N_VEHICLE = 3  # 运输车数量

# 距离矩阵，大小为：N_QC + N_ARMG + 1，最后一行/列为车辆起始位置
DIST_MAT = np.array([
    [0, 1, 2, 4, 6, 3],
    [1, 0, 1, 4, 4, 3],
    [2, 1, 0, 6, 4, 3],
    [4, 4, 6, 0, 1, 3],
    [6, 4, 4, 1, 0, 3],
    [3, 3, 3, 3, 3, 0]
])
assert DIST_MAT.shape == (N_QC + N_ARMG + 1, N_QC + N_ARMG + 1)

QC_PREPARE_TIME = 30    # 岸桥装/卸箱准备时间
QC_LIFT_TIME = 10       # 岸桥吊装时间
ARMG_PREPARE_TIME = 20  # 场桥装/卸箱准备时间
ARMG_LIFT_TIME = 6      # 场桥吊装时间

MIN_IDLE_TIME = 5       # 车辆最短闲置时间

In [75]:
class Simulation:
    """
    仿真类负责计算各个设备完成任务的时间，环境依赖仿真推进任务队列的执行。
    """
    def __init__(self):
        self.equip_state = {i: [] for i in range(N_QC + N_ARMG)}
        self.vehicle_position = [N_QC + N_ARMG] * N_VEHICLE
        
        self.future_events = [(0, i) for i in range(N_VEHICLE)]
        self.now = 0
    
    def current_state(self):
        return {
            "equip_state": self.equip_state,
            "vehicle_position": self.vehicle_position,
            "future_events": self.future_events,
            "now": self.now
        }
    
    def peek_future(self):
        return self.future_events[0]
    
    def predict_travel_time(self, i_vehicle, i_equip):
        vehicle_pos = self.vehicle_position[i_vehicle]
        return DIST_MAT[vehicle_pos, i_equip] * 10
    
    def get_equip_state(self, i_equip):
        state = self.equip_state[i_equip]
        if len(state) == 0:
            return 0
        return state[-1]
    
    def do_idle(self, i_vehicle):
        recycle_time = self.predict_travel_time(self.vehicle_position[i_vehicle], len(DIST_MAT) - 1)
        idle_time = max(recycle_time, MIN_IDLE_TIME)
        self.vehicle_position[i_vehicle] = len(DIST_MAT) - 1
        return self.now + idle_time
    
    def do_task(self, task, i_vehicle, phase=1):
        i_equip = task[0] if phase == 0 else task[1]
        
        last_time = self.get_equip_state(i_equip)
        prepare_time = QC_PREPARE_TIME if i_equip < N_QC else ARMG_PREPARE_TIME
        lift_time = QC_LIFT_TIME if i_equip < N_QC else ARMG_LIFT_TIME
        if phase == 0:
            arrival_time = self.now + self.predict_travel_time(i_vehicle, i_equip)
        else:
            depart_time = self.do_task(task, i_vehicle, 0)
            arrival_time = depart_time + self.predict_travel_time(i_vehicle, i_equip)
        finish_time = max(last_time + prepare_time, arrival_time) + lift_time
        
        self.equip_state[i_equip].append(finish_time)
        self.vehicle_position[i_vehicle] = i_equip
        return finish_time
        
    def step(self, task):
        self.now, i_vehicle = heappop(self.future_events)
        if task is None:
            next_time = self.do_idle(i_vehicle)
        else:
            next_time = self.do_task(task, i_vehicle)
        heappush(self.future_events, (next_time, i_vehicle))

In [81]:
tasks = [
    (0, 3),
    (4, 0),
    (3, 1),
    (3, 2)
]
simulation = Simulation()
for task in tasks:
    print(f"Simulation current state: {simulation.current_state()}.")
    future = simulation.peek_future()
    print(f"Vehicle {future[1]} is available at {future[0]}s.")
    simulation.step(task)
    print(f"Task {task} is done.")
print(f"Simulation current state: {simulation.current_state()}.")
future = simulation.peek_future()
print(f"Vehicle {future[1]} back to origin at {future[0]}s.")
simulation.step(None)
print(f"Simulation current state: {simulation.current_state()}.")

Simulation current state: {'equip_state': {0: [], 1: [], 2: [], 3: [], 4: []}, 'vehicle_position': [5, 5, 5], 'future_events': [(0, 0), (0, 1), (0, 2)], 'now': 0}.
Vehicle 0 is available at 0s.
Task (0, 3) is done.
Simulation current state: {'equip_state': {0: [40], 1: [], 2: [], 3: [86], 4: []}, 'vehicle_position': [3, 5, 5], 'future_events': [(0, 1), (0, 2), (86, 0)], 'now': 0}.
Vehicle 1 is available at 0s.
Task (4, 0) is done.
Simulation current state: {'equip_state': {0: [40, 106], 1: [], 2: [], 3: [86], 4: [36]}, 'vehicle_position': [3, 0, 5], 'future_events': [(0, 2), (86, 0), (106, 1)], 'now': 0}.
Vehicle 2 is available at 0s.
Task (3, 1) is done.
Simulation current state: {'equip_state': {0: [40, 106], 1: [162], 2: [], 3: [86, 112], 4: [36]}, 'vehicle_position': [3, 0, 1], 'future_events': [(86, 0), (106, 1), (162, 2)], 'now': 0}.
Vehicle 0 is available at 86s.
Task (3, 2) is done.
Simulation current state: {'equip_state': {0: [40, 106], 1: [162], 2: [208], 3: [86, 112, 138], 

In [38]:
class Environment:
    def __init__(self, tasks):
        self.tasks = tasks
        self.action_space = np.arange(len(tasks))
        self.observation_space = np.array([])
        
        self.reset()
        
    def reset(self):
        pass
    
    def step(self):
        pass