In [181]:
from __future__ import annotations
from typing import List, Dict
import numpy as np
from heapq import heappop, heappush
import random
import operator
from random import choice, randint, choices, sample
from copy import deepcopy
MAX_COST = 1 << 31 - 1

In [182]:
class gv:
    num_vehicles = None
    num_tasks = None
    task_dict: Dict[int, Task] = None
    sp = None  # shortest path
    capacity = None
    total_demand = None

    @staticmethod
    def init(info):
        gv.num_tasks = info["num_required_edges"]
        gv.task_dict = info["task_dict"]
        gv.sp = info["sp"]
        gv.num_vehicles = info["num_vehicles"]
        gv.capacity = info["capacity"]
        gv.total_demand = info["total_demand"]


class Task:
    # ! 这是不包括重复的task和0号
    def __init__(self, no, s, t, demand):
        self.no = no
        self.s = s
        self.t = t
        self.demand = demand

    @property
    def st(self):
        """返回坐标"""
        return (self.s, self.t)

    @property
    def invert_no(self):
        if self.no == 0:
            return 0
        elif self.no <= gv.num_tasks:
            return self.no + gv.num_tasks
        else:
            return self.no - gv.num_tasks
    
    @property
    def invert_task(self):
        return gv.task_dict[self.invert_no]

    @property
    def cost(self):
        return gv.sp[self.st]
    
    def __eq__(self, other: Task):
        if self.no == 0:
            return other.no == 0
        # return self.no == other.no or abs(self.no - other.no) == gv.num_tasks
        return self.no == other.no
    
    def __hash__(self) -> int:
        """两个方向的task视为同一个"""
        return self.no + self.invert_no

    def __repr__(self):
        # return f"no.{self.no}: ({self.s}, {self.t})"
        return f"({self.s}, {self.t}): {self.demand}"


class Route:
    def __init__(self, tasks=list(), cost=0, remain_cap=None):
        """
        :param
          remain_cap: 这条路线还剩余多少容量
        """
        self.tasks: List[Task] = deepcopy(tasks)
        self.cost = cost
        self.remain_cap = remain_cap if remain_cap is not None else gv.capacity

    def append_cost(self, task: Task):
        return gv.sp[self.tasks[-1].t, task.s] + task.cost

    def append_task(self, task: Task):
        # assert self.remain_cap - task.demand >= 0  # 容量必须足够
        ac = self.append_cost(task)
        self.cost += ac
        self.remain_cap -= task.demand
        self.tasks.append(task)  # !最后才能加！！！不然-1就不是倒数第一个了
        return ac

    def insert_cost(self, idx, task: Task):
        """插入一个task需要增加多少cost"""
        # assert idx > 0  # 不能插在0位置，其实最后一个位置也不能插入(append)
        t1, t2 = self.tasks[idx - 1], self.tasks[idx]
        return gv.sp[t1.t, task.s] + task.cost + gv.sp[task.t, t2.s] - gv.sp[t1.t, t2.s]

    def insert_task(self, idx, task: Task):
        ic = self.insert_cost(idx, task)
        self.cost += ic
        self.remain_cap -= task.demand
        self.tasks.insert(idx, task)
        return ic

    def remove_cost(self, idx):
        """删除tasks[idx]会少多少cost"""
        t1 = self.tasks[idx - 1]
        t2 = self.tasks[idx]
        t3 = self.tasks[idx + 1]
        return gv.sp[t1.t, t2.s] + t2.cost + gv.sp[t2.t, t3.s] - gv.sp[t1.t, t3.s]

    def remove_task(self, task: Task=None, idx=None):
        # 当没有坐标的时候，就需要找到这个task的位置，再判断应该减去多少cost
        assert not (task is None and idx is None)
        if idx is None:
            idx = self.tasks.index(task)
        elif task is None:
            task = self.tasks[idx]
        
        rc = self.remove_cost(idx)  # 方便return
        self.cost -= rc
        self.remain_cap += task.demand
        self.tasks.pop(idx)
        return rc


    def addable(self, task: Task):
        return self.remain_cap - task.demand >= 0

    def __getitem__(self, idx):
        return self.tasks[idx]
    
    def __repr__(self):
        return str(self.__dict__)

    def __eq__(self, other: Route):
        return operator.eq(self.tasks, other.tasks)


