In [1]:
from data import load_data
problem = load_data(r"data\100customers\c101.txt", 8, 2, 3)

In [2]:
for customer in problem.customer_list:
    print(customer)

Customer(x=40, y=50, demand=0, time=0.0, service=0.0)
Customer(x=45, y=68, demand=10, time=11.4, service=0.075)
Customer(x=45, y=70, demand=30, time=10.3125, service=0.075)
Customer(x=42, y=66, demand=10, time=0.8125, service=0.075)
Customer(x=42, y=68, demand=10, time=9.0875, service=0.075)
Customer(x=42, y=65, demand=10, time=0.1875, service=0.075)
Customer(x=40, y=69, demand=20, time=7.7625, service=0.075)
Customer(x=40, y=66, demand=20, time=2.125, service=0.075)
Customer(x=38, y=68, demand=20, time=3.1875, service=0.075)


In [27]:
from problem import Problem
from population import Individual
def extract_routes(chromosome, problem: Problem):
    truck_routes = []
    drone_routes = []
    tmp_truck_route = []
    tmp_drone_route = []

    k = problem.number_customer + 1
    d = problem.number_customer + problem.number_of_trucks

    for i in range(len(chromosome[0])):
        if chromosome[0][i] >= k and chromosome[0][i] < d:
            truck_routes.append(tmp_truck_route)
            tmp_truck_route = []
            continue
        elif (
            chromosome[0][i] >= d
            and chromosome[0][i] < d + problem.number_of_drones - 1
        ):
            drone_routes.append(tmp_drone_route)
            tmp_drone_route = []
            continue

        if chromosome[1][i] == 0:
            tmp_truck_route.append(chromosome[0][i])
        elif chromosome[1][i] == 1:
            tmp_drone_route.append(chromosome[0][i])

    truck_routes.append(tmp_truck_route)
    drone_routes.append(tmp_drone_route)
    return truck_routes, drone_routes

def repair_capacity(chromosome, problem: Problem):
    for i in range(len(chromosome[0])):
        if chromosome[1][i] == 1:
            if problem.customer_list[chromosome[0][i]].quantity > problem.drone_capacity:
                chromosome[1][i] = 0
    return chromosome


def find_nearest_customer(customer_drone_list, customer_truck_list, distance_drone_matrix):
    nearest_dict = {}

    for customer in customer_drone_list:
        distances = [
            (other, distance_drone_matrix[customer][other])
            for other in customer_truck_list if other != customer
        ]
        distances.append((0, distance_drone_matrix[0][customer]))
        distances.sort(key=lambda x: x[1])
        nearest_dict[customer] = [other for other, _ in distances]

    return nearest_dict


def update_nearest_dict(nearest_dict, customer_remove, distance_drone_matrix):
    if customer_remove in nearest_dict:
        del nearest_dict[customer_remove]

    for key, value in nearest_dict.items():
        dist_remove = distance_drone_matrix[key][customer_remove]

        inserted = False
        new_list = []
        for other in value:
            dist_other = distance_drone_matrix[key][other]
            if not inserted and dist_remove < dist_other:
                new_list.append(customer_remove)
                inserted = True
            new_list.append(other)
        if not inserted:
            new_list.append(customer_remove)

        nearest_dict[key] = new_list

    return nearest_dict


import numpy as np
class Customer:
    def __init__(self, arrive_time, quantity, service_time, x, y):
        self.arrive_time = arrive_time
        self.quantity = quantity
        self.service_time = service_time
        self.x = x
        self.y = y
    def __repr__(self):
        return f"Customer(x={self.x}, y={self.y}, demand={self.quantity}, time={self.arrive_time}, service={self.service_time})"


class Truck_Solution:
    def __init__(self, assigned_customers, recived_truck, recived_drone, arrive_time, leave_time):
        self.assigned_customers = assigned_customers
        self.recived_truck = recived_truck
        self.recived_drone = recived_drone
        self.arrive_time = arrive_time
        self.leave_time = leave_time

class Drone_Trip:
    def __init__(self, assigned_customers, recived_drone, arrive_time, leave_time):
        self.assigned_customers = assigned_customers
        self.recived_drone = recived_drone
        self.arrive_time = arrive_time
        self.leave_time = leave_time

class Drone_Solution:
    def __init__(self, num_of_trips, trip_list):
        self.num_of_trips = num_of_trips
        self.trip_list = trip_list

        
