In [1]:
from typing import List, Dict, Tuple
from copy import deepcopy
import pandas as pd
from Depot import Depot
from Vehicle import Vehicle

In [2]:
class DepotFile:
    def __init__(self, BASE_DIR) -> None:
        self.distance_to_other_depots = f"{BASE_DIR}/c_ij.csv" # Matrix
        self.time_to_other_depots = f"{BASE_DIR}/t_ij.csv" # Matrix
        self.demand = f"{BASE_DIR}/d_i.csv"
        self.earilest_time_can_be_delivered = f"{BASE_DIR}/e_i.csv"
        self.latest_time_must_be_delivered = f"{BASE_DIR}/l_i.csv"
        
class VehicleFile:
    def __init__(self, BASE_DIR) -> None:
        self.capacity = f"{BASE_DIR}/Q_k.csv"
        self.fuel_fee = f"{BASE_DIR}/B.csv"
        self.fuel_efficiency = f"{BASE_DIR}/a_k.csv"
        self.fixed_cost = f"{BASE_DIR}/fc_k.csv"
        self.depots_delivery_status = f"{BASE_DIR}/a_ik.csv" # Matrix

In [3]:
class DepotBuilder:
    def __init__(self, file_name:DepotFile) -> None:
        self.depot_distance = pd.read_csv(file_name.distance_to_other_depots, index_col=0)
        self.depot_time = pd.read_csv(file_name.time_to_other_depots, index_col=0)
        self.depot_demand = pd.read_csv(file_name.demand, index_col=0)
        self.depot_earilest_time_can_be_delivered = pd.read_csv(file_name.earilest_time_can_be_delivered)
        self.depot_latest_time_must_be_delivered = pd.read_csv(file_name.latest_time_must_be_delivered)
        
        self._number_of_depots = len(self.depot_demand)
        
        self._depots = self.build_depots()
        
    def build_depots(self) -> Dict[int,Depot]:
        '''
        depot key is 0-based.
        '''
        depots = {}
        for idx in range(self._number_of_depots):
            depot_name = idx
            depot_demand = dict(self.depot_demand.iloc[idx, :])
            depot_earilest_time_can_be_delivered = self.depot_earilest_time_can_be_delivered["earilest_time_can_be_delivered"][idx]
            depot_latest_time_must_be_delivered = self.depot_latest_time_must_be_delivered["latest_time_must_be_delivered"][idx]
            depot_distance = list(self.depot_distance.iloc[idx,:])
            depot_time = list(self.depot_time.iloc[idx,:])
            
            created_depot = Depot(depot_demand, 
                                  depot_earilest_time_can_be_delivered, 
                                  depot_latest_time_must_be_delivered,
                                  depot_distance,
                                  depot_time,
                                  depot_name)
            depots[depot_name] = created_depot
        
        return depots
    
    @property
    def all_depot_names(self) -> List[int]:
        return [name for name in self._depots.keys()]
    
    def __getitem__(self, depot_idx:int) -> Depot:
        '''
        __getitem__ method is served as accessor of all depots by specifying certain key of self._depots 
        '''
        if depot_idx not in self._depots:
            raise ValueError(f"'depot_idx' must be one of the following: {list(self._depots.keys())}")
            
        return self._depots[depot_idx]
    
    
    def __repr__(self) -> str:
        return "".join([repr(depot) 
                        for depot in 
                        self._depots.values()])

