-
Notifications
You must be signed in to change notification settings - Fork 39
/
Population.py
202 lines (168 loc) · 6.19 KB
/
Population.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Generator, Optional
from warnings import warn
from pyvrp._pyvrp import PopulationParams, SubPopulation
from pyvrp.exceptions import EmptySolutionWarning
if TYPE_CHECKING:
from pyvrp._pyvrp import CostEvaluator, RandomNumberGenerator, Solution
class Population:
"""
Creates a Population instance.
Parameters
----------
diversity_op
Operator to use to determine pairwise diversity between solutions. Have
a look at :mod:`pyvrp.diversity` for available operators.
params
Population parameters. If not provided, a default will be used.
"""
def __init__(
self,
diversity_op: Callable[[Solution, Solution], float],
params: Optional[PopulationParams] = None,
):
self._op = diversity_op
self._params = params if params is not None else PopulationParams()
self._feas = SubPopulation(diversity_op, self._params)
self._infeas = SubPopulation(diversity_op, self._params)
def __iter__(self) -> Generator[Solution, None, None]:
"""
Iterates over the solutions contained in this population.
"""
for item in self._feas:
yield item.solution
for item in self._infeas:
yield item.solution
def __len__(self) -> int:
"""
Returns the current population size.
"""
return len(self._feas) + len(self._infeas)
def _update_fitness(self, cost_evaluator: CostEvaluator):
"""
Updates the biased fitness values for the subpopulations.
Parameters
----------
cost_evaluator
CostEvaluator to use for computing the fitness.
"""
self._feas.update_fitness(cost_evaluator)
self._infeas.update_fitness(cost_evaluator)
def num_feasible(self) -> int:
"""
Returns the number of feasible solutions in the population.
"""
return len(self._feas)
def num_infeasible(self) -> int:
"""
Returns the number of infeasible solutions in the population.
"""
return len(self._infeas)
def add(self, solution: Solution, cost_evaluator: CostEvaluator):
"""
Adds the given solution to the population. Survivor selection is
automatically triggered when the population reaches its maximum size.
Parameters
----------
solution
Solution to add to the population.
cost_evaluator
CostEvaluator to use to compute the cost.
"""
if solution.num_clients() == 0:
msg = """
An empty solution is being added to the population. This typically
indicates that there is a significant difference between the values
of the prizes and the other objective terms, which hints at a data
problem. Note that not every part of PyVRP can work gracefully with
empty solutions.
"""
warn(msg, EmptySolutionWarning)
# Note: the CostEvaluator is required here since adding a solution
# may trigger a purge which needs to compute the biased fitness which
# requires computing the cost.
if solution.is_feasible():
# Note: the feasible subpopulation actually does not depend
# on the penalty values but we use the same implementation.
self._feas.add(solution, cost_evaluator)
else:
self._infeas.add(solution, cost_evaluator)
def clear(self):
"""
Clears the population by removing all solutions currently in the
population.
"""
self._feas = SubPopulation(self._op, self._params)
self._infeas = SubPopulation(self._op, self._params)
def select(
self,
rng: RandomNumberGenerator,
cost_evaluator: CostEvaluator,
k: int = 2,
) -> tuple[Solution, Solution]:
"""
Selects two (if possible non-identical) parents by tournament, subject
to a diversity restriction.
Parameters
----------
rng
Random number generator.
cost_evaluator
Cost evaluator to use when computing the fitness.
k
The number of solutions to draw for the tournament. Defaults to
two, which results in a binary tournament.
Returns
-------
tuple
A solution pair (parents).
"""
self._update_fitness(cost_evaluator)
first = self._tournament(rng, k)
second = self._tournament(rng, k)
diversity = self._op(first, second)
lb = self._params.lb_diversity
ub = self._params.ub_diversity
tries = 1
while not (lb <= diversity <= ub) and tries <= 10:
tries += 1
second = self._tournament(rng, k)
diversity = self._op(first, second)
return first, second
def tournament(
self,
rng: RandomNumberGenerator,
cost_evaluator: CostEvaluator,
k: int = 2,
) -> Solution:
"""
Selects a solution from this population by k-ary tournament, based
on the (internal) fitness values of the selected solutions.
Parameters
----------
rng
Random number generator.
cost_evaluator
Cost evaluator to use when computing the fitness.
k
The number of solutions to draw for the tournament. Defaults to
two, which results in a binary tournament.
Returns
-------
Solution
The selected solution.
"""
self._update_fitness(cost_evaluator)
return self._tournament(rng, k)
def _tournament(self, rng: RandomNumberGenerator, k: int) -> Solution:
if k <= 0:
raise ValueError(f"Expected k > 0; got k = {k}.")
def select():
num_feas = len(self._feas)
idx = rng.randint(len(self))
if idx < num_feas:
return self._feas[idx]
return self._infeas[idx - num_feas]
items = [select() for _ in range(k)]
fittest = min(items, key=lambda item: item.fitness)
return fittest.solution