/
strategies.py
675 lines (564 loc) · 24.3 KB
/
strategies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
# coding=utf-8
#
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Most of this work is copyright (C) 2013-2019 David R. MacIver
# (david@drmaciver.com), but it contains contributions by others. See
# CONTRIBUTING.rst for a full list of people who may hold copyright, and
# consult the git log if you need to determine who owns an individual
# contribution.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
#
# END HEADER
from __future__ import absolute_import, division, print_function
from collections import defaultdict
import hypothesis.internal.conjecture.utils as cu
from hypothesis._settings import Phase
from hypothesis.control import _current_build_context, assume
from hypothesis.errors import (
HypothesisException,
NoExamples,
NoSuchExample,
Unsatisfiable,
UnsatisfiedAssumption,
)
from hypothesis.internal.compat import bit_length, hrange
from hypothesis.internal.conjecture.utils import (
LABEL_MASK,
calc_label_from_cls,
calc_label_from_name,
combine_labels,
)
from hypothesis.internal.coverage import check_function
from hypothesis.internal.lazyformat import lazyformat
from hypothesis.internal.reflection import get_pretty_function_description
from hypothesis.internal.validation import check_type
from hypothesis.utils.conventions import UniqueIdentifier
try:
from random import Random # noqa
from typing import List, Callable, TypeVar, Generic, Optional # noqa
Ex = TypeVar("Ex", covariant=True)
T = TypeVar("T")
from hypothesis.internal.conjecture.data import ConjectureData # noqa
except ImportError: # pragma: no cover
Ex = "key" # type: ignore
Generic = {Ex: object} # type: ignore
calculating = UniqueIdentifier("calculating")
MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL = calc_label_from_name(
"another attempted draw in MappedSearchStrategy"
)
def one_of_strategies(xs):
"""Helper function for unioning multiple strategies."""
xs = tuple(xs)
if not xs:
raise ValueError("Cannot join an empty list of strategies")
return OneOfStrategy(xs)
class SearchStrategy(Generic[Ex]):
"""A SearchStrategy is an object that knows how to explore data of a given
type.
Except where noted otherwise, methods on this class are not part of
the public API and their behaviour may change significantly between
minor version releases. They will generally be stable between patch
releases.
"""
supports_find = True
validate_called = False
__label = None
def recursive_property(name, default):
"""Handle properties which may be mutually recursive among a set of
strategies.
These are essentially lazily cached properties, with the ability to set
an override: If the property has not been explicitly set, we calculate
it on first access and memoize the result for later.
The problem is that for properties that depend on each other, a naive
calculation strategy may hit infinite recursion. Consider for example
the property is_empty. A strategy defined as x = st.deferred(lambda x)
is certainly empty (in order to draw a value from x we would have to
draw a value from x, for which we would have to draw a value from x,
...), but in order to calculate it the naive approach would end up
calling x.is_empty in order to calculate x.is_empty in order to etc.
The solution is one of fixed point calculation. We start with a default
value that is the value of the property in the absence of evidence to
the contrary, and then update the values of the property for all
dependent strategies until we reach a fixed point.
The approach taken roughly follows that in section 4.2 of Adams,
Michael D., Celeste Hollenbeck, and Matthew Might. "On the complexity
and performance of parsing with derivatives." ACM SIGPLAN Notices 51.6
(2016): 224-236.
"""
cache_key = "cached_" + name
calculation = "calc_" + name
force_key = "force_" + name
def forced_value(target):
try:
return getattr(target, force_key)
except AttributeError:
return getattr(target, cache_key)
def accept(self):
try:
return forced_value(self)
except AttributeError:
pass
mapping = {}
hit_recursion = [False]
# For a first pass we do a direct recursive calculation of the
# property, but we block recursively visiting a value in the
# computation of its property: When that happens, we simply
# note that it happened and return the default value.
def recur(strat):
try:
return forced_value(strat)
except AttributeError:
pass
try:
result = mapping[strat]
if result is calculating:
hit_recursion[0] = True
return default
else:
return result
except KeyError:
mapping[strat] = calculating
mapping[strat] = getattr(strat, calculation)(recur)
return mapping[strat]
recur(self)
# If we hit self-recursion in the computation of any strategy
# value, our mapping at the end is imprecise - it may or may
# not have the right values in it. We now need to proceed with
# a more careful fixed point calculation to get the exact
# values. Hopefully our mapping is still pretty good and it
# won't take a large number of updates to reach a fixed point.
if hit_recursion[0]:
needs_update = set(mapping)
# We track which strategies use which in the course of
# calculating their property value. If A ever uses B in
# the course of calculating its value, then whenever the
# value of B changes we might need to update the value of
# A.
listeners = defaultdict(set)
else:
needs_update = None
def recur2(strat):
def recur_inner(other):
try:
return forced_value(other)
except AttributeError:
pass
listeners[other].add(strat)
try:
return mapping[other]
except KeyError:
needs_update.add(other)
mapping[other] = default
return default
return recur_inner
count = 0
seen = set()
while needs_update:
count += 1
# If we seem to be taking a really long time to stabilize we
# start tracking seen values to attempt to detect an infinite
# loop. This should be impossible, and most code will never
# hit the count, but having an assertion for it means that
# testing is easier to debug and we don't just have a hung
# test.
# Note: This is actually covered, by test_very_deep_deferral
# in tests/cover/test_deferred_strategies.py. Unfortunately it
# runs into a coverage bug. See
# https://bitbucket.org/ned/coveragepy/issues/605/
# for details.
if count > 50: # pragma: no cover
key = frozenset(mapping.items())
assert key not in seen, (key, name)
seen.add(key)
to_update = needs_update
needs_update = set()
for strat in to_update:
new_value = getattr(strat, calculation)(recur2(strat))
if new_value != mapping[strat]:
needs_update.update(listeners[strat])
mapping[strat] = new_value
# We now have a complete and accurate calculation of the
# property values for everything we have seen in the course of
# running this calculation. We simultaneously update all of
# them (not just the strategy we started out with).
for k, v in mapping.items():
setattr(k, cache_key, v)
return getattr(self, cache_key)
accept.__name__ = name
return property(accept)
# Returns True if this strategy can never draw a value and will always
# result in the data being marked invalid.
# The fact that this returns False does not guarantee that a valid value
# can be drawn - this is not intended to be perfect, and is primarily
# intended to be an optimisation for some cases.
is_empty = recursive_property("is_empty", True)
# Returns True if values from this strategy can safely be reused without
# this causing unexpected behaviour.
has_reusable_values = recursive_property("has_reusable_values", True)
# Whether this strategy is suitable for holding onto in a cache.
is_cacheable = recursive_property("is_cacheable", True)
def calc_is_cacheable(self, recur):
return True
def calc_is_empty(self, recur):
# Note: It is correct and significant that the default return value
# from calc_is_empty is False despite the default value for is_empty
# being true. The reason for this is that strategies should be treated
# as empty absent evidence to the contrary, but most basic strategies
# are trivially non-empty and it would be annoying to have to override
# this method to show that.
return False
def calc_has_reusable_values(self, recur):
return False
def example(self, random=None):
# type: (Random) -> Ex
"""Provide an example of the sort of value that this strategy
generates. This is biased to be slightly simpler than is typical for
values from this strategy, for clarity purposes.
This method shouldn't be taken too seriously. It's here for interactive
exploration of the API, not for any sort of real testing.
This method is part of the public API.
"""
context = _current_build_context.value
if context is not None:
if context.data is not None and context.data.depth > 0:
raise HypothesisException(
"Using example() inside a strategy definition is a bad "
"idea. Instead consider using hypothesis.strategies.builds() "
"or @hypothesis.strategies.composite to define your strategy."
" See https://hypothesis.readthedocs.io/en/latest/data.html"
"#hypothesis.strategies.builds or "
"https://hypothesis.readthedocs.io/en/latest/data.html"
"#composite-strategies for more details."
)
else:
raise HypothesisException(
"Using example() inside a test function is a bad "
"idea. Instead consider using hypothesis.strategies.data() "
"to draw more examples during testing. See "
"https://hypothesis.readthedocs.io/en/latest/data.html"
"#drawing-interactively-in-tests for more details."
)
from hypothesis import find, settings, Verbosity
# Conjecture will always try the zero example first. This would result
# in us producing the same example each time, which is boring, so we
# deliberately skip the first example it feeds us.
first = [] # type: list
def condition(x):
if first:
return True
else:
first.append(x)
return False
try:
return find(
self,
condition,
random=random,
settings=settings(
database=None,
verbosity=Verbosity.quiet,
phases=tuple(set(Phase) - {Phase.shrink}),
),
)
except (NoSuchExample, Unsatisfiable):
# This can happen when a strategy has only one example. e.g.
# st.just(x). In that case we wanted the first example after all.
if first:
return first[0]
raise NoExamples(u"Could not find any valid examples in 100 tries")
def map(self, pack):
# type: (Callable[[Ex], T]) -> SearchStrategy[T]
"""Returns a new strategy that generates values by generating a value
from this strategy and then calling pack() on the result, giving that.
This method is part of the public API.
"""
return MappedSearchStrategy(pack=pack, strategy=self)
def flatmap(self, expand):
# type: (Callable[[Ex], SearchStrategy[T]]) -> SearchStrategy[T]
"""Returns a new strategy that generates values by generating a value
from this strategy, say x, then generating a value from
strategy(expand(x))
This method is part of the public API.
"""
from hypothesis.searchstrategy.flatmapped import FlatMapStrategy
return FlatMapStrategy(expand=expand, strategy=self)
def filter(self, condition):
# type: (Callable[[Ex], bool]) -> SearchStrategy[Ex]
"""Returns a new strategy that generates values from this strategy
which satisfy the provided condition. Note that if the condition is too
hard to satisfy this might result in your tests failing with
Unsatisfiable.
This method is part of the public API.
"""
return FilteredStrategy(conditions=(condition,), strategy=self)
def do_filtered_draw(self, data, filter_strategy):
# Hook for strategies that want to override the behaviour of
# FilteredStrategy. Most strategies don't, so by default we delegate
# straight back to the default filtered-draw implementation.
return filter_strategy.default_do_filtered_draw(data)
@property
def branches(self):
# type: () -> List[SearchStrategy[Ex]]
return [self]
def __or__(self, other):
"""Return a strategy which produces values by randomly drawing from one
of this strategy or the other strategy.
This method is part of the public API.
"""
if not isinstance(other, SearchStrategy):
raise ValueError("Cannot | a SearchStrategy with %r" % (other,))
return one_of_strategies((self, other))
def validate(self):
# type: () -> None
"""Throw an exception if the strategy is not valid.
This can happen due to lazy construction
"""
if self.validate_called:
return
try:
self.validate_called = True
self.do_validate()
self.is_empty
self.has_reusable_values
except Exception:
self.validate_called = False
raise
LABELS = {} # type: dict
@property
def class_label(self):
cls = self.__class__
try:
return cls.LABELS[cls]
except KeyError:
pass
result = calc_label_from_cls(cls)
cls.LABELS[cls] = result
return result
@property
def label(self):
if self.__label is calculating:
return 0
if self.__label is None:
self.__label = calculating
self.__label = self.calc_label()
return self.__label
def calc_label(self):
return self.class_label
def do_validate(self):
pass
def do_draw(self, data):
# type: (ConjectureData) -> Ex
raise NotImplementedError("%s.do_draw" % (type(self).__name__,))
def __init__(self):
pass
class OneOfStrategy(SearchStrategy):
"""Implements a union of strategies. Given a number of strategies this
generates values which could have come from any of them.
The conditional distribution draws uniformly at random from some
non-empty subset of these strategies and then draws from the
conditional distribution of that strategy.
"""
def __init__(self, strategies):
SearchStrategy.__init__(self)
strategies = tuple(strategies)
self.original_strategies = list(strategies)
self.__element_strategies = None
self.__in_branches = False
def calc_is_empty(self, recur):
return all(recur(e) for e in self.original_strategies)
def calc_has_reusable_values(self, recur):
return all(recur(e) for e in self.original_strategies)
def calc_is_cacheable(self, recur):
return all(recur(e) for e in self.original_strategies)
@property
def element_strategies(self):
if self.__element_strategies is None:
strategies = []
for arg in self.original_strategies:
check_strategy(arg)
if not arg.is_empty:
strategies.extend([s for s in arg.branches if not s.is_empty])
pruned = []
seen = set()
for s in strategies:
if s is self:
continue
if s in seen:
continue
seen.add(s)
pruned.append(s)
branch_labels = []
shift = bit_length(len(pruned))
for i, p in enumerate(pruned):
branch_labels.append(
(((self.label ^ p.label) << shift) + i) & LABEL_MASK
)
self.__element_strategies = pruned
self.__branch_labels = tuple(branch_labels)
return self.__element_strategies
@property
def branch_labels(self):
self.element_strategies
assert len(self.__branch_labels) == len(self.element_strategies)
return self.__branch_labels
def calc_label(self):
return combine_labels(
self.class_label, *[p.label for p in self.original_strategies]
)
def do_draw(self, data):
# type: (ConjectureData) -> Ex
n = len(self.element_strategies)
assert n > 0
if n == 1:
return data.draw(self.element_strategies[0])
i = cu.integer_range(data, 0, n - 1)
return data.draw(self.element_strategies[i], label=self.branch_labels[i])
def __repr__(self):
return "one_of(%s)" % ", ".join(map(repr, self.original_strategies))
def do_validate(self):
for e in self.element_strategies:
e.validate()
@property
def branches(self):
if not self.__in_branches:
try:
self.__in_branches = True
return self.element_strategies
finally:
self.__in_branches = False
else:
return [self]
class MappedSearchStrategy(SearchStrategy):
"""A strategy which is defined purely by conversion to and from another
strategy.
Its parameter and distribution come from that other strategy.
"""
def __init__(self, strategy, pack=None):
SearchStrategy.__init__(self)
self.mapped_strategy = strategy
if pack is not None:
self.pack = pack
def calc_is_empty(self, recur):
return recur(self.mapped_strategy)
def calc_is_cacheable(self, recur):
return recur(self.mapped_strategy)
def __repr__(self):
if not hasattr(self, "_cached_repr"):
self._cached_repr = "%r.map(%s)" % (
self.mapped_strategy,
get_pretty_function_description(self.pack),
)
return self._cached_repr
def do_validate(self):
self.mapped_strategy.validate()
def pack(self, x): # type: ignore
"""Take a value produced by the underlying mapped_strategy and turn it
into a value suitable for outputting from this strategy."""
raise NotImplementedError("%s.pack()" % (self.__class__.__name__))
def do_draw(self, data):
# type: (ConjectureData) -> Ex
for _ in range(3):
i = data.index
try:
data.start_example(MAPPED_SEARCH_STRATEGY_DO_DRAW_LABEL)
result = self.pack(data.draw(self.mapped_strategy))
data.stop_example()
return result
except UnsatisfiedAssumption:
data.stop_example(discard=True)
if data.index == i:
raise
raise UnsatisfiedAssumption()
@property
def branches(self):
# type: () -> List[SearchStrategy[Ex]]
return [
MappedSearchStrategy(pack=self.pack, strategy=strategy)
for strategy in self.mapped_strategy.branches
]
filter_not_satisfied = UniqueIdentifier("filter not satisfied")
class FilteredStrategy(SearchStrategy):
def __init__(self, strategy, conditions):
super(FilteredStrategy, self).__init__()
if isinstance(strategy, FilteredStrategy):
# Flatten chained filters into a single filter with multiple
# conditions.
self.flat_conditions = strategy.flat_conditions + conditions
self.filtered_strategy = strategy.filtered_strategy
else:
self.flat_conditions = conditions
self.filtered_strategy = strategy
assert self.flat_conditions
assert isinstance(self.flat_conditions, tuple)
assert not isinstance(self.filtered_strategy, FilteredStrategy)
self.__condition = None
def calc_is_empty(self, recur):
return recur(self.filtered_strategy)
def calc_is_cacheable(self, recur):
return recur(self.filtered_strategy)
def __repr__(self):
if not hasattr(self, "_cached_repr"):
self._cached_repr = "%r%s" % (
self.filtered_strategy,
"".join(
".filter(%s)" % get_pretty_function_description(cond)
for cond in self.flat_conditions
),
)
return self._cached_repr
def do_validate(self):
self.filtered_strategy.validate()
@property
def condition(self):
if self.__condition is None:
assert self.flat_conditions
if len(self.flat_conditions) == 1:
# Avoid an extra indirection in the common case of only one
# condition.
self.__condition = self.flat_conditions[0]
else:
self.__condition = lambda x: all(
cond(x) for cond in self.flat_conditions
)
return self.__condition
def do_draw(self, data):
# type: (ConjectureData) -> Ex
result = self.filtered_strategy.do_filtered_draw(
data=data, filter_strategy=self
)
if result is not filter_not_satisfied:
return result
data.note_event("Aborted test because unable to satisfy %r" % (self,))
data.mark_invalid()
raise AssertionError("Unreachable, for Mypy") # pragma: no cover
def note_retried(self, data):
data.note_event(lazyformat("Retried draw from %r to satisfy filter", self))
def default_do_filtered_draw(self, data):
for i in hrange(3):
start_index = data.index
value = data.draw(self.filtered_strategy)
if self.condition(value):
return value
else:
if i == 0:
self.note_retried(data)
# This is to guard against the case where we consume no data.
# As long as we consume data, we'll eventually pass or raise.
# But if we don't this could be an infinite loop.
assume(data.index > start_index)
return filter_not_satisfied
@property
def branches(self):
# type: () -> List[SearchStrategy[Ex]]
return [
FilteredStrategy(strategy=strategy, conditions=self.flat_conditions)
for strategy in self.filtered_strategy.branches
]
@check_function
def check_strategy(arg, name=""):
check_type(SearchStrategy, arg, name)