Skip to content
This repository has been archived by the owner on Apr 16, 2022. It is now read-only.

Commit

Permalink
more consistent test results
Browse files Browse the repository at this point in the history
  • Loading branch information
haliphax committed Jan 19, 2016
1 parent 0d9316f commit 5197bf7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
12 changes: 7 additions & 5 deletions rodario/actors/clusterproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,15 @@ def _handler(self, message):
# throw its value in the associated response queue
data = pickle.loads(message['data'])
queue = data[0]
self._response_queues[queue].put(data[1])

if data[1] is not False:
self._response_queues[queue].put(data[1])

self._response_counter[queue] -= 1

if self._response_counter[queue] <= 0:
del self._response_counter[queue]
self._response_queues.pop(queue, None)
del self._response_queues[queue]

def _proxy(self, method_name, *args, **kwargs):
"""
Expand All @@ -125,11 +128,10 @@ def _proxy(self, method_name, *args, **kwargs):

# create a unique response queue for retrieving the return value async
queue = str(uuid4())
data = (self.proxyid, queue, method_name, args, kwargs,)
# fire off the method call to the original Actors over pubsub
count = self._redis.publish('cluster:%s' % self.channel,
pickle.dumps((self.proxyid, queue,
method_name, args,
kwargs,)))
pickle.dumps(data))

if count == 0:
raise EmptyClusterException()
Expand Down
48 changes: 23 additions & 25 deletions rodario/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ def __call__(self, *args, **kwargs):
for callee in self.bindings:
result = callee(self._instance, *args, **kwargs)

if result != None:
return result
else:
return self._func(self._instance, *args, **kwargs)
if result is not None:
print 'Returning %r' % result
return result

result = self._func(self._instance, *args, **kwargs)
print 'Actual return %r' % result
return result

def blocking(func):
"""
Block the thread and return the proxied method call's result.
Block the thread and return the proxied method call's result. This is
sloppy and needs some work - the actual blocking is done within the Actor
class after checking the method's decorations.
:param instancemethod func: The function to wrap
:rtype: :class:`rodario.decorators.DecoratedMethod`
Expand All @@ -68,18 +72,17 @@ def blocking(func):

def singular(func):
"""
First-come, first-served cluster channel call.
First-come, first-served cluster channel call. Unlike the @blocking
decorator, this is self-contained. All of the work happens right here.
Needs some work - should accept parameters for context and expiry.
:param instancemethod func: The function to wrap
:param int expiry: The TTL (in seconds) for the lock
:param str context: The context for the lock name. If None, uses the
'global.lock' context.
:rtype: :class:`rodario.decorators.DecoratedMethod`
"""

from time import time

expiry = 10
expiry = 2
context = None

# pylint: disable=W0613,W0212
Expand All @@ -88,31 +91,26 @@ def call_singular(self, *args, **kwargs):
Call the method if we can get a lock.
:rtype: mixed
:returns: The function result if a lock is acquired; otherwise, None.
:returns: The function result if a lock is acquired; otherwise, False.
"""

current = time()
lock_expires = current + expiry + 1
lock_context = 'global.lock' if not context else context
lock_name = '%s:%s' % (lock_context, func.__name__)
have_lock = True

# try to get lock; if we fail, do sanity check on lock
if not self._redis.setnx(lock_name, lock_expires):
have_lock = False

# see if current lock is expired; if so, take it
if (current >= float(self._redis.get(lock_name))
and current >= float(self._redis.getset(lock_name,
lock_expires))):
have_lock = True

if have_lock:
# we have the lock; give it a TTL and call our function
self._redis.expire(lock_name, expiry)
return func(self, *args, **kwargs)

return None
if current < float(self._redis.get(lock_name)):
# lock is not expired
return False
elif current < float(self._redis.getset(lock_name, lock_expires)):
# somebody else beat us to it
return False

# we have the lock; give it a TTL and pass through
self._redis.expire(lock_name, expiry)

# if it's already a DecoratedMethod, just add to it
if isinstance(func, DecoratedMethod):
Expand Down
6 changes: 3 additions & 3 deletions rodario/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test(self):
def test_singular(self):
""" Simple singular cluster method. """

return 2
return 3

@singular
@blocking
Expand Down Expand Up @@ -96,7 +96,7 @@ def testSingularMethod(self):
# first value is number of workers in cluster pool
self.assertEqual(2, future.get())
# second value is the actual function call result
self.assertEqual(2, future.get(timeout=10))
self.assertEqual(3, future.get(timeout=3))
self._redis.delete('global.lock:test_singular')
self.actor.part('decorators_test')
self.costar.part('decorators_test')
Expand All @@ -112,7 +112,7 @@ def testLockExpiry(self):
# first value is number of workers in cluster pool
self.assertEqual(2, future.get())
# second value is the actual function call result
self.assertEqual(3, future.get(timeout=10))
self.assertEqual(3, future.get(timeout=3))
self.actor.part('decorators_test')
self.costar.part('decorators_test')

Expand Down

0 comments on commit 5197bf7

Please sign in to comment.