class Problem:
    def __init__(self, number_customer, customer_list, number_of_trucks, number_of_drones, distance_matrix_truck,
                distance_matrix_drone, truck_capacity, drone_capacity, drone_energy,
                speed_of_truck, speed_of_drone, launch_time, land_time,
                energy_consumption_rate, weight_of_drone,
                cost_truck_unit, cost_drone_unit, fix_truck_cost, fix_drone_cost):
        self.number_customer = number_customer
        self.customer_list = customer_list
        self.number_of_trucks = number_of_trucks
        self.number_of_drones = number_of_drones
        self.distance_matrix_truck = distance_matrix_truck
        self.distance_matrix_drone = distance_matrix_drone
        self.truck_capacity = truck_capacity
        self.drone_capacity = drone_capacity
        self.drone_energy = drone_energy
        self.speed_of_truck = speed_of_truck
        self.speed_of_drone = speed_of_drone
        self.launch_time = launch_time
        self.land_time = land_time
        self.energy_consumption_rate = energy_consumption_rate
        self.weight_of_drone = weight_of_drone
        self.cost_truck_unit = cost_truck_unit
        self.cost_drone_unit = cost_drone_unit
        self.fix_truck_cost = fix_truck_cost
        self.fix_drone_cost = fix_drone_cost

    def check_capacity_truck_contraint(self, truck_solution: Truck_Solution):
        customer_length = len(truck_solution.assigned_customers)
        if customer_length == 0:
            return True
        total_goods = 0
        for i in range(customer_length):
            total_goods = total_goods + truck_solution.recived_truck[i] + truck_solution.recived_drone[i]
        if total_goods > self.truck_capacity:
            return False
        return True
    
    def check_capacity_drone_trip_constraint(self, drone_trip: Drone_Trip):
        if len(drone_trip.assigned_customers) == 0:
            return True
        total_goods = 0
        for i in range(len(drone_trip.assigned_customers)):
            total_goods = total_goods + drone_trip.recived_drone[i]
        if total_goods > self.drone_capacity:
            return False
        return True
    
    def check_capacity_drone_constraint(self, drone_solution: Drone_Solution):
        for i in range(drone_solution.num_of_trips):
            if self.check_capacity_drone_trip_constraint(drone_solution.trip_list[i]) == False:
                return False
        return True
    

    def check_energy_drone(self, drone_trip: Drone_Trip):
        number_point = len(drone_trip.assigned_customers)
        if number_point == 0:
            return True
        total_energy = 0
        total_goods = 0
        for i in range(number_point - 1):
            u, u_next = drone_trip.assigned_customers[i], drone_trip.assigned_customers[i+1]
            total_energy = total_energy + self.energy_consumption_rate*(self.weight_of_drone + total_goods)*(self.distance_matrix_drone[u][u_next]/self.speed_of_drone + self.launch_time)
            if drone_trip.recived_drone[i] != 0:
                wait_time = max(self.customer_list[u_next].arrive_time - drone_trip.arrive_time[i], 0)
                total_energy = total_energy + self.energy_consumption_rate*(self.weight_of_drone + total_goods)*(self.land_time + wait_time)
                total_goods = total_goods + drone_trip.recived_drone[i]
                total_energy = total_energy + self.energy_consumption_rate*(self.weight_of_drone + total_goods)*self.customer_list[u_next].service_time
            else:
                wait_time = max(0, drone_trip.leave_time[i+1] - drone_trip.arrive_time[i+1])
                total_energy = total_energy + self.energy_consumption_rate*(self.weight_of_drone + total_goods)*wait_time
        if total_energy > self.drone_energy:
            return False
        else:
            return True
        
    def cal_truck_cost(self, truck_solution: Truck_Solution):
        if len(truck_solution.assigned_customers) == 0:
            return 0
        else:
            total_cost =self.fix_truck_cost + self.distance_matrix_truck[0][truck_solution.assigned_customers[0]]*self.cost_truck_unit
            for i in range(len(truck_solution.assigned_customers)-1):
                total_cost = total_cost + self.distance_matrix_truck[truck_solution.assigned_customers[i]][truck_solution.assigned_customers[i+1]]*self.cost_truck_unit
            return total_cost
        
    def cal_drone_trip_cost(self, drone_trip: Drone_Trip):
        num_point = len(drone_trip)
        if num_point == 0:
            return 0
        else:
            total_cost = 0
            for i in range(num_point - 1):
                total_cost = total_cost + self.distance_matrix_drone[drone_trip.assigned_customers[i]][drone_trip.assigned_customers[i+1]]*self.cost_drone_unit
            return total_cost
        
    def cal_drone_cost(self, drone_solution: Drone_Solution):
        if drone_solution.num_of_trips == 0:
            return 0
        total_cost =  self.fix_drone_cost
        for trip in drone_solution.trip_list:
            total_cost = total_cost + self.cal_drone_trip_cost(trip)
        return total_cost
    
    def cal_total_cost(self, truck_solution_list, drone_solution_list):
        total_cost = 0
        for truck_solution in truck_solution_list:
            total_cost = total_cost + self.cal_truck_cost(truck_solution)
        for drone_solution in drone_solution_list:
            total_cost = total_cost + self.cal_drone_cost(drone_solution)
        return total_cost
    

    def customer_wait_max(self, truck_solution_list, drone_solution_list):
        customer_wait_time = np.zeros(len(self.customer_list))
        for truck_solution in truck_solution_list:
            num_point = len(truck_solution.assigned_customers)
            if num_point == 0:
                continue
            else:
                for i in range(num_point):
                    if truck_solution.recived_truck[i] == 0:
                        continue
                    else:
                        customer = truck_solution.assigned_customers[i]
                        customer_wait_time[customer] = max(truck_solution.arrive_time[i] - self.customer_list[customer].arrive_time, 0)
        
        for drone_solution in drone_solution_list:
            for drone_trip in drone_solution:
                num_point = len(drone_trip.assigned_customers)
                if num_point == 0:
                    continue
                else:
                    for i in range(num_point):
                        if drone_trip.recived_drone[i] == 0:
                            continue
                        else:
                            customer = drone_trip.assigned_customers[i]
                            customer_wait_time[customer] = max(drone_trip.arrive_time[i] - self.customer_list[customer].arrive_time, 0)
        return np.max(customer_wait_time)
    
    def cal_distance_truck(self, truck_solution: Truck_Solution):
        total_distance = 0
        num_point = len(truck_solution)
        if num_point == 0:
            return total_distance
        else:
            total_distance = total_distance + self.distance_matrix_truck[0][truck_solution.assigned_customers[0]] + self.distance_matrix_truck[truck_solution.assigned_customers[-1]][0]
            for i in range(num_point):
                u, u_next = truck_solution.assigned_customers[i], truck_solution.assigned_customers[i+1]
                total_distance = total_distance + self.distance_matrix_truck[u][u_next]
            return total_distance
        
    def cal_truck_fairness(self, truck_soltuon_list):
        num_truck = len(truck_soltuon_list)
        distance_list = np.zeros(num_truck)
        for i in range(num_truck):
            distance_list = self.cal_distance_truck(truck_soltuon_list[i])
        return np.std(distance_list)


