In [1]:
import numpy as np

In [2]:
# I want a file converter that will take in VRPlib formats, and give me a usable dataset.
#I will take inspiration from the ACO one.
#First the ACO sets up a "Node" class. Like a set of parameters
class Node:
    def __init__(self, id:  int, x: float, y: float, demand: float, ready_time: float, due_time: float, service_time: float):
        super()
        self.id = id

        if id == 0:
            self.is_depot = True
        else:
            self.is_depot = False

        self.x = x
        self.y = y
        self.demand = demand
        self.ready_time = ready_time
        self.due_time = due_time
        self.service_time = service_time
#Now that I have these data categories, I want to make a function that will take the file, and distribute the data
#into these categories.
#ACO alg makes a big class that takes in a file, distributes the variables, sets up pheremone, dists etc, 
#so will forego pheremone here
class VrpGaGraph:
    def __init__(self, file_path): #rho = 0.1 maybe, or some equivalent for GA
        super()
        #maybe we should include cost_mat in the below. depends where we want to incorporate this
        self.node_num, self.nodes, self.node_dist_mat, self.vehicle_num, self.vehicle_capacity \
            = self.create_from_file(file_path)
        #self.rho = rho
        self.nnh_travel_path, _ = self.nearest_neighbor_heuristic() #maybe different for this (CW?)
        self.heuristic_info_mat = 1 / self.node_dist_mat #maybe not needed, related to fitness
    #now for the file reader
    def create_from_file(self, file_path):
        node_list = []
        with open(file_path, 'rt') as f:
            count = 1
            for line in f:
                if count == 5:
                    vehicle_num, vehicle_capacity = line.split()
                    vehicle_num = int(vehicle_num)
                    vehicle_capacity = int(vehicle_capacity)
                elif count >= 10:
                    node_list.append(line.split())
                count += 1
        node_num = len(node_list)
        nodes = list(Node(int(item[0]), float(item[1]), float(item[2]), float(item[3]), float(item[4]), float(item[5]), float(item[6])) for item in node_list)
        #so the Node function is used here, line by line, to give us a list of lists for node
        #ok now this part is very nice, as we are essentially defining the cost matrix!!!(prob will return cost matrix)
        for i in range(node_num):
            node_a = nodes[i]
            node_dist_mat[i][i] = 1e-8
            for j in range(i+1, node_num):
                node_b = nodes[j]
                node_dist_mat[i][j] = VrptwGraph.calculate_dist(node_a, node_b) #will be just input from data instead
                node_dist_mat[j][i] = node_dist_mat[i][j]

        return node_num, nodes, node_dist_mat, vehicle_num, vehicle_capacity
    #we are using non-Euclidean distances from a scraped matrix, so forego the calculate_dist function
    #forego pheremone updater. For NN heuristic, what is the point of this? Is it the first route? Maybe we can just use this
    #seems that we append zeros to the route.
    #ok now we make a function for calculating the next nearest (least costly) index
    def _cal_nearest_next_index(self, index_to_visit, current_index, current_load, current_time):
        nearest_ind = None
        nearest_distance = None

        for next_index in index_to_visit:
            if current_load + self.nodes[next_index].demand > self.vehicle_capacity:
                continue

            dist = self.node_dist_mat[current_index][next_index]
            wait_time = max(self.nodes[next_index].ready_time - current_time - dist, 0)
            service_time = self.nodes[next_index].service_time
            if current_time + dist + wait_time + service_time + self.node_dist_mat[next_index][0] > self.nodes[0].due_time:
                continue

            if current_time + dist > self.nodes[next_index].due_time:
                continue
