Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ __pycache__
dist/
docs/_build
pip-log.txt
.tox
35 changes: 25 additions & 10 deletions rules/predicates.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import operator
import threading
from functools import partial, update_wrapper

Expand Down Expand Up @@ -144,35 +145,52 @@ def test(self, obj=NO_VALUE, target=NO_VALUE):
args = tuple(arg for arg in (obj, target) if arg is not NO_VALUE)
_context.stack.append(Context(args))
try:
return self._apply(True, *args)
return self._apply(*args)
finally:
_context.stack.pop()

def _compare_predicates(self, other, op, args):
try:
self_result = self._apply(*args)
except SkipPredicate:
try:
return other._apply(*args)
except SkipPredicate:
return False
else:
try:
other_result = other._apply(*args)
except SkipPredicate:
return self_result
else:
return op(self_result, other_result)


def __and__(self, other):
def AND(*args):
return self._apply(True, *args) and other._apply(True, *args)
return self._compare_predicates(other, operator.and_, args)
return type(self)(AND, '(%s & %s)' % (self.name, other.name))

def __or__(self, other):
def OR(*args):
return self._apply(True, *args) or other._apply(True, *args)
return self._compare_predicates(other, operator.or_, args)
return type(self)(OR, '(%s | %s)' % (self.name, other.name))

def __xor__(self, other):
def XOR(*args):
return self._apply(True, *args) ^ other._apply(True, *args)
return self._compare_predicates(other, operator.xor, args)
return type(self)(XOR, '(%s ^ %s)' % (self.name, other.name))

def __invert__(self):
def INVERT(*args):
return not self._apply(False, *args)
return not self._apply(*args)
if self.name.startswith('~'):
name = self.name[1:]
else:
name = '~' + self.name
return type(self)(INVERT, name)

def _apply(self, default, *args):
def _apply(self, *args):
# Internal method that is used to invoke the predicate with the
# proper number of positional arguments, inside the current
# invocation context.
Expand All @@ -184,10 +202,7 @@ def _apply(self, default, *args):
callargs = args[:self.num_args]
if self.bind:
callargs = (self,) + callargs
try:
return bool(self.fn(*callargs))
except SkipPredicate:
return default
return bool(self.fn(*callargs))


def predicate(fn=None, name=None, **options):
Expand Down
6 changes: 5 additions & 1 deletion tests/testsuite/test_predicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,15 @@ def requires_one_arg(a):

# because requires_two_args is called with only one argument
# its result is not taken into account, only the result of the
# other predicates matters.
# other predicate matters.
assert (requires_two_args & requires_one_arg).test(True)
assert not (requires_two_args & requires_one_arg).test(False)
assert (~requires_two_args & requires_one_arg).test(True)
assert not (~requires_two_args & requires_one_arg).test(False)
assert not (~requires_two_args | requires_one_arg).test(False)
assert not (requires_two_args | requires_one_arg).test(False)
assert (~requires_two_args | requires_one_arg).test(True)
assert (requires_two_args | requires_one_arg).test(True)


def test_invocation_context():
Expand Down