Run tests by executing make test
.
Add a cache to a function such that multiple calls with the same args will return cached results. Supports an optional cache timeout which will flush items from the cache after a set interval for recomputation.
from wraptor.decorators import memoize
@memoize()
def foo(bar, baz):
print(bar, baz)
foo(1, 2)
# prints (1, 2)
foo(3, 4)
# prints (3, 4)
foo(1, 2)
# no-op
Supports timeouts!
@memoize(timeout=.5)
def foo(bar, baz):
print(bar, baz)
foo(1, 2)
# prints (1, 2)
foo(1, 2)
# no-op
import time
time.sleep(2)
foo(1, 2)
# prints (1, 2)
Supports attaching to an instance method!
class foo(object):
@memoize(instance_method=True)
def bar(self, a, b):
return random()
f = foo()
f2 = foo()
# they don't share a cache!
f.bar(1,2) != f2.bar(1,2)
Throttle a function to firing at most 1 time per interval. The function is fired on the forward edge (meaning it will fire the first time you call it).
from wraptor.decorators import throttle
import time
@throttle(.5)
def foo(bar, baz):
print(bar, baz)
foo(1, 2)
# prints (1, 2)
foo(3, 4)
# no-op
time.sleep(1)
foo(5, 6)
# prints (1, 2)
Supports attaching to an instance method!
arr = []
class foo(object):
@throttle(1, instance_method=True)
def bar(self):
arr.append(1)
x = foo()
x2 = foo()
x.bar()
x2.bar()
# they don't share the same throttle!
assert arr == [1, 1]
By default throttle passes through the return value of the wrapped function if it was called, or None if the def was not called. Use return_throttle_result=True to instead return a bool:
@throttle(1, return_throttle_result=True)
def foo():
pass
assert foo() is True, 'True means "called"'
assert foo() is False, 'False means "not called" (throttled)'
Timeout uses signal under the hood to allow you to add timeouts to any function. The only caveat is that signal.alarm can only be used in the main thread of execution (so multi-threading programs can't use this decorator in sub-threads).
The timeout value must be a positive integer.
from wraptor.decorators import timeout, TimeoutException
import time
@timeout(1)
def heavy_workload():
# simulate heavy work
time.sleep(10)
try:
heavy_workload()
except TimeoutException:
print('workload timed out')
You can also catch the timeout exception from inside the function:
@timeout(1)
def heavy_workload():
try:
# simulate heavy work
time.sleep(10)
except TimeoutException:
print('workload timed out')
exception_catcher is a helpful method for dealing with threads that may raise an exception. It is especially useful for testing.
from wraptor.decorators import exception_catcher
@exception_catcher
def work():
raise Exception()
t = threading.Thread(target=work)
t.start()
t.join()
try:
work.check()
except Exception as e:
print e
Throttle a with statement to executing its body at most 1 time per interval. The body is fired on the forward edge (meaning it will fire the first time you call it).
from wraptor.context import throttle
import time
throttler = throttle(seconds=3)
def foo():
with throttler:
print 'bar'
foo()
# prints bar
sleep(2)
foo()
# does nothing
sleep(2)
foo()
# prints bar
Execute a with block based on the results of a predicate.
from wraptor.context import maybe
def foo(cond):
with maybe(lambda: cond == 5):
print 'bar'
foo(5)
# prints bar
foo(3)
# does nothing
Time a block of code.
from wraptor.context import timer
def foo(cond):
with timer('my slow method') as t:
expensive_stuff()
print t
foo()
# prints "my slow method took 435.694 ms"