In [4]:
class VehicleBuilder:
    def __init__(self, file_name:VehicleFile) -> None:
        self.vehicle_fuel_fee = pd.read_csv(file_name.fuel_fee)
        self.vehicle_fuel_efficiency = pd.read_csv(file_name.fuel_efficiency)
        self.vehicle_fixed_cost = pd.read_csv(file_name.fixed_cost)
        self.vehicle_capacity = pd.read_csv(file_name.capacity, index_col=0)
        self.vehicle_depots_delivery_status = pd.read_csv(file_name.depots_delivery_status, index_col=0)
        
        self._number_of_vehicles = len(self.vehicle_capacity)
        
        self._vehicles = self._build_vehicles()
        
    def _build_vehicles(self) -> Dict[int,Depot]:
        '''
        car key is 0-based.
        '''
        vehicles = {}
        for idx in range(self._number_of_vehicles):
            vehicle_name = idx
            vehicle_capacity = dict(self.vehicle_capacity.iloc[idx, :])
            vehicle_fuel_fee = self.vehicle_fuel_fee["fuel_fee"][idx]
            vehicle_fuel_efficiency = self.vehicle_fuel_efficiency["fuel_efficiency"][idx]
            vehicle_fixed_cost = self.vehicle_fixed_cost["fixed_cost"][idx]
            vehicle_depots_delivery_status = list(self.vehicle_depots_delivery_status.iloc[idx,:])
            
            created_vehicle = Vehicle(vehicle_capacity, 
                                      vehicle_fuel_fee, 
                                      vehicle_fuel_efficiency, 
                                      vehicle_fixed_cost,
                                      vehicle_depots_delivery_status,
                                      vehicle_name)

            vehicles[vehicle_name] = created_vehicle
        
        return vehicles
    
    @property
    def all_vehicle_names(self) -> List[int]:
        return [name for name in self._vehicles.keys()]
    
    
    def __getitem__(self, vehicle_idx:int) -> Vehicle:
        '''
        __getitem__ method is served as accessor of all depots by specifying certain key of self._depots 
        '''
        if vehicle_idx not in self._vehicles:
            raise ValueError(f"'vehicle_idx' must be one of the following: {list(self._vehicles.keys())}")
            
        return self._vehicles[vehicle_idx]
    
    
    def __repr__(self) -> str:
        return "".join([repr(vehicle) 
                        for vehicle in 
                        self._vehicles.values()])


In [5]:
depot_file_name = DepotFile("dataset/9_3cars/")

In [6]:
depots= DepotBuilder(depot_file_name)

In [7]:
depots

Depot Name: 0
Demand: {'a': 0, 'b': 0}
Earilest Time Can Be Delivered: Starting Time after 0 Mins
Latest Time Must Be Delivered: Starting Time after 480 Mins
Distance to Other Depots: {0: 0, 1: 20886, 2: 19588, 3: 27410, 4: 12719, 5: 24283, 6: 15750, 7: 21450, 8: 11568}
Delivery Time to Other Depots: {0: 0.0, 1: 1376.0, 2: 1343.0, 3: 1755.0, 4: 1237.0, 5: 1885.0, 6: 1029.0, 7: 1575.0, 8: 1455.0}
------------------------------------------------------------
Depot Name: 1
Demand: {'a': 17, 'b': 15}
Earilest Time Can Be Delivered: Starting Time after 0 Mins
Latest Time Must Be Delivered: Starting Time after 300 Mins
Distance to Other Depots: {0: 21806, 1: 0, 2: 7840, 3: 5720, 4: 11730, 5: 8391, 6: 8983, 7: 41731, 8: 35455}
Delivery Time to Other Depots: {0: 1392.0, 1: 0.0, 2: 949.9999998, 3: 750.0, 4: 1347.0, 5: 942.0, 6: 874.0000002, 7: 2474.0, 8: 2486.0}
------------------------------------------------------------
Depot Name: 2
Demand: {'a': 30, 'b': 28}
Earilest Time Can Be Delivered: S

In [8]:
vehicle_file_name = VehicleFile("dataset/9_3cars/")

In [9]:
vehicles = VehicleBuilder(vehicle_file_name)

In [10]:
vehicles

