Skip to content

Commit

Permalink
Issue #12091: simplify ApplyResult and MapResult with threading.Event
Browse files Browse the repository at this point in the history
Patch by Charles-François Natali
  • Loading branch information
ask committed May 25, 2012
1 parent e3a1f96 commit 244f090
Showing 1 changed file with 11 additions and 20 deletions.
31 changes: 11 additions & 20 deletions billiard/pool.py
Expand Up @@ -1277,10 +1277,9 @@ def __init__(self, cache, callback, accept_callback=None,
timeout=None, lost_worker_timeout=LOST_WORKER_TIMEOUT,
on_timeout_set=None, on_timeout_cancel=None):
self._mutex = threading.Lock()
self._cond = threading.Condition(threading.Lock())
self._event = threading.Event()
self._job = job_counter.next()
self._cache = cache
self._ready = False
self._callback = callback
self._accept_callback = accept_callback
self._error_callback = error_callback
Expand All @@ -1297,26 +1296,24 @@ def __init__(self, cache, callback, accept_callback=None,
cache[self._job] = self

def ready(self):
return self._ready
return self._event.isSet()

def accepted(self):
return self._accepted

def successful(self):
assert self._ready
assert self.ready()
return self._success

def worker_pids(self):
return filter(None, [self._worker_pid])

def wait(self, timeout=None):
with self._cond:
if not self._ready:
self._cond.wait(timeout)
self._event.wait(timeout)

def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
if not self.ready():
raise TimeoutError
if self._success:
return self._value
Expand All @@ -1328,9 +1325,7 @@ def _set(self, i, obj):
if self._on_timeout_cancel:
self._on_timeout_cancel(self)
self._success, self._value = obj
with self._cond:
self._ready = True
self._cond.notify()
self._event.set()
if self._accepted:
self._cache.pop(self._job, None)

Expand All @@ -1347,7 +1342,7 @@ def _ack(self, i, time_accepted, pid):
self._accepted = True
self._time_accepted = time_accepted
self._worker_pid = pid
if self._ready:
if self.ready():
self._cache.pop(self._job, None)
if self._on_timeout_set:
self._on_timeout_set(self, self._soft_timeout, self._timeout)
Expand All @@ -1374,7 +1369,7 @@ def __init__(self, cache, chunksize, length, callback, error_callback):
self._chunksize = chunksize
if chunksize <= 0:
self._number_left = 0
self._ready = True
self._event.set()
else:
self._number_left = length // chunksize + bool(length % chunksize)

Expand All @@ -1388,19 +1383,15 @@ def _set(self, i, success_result):
self._callback(self._value)
if self._accepted:
self._cache.pop(self._job, None)
with self._cond:
self._ready = True
self._cond.notify()
self._event.set()
else:
self._success = False
self._value = result
if self._error_callback:
self._error_callback(self._value)
if self._accepted:
self._cache.pop(self._job, None)
with self._cond:
self._ready = True
self._cond.notify()
self._event.set()

def _ack(self, i, time_accepted, pid):
start = i * self._chunksize
Expand All @@ -1409,7 +1400,7 @@ def _ack(self, i, time_accepted, pid):
self._accepted[j] = True
self._worker_pid[j] = pid
self._time_accepted[j] = time_accepted
if self._ready:
if self.ready():
self._cache.pop(self._job, None)

def accepted(self):
Expand Down

0 comments on commit 244f090

Please sign in to comment.