# def find_routes(chromosome, problem: Problem):
#     '''
#     trả về các các route cho từng truck và drone, mỗi truck đi tối đa 1 route, drone có thể nhiều trip và mỗi trip có thể giao cho nhiều khách hàng, miễn là đảm bảo ràng buộc energey và capacity, giả sử mỗi lần drone land sẽ được thay pin ngay lập tức"

#     sử dụng hàm extract_routes để biết được tập khách hàng phân cho truck và drone, sau đó đi tìm các điểm launch và land của drone,
    

#     '''


# def repair_based_nearest(chromosome, customer_drone_list, customer_truck_list, problem: Problem):
#     nearest_dict = find_nearest_customer(customer_drone_list, customer_truck_list, problem.distance_matrix_truck)
#     for i in range(len(chromosome[0])):
#         if chromosome[1][i] == 1:
#             customer = chromosome[0][i]
#             min_distance = 



In [23]:
def find_routes(chromosome, problem: Problem):
    """
    Finds the routes for each truck and trips for each drone based on the
    given chromosome and problem parameters, while checking all constraints.
    """
    # Step 1: Repair the chromosome based on drone capacity
    chromosome = repair_capacity(chromosome, problem)
    
    # Step 2: Extract initial routes after repair
    truck_routes, drone_routes = extract_routes(chromosome, problem)
    
    # Step 3: Process Truck Routes
    truck_solutions = []
    # Simplified truck route creation. In a full model, this would
    # involve calculating arrival/leave times and validating capacity.
    for route in truck_routes:
        if route:
            full_route = [0] + route + [0]
            recived_truck = [problem.customer_list[c].quantity for c in route]
            recived_drone = [0] * len(route)
            arrive_time = [0] * len(full_route)
            leave_time = [0] * len(full_route)
            
            # Additional validation and time calculation would be here.
            truck_solutions.append(
                Truck_Solution(
                    assigned_customers=full_route,
                    recived_truck=recived_truck,
                    recived_drone=recived_drone,
                    arrive_time=arrive_time,
                    leave_time=leave_time
                )
            )
        else:
            truck_solutions.append(Truck_Solution([], [], [], [], []))

    # Step 4: Process Drone Trips with full validation
    drone_solutions = []
    # This is a more detailed approach for forming drone trips.
    for route in drone_routes:
        trip_list = []
        if route:
            # Sort drone customers by some heuristic, e.g., distance to depot or nearest truck customer
            # For simplicity, we'll process in the given order.
            
            truck_customers = [c for r in truck_routes for c in r]
            
            for customer in route:
                # Find the best launch point for this customer
                best_trip = None
                
                # Consider Depot as a potential launch/land point
                potential_launch_points = [0] + truck_customers
                
                for launch_point in potential_launch_points:
                    # A drone trip must have a land point
                    # Consider all other truck customers as potential land points
                    potential_land_points = [0] + [c for c in truck_customers if c != launch_point]
                    
                    for land_point in potential_land_points:
                        # Calculate the trip (launch -> customer -> land)
                        trip_customers = [launch_point, customer, land_point]
                        recived_drone = [0, problem.customer_list[customer].quantity, 0]
                        # Placeholder times - real calculation needed
                        arrive_time = [0] * len(trip_customers)
                        leave_time = [0] * len(trip_customers)

                        current_trip = Drone_Trip(trip_customers, recived_drone, arrive_time, leave_time)
                        
                        # Check all constraints
                        if problem.check_capacity_drone_trip_constraint(current_trip) and \
                           problem.check_energy_drone(current_trip):
                            # This is a valid trip. Calculate cost and find the best one.
                            # For simplicity, we'll just take the first valid one.
                            best_trip = current_trip
                            break  # Found a valid land point, break inner loop
                    if best_trip:
                        break # Found a valid launch point, break outer loop

                if best_trip:
                    trip_list.append(best_trip)
                else:
                    # If no valid trip can be formed for this customer,
                    # re-assign to truck. This is a crucial repair step.
                    # This logic needs to be robust, potentially modifying the original chromosome.
                    # For this example, we'll simulate adding it to the first truck route.
                    truck_routes[0].append(customer)
        
        drone_solutions.append(
            Drone_Solution(
                num_of_trips=len(trip_list),
                trip_list=trip_list
            )
        )
    
    # After potentially re-assigning customers, re-build truck solutions
    # to include the newly assigned customers.
    # This part would need to be re-run or integrated properly.
    
    return truck_solutions, drone_solutions