#the below line is the crux of the nearest neighbour. if we change it to some other process,
#then we would have the desired checker for other processes
            if nearest_distance is None or self.node_dist_mat[current_index][next_index] < nearest_distance:
                nearest_distance = self.node_dist_mat[current_index][next_index]
                nearest_ind = next_index

        return nearest_ind
    #now we create the NN heuristic
    #we can get rid of time windows if we want. This returns us a single route. It adds vehicles based upon demand.
    def nearest_neighbor_heuristic(self, max_vehicle_num=None):
        index_to_visit = list(range(1, self.node_num))
        current_index = 0
        current_load = 0
        current_time = 0
        travel_distance = 0
        travel_path = [0]

        if max_vehicle_num is None:
            max_vehicle_num = self.node_num

        while len(index_to_visit) > 0 and max_vehicle_num > 0:
            nearest_next_index = self._cal_nearest_next_index(index_to_visit, current_index, current_load, current_time)

            if nearest_next_index is None:
                travel_distance += self.node_dist_mat[current_index][0]

                current_load = 0
                current_time = 0
                travel_path.append(0)
                current_index = 0

                max_vehicle_num -= 1
            else:
                current_load += self.nodes[nearest_next_index].demand

                dist = self.node_dist_mat[current_index][nearest_next_index]
                wait_time = max(self.nodes[nearest_next_index].ready_time - current_time - dist, 0)
                service_time = self.nodes[nearest_next_index].service_time

                current_time += dist + wait_time + service_time
                index_to_visit.remove(nearest_next_index)

                travel_distance += self.node_dist_mat[current_index][nearest_next_index]
                travel_path.append(nearest_next_index)
                current_index = nearest_next_index
                
        travel_distance += self.node_dist_mat[current_index][0]
        travel_path.append(0)

        vehicle_num = travel_path.count(0)-1
        return travel_path, travel_distance, vehicle_num
    
    def InsertionCost(matrix,i,k,j):
        return matrix[i][k] + matrix[k][j] - matrix[i][j]
    
    #now for the random insertion. This creates an entire tour by inserting random nodes into our "tour" and 
    #assessing if the insertion is good. The change I need to make is appending a zero to the tour where the capacity
    #and/or time constraint is violated. Possibly, I just do this posthumously. But ok, some extra care should be taken
    #to not make the tour needlessly long. Start with simple, then maybe incorporate a basic checker.
    def random_insertion(self, matrix):
        n = len(matrix) #I want to change this formulation
        nodes = set(range(0,n))
        #index_to_visit = list(range(1, self.node_num))
        #current_index = 0
        #current_load = 0
        #remaining_nodes = list(set(nodes)-set(tour))
        start_node = random.randint(1,n-1)
        tour =[0,start_node,0]
    
        while len(tour) < n+1:
            best_cost = np.Inf
            tour_set  = set(tour)
            remaining_nodes = nodes - tour_set
            k = random.choice(list(remaining_nodes))

            for i in range(0,len(tour)-1):
                    j=i+1
                    
#we take random k, nodes are enumerated, then with current tour we insert k. Ah, this algorithm builds from the 
#back, so it keeps redefining start and end until no further index is appropriate. So add from back until exceed cost,
#then append zero and continue counting from the back, but subtract the demand until the first zero, and so on...
                    cost = InsertionCost(matrix,tour[i],k,tour[j])
                    if cost < best_cost:
                        best_cost = cost
                        start = i
                        end = j
            tour = tour[:start+1]+[k]+tour[end:] 
        current_load = 0
        index = 1
        while tour[index] != 0:
            if current_load + graph.nodes[tour[index]].demand <= graph.vehicle_capacity:
                current_load += graph.nodes[tour[index]].demand
            else:
                 tour = tour[:index] + [0] + tour[index:]
                 current_load = 0
            index += 1
        return tour
    
    def ClarkeWright(matrix):
        n = len(matrix)
#    print "size", n
        depot = 0
    
        tours = [] 
        for node in range(1,n):
            tours.append([node,depot,node])
#    print "tours", tours  
        savings=[]
    
        for i in range(0,n-1):
            for j in range(i+1,n):
                saving = matrix[0][i] + matrix[0][j] - matrix[i][j]
                savings.append((saving,i,j))
                     
        savings = sorted(savings,reverse=True)           
#    print savings           
        new_tour =[]
        while True:
            for save,i,j in savings:
                if new_tour == []:
                    tour_1 = tours[i-1]
                    tour_2 = tours[j-1]
                    new_tour = tour_1[1:]+tour_2[:-1]
                
#                print new_tour
                elif new_tour[-2] == j and i not in new_tour:
                     new_tour = new_tour[:-1]+tours[i-1][:-1]
#                 print new_tour
                elif new_tour[1] == j and i not in new_tour:
                     new_tour = tours[i-1][1:]+new_tour[1:]         
#                 print new_tour
                elif new_tour[-2] == i and j not in new_tour:
                     new_tour = new_tour[:-1]+tours[j-1][:-1]
#                 print new_tour
                elif new_tour[1] == i and j not in new_tour:
                     new_tour = tours[j-1][1:]+new_tour[1:]