class Solution:
    def __init__(self, routes=[], cost=0):
        # ! 需要deepcopy，否则会一直在原来的列表上累加
        self.routes: List[Route] = deepcopy(routes)
        self.cost = cost

    def add_route(self, route: Route):
        self.routes.append(route)
        self.cost += route.cost

    def calc_cost(self):
        cost = 0
        for r in self.routes:
            cost += r.cost
        self.cost = cost
        return cost
    
    def feasible(self):
        """是否是可行的，需要看不同的任务数是否够"""
        # cnt = 0
        # for r in self.routes:
        #     cnt += len(r.tasks) - 2  # 去掉开头和结尾的dummy_task
        # return cnt == gv.num_tasks
        tasks = []
        for r in self.routes:
            tasks += r.tasks
        return len(set(tasks)) - 1 == gv.num_tasks
    
    @classmethod
    def crossover(cls, s1, s2):
        s1r, s2r = s1.routes, s2.routes
        a, b = randint(0, len(s1r) - 1), randint(0, len(s2r) - 1)
        # print("crossover", a, b)

        # !!!!!!这里也要deepcopy...不然会影响原来的solution
        s0r = deepcopy(s1r[:a] + s1r[a + 1:])
        old_r, new_r = set(s1r[a]), set(s2r[b])

        common = old_r & new_r
        # duplicate可以不用double
        duplicated = list(new_r - common)
        # unserved必须double
        unserved = list(old_r - common)
        unserved = unserved + [task.invert_task for task in unserved]

        # !!!!!!!!!!!!!!!!!!!
        new_r = deepcopy(s2r[b])

        while len(duplicated) > 0:
            task = duplicated.pop()
            for route in s0r:
                if task in route.tasks:
                    
                    # 在这个route中找到了重复的task，从这个位置删除会少多少cost
                    # 这个是非crossover中重复的任务
                    i1 = route.tasks.index(task)
                    non_co = route.remove_cost(i1)
                    # 这个是crossover中重复的任务，也就是new_r route
                    i2 = new_r.tasks.index(task)
                    co = new_r.remove_cost(i2)

                    print(task, i1, i2)
                    if non_co > co:
                        # 非crossover中重复的任务少的cost更多，因此移除
                        route.remove_task(task, i1)
                    
                    else:
                        new_r.remove_task(task, i2)
                    
                    # print(task)
                    # duplicated.remove(task)
                    # duplicated.remove(task.invert_task)
                    break

        # print()
        s0r.append(new_r)
        while len(unserved) > 0:
            task = unserved.pop()
            min_cost = MAX_COST
            insert_route, insert_idx = None, None
            for route in s0r:
                # 搜索每一条route，首先看看能否添加
                if route.addable(task):
                    # 容量够了，遍历这个路线的所有位置，看看cost如何
                    # 排除第一个，最后不用-1，不然只有两个dummy_task时插不进
                    for idx in range(1, len(route.tasks)):
                        cost = route.insert_cost(idx, task)
                        if cost < min_cost:
                            min_cost = cost  # ! ...
                            insert_route, insert_idx = route, idx
            
            if insert_route is not None:
                # 找到了可以插入的位置
                insert_route.insert_task(insert_idx, task)
                # 从unserved中移除反方向的task
                unserved.remove(task.invert_task)
            
            # print(task)
            
        
        new_solu = Solution(s0r)
        new_solu.calc_cost()
        return new_solu
    
    @classmethod
    def gene_solu(cls):
        task_dict, num_vehicles = gv.task_dict, gv.num_vehicles
        
        unserved_no = [i for i in range(1, gv.num_tasks * 2 + 1)]
        solu = Solution()
        dummy_task = task_dict[0]
        for _ in range(num_vehicles):
            if len(unserved_no) == 0:
                break
            
            route = Route(tasks=[dummy_task], remain_cap=gv.capacity)  # 初始有一个0号
            # 下面的循环生成一条route
            while True:
                valid_tasks = []
                for no in unserved_no:
                    task = task_dict[no]
                    if route.addable(task):
                        valid_tasks.append(task)
                    
                if len(valid_tasks) == 0:
                    break
                    
                else: 
                    task = choice(valid_tasks)
                    route.append_task(task)

                    unserved_no.remove(task.no)
                    unserved_no.remove(task.invert_no)
            
            route.append_task(dummy_task)
            solu.add_route(route)
        
        return solu

    @classmethod
    def init_pop(cls, pop_size):
        return [cls.gene_solu() for _ in range(pop_size)]
    
    def __repr__(self):
        s = "solution cost:{}, {}\n".format(self.cost, "feasible" if self.feasible() else "infeasible")
        for route in self.routes:
            s += str(route) + "\n"
        
        return s
    
    def __getitem__(self, idx):
        return self.routes[idx]
    
    def __eq__(self, other) -> bool:
        r1 = sorted(self.routes)
        r2 = sorted(other.routes)
        return operator.eq(r1, r2)