In [28]:
truck_solutions, drone_solutions = find_routes(chromosome, problem)

In [29]:
print(truck_solutions)
print(drone_solutions)

[<__main__.Truck_Solution object at 0x000002601226D220>, <__main__.Truck_Solution object at 0x000002601226F560>]
[<__main__.Drone_Solution object at 0x000002601214A1E0>, <__main__.Drone_Solution object at 0x000002601214B260>, <__main__.Drone_Solution object at 0x000002601214B2C0>]


In [33]:
for truck_route in truck_solutions:
    print(truck_route.assigned_customers)
    print(truck_route.recived_truck)
    print(truck_route.recived_drone)
    print(truck_route.arrive_time)
    print(truck_route.leave_time)
    print("**********************")

[0, 1, 3, 4, 0]
[10, 10, 10]
[0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0]
**********************
[0, 6, 2, 0]
[20, 30]
[0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0]
**********************


In [40]:
for drone_trips in drone_solutions:
    for trip in drone_trips.trip_list:
        print(trip.assigned_customers)
        print(trip.recived_drone)
        print(trip.arrive_time)
        print(trip.leave_time)
        print("*********************")

[0, 7, 0]
[0, 20, 0]
[0, 0, 0]
[0, 0, 0]
*********************
[0, 5, 0]
[0, 10, 0]
[0, 0, 0]
[0, 0, 0]
*********************
[0, 8, 0]
[0, 20, 0]
[0, 0, 0]
[0, 0, 0]
*********************


In [22]:
chromosome = [[1, 10, 3, 7, 4, 9, 5, 6, 11, 8, 2], [0,  0, 0, 1, 0, 0, 1, 0,  0, 1, 0]]
truck_routes, drone_trips = extract_routes(chromosome, problem)

print("assigned_truck:", truck_routes)
print("assgined_drone", drone_trips)

nearest_dict = find_nearest_customer([7,5, 8], [1,2,3,4,6], problem.distance_matrix_drone)
print(nearest_dict)
nearest_dict = update_nearest_dict(nearest_dict, 5, problem.distance_matrix_drone)
print(nearest_dict)