Capacity: {'a': 150, 'b': 130}
Fuel Fee: $34
Fuel Efficiency: 0.1 l/km
Fixed Cost: $1000
Depots Delivery Status: {0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}
------------------------------------------------------------
Capacity: {'a': 180, 'b': 160}
Fuel Fee: $34
Fuel Efficiency: 0.15 l/km
Fixed Cost: $1200
Depots Delivery Status: {0: 1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}
------------------------------------------------------------
Capacity: {'a': 290, 'b': 270}
Fuel Fee: $34
Fuel Efficiency: 0.2 l/km
Fixed Cost: $1500
Depots Delivery Status: {0: 1, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1}
------------------------------------------------------------

In [11]:
class BuilderFactory():
    def __init__(self, BASE_DIR:str="dataset/9_3cars/"):
        depot_files = DepotFile(BASE_DIR)
        vehicle_files = VehicleFile(BASE_DIR)
        
        self.depots = DepotBuilder(depot_files)
        self.vehicles = VehicleBuilder(vehicle_files)

In [12]:
class RouteResourceCalculator(BuilderFactory):
    def __init__(self) -> None:
        '''
        RouteResourceCalculator responsible for calculating the resources needed for "A GIVEN ROUTE"
        -------------------------------------------------------------------------------------------
        Params:
        depot_builder: storing information for 'EACH' depot
        vehicle: storing information for a 'GIVEN' vehicle
        route: storing a path that the input vehicle needs to go through. 
        '''
        super().__init__()
        
    def _calculate_demand(self, route:List[int]) -> Dict[str,int]:
        total_demand = {}
        for depot_id in route:
            for product in self.depots[depot_id].demand.keys():
                if product not in total_demand:
                    total_demand[product] = 0
                total_demand[product] +=  self.depots[depot_id].demand[product]
        return total_demand
    
    def _calculate_distance(self, route:List[int]) -> int:
        '''
        Unit: km
        '''
        total_distance = 0
        for idx in range(len(route) - 1):
            start_depot = route[idx]
            end_depot = route[idx + 1]
            
            distance = self.depots[start_depot].get_distance_to_depot(end_depot)
            total_distance += distance
        
        return total_distance
    
    
    def _calculate_time(self,vehicle_idx:int,route:List[int]) -> List[int]:
        '''
        Unit: minute
        Returns total delivery time and total service time as a list for a given route.
        '''
        delivery_time = 0
        service_time = 0
        for idx in range(len(route) - 1):
            start_depot = route[idx]
            end_depot = route[idx + 1]
            
            delivery_time += self.depots[start_depot].get_delivery_time_to_depot(end_depot)
            service_time += self.vehicles[vehicle_idx].shipement_discharging_time
            
        return [delivery_time, service_time]
    
    
    def _calculate_driver_cost(self, hourly_wage:int, time_on_duty_in_minute:int) -> int:
        '''
        Params:
        total_time_on_duty: minute
        '''
        
        return (time_on_duty_in_minute / 60) * hourly_wage
    
    def calculate_route_resources(self, vehicle_idx:int, route:List[int]) -> 'Dict[str, int|Dict[str, int]]':
        '''
        This method is a public API expected to expose to users.
        Functionality:
            Calculate total resources needed for 'a given route with a given vehicle'
        '''
        
        if len(route) == 0:
            return
            
        
        
        route_demand = self._calculate_demand(route)
        route_distance = self._calculate_distance(route)
        route_delivery_time, route_service_time = self._calculate_time(vehicle_idx, route)
        
        vehicle_capacity = self.vehicles[vehicle_idx].capacity
        vehicle_fixed_cost = self.vehicles[vehicle_idx].fixed_cost
        
        time_on_duty_in_minute = route_delivery_time + route_service_time
        driver_cost = self._calculate_driver_cost(168, time_on_duty_in_minute)
        
        
        return [route_demand, # 路徑需求
                route_distance, #路徑距離
                route_delivery_time, #路徑所需運送時間
                route_service_time, #總卸貨時間
                vehicle_capacity, #載具總運載量
                vehicle_fixed_cost,#載具固定成本
                driver_cost] #司機成本
    

In [13]:
route_calc = RouteResourceCalculator()

In [14]:
route_calc.calculate_route_resources(1,[0,4,1,3,4,0])

[{'a': 133, 'b': 125}, 55913, 6574.0, 100, {'a': 180, 'b': 160}, 1200, 18687.2]

In [15]:
Solution = Dict[int, List[int]]

In [16]:
class Optimizer(BuilderFactory):
    def __init__(self):
        super().__init__()
        self.resource_calc = RouteResourceCalculator()
        
    def _find_shortage_point_in_route(self, vehicle_idx:int,route:List[int]) -> int:
        '''
        A helper function aiming to find 'shortage point' during delivery, 
        returns 'depot_name' not the index of depot
        '''
        copy_vehicle = deepcopy(self.vehicles[vehicle_idx])
        for depot_name in route:
            current_depot = self.depots[depot_name]
#             print("Current Capacity", copy_vehicle.capacity)
#             print("Demand",current_depot.demand)
            
            # product is a dict
            copy_vehicle.discharge(current_depot.demand)
            if copy_vehicle.is_out_of_stock():
                return depot_name
            
    
    def _find_optimal_replenish_point_in_route(self, vehicle_idx:int,route:List[int]) -> List[int]:
        '''
        Opitmal point is based on distance, 
        thus, using RouteResourceCalculator._calculate_distance to find optimal point
        '''
        shortage_point = self._find_shortage_point_in_route(vehicle_idx,route)
        possible_routes = []
        warehouse_depot = 0
        for idx in range(len(route) - 1): # excluding 0(warehouse)
            route_copy = route.copy()
            current_depot = route[idx]
            previous_depot = route[idx - 1]
            
            
            if (current_depot == 0 or previous_depot == 0):
                 #preventing situation like [0,(0), 1,2(0),0] X
                continue
                           
            route_copy.insert(idx, warehouse_depot)

            total_distance = self.resource_calc._calculate_distance(route_copy)
            
            possible_routes.append(
                (route_copy, total_distance)
            )
            
            if current_depot == shortage_point:
                break
        
        possible_routes.sort(key= lambda route_with_distance: route_with_distance[1])
        
        if len(possible_routes) == 0:
            return route
        
        shortest_route, _ = possible_routes[0]
        
        return shortest_route
    
    def optimize(self, solution:Solution) -> Solution:
        solution = deepcopy(solution)
        for vehicle_idx, route in solution.items():
            solution[vehicle_idx] = self._find_optimal_replenish_point_in_route(vehicle_idx,route)
        return solution
            
shortage_solution2 = {0:[0, 8, 4, 7, 2,5, 1, 0], 1: [0, 6, 3, 0], 2: [0, 7, 5, 0]}


In [17]:
op = Optimizer()

In [18]:
op.optimize(shortage_solution2)

{0: [0, 8, 4, 0, 7, 2, 5, 1, 0], 1: [0, 6, 0, 3, 0], 2: [0, 7, 0, 5, 0]}

In [19]:
class RouteStatusCode:
    FAILED_ROUTE = 0
    SUCCESSFUL_ROUTE = 1
    
    SHORTAGE_ROUTE = 2

class ConstraintChecker(BuilderFactory):
    def __init__(self) -> None:
        super().__init__()
        self.resource_calc = RouteResourceCalculator()
    
    def _is_not_need_to_replenish_during_delivery(self,vehicle_idx:int,route:List[int]) -> bool:
        '''
        A solution checker for checking if a given route should replenish during delivery
        '''
        copy_vehicle = deepcopy(self.vehicles[vehicle_idx])
        for depot_name in route:
            current_depot = self.depots[depot_name]
            # product is a dict
            copy_vehicle.discharge(current_depot.demand)
            if copy_vehicle.is_out_of_stock():
                return False # need to replenish
        
        return True
            
    
    
    def _is_replenish_one_time_can_fullfill_route_demand(self, vehicle_idx:int, route:List[int]) -> bool:
        '''
        A solution checker for checking if driver replenish one time, 
        total capacity can fullfill the total route demand
        '''
        route_demand = self.resource_calc._calculate_demand(route)
        vehicle_capacity = self.vehicles[vehicle_idx].capacity
        
            
        for product, demand_quantity in route_demand.items():
            if vehicle_capacity[product] * 2 < demand_quantity:
                return False # replenish one time can't fullfill route demand
        
        return True
    
    
        
        
        
        
    def route_report(self, solution:Solution) -> RouteStatusCode:
        '''
        A function produce report for solution report
        '''
        VALIDATORS_WITH_STATUS_CODES = (
            (self._is_not_need_to_replenish_during_delivery, RouteStatusCode.SUCCESSFUL_ROUTE),
            (self._is_replenish_one_time_can_fullfill_route_demand, RouteStatusCode.SHORTAGE_ROUTE),
            
            (True, RouteStatusCode.FAILED_ROUTE)
        )
        
        for vehicle_idx, route in solution.items():
            for validator_with_statu_code in VALIDATORS_WITH_STATUS_CODES:
                validator, status_code = validator_with_statu_code
                if validator(vehicle_idx, route):
                    return status_code

In [20]:
class SolutionChromosome:
    pass
class SolutionChromosome(BuilderFactory):
    def __init__(self, solution:Solution, generation:int = 0) -> None:
        self.solution = solution
    
    def mutate(self, mutation_rate) -> SolutionChromosome:
        pass
        
        
        return self
    
    def crossover(self, _other_solution_chromosome:SolutionChromosome) -> List[SolutionChromosome]:
        pass
    
    def __repr__(self) -> str:
        return f"Chromosome: {self.solution}"
    
    def __eq__(self, _other_solution_chromosome:SolutionChromosome) -> bool:
        pass
    
    def __gt__(self, _other_solution_chromosome:SolutionChromosome) -> bool:
        pass
    
    @property
    def fitness(self):
        pass
    
    
    

In [21]:
 # e.g., {0: [], 1: [0, 8, 6, 0], 2: [0, 7, 5, 0], 3: [0, 3, 0], 4: []}
from random import choice
class SolutionGenerator(BuilderFactory):
    def __init__(self, constraint_checker:ConstraintChecker) -> None:
        super().__init__()
        self.checker = constraint_checker
        self.optimizer = Optimizer()
        
        self.all_depot_names =  self.depots.all_depot_names
        print(f"Available Depot Names: {self.all_depot_names}")
        self.all_vehicle_names = self.vehicles.all_vehicle_names
        print(f"Available Vehicle Names: {self.all_vehicle_names}")
        
        
    def _start_from_warehouse_and_go_back_to_warehouse_helper(self, route:List[int]) -> List[int]:
        '''
        A helper function for .generate_initial_raw_solution(), 
        which adds 1 zero at the beginning and at the end of the route, 
        indicating that the route should be starting from warehouse, and going back to warehouse
        '''
        return [0, *route, 0]    

    def _generate_initial_raw_solution(self) -> Solution:
        '''
        .generate_initial_raw_solution generates a raw solution that is subject to further check with constraint checker (i.e., ConstraintChecker)
        
        P.S. This method generates a solution at least doesn't violate the constraint that an undeliverable depot is assigned to a given vehicle.
        
        '''
        #[1,2 ...., n], 0(warehouse) should be excluded
        all_depots_to_be_assigned = [name for name in self.all_depot_names if name != 0]
        
        vehicles_with_assigned_depots =  {vehicle_name:[] 
                                        for vehicle_name in self.all_vehicle_names} # {0:[], 1:[], 2:[] ..., n:[]}
        
        while (len(all_depots_to_be_assigned) > 0): # while there exists any depot in all_depots_to_be_assigned
            selected_vehicle = choice(self.all_vehicle_names)
            selected_depot = all_depots_to_be_assigned.pop()
            if not self.vehicles[selected_vehicle].is_depot_can_be_delivered(selected_depot):
                continue
            
            vehicles_with_assigned_depots[selected_vehicle].append(selected_depot) 
        
        # use helper function i.e., [0, *route, 0]
        for vehicle_name, assigned_depots in vehicles_with_assigned_depots.items():
            if len(assigned_depots) == 0: # if assigned route is a empty list
                continue
                
            vehicles_with_assigned_depots[vehicle_name] = self._start_from_warehouse_and_go_back_to_warehouse_helper(assigned_depots)
            
        return vehicles_with_assigned_depots
    
    
    def generate_valid_solutions(self, number_of_solutions:int) -> List[SolutionChromosome]:
        valid_solutions = []
        while (len(valid_solutions) < number_of_solutions):
            one_solution = self._generate_initial_raw_solution()
            if one_solution in valid_solutions:
                continue
            
            route_report = self.checker.route_report(one_solution)
            if route_report == RouteStatusCode.SUCCESSFUL_ROUTE:
                valid_solutions.append(SolutionChromosome(one_solution))
            elif route_report == RouteStatusCode.SHORTAGE_ROUTE:
                non_shortage_solution = self.optimizer.optimize(one_solution)
                valid_solutions.append(SolutionChromosome(non_shortage_solution))

        return valid_solutions
        
    
    
    

In [22]:
checker = ConstraintChecker()
solution_generator = SolutionGenerator(checker)

Available Depot Names: [0, 1, 2, 3, 4, 5, 6, 7, 8]
Available Vehicle Names: [0, 1, 2]


In [24]:
solution_generator.generate_valid_solutions(30)

[Chromosome: {0: [0, 8, 0, 6, 4, 3, 0], 1: [], 2: [0, 7, 0, 5, 0]},
 Chromosome: {0: [0, 6, 5, 1, 0], 1: [0, 8, 3, 2, 0], 2: [0, 7, 4, 0]},
 Chromosome: {0: [0, 6, 2, 0], 1: [0, 5, 1, 0], 2: [0, 8, 7, 4, 3, 0]},
 Chromosome: {0: [0, 8, 0], 1: [0, 7, 2, 0], 2: [0, 6, 5, 4, 3, 0]},
 Chromosome: {0: [0, 8, 6, 0], 1: [0, 2, 1, 0], 2: [0, 7, 5, 4, 3, 0]},
 Chromosome: {0: [0, 8, 7, 3, 1, 0], 1: [0, 6, 4, 2, 0], 2: [0, 5, 0]},
 Chromosome: {0: [0, 6, 0], 1: [0, 8, 5, 4, 3, 2, 0], 2: [0, 7, 0]},
 Chromosome: {0: [0, 2, 0], 1: [0, 7, 6, 3, 1, 0], 2: [0, 8, 5, 4, 0]},
 Chromosome: {0: [0, 8, 1, 0], 1: [0, 7, 6, 5, 3, 2, 0], 2: [0, 4, 0]},
 Chromosome: {0: [0, 8, 7, 0, 6, 3, 0], 1: [0, 5, 4, 0, 1, 0], 2: []},
 Chromosome: {0: [0, 7, 6, 2, 0], 1: [0, 4, 3, 1, 0], 2: [0, 8, 5, 0]},
 Chromosome: {0: [0, 7, 5, 4, 2, 0], 1: [0, 8, 6, 1, 0], 2: [0, 3, 0]},
 Chromosome: {0: [0, 4, 3, 2, 1, 0], 1: [0, 8, 5, 0], 2: [0, 7, 6, 0]},
 Chromosome: {0: [0, 7, 0, 5, 4, 3, 2, 1, 0], 1: [0, 6, 0], 2: [0, 8, 0]},