In [183]:
def read_file(path):
    def shortest_path(graph):
        n = graph.shape[0]
        sp = np.zeros_like(graph, dtype=int)
        sp.fill(MAX_COST)
        is_visited = np.zeros((n, ), dtype=bool)

        for src in range(n):
            is_visited.fill(False)
            heap = [(0, src)]  # (distance, point)
            sp[src, src] = 0  # 

            while len(heap) > 0:
                cost, s = heappop(heap)
                if is_visited[s]: continue
                is_visited[s] = True


                for d in range(n):
                    # 没有被访问过
                    new_cost = cost + graph[s, d]
                    if not is_visited[d] and new_cost < sp[src, d]:
                        sp[src, d] = new_cost
                        
                        # 将d插入堆中
                        heappush(heap, (new_cost, d))

        return sp

    with open(path, "r") as f:
        contents = f.readlines()
    num_vertices = int(contents[1].split(":")[1])
    depot = int(contents[2].split(":")[1]) - 1  # ! 减一
    num_required_edges = int(contents[3].split(":")[1])
    num_non_required_edges = int(contents[4].split(":")[1])
    num_vehicles = int(contents[5].split(":")[1])
    capacity = int(contents[6].split(":")[1])
    # total_demand = int(contents[7].split(":")[1])  

    graph = np.zeros((num_vertices, num_vertices), dtype=int)
    graph.fill(MAX_COST)
    task_dict = {0: Task(0, 0, 0, 0)}

    total_demand = 0
    id = 1
    for c in contents[9: 9 + num_required_edges + num_non_required_edges]:
        c = [int(i) for i in c.split()]
        n1, n2 = c[0] - 1, c[1] - 1
        graph[n1, n2] = c[2]
        graph[n2, n1] = c[2]
        if c[3] != 0:
            # ! 两个方向都会放进去
            task_dict[id] = Task(id, n1, n2, c[3])
            task_dict[id + num_required_edges] = Task(id + num_required_edges, n2, n1, c[3])
            id += 1
            total_demand += c[3]

    info = {
        "num_vertices": num_vertices,
        "depot": depot,
        "num_required_edges": num_required_edges,
        "num_non_required_edges": num_non_required_edges,
        "num_vehicles": num_vehicles,
        "capacity": capacity,
        "total_demand": total_demand,
        "sp": shortest_path(graph),
        "task_dict": task_dict
    }
    return info

In [184]:
def gene_solu():
    task_dict, num_vehicles = gv.task_dict, gv.num_vehicles
    
    unserved_no = [i for i in range(1, gv.num_tasks * 2 + 1)]
    solu = Solution()
    dummy_task = task_dict[0]
    for _ in range(num_vehicles):
        if len(unserved_no) == 0:
            break
        
        route = Route(tasks=[dummy_task], remain_cap=gv.capacity)  # 初始有一个0号
        # 下面的循环生成一条route
        while True:
            valid_tasks = []
            for no in unserved_no:
                task = task_dict[no]
                if route.addable(task):
                    valid_tasks.append(task)
                
            if len(valid_tasks) == 0:
                break
                
            else: 
                task = choice(valid_tasks)
                route.append_task(task)

                unserved_no.remove(task.no)
                unserved_no.remove(task.invert_no)
        
        route.append_task(dummy_task)
        solu.add_route(route)
    
    return solu