assigned_truck: [[1, 3, 4], [6, 2]]
assgined_drone [[], [7, 5], [8]]
{7: [3, 4, 6, 1, 2, 0], 5: [3, 4, 1, 6, 2, 0], 8: [6, 4, 3, 1, 2, 0]}
{7: [3, 5, 4, 6, 1, 2, 0], 8: [6, 4, 3, 5, 1, 2, 0]}


In [None]:

def repair(individual: Individual, problem: Problem):
    chromosome = individual.chromosome

    if chromosome is None:
        return None

    truck_routes = []
    drone_routes = []
    tmp_truck_route = []
    tmp_drone_route = []

    k = problem.number_customer + 1
    d = problem.number_customer + problem.number_of_trucks

    for i in range(len(chromosome[0])):
        if chromosome[0][i] >= k and chromosome[0][i] < d:
            truck_routes.append(tmp_truck_route)
            tmp_truck_route = []
            continue
        elif (
            chromosome[0][i] >= d
            and chromosome[0][i] < d + problem.number_of_drones - 1
        ):
            drone_routes.append(tmp_drone_route)
            tmp_drone_route = []
            continue

        if chromosome[1][i] == 0:
            tmp_truck_route.append((i, chromosome[0][i]))
        elif chromosome[1][i] == 1:
            tmp_drone_route.append((i, chromosome[0][i]))

    truck_custs = [(cust for cust in route) for route in truck_routes]

    # repair drone_routes:
    #   ensure all customers are feasible for drone capacity
    #   ensure all customers are feasible for drone energy
    for drone_route in drone_routes:
        for chro_id, cust in drone_route:
            # violate capacity constraints
            if not problem.check_drone_capacity([cust]):
                chromosome[1][chro_id] = 0  # 0 for truck   1 for drone

            # violate energy constraints
            lch_cust = min(
                truck_custs,
                key=lambda truck_cust: problem.distance_matrix_drone[truck_cust][cust],
            )
            rem_truck_custs = [cust for cust in truck_custs if cust != lch_cust]
            ld_cust = min(
                rem_truck_custs,
                key=lambda truck_cust: problem.distance_matrix_drone[truck_cust][cust],
            )
            if not problem.check_drone_energy_constraint([lch_cust, cust, ld_cust]):
                chromosome[1][chro_id] = 0

    # repair truck_routes:
    #   ensure all routes are feasible for truck capacity
    pass


the individual can be decoded only when it is feasible
def decode(individual: Individual, problem: Problem):
    chromosome = individual.chromosome
    truck_routes, drone_routes = extract_routes(chromosome, problem)

    truck_custs = [(cust for cust in route) for route in truck_routes]

    # split sequence of drone customers into trips
    # and determine launching node and landing node
    drone_trips = [[]]

    for drone_id, drone_route in enumerate(drone_routes):
        trip = []
        for drone_cust in drone_route:
            if not trip:
                lch_cust = min(
                    truck_custs,
                    key=lambda truck_cust: problem.distance_matrix_drone[truck_cust][
                        drone_cust
                    ],
                )

                rem_truck_custs = [cust for cust in truck_custs if cust != lch_cust]

                pred_ld_cust = min(
                    rem_truck_custs,
                    key=lambda truck_cust: problem.distance_matrix_drone[drone_cust][
                        truck_cust
                    ],
                )

                if problem.check_drone_capacity(
                    [drone_cust]
                ) and problem.check_drone_energy_constraint(
                    [lch_cust, drone_cust, pred_ld_cust]
                ):
                    trip.append(lch_cust)
                    trip.append(drone_cust)

            else:
                pred_ld_cust = min(
                    truck_custs,
                    key=lambda truck_cust: problem.distance_matrix_drone[drone_cust][
                        truck_cust
                    ],
                )
                if problem.check_drone_capacity(
                    trip[1:] + [drone_cust]
                ) and problem.check_drone_energy_constraint(
                    trip + [drone_cust, pred_ld_cust]
                ):
                    trip.append(drone_cust)
                else:
                    rem_truck_custs = [cust for cust in truck_custs if cust != trip[0]]
                    ld_cust = min(
                        rem_truck_custs,
                        key=lambda truck_cust: problem.distance_matrix_drone[trip[-1]][
                            truck_cust
                        ],
                    )

                    trip.append(ld_cust)
                    drone_trips[drone_id].append(trip)
                    trip.clear()

    return truck_routes, drone_trips