/
base.py
1489 lines (1185 loc) · 48.9 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2021 The PyGlove Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common interfaces for evolutionary algorithms."""
import abc
import collections
import random
import threading
import typing
from typing import Any, Callable, Iterable, List, Text, Optional, Tuple, Type
import pyglove.core as pg
from pyglove.ext import scalars
# We disable implicit str concat as it is commonly used class schema docstr.
# pylint: disable=implicit-str-concat
#
# Common keys used in DNA meta-data for evolution.
#
# Proposal ID is an 1-based integer that is incremented for every proposal.
DNA_METADATA_PROPOSAL_ID = 'proposal_id'
# Feedback sequence number is an 1-based integer that is incremented for
# every feedback.
DNA_METADATA_FEEDBACK_SEQUENCE_NUMBER = 'feedback_sequence_number'
# Fitness for a DNA, can be a float number for single-objective or
# a tuple of float numbers for multiple objectives.
DNA_METADATA_FITNESS = 'reward'
# Whether a DNA belongs to the initial population.
DNA_METADATA_INITIAL_POPULATION = 'initial_population'
# DNA generation ID, which is incremented for each reproduction.
DNA_METADATA_GENERATION_ID = 'generation_id'
def set_proposal_id(dna: pg.DNA, proposal_id: int) -> pg.DNA:
"""Set the proposal ID to a DNA and return itself."""
return dna.set_metadata(DNA_METADATA_PROPOSAL_ID, proposal_id)
def get_proposal_id(dna: pg.DNA) -> int:
"""Returns the 1-based proposal ID of the DNA."""
return dna.metadata[DNA_METADATA_PROPOSAL_ID]
def set_generation_id(dna: pg.DNA, generation_id: int) -> pg.DNA:
"""Set the generation ID to a DNA and return itself."""
return dna.set_metadata(DNA_METADATA_GENERATION_ID, generation_id)
def get_generation_id(dna: pg.DNA) -> int:
"""Returns the 1-based generation ID."""
return dna.metadata[DNA_METADATA_GENERATION_ID]
def set_feedback_sequence_number(
dna: pg.DNA, sequence_number: int) -> pg.DNA:
"""Set the feedback sequence number to a DNA and return itself."""
return dna.set_metadata(
DNA_METADATA_FEEDBACK_SEQUENCE_NUMBER, sequence_number)
def get_feedback_sequence_number(dna: pg.DNA) -> Optional[int]:
"""Returns the 1-based feedback sequence number of the DNA."""
return dna.metadata.get(DNA_METADATA_FEEDBACK_SEQUENCE_NUMBER)
def set_fitness(dna: pg.DNA,
fitness: typing.Union[float, Tuple[float]]) -> pg.DNA:
"""Set the evaluated fitness to a DNA and return itself."""
return dna.set_metadata(DNA_METADATA_FITNESS, fitness)
def get_fitness(dna: pg.DNA) -> typing.Union[float, Tuple[float]]:
"""Returns the fitness associated with a DNA."""
return dna.metadata[DNA_METADATA_FITNESS]
def _set_initial_population(dna: pg.DNA, is_initial: bool) -> pg.DNA:
"""Set whether the DNA belongs to the initial population."""
return dna.set_metadata(DNA_METADATA_INITIAL_POPULATION, is_initial)
def is_initial_population(dna: pg.DNA) -> bool:
"""Returns True if the DNA belongs to the initial population."""
return dna.metadata[DNA_METADATA_INITIAL_POPULATION]
#
# Interfaces for evolutionary operations.
#
def operation_spec(
input_element_spec: Optional[pg.typing.ValueSpec] = None,
output_element_spec: Optional[pg.typing.ValueSpec] = None
) -> pg.typing.ValueSpec:
"""Returns the value spec (PyGlove typing) for an evolutionary operation.
We use `pg.typing.Callable` instead of `pg.typing.Object(Operation)`
to make it more flexible to plugin lambdas.
Args:
input_element_spec: The value spec for input element.
output_element_spec: The value spec for output element.
Returns:
A value spec for Callable[[List[DNA]], List[DNA]].
"""
if input_element_spec is None:
input_element_spec = pg.typing.Object(pg.DNA)
if output_element_spec is None:
output_element_spec = pg.typing.Object(pg.DNA)
return pg.typing.Callable(
[pg.typing.List(input_element_spec)],
returns=pg.typing.List(output_element_spec))
class Operation(pg.Object):
"""Base class for evolutionary operations.
An evolutionary operation transforms a DNA list into another DNA list.
This abstraction provides the flexibility to describe common evolutionary
operations including but not limited to:
* Selection: Select M parents from a population of size N.
* Recombination: Generate M child candidates from N parents.
* Mutation: Mutate the N child candidates one by one, each can
produce one or K DNA, resulting in N * K child DNA.
Operations are compositional via the following operators::
x >> y: Pipeline x and y by passing the output of x to y as input.
x + y: Concatenate the outputs of x and y, based on the same input.
Resulted list could contain duplicates.
x[m:n:p] Slice x's output from m (inclusive) to n (exclusive) with step p.
x | y: Union the outputs of x and y based on the same input.
Resulted list will not contain duplicated items.
x & y: Intersect the outputs of x and y, based on the same input.
Resulted list will not contain duplicated items.
x - y: Remove elements from x's output which also appears in y's output,
based on the same input.
x ^ y: Keep items that are either in x's output or in y's output, but
not both. Duplicated items in x's output or in y's output will
be preserved.
x * K: Repeat operation x for K times based on the same input,
concatenate their outputs.
x ** K: Pipeline operation x for K times, equivalent to `x >> .. >> x`
(`x` appears K times in the formula).
-x or ~x: Compute the inversion of x's output based on the input.
Equivalent to `Identity() - x`
x.if_true: Conditionally apply x if predicate returns True.
x.if_false: Conditionally apply x if predicate returns False.
Besides, the following operations are commonly used during composition::
Identity() : Returns the input DNA list.
Lambda( : Creates an Operation using lambda.
lambda x: x[1:])
Choice([ : Perform x, y based on probabilities.
(x, prob1),
(y, prob2)])
Conditional( : Perform x if input length > 5, otherwise y.
lambda x: len(x) > 5,
x, y)
UntilChange(x) : Repeat x until its output changes from the input.
ElementWise( : Perform element-wise operation and concatenate.
lambda x: sorted(x))
Flatten() : Flatten nested lists into a flat list.
GlobalStateGetter( : Fetch global state by key 'species'.
'species')
GlobalStateSetter( : Set global state by key 'species' from input.
'species')
The operand in compositional operations supports callable objects in signature
`(List[DNA], int) -> List[DNA]`. For example::
pg.evolution.selectors.Last(50) >> (lambda x, s: x[:2])
"""
def __call__(self,
inputs: List[Any],
global_state: Optional[pg.geno.AttributeDict] = None,
step: int = 0) -> List[Any]:
"""Transform a list of input values to a list of output values.
Args:
inputs: A list of values as inputs.
global_state: An `AttributeDict` object (dictionary that provides
attribute access) as the global state container, which is
readable/writable during the operation.
step: Number of examples historically proposed, which can be used for
determining a cross over schedule.
Returns:
A list of values as output of current operation.
"""
if self.input_element_type is not None:
elem_type = self.input_element_type
for i, elem in enumerate(inputs):
if not isinstance(elem, elem_type):
raise TypeError(
f'The input is expected to be a list of {elem_type!r} '
f'but {elem!r} is encountered at position {i}.')
# NOTE(daiyip): `global_state` will always be provided by `Evolution`.
# For testing purpose, we allow it to be None to make the caller more
# convenient.
if global_state is None:
global_state = pg.geno.AttributeDict()
self._on_input(inputs)
outputs = self._operate(inputs, global_state=global_state, step=step)
if self.output_element_type is not None:
elem_type = self.output_element_type
for i, elem in enumerate(outputs):
if not isinstance(elem, elem_type):
raise TypeError(
f'The output is expected to be a list of {elem_type!r} but '
f'{elem!r} is encountered at position {i}.')
return outputs
def _on_bound(self):
super()._on_bound()
self._operate = make_operation_compatible(self.operation_method)
def _on_input(self, inputs: List[Any]) -> None:
"""Event that is triggered when an input is received.."""
@property
def input_element_type(
self) -> typing.Union[None, Type[Any], Tuple[Type[Any]]]:
"""Retuns the input element type. Subclasses can override."""
return None
@property
def output_element_type(
self) -> typing.Union[None, Type[Any], Tuple[Type[Any]]]:
"""Retuns the output element type. Subclasses can override."""
return None
@property
def operation_method(self) -> Callable[..., List[Any]]:
"""Returns a member method as operation."""
return self.call
def call(self,
inputs: List[Any],
global_state: pg.geno.AttributeDict,
step: int = 0) -> List[Any]:
"""Subclasses should override this method.
The `global_state` and `step` are optional for the subclasses' call
signature.
Args:
inputs: A list of values as inputs.
global_state: An `AttributeDict` object as the global state container,
which is readable/writable during the operation.
step: Number of examples historically proposed, which can be used for
determining a cross over schedule.
Returns:
A list of values as output of current operation.
"""
raise NotImplementedError()
def __rshift__(self, x):
"""The pipeline operator (>>)."""
if x is None:
return self
return Pipeline([self, x])
def __rrshift__(self, x):
"""The rhs pipeline operator (>>)."""
if x is None:
return self
return Pipeline([x, self])
def __getitem__(self, index):
"""The slie operator ([])."""
return Slice(self, index)
def __or__(self, x):
"""The union operator (|)."""
if x is None:
return self
return Union([self, x])
def __ror__(self, x):
"""The union operator (|)."""
if x is None:
return self
return Union([x, self])
def __and__(self, x):
"""The intersection operator (&)."""
if x is None:
return self
return Intersection([self, x])
def __rand__(self, x):
"""The intersection operator (&)."""
if x is None:
return self
return Intersection([x, self])
def __add__(self, x):
"""The concatenate operator (+)."""
if x is None:
return self
return Concatenation([self, x])
def __radd__(self, x):
"""The concatenate operator (+)."""
if x is None:
return self
return Concatenation([x, self])
def __sub__(self, x):
"""The difference operator (-)."""
if x is None:
return self
return Difference([self, x])
def __rsub__(self, x):
"""The difference operator (-)."""
if x is None:
return Inversion(self)
return Difference([x, self])
def __xor__(self, x):
"""The symmetric difference operator (^)."""
if x is None:
return self
return SymmetricDifference([self, x])
def __rxor__(self, x):
"""The symmetric difference operator (^)."""
if x is None:
return self
return SymmetricDifference([x, self])
def __mul__(self, k: typing.Union[int, Callable[[int], int]]):
"""The repeat operator (*)."""
return Repeat(self, k)
def __pow__(self, k: typing.Union[int, Callable[[int], int]]):
"""The power operator (**)."""
return Power(self, k)
def __neg__(self):
"""The negative operator (-)."""
return Inversion(self)
def __invert__(self):
"""The inversion operator (~)."""
return Inversion(self)
def with_prob(self,
probability: typing.Union[float, Callable[[int], float]],
seed: Optional[int] = None):
"""With probability."""
return Choice([(self, probability)], seed=seed)
def if_true(self, predicate: Callable[..., bool]):
"""Conditionally applies current operation when predicate returns True.
Args:
predicate: The predicate that takes the outputs from the previous
operation as input, with optional keyword arguments `global_state` and
`step`. Returns True if current operation needs to be enabled.
Otherwise no operation will be performed.
Returns:
A conditional operation.
"""
return Conditional(predicate, self, None)
def if_false(self, predicate: Callable[..., bool]):
"""Conditionally applies current operation when predicate returns False.
Args:
predicate: The predicate that takes the outputs from the previous
operation as input, with optional keyword arguments `global_state` and
`step`. Returns False if current operation needs to be enabled.
Otherwise no operation will be performed.
Returns:
A conditional operation.
"""
return Conditional(predicate, None, self)
def for_each(self, op):
"""For each element in the output perform the operation."""
return self >> ElementWise(op)
def until_change(self, max_attempts: Optional[int] = None):
"""Ensure current operation will change the inputs."""
return UntilChange(self, max_attempts)
def flatten(self, max_level: Optional[int] = None):
"""Flatten output with a max level."""
return self >> Flatten(max_level)
def global_state(self, key: Text):
"""Returns a global state getter based on the key."""
return GlobalStateGetter(key)
def as_global_state(self, key: Text):
"""Returns a global state setter based on the key."""
return Pipeline([self, GlobalStateSetter(key)])
def set_global_state(self, key: Text, value: Any):
"""Set global state and return current output."""
return self + GlobalStateSetter(key, value)
class Selector(Operation):
"""Base class for selectors."""
@abc.abstractmethod
def select(
self,
inputs: List[Any],
global_state: pg.geno.AttributeDict,
step: int) -> List[Any]:
"""Select a list of outputs from the inputs.
A selector has two use cases:
* Used as parents selector, which selects individuals from the population
as parents for recombination. It will be called before the recombination
step within the :meth:`pyglove.evolution.Evolution.propose` method.
* Used as a population updater, which selects individuals from previous
population as a new population. It will be called everytime the
population is updated, triggered by the
:meth:`pyglove.evolution.Evolution.feedback` method.
Args:
inputs: a list of objects as input.
global_state: An `AttributeDict` object as the global state container,
which is readable/writable during the operation.
step: Number of examples historically proposed, which can be used for
determining a cross over schedule.
"""
@property
def operation_method(self):
return self.select
class DNAOperation(Operation):
"""Operation that takes a list of DNA as both input and output."""
@property
def input_element_type(self) -> Type[pg.DNA]:
"""Retuns the input element type."""
return pg.DNA
@property
def output_element_type(self) -> Type[pg.DNA]:
"""Retuns the output element type."""
return pg.DNA
class Recombinator(DNAOperation):
"""Base class for recombinators."""
# Number of parents allowed for recombination. If None, there is no limit.
NUM_PARENTS = None
@abc.abstractmethod
def recombine(self,
parents: List[pg.DNA],
global_state: pg.geno.AttributeDict,
step: int) -> List[pg.DNA]:
"""Generate a list of child DNA based on the list of parents given.
User should override this method with optional keyword arguments
'global_state' and 'step'.
The parents DNA contains a metadata field 'generation', which is the
generation of the parent DNA. If the Recombinator does not assign this
field for the new child DNA, the child DNA will have the maximum generation
from the parents plus 1.
Args:
parents: Parent trials.
global_state: An `AttributeDict` object as the global state container,
which is readable/writable during the operation.
step: Number of examples historically proposed, which can be used for
determining a cross over schedule.
Returns:
A list of generated child DNA.
"""
def _on_input(self, inputs: List[Any]) -> None:
"""Override to check number of parents."""
if self.NUM_PARENTS is not None and len(inputs) != self.NUM_PARENTS:
raise ValueError(
f'{self.__class__.__name__} supports recombination on exact '
f'{self.NUM_PARENTS} parents. Encountered: {inputs!r}.')
@property
def operation_method(self):
return self.recombine
class Mutator(DNAOperation):
"""Base class for mutators.
A mutator performs a mutation, i.e. a random transformation that converts
a parent DNA to a child DNA. Mutations should reach the full search space
through composition and should prefer local transformations individually.
"""
def _on_bound(self):
super()._on_bound()
self._mutate = make_operation_compatible(self.mutate)
def mutate(
self,
dna: pg.DNA,
global_state: pg.geno.AttributeDict,
step: int = 0) -> typing.Union[pg.DNA, List[pg.DNA]]:
"""Mutates the DNA at a given step.
User should override this method or `mutate_list` method with optional
keyword arguments 'global_state' and 'step'.
Args:
dna: DNA to mutate.
global_state: An `AttributeDict` object as the container of global states.
step: Number of examples historically proposed, which can be used for
determining a mutation schedule.
Returns:
A new DNA or a DNA list as the result of the mutation.
"""
raise NotImplementedError()
def mutate_list(self,
dna_list: List[pg.DNA],
global_state: pg.geno.AttributeDict,
step: int = 0) -> List[pg.DNA]:
"""Mutate the DNA in the input one by one and concatenate their outputs.
User should override this method instead of `mutate` if mutation depends on
the list-wise information. Keyword arguments `global_state` and `step` are
optional when override.
Args:
dna_list: a list of DNA to mutate.
global_state: An `AttributeDict` object as the container of global states.
step: Number of examples historically proposed, which can be used for
determining a mutation schedule.
Returns:
a list of DNA as the result of the mutation.
"""
results = []
for dna in dna_list:
output = self._mutate(dna, global_state=global_state, step=step)
if isinstance(output, list):
results.extend(output)
else:
results.append(output)
return results
@property
def operation_method(self):
return self.mutate_list
@pg.members([
('reproduction', operation_spec(),
'Operation for performing selection and reproduction.'),
('population_init',
pg.typing.Union([
pg.typing.Object(pg.DNAGenerator),
pg.typing.Tuple([pg.typing.Object(pg.DNAGenerator), pg.typing.Int()])
], default=(pg.geno.Random(), 50)),
'A DNAGenerator or a tuple of (initial_population_generator, '
'initial_population_size) as the population initializer for bootstrapping '
'the initial population. If the initial_population_size is not provided, '
'it will delegate all `propose` calls to the initial population generator '
'untill the generator raises a StopIteration error.'),
('population_update', operation_spec().noneable(),
'Operation for population update, which is called every time when the '
'fitness of a proposed DNA is fed back to the algorithm. It passes the '
'previous population with the newly added DNA as the input to this '
'operation, whose return value will be used as the new population. '
'If None, the population will be accumulated without a limit.'),
('multi_objective', pg.typing.Bool(default=False),
'If True, the fitness is a tuple of float numbers as the metrics of '
'multiple objectives. Otherwise the fitness is a float value as the '
'metric for a single objective.')
], init_arg_list=[
'reproduction', 'population_init', 'population_update'
])
class Evolution(pg.DNAGenerator):
"""An evolutionary algorithm based on compositional operations.
Common evolutionary algorithms can be abstracted with 3 operations:
* Select parent(s) from the population.
* Recombine the DNA from the parents into a list of child DNA.
* Mutate each child DNA.
Plus 2 population related operations:
* Initialize the population.
* Update the population when the fitness is determined for the child DNA.
All the operations above except population initialization can be described
by the `Operation` interface, which transforms a list of DNA into another list
of DNA. Moreover, `Operation` allows complex algorithms to be represented as
a composition of elementary operations. For example, Regularized Evolution
can be described as::
pg.evolution.Evolution(
op=(pg.evolution.selectors.Random(10)
>> pg.evolution.selectors.Top(1)
>> pg.evolution.mutators.Uniform()),
population_init=(pg.geno.Random(), 50),
population_update=pg.evolution.selectors.Last(50))
"""
def _setup(self) -> None:
"""Setup the algorithm."""
if isinstance(self.population_init, tuple):
(self._init_population_generator,
self._init_population_size) = self.population_init
else:
self._init_population_generator = self.population_init
self._init_population_size = None
self._init_population_generator.setup(self.dna_spec)
self._reproduction = make_operation_compatible(self.reproduction)
self._population_update = make_operation_compatible(
self.population_update)
# NOTE(daiyip): global state can be accessed as attributes, and new keys
# can be inserted during operations.
self._global_state = pg.geno.AttributeDict(num_generations=0)
self._population_initialized = False
self._population = []
self._pending_proposals = collections.deque()
self._lock = threading.Lock()
@property
def multi_objective(self) -> bool:
"""Returns True if fitness is a tuple of float numbers."""
return self.sym_getattr('multi_objective')
@property
def population(self) -> List[pg.DNA]:
"""Returns current population."""
return self._population
@property
def global_state(self) -> pg.geno.AttributeDict:
"""Returns the global state."""
return self._global_state
@property
def num_generations(self) -> int:
return self._global_state.num_generations
def _propose(self) -> pg.DNA:
"""Implementation of DNA proposal."""
with self._lock:
if not self._pending_proposals:
if self._population_initialized:
# Propose new individuals using evolution.
self._pending_proposals.extend(self._evolve())
else:
# Propose initial population.
try:
dna = self._init_population_generator.propose()
set_proposal_id(dna, self.num_proposals + 1)
set_generation_id(dna, self.num_generations + 1)
_set_initial_population(dna, True)
self._pending_proposals.append(dna)
except StopIteration:
pg.logging.info(
f'Evolution finishes population initialization due to that '
f'the population initializer ('
f'{self._init_population_generator!r}) raises StopIteration '
f'error. (population_size={len(self._population)}')
self._population_initialized = True
self._global_state.num_generations = 1
self._pending_proposals.extend(self._evolve())
return self._pending_proposals.popleft()
def _evolve(self) -> List[pg.DNA]:
"""Performs a single round of evolution process."""
# Step 1: Select parents from the population.
current_step = self.num_proposals
children = self._reproduction(
self._population, global_state=self._global_state, step=current_step)
if not children:
raise ValueError(
'There is no child reproduced, which means that the population '
'(with reward) is empty. This is likely due to that no evaluation is '
'received from existing proposals at this point. Consider to specify '
'the initial population size (through the second item of the '
'`population_init` tuple) or reduce the number of parallel sampling '
'clients.')
for i, child in enumerate(children):
# NOTE(daiyip): If a child's feedback sequence number exists, it's
# an existing DNA from the population, in such case, we should clone
# to avoid the existing population get polluted.
if get_feedback_sequence_number(child) is not None:
child = child.clone(deep=True)
children[i] = child
# Update the 1-based ID and generation information for the DNA.
set_proposal_id(child, current_step + 1 + i)
set_generation_id(child, self.num_generations + 1)
_set_initial_population(child, False)
# Update number of generations in global state.
self._global_state.num_generations += 1
return children
def _feedback(
self, dna: pg.DNA, reward: typing.Union[float, Tuple[float]]) -> None:
"""Feedback a DNA with its reward."""
set_feedback_sequence_number(dna, self._num_feedbacks + 1)
set_fitness(dna, reward)
assert get_fitness(dna) is not None
with self._lock:
# Feedback generation-zero DNA to the population initializer.
if is_initial_population(dna):
self._init_population_generator.feedback(dna, reward)
# We use `num_feedbacks` instead of `num_proposals` to determine
# initial population size to avoid starting the evolution process
# too early without enough feedbacks.
# `self.num_feedbacks` will be incremented after _feedback returns.
# Therefore, we compare it with initial population size - 1.
if (not self._population_initialized
and self._init_population_size is not None
and self.num_feedbacks >= self._init_population_size - 1):
pg.logging.info(
f'Evolution finishes population initialization due to that '
f'the initial population size ({self._init_population_size}) '
f'is reached. (population_size={len(self._population)}')
self._population_initialized = True
self._global_state.num_generations = 1
# Update the population if needed.
self._population.append(dna)
if self._population_update:
self._population = self._population_update(
self._population,
global_state=self._global_state,
step=self.num_feedbacks)
def recover(
self,
history: Iterable[
Tuple[pg.DNA, typing.Union[None, float, Tuple[float]]]]
) -> None:
"""Recover states by replaying the proposal history."""
# Recover the state of the population.
init_population = []
for dna, reward in history:
self._num_proposals += 1
dna.use_spec(self.dna_spec)
if reward is not None:
# NOTE(daiyip): There is a possibility that the client has provided the
# reward to the controller, but the controller process restarted before
# calling the `feedback` method. Such cases can be identified by
# checking the `feedback_sequence_number` metadata.
if get_feedback_sequence_number(dna) is None:
self.feedback(dna, reward)
else:
assert get_fitness(dna) == reward, (dna, reward)
self._population.append(dna)
if self._population_update:
self._population = self._population_update(
self._population,
global_state=self._global_state,
step=self._num_feedbacks)
self._num_feedbacks += 1
if is_initial_population(dna):
init_population.append((dna, reward))
# Recover `self.num_generations`.
generation_id = get_generation_id(dna)
if generation_id > self.num_generations:
self._global_state.num_generations = generation_id
# Recover the state of the population initializer.
if (self._init_population_size is not None
and len(init_population) >= self._init_population_size):
self._population_initialized = True
self._init_population_generator.recover(init_population)
#
# Implementation of compositional operations.
#
class Identity(Operation):
"""Returns the input itself."""
def call(self, inputs: List[Any]) -> List[Any]: # pytype: disable=signature-mismatch
return inputs
@pg.members([
('fn', operation_spec(pg.typing.Any(), pg.typing.Any()),
'A callable object that performs the operation.')
])
class Lambda(Operation):
"""A lambda operation."""
def _on_bound(self):
super()._on_bound()
self._fn = make_operation_compatible(self.fn)
def call(
self,
inputs: List[Any],
global_state: pg.geno.AttributeDict,
step: int = 0) -> List[Any]:
return self._fn(inputs, global_state=global_state, step=step)
@pg.members([
('op', operation_spec(pg.typing.Any(), pg.typing.Any()),
'Operation to repeat if the output is the same as the input.'),
('max_attempts', scalars.scalar_spec(pg.typing.Int(min_value=1)).noneable(),
'Maximum attempts to make if the output is the same as the input.')
])
class UntilChange(Operation):
"""Repeat an opertion until its output is different from the input."""
def _on_bound(self):
super()._on_bound()
self._op = make_operation_compatible(self.op)
def call(self,
inputs: List[Any],
global_state: pg.geno.AttributeDict,
step: int = 0) -> List[Any]:
max_attempts = scalars.scalar_value(self.max_attempts, step)
attempts = 0
while max_attempts is None or attempts < max_attempts:
output = self._op(inputs, global_state=global_state, step=step)
if output != inputs:
break
attempts += 1
return output
@pg.members([
('ops', pg.typing.List(pg.typing.Tuple([
# An operation candidate.
operation_spec(pg.typing.Any(), pg.typing.Any()),
# The probability of applying the operation.
scalars.scalar_spec(pg.typing.Float(min_value=0.0, max_value=1.0))])),
'A list of (operation, probability) tuples, denoting the probability of '
'applying each operation. '
'Example: [(mutator1, 0.25), (mutator2, 0.5)] will '
'result in applying mutator1 to 25% of the DNAs that are mtuated and '
'mutator2 to 50% of the DNAs. The decision to apply each mutator is '
'independent of the application of the others; in particular, both '
'mutator1 and mutator2 will be applied to 12.5% of the examples. The '
'probability can be a callable object that takes an integer step as input '
'and returns a float value as the probability for the step.'),
('limit', scalars.scalar_spec(pg.typing.Int(min_value=0)).noneable(),
'The maximum number of operations that are allowed to perform. '
'If None, there is limit. This is useful when we want to control '
'the max difference between the input and the output.'),
('seed', pg.typing.Int().noneable(), 'Random seed for choosing ops.')
])
class Choice(Operation):
"""Applying a list of operations with probabilities in a pipeline.
Example::
pg.evolution.Choice([
(pg.evolution.mutators.Uniform() ** 3, 0.5)
(pg.evolution.mutators.Swap(), 0.4)
], limit=1)
The code above apply 3 pipelined uniform mutation with probability 0.5, and
apply 1 swap between two multiple choices with probability 0.4. At most 1
operation can be applied.
"""
def _on_bound(self):
super()._on_bound()
self._random = random if self.seed is None else random.Random(self.seed)
self._ops = [(make_operation_compatible(op), prob)
for op, prob in self.ops]
def call(self,
inputs: List[Any],
global_state: pg.geno.AttributeDict,
step: int = 0) -> List[Any]:
num_performed_ops = 0
for op, prob in self._ops:
prob = scalars.scalar_value(prob, step)
if self._random.random() < prob:
inputs = op(inputs, global_state=global_state, step=step)
num_performed_ops += 1
if self.limit is not None and num_performed_ops == self.limit:
break
return inputs
@pg.members([
('predicate', pg.typing.Callable(
[pg.typing.List(pg.typing.Any())], returns=pg.typing.Bool()),
'A callable object that takes the output from previous operation as '
'input, with optional `global_step` and `step` arguments, to tell which '
'branch should be enabled.'),
('true_op', operation_spec(pg.typing.Any(), pg.typing.Any()).noneable(),
'An operation that will be applied when predicate returns True.'),
('false_op', operation_spec(pg.typing.Any(), pg.typing.Any()).noneable(),
'An operation that will be applied when predicate returns False. '
'If None, an empty operation will be applied.')
])
class Conditional(Operation):
"""Conditional operation.
A conditional operation allows sub-operations to be applied only when a
condition is met.
Example::
pg.evolution.Conditional(
lambda x: len(x) > 10,
true_op=Top(10))
which is equivalent to::
Top(10).if_true(lambda x: len(x) > 10)
"""
def _on_bound(self):
super()._on_bound()
self._predicate = make_operation_compatible(self.predicate)
self._true_op = make_operation_compatible(self.true_op)
self._false_op = make_operation_compatible(self.false_op)
def call(self,
inputs: List[Any],
global_state: pg.geno.AttributeDict,
step: int = 0) -> List[Any]:
if self._predicate(inputs, global_state=global_state, step=step):
branch = self._true_op
else:
branch = self._false_op
if branch is not None:
return branch(inputs, global_state=global_state, step=step)
else:
return inputs
@pg.members([
('ops', pg.typing.List(
operation_spec(pg.typing.Any(), pg.typing.Any()), min_size=2),