def init_pop(pop_size):
    return [gene_solu() for _ in range(pop_size)]

In [185]:
# def test(seed):
#     info = read_file("sample1.dat")
#     gv.init(info)
#     random.seed(seed)
#     pop = init_pop(20)
#     for solu in pop:
#         print(solu.cost, solu.feasible(), end=" ")
#         # pass
#     print("-")

#     for _ in range(1):
#         co = sample(pop, k=2)
#         solu = Solution.crossover(co[0], co[1])
#         pop.append(solu)
#         pop = sorted(pop, key=lambda x: x.cost)[:20]
#     pop = sorted(pop, key=lambda x: x.cost)[:20]
#     print(pop[0])

#     return pop[0].feasible()


In [190]:
def inversion(route: Route, idx):
    """翻转route中的一个task"""
    route = deepcopy(route)
    task = route[idx]
    change_cost = 0
    change_cost -= route.remove_task(task, idx)
    change_cost += route.insert_task(idx, task)
    return route

def single_insert(solu: Solution, best_cost, tabu_list, max_try):
    best_neighbor = None
    solu = deepcopy(solu)
    # route = choice(solu.routes)
    # task = choice(route.tasks)
    remove_route_idx = randint(0, len(solu.routes) - 1)
    remove_task_idx = randint(1, len(solu[remove_route_idx].tasks))
    solu[remove_route_idx].remove_task(idx=remove_task_idx)
    
    num_try = 0
    insert_route_idx, insert_task_idx = None, None
    while num_try < max_try:
        while num_try < max_try:
            # 选一个被插入的路线
            insert_route_idx = randint(0, len(solu.routes) - 1)
            insert_task_idx = randint(1, len(solu[insert_route_idx].tasks))

            if (remove_route_idx, remove_task_idx) != (insert_route_idx, insert_task_idx) \
                and solu[remove_route_idx].addable(solu[remove_route_idx][insert_task_idx]):

                # 这个位置可以insert
                break
                
            num_try += 1 # 不能insert，尝试次数+1

        # 尝试插入
        insert_route = solu[insert_route_idx]
        insert_task = insert_route[insert_task_idx]
        inv_task = insert_task.invert_task

        insert_task_cost = insert_route.insert_task(insert_task_idx, insert_task)
        # 看看是否出现重复解
        if solu in tabu_list:
            insert_task_cost = MAX_COST
        insert_route.remove_task(insert_task_idx) # 恢复
        
        inv_task_cost = insert_route.insert_task(insert_task_idx, inv_task)
        if solu in tabu_list:
            inv_task_cost = MAX_COST
        insert_route.remove_task(insert_task_idx) # 恢复

        if insert_task_cost == MAX_COST and inv_task_cost == MAX_COST:
            # 说明两个方向的task都在禁忌表里，不能插入
            continue # 开始下一次try
        
        elif insert_task_cost < inv_task_cost:
            insert_task_cost = insert_route.insert_task(insert_task_idx, insert_task)
        
        else:
            inv_task_cost = insert_route.insert_task(insert_task_idx, inv_task)
    
        # 判断是否比原来的解更加好
        solu.calc_cost()
        if solu.cost < best_cost:
            # 加入禁忌表
            tabu_list.append(solu)
            return solu
    
    return None
    
    


In [191]:
init_pop(1)

[solution cost:35, feasible
 {'tasks': [(0, 0): 0, (7, 6): 2, (5, 4): 2, (6, 5): 2, (0, 0): 0], 'cost': 22, 'remain_cap': 0}
 {'tasks': [(0, 0): 0, (3, 2): 3, (2, 1): 3, (0, 0): 0], 'cost': 13, 'remain_cap': 0}]

In [192]:
solu1 = Solution()
route1 = Route([gv.task_dict[0]])
route1.append_task(gv.task_dict[1])
route1.append_task(gv.task_dict[2])
solu1.add_route(route1)

solu2 = Solution()
route2= Route([gv.task_dict[0]])
route2.append_task(gv.task_dict[1])
route2.append_task(gv.task_dict[2])
solu2.add_route(route2)

solu1 == solu2

True