-
Notifications
You must be signed in to change notification settings - Fork 39
/
neighbourhood.py
175 lines (144 loc) · 6.15 KB
/
neighbourhood.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from pyvrp import ProblemData
@dataclass
class NeighbourhoodParams:
"""
Configuration for calculating a granular neighbourhood.
Attributes
----------
weight_wait_time
Penalty weight given to the minimum wait time aspect of the proximity
calculation. A large wait time indicates the clients are far apart
in duration/time.
weight_time_warp
Penalty weight given to the minimum time warp aspect of the proximity
calculation. A large time warp indicates the clients are far apart in
duration/time.
nb_granular
Number of other clients that are in each client's granular
neighbourhood. This parameter determines the size of the overall
neighbourhood.
symmetric_proximity
Whether to calculate a symmetric proximity matrix. This ensures edge
:math:`(i, j)` is given the same weight as :math:`(j, i)`.
symmetric_neighbours
Whether to symmetrise the neighbourhood structure. This ensures that
when edge :math:`(i, j)` is in, then so is :math:`(j, i)`. Note that
this is *not* the same as ``symmetric_proximity``.
Raises
------
ValueError
When ``nb_granular`` is non-positive.
"""
weight_wait_time: float = 0.2
weight_time_warp: float = 1.0
nb_granular: int = 40
symmetric_proximity: bool = True
symmetric_neighbours: bool = False
def __post_init__(self):
if self.nb_granular <= 0:
raise ValueError("nb_granular <= 0 not understood.")
def compute_neighbours(
data: ProblemData, params: NeighbourhoodParams = NeighbourhoodParams()
) -> list[list[int]]:
"""
Computes neighbours defining the neighbourhood for a problem instance.
Parameters
----------
data
ProblemData for which to compute the neighbourhood.
params
NeighbourhoodParams that define how the neighbourhood is computed.
Returns
-------
list
A list of list of integers representing the neighbours for each client.
The first lists in the lower indices are associated with the depots and
are all empty.
"""
proximity = _compute_proximity(
data,
params.weight_wait_time,
params.weight_time_warp,
)
if params.symmetric_proximity:
proximity = np.minimum(proximity, proximity.T)
for group in data.groups():
if group.mutually_exclusive:
# Clients in mutually exclusive groups cannot neighbour each other,
# since only one of them can be in the solution at any given time.
# We use max float, not infty, to ensure these clients are ordered
# before the depots: we want to avoid same group neighbours, but it
# is not problematic if we need to have them.
idcs = np.ix_(group.clients, group.clients)
proximity[idcs] = np.finfo(np.float64).max
np.fill_diagonal(proximity, np.inf) # cannot be in own neighbourhood
proximity[: data.num_depots, :] = np.inf # depots have no neighbours
proximity[:, : data.num_depots] = np.inf # clients do not neighbour depots
k = min(params.nb_granular, data.num_clients - 1) # excl. self
top_k = np.argsort(proximity, axis=1, kind="stable")[data.num_depots :, :k]
if not params.symmetric_neighbours:
return [[] for _ in range(data.num_depots)] + top_k.tolist()
# Construct a symmetric adjacency matrix and return the adjacent clients
# as the neighbourhood structure.
adj = np.zeros_like(proximity, dtype=bool)
rows = np.expand_dims(np.arange(data.num_depots, len(proximity)), axis=1)
adj[rows, top_k] = True
adj = adj | adj.transpose()
return [np.flatnonzero(row).tolist() for row in adj]
def _compute_proximity(
data: ProblemData, weight_wait_time: float, weight_time_warp: float
) -> np.ndarray[float]:
"""
Computes proximity for neighborhood. Proximity is based on [1]_, with
modification for additional VRP variants.
Parameters
----------
data
ProblemData for which to compute proximity.
params
NeighbourhoodParams that define how proximity is computed.
Returns
-------
np.ndarray[float]
An array of size :py:attr:`~pyvrp._pyvrp.ProblemData.num_locations`
by :py:attr:`~pyvrp._pyvrp.ProblemData.num_locations`.
References
----------
.. [1] Vidal, T., Crainic, T. G., Gendreau, M., and Prins, C. (2013). A
hybrid genetic algorithm with adaptive diversity management for a
large class of vehicle routing problems with time-windows.
*Computers & Operations Research*, 40(1), 475 - 489.
"""
clients = [data.location(loc) for loc in range(data.num_locations)]
early = np.asarray([c.tw_early for c in clients])
late = np.asarray([c.tw_late for c in clients])
service = np.zeros_like(early)
service[data.num_depots :] = [c.service_duration for c in data.clients()]
prize = np.zeros_like(early)
prize[data.num_depots :] = [client.prize for client in data.clients()]
# We first determine the elementwise minimum cost across all vehicle types.
# This is the cheapest way any edge can be traversed.
distances = data.distance_matrices()
durations = data.duration_matrices()
edge_costs = [ # edge costs per vehicle type
veh_type.unit_distance_cost * distances[veh_type.profile]
+ veh_type.unit_duration_cost * durations[veh_type.profile]
for veh_type in data.vehicle_types()
]
# Minimum wait time and time warp of visiting j directly after i.
min_duration = np.minimum.reduce(durations)
min_wait = early[None, :] - min_duration - service[:, None] - late[:, None]
min_tw = early[:, None] + service[:, None] + min_duration - late[None, :]
# Proximity is based on edge costs (and rewards) and penalties for known
# time-related violations.
return (
np.minimum.reduce(edge_costs, dtype=float)
- prize[None, :]
+ weight_wait_time * np.maximum(min_wait, 0)
+ weight_time_warp * np.maximum(min_tw, 0)
)