#                 print new_tour
#   maybe can have "and doesn't exceed capacity" and and else append zero here. vehicle counter too
            if len(new_tour) >= n+1:break
        current_load = 0
        index = 1
        while new_tour[index] != 0:
            if current_load + graph.nodes[new_tour[index]].demand <= graph.vehicle_capacity:
                current_load += graph.nodes[new_tour[index]].demand
            else:
                 new_tour = new_tour[:index] + [0] + new_tour[index:]
                 current_load = 0
                 index+=1
            index += 1
        return new_tour
        

class PathMessage:
    def __init__(self, path, distance):
        if path is not None:
            self.path = copy.deepcopy(path)
            self.distance = copy.deepcopy(distance)
            self.used_vehicle_num = self.path.count(0) - 1
        else:
            self.path = None
            self.distance = None
            self.used_vehicle_num = None

    def get_path_info(self):
        return self.path, self.distance, self.used_vehicle_num
#finished with the base setup for GA

In [3]:
def CostCalculation(tour,cost_matrix):
#takes in the list version of the gene  
    tour_cost = 0
    for i in range(len(tour)-1):
        node_i = tour[i]
        node_j = tour[i+1]          
        tour_cost = tour_cost + cost_matrix[node_i][node_j]  
    return round(tour_cost,2)

3333333*33336666667

In [4]:
class intra_mutate:
    #this class functions to create mutations within a chromosome, and assess if the mutation is
    #favourable
    #there will be three functions here:
    #SwapTwo, TwoOpt, and Donate (we will see about donate)
    def SwapTwo(tour,i,k):
        first_route = tour[0:i]
        second_route = tour[i:k+1][::-1]
        third_route = tour[k+1:] 
        new_tour = list()
        new_tour = first_route + second_route + third_route
        
        return new_tour
    
    def TwoOpt(tour, cost_matrix):
        tour_vect = vectify(tour)
        best_cost = CostCalculation(tour,cost_matrix)
        improve = 0 #auxiliary variable that stops the algorithm if no improvement is found
        while improve <= 1:
            for index in tour_vect:
                for i in range(0,tour_vect[index]-1):
                    for k in range(i+1,tour_vect[index]):
                        best_cost = CostCalculation([0]+tour_vect[index]+[0],cost_matrix)# the depot (0) is not considered for swapping
                        new_tour = SwapTwo(tour_vect[index],i,k)   
                        new_cost = CostCalculation([0]+new_tour+[0],cost_matrix)
                        if (new_cost < best_cost):
                            new_tour_vect[index] = new_tour
                            best_cost = new_cost
                            improve = 0  
            improve += 1
        return listify(tour_vect)
    #maybe TwoOpt could be improved (aux variable doesn't currently do anything?)
    def donate(tour, cost_matrix):
        tour_vect = vectify(tour)
        best_cost = CostCalculation(tour, cost_matrix)
        #run through this multiple times?
        improve = 0
        while improve <= 1:
            for index in tour_vect:
                for i in range
    #maybe we won't use a donate function, but instead mutate differently: we concurrently use
    #list and vector forms of the route, without the endpoints, perform the mutations on the
    #list, vectify it, and run the check. If it's valid, we run the cost check, and sub in.

SyntaxError: invalid syntax (Temp/ipykernel_3376/3114903357.py, line 40)

In [None]:
#for inserting vans into the list we get (due to capacity)
def vans(tour):
    index = 1
    current_load = 0
    while tour[index] != 0:
        if current_load + graph.nodes[tour[index]].demand <= graph.vehicle_capacity:
            current_load += graph.nodes[tour[index]].demand
        else:
            tour = tour[:index] + [0] + tour[index:]
            current_load = 0
        index += 1
    return tour

#converting "vector" form solution into single list form
def listify(tour_vect):
    tour_list = [0]
    for index in tour_vect:
        tour_list.append(tour_vect[index]+[0])
    return tour_list

#converting list form solution into "vector" form
def vectify(tour):
    tour_vect = [[]]
    index = 1
    temp_list = []
    while index <= len(tour)-1:
        if tour[index] != 0:
            temp_list.append(tour[index])
        else:
            tour_vect.append(temp_list)
            temp_list = []
        index +=1
    return tour_vect

#check if a proposed (vector form) solution is valid 
def valid(tour_vect):
    check = True
    for index in tour_vect:
        if CostCalculation(tour_vect[index],cost_matrix) <= vehicle_capacity:
            continue
        else:
            check = False
    return check