Skip to content

Commit

Permalink
Pool: Support running without TimeoutHandler thread.
Browse files Browse the repository at this point in the history
- The with_*_thread arguments has also been replaced with
  a single `threads=True` argument.

- Two new pool callbacks:

    - ``on_timeout_set(job, soft, hard)``

        Applied when a task is executed with a timeout.

    - ``on_timeout_cancel(job)``

        Applied when a timeout is cancelled (the job completed)
  • Loading branch information
ask committed May 22, 2012
1 parent 42fbb7a commit e3a1f96
Showing 1 changed file with 123 additions and 85 deletions.
208 changes: 123 additions & 85 deletions billiard/pool.py
Expand Up @@ -118,7 +118,7 @@ def safe_apply_callback(fun, *args):


def stop_if_not_current(thread, timeout=None):
if thread is not threading.current_thread():
if thread is not threading.currentThread():
thread.stop(timeout)


Expand Down Expand Up @@ -444,67 +444,65 @@ def __init__(self, processes, cache, t_soft, t_hard):
self.cache = cache
self.t_soft = t_soft
self.t_hard = t_hard
self._it = None
super(TimeoutHandler, self).__init__()

def body(self):
processes = self.processes
def _process_by_pid(self, pid):
for index, process in enumerate(self.processes):
if process.pid == pid:
return process, index
return None, None

def on_soft_timeout(self, job):
debug('soft time limit exceeded for %r', job)
process, _index = self._process_by_pid(job._worker_pid)
if not process:
return

# Run timeout callback
if job._timeout_callback is not None:
job._timeout_callback(soft=True, timeout=job._soft_timeout)

try:
os.kill(job._worker_pid, SIG_SOFT_TIMEOUT)
except OSError, exc:
if exc.errno != errno.ESRCH:
raise

def on_hard_timeout(self, job):
if job.ready():
return
debug('hard time limit exceeded for %r', job)
# Remove from cache and set return value to an exception
try:
raise TimeLimitExceeded(job._timeout)
except TimeLimitExceeded:
job._set(job._job, (False, ExceptionInfo()))
else: # pragma: no cover
pass

# Remove from _pool
process, _index = self._process_by_pid(job._worker_pid)

# Run timeout callback
if job._timeout_callback is not None:
job._timeout_callback(soft=False, timeout=job._timeout)
if process:
process.terminate()

def handle_timeouts(self):
cache = self.cache
t_hard, t_soft = self.t_hard, self.t_soft
dirty = set()

def _process_by_pid(pid):
for index, process in enumerate(processes):
if process.pid == pid:
return process, index
return None, None
on_soft_timeout = self.on_soft_timeout
on_hard_timeout = self.on_hard_timeout

def _timed_out(start, timeout):
if not start or not timeout:
return False
if time.time() >= start + timeout:
return True

def _on_soft_timeout(job, i, soft_timeout):
debug('soft time limit exceeded for %i', i)
process, _index = _process_by_pid(job._worker_pid)
if not process:
return

# Run timeout callback
if job._timeout_callback is not None:
job._timeout_callback(soft=True, timeout=soft_timeout)

try:
os.kill(job._worker_pid, SIG_SOFT_TIMEOUT)
except OSError, exc:
if exc.errno == errno.ESRCH:
pass
else:
raise

dirty.add(i)

def _on_hard_timeout(job, i, hard_timeout):
if job.ready():
return
debug('hard time limit exceeded for %i', i)
# Remove from cache and set return value to an exception
try:
raise TimeLimitExceeded(hard_timeout)
except TimeLimitExceeded:
job._set(i, (False, ExceptionInfo()))
else: # pragma: no cover
pass

# Remove from _pool
process, _index = _process_by_pid(job._worker_pid)

# Run timeout callback
if job._timeout_callback is not None:
job._timeout_callback(soft=False, timeout=hard_timeout)
if process:
process.terminate()

# Inner-loop
while self._state == RUN:

Expand All @@ -521,19 +519,34 @@ def _on_hard_timeout(job, i, hard_timeout):
if hard_timeout is None:
hard_timeout = t_hard
if _timed_out(ack_time, hard_timeout):
_on_hard_timeout(job, i, hard_timeout)
on_hard_timeout(job)
elif i not in dirty and _timed_out(ack_time, soft_timeout):
_on_soft_timeout(job, i, soft_timeout)

time.sleep(1.0) # Don't waste CPU cycles.
on_soft_timeout(job)
dirty.add(i)
yield

def body(self):
while self._state == RUN:
try:
for _ in self.handle_timeouts():
time.sleep(1.0) # don't spin
except CoroStop:
break
debug('timeout handler exiting')

def handle_event(self, *args):
if self._it is None:
self._it = self.handle_timeouts()
try:
self._it.next()
except StopIteration:
self._it = None


class ResultHandler(PoolThread):

def __init__(self, outqueue, get, cache, poll,
join_exited_workers, putlock, restart_state):
join_exited_workers, putlock, restart_state, check_timeouts):
self.outqueue = outqueue
self.get = get
self.cache = cache
Expand All @@ -543,11 +556,12 @@ def __init__(self, outqueue, get, cache, poll,
self.restart_state = restart_state
self._it = None
self._shutdown_complete = False
self.check_timeouts = check_timeouts
super(ResultHandler, self).__init__()

def on_stop_not_started(self):
# used when pool started without result handler thread.
self.finish_at_shutdown()
self.finish_at_shutdown(handle_timeouts=True)

def _process_result(self, timeout=1.0):
cache = self.cache
Expand Down Expand Up @@ -621,15 +635,16 @@ def handle_event(self, fileno, event):
def body(self):
debug('result handler starting')
try:
while 1:
for _ in self._process_result(1.0): # blocking
pass
except CoroStop:
return
while self._state == RUN:
try:
for _ in self._process_result(1.0): # blocking
pass
except CoroStop:
break
finally:
self.finish_at_shutdown()

def finish_at_shutdown(self):
def finish_at_shutdown(self, handle_timeouts=False):
self._shutdown_complete = True
get = self.get
outqueue = self.outqueue
Expand All @@ -638,6 +653,7 @@ def finish_at_shutdown(self):
join_exited_workers = self.join_exited_workers
putlock = self.putlock
restart_state = self.restart_state
check_timeouts = self.check_timeouts

def on_ack(job, i, time_accepted, pid):
try:
Expand Down Expand Up @@ -671,6 +687,7 @@ def on_state_change(task):

time_terminate = None
while cache and self._state != TERMINATE:
check_timeouts()
try:
ready, task = poll(1.0)
except (IOError, EOFError), exc:
Expand Down Expand Up @@ -731,9 +748,9 @@ def __init__(self, processes=None, initializer=None, initargs=(),
max_restarts=None, max_restart_freq=1,
on_process_up=None,
on_process_down=None,
with_task_thread=True,
with_result_thread=True,
with_supervisor_thread=True,
on_timeout_set=None,
on_timeout_cancel=None,
threads=True,
semaphore=None):
self._setup_queues()
self._taskqueue = Queue.Queue()
Expand All @@ -749,9 +766,9 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self.restart_state = restart_state(max_restarts, max_restart_freq or 1)
self.on_process_up = on_process_up
self.on_process_down = on_process_down
self.with_task_thread = with_task_thread
self.with_result_thread = with_result_thread
self.with_supervisor_thread = with_supervisor_thread
self.on_timeout_set = on_timeout_set
self.on_timeout_cancel = on_timeout_cancel
self.threads = threads
self.readers = {}

if soft_timeout and SIG_SOFT_TIMEOUT is None:
Expand All @@ -776,7 +793,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._create_worker_process()

self._worker_handler = self.Supervisor(self)
if with_supervisor_thread:
if threads:
self._worker_handler.start()
else:
self.readers.update(
Expand All @@ -787,23 +804,34 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._quick_put,
self._outqueue,
self._pool)
if with_task_thread:
if threads:
self._task_handler.start()

# Thread killing timedout jobs.
self._timeout_handler = None
self._timeout_handler = self.TimeoutHandler(
self._pool, self._cache,
self.soft_timeout, self.timeout)
self._timeout_handler_mutex = threading.Lock()
self._timeout_handler_started = False
if self.timeout is not None or self.soft_timeout is not None:
self._start_timeout_handler()

# If running without threads, we need to check for timeouts
# while waiting for unfinished work at shutdown.
check_timeouts = None
if not threads:
check_timeouts = self._timeout_handler.handle_event

# Thread processing results in the outqueue.
self._result_handler = self.ResultHandler(self._outqueue,
self._quick_get, self._cache,
self._poll_result,
self._join_exited_workers,
self._putlock,
self.restart_state)
if with_result_thread:
self.restart_state,
check_timeouts)

if threads:
self._result_handler.start()
else:
self.readers[self._outqueue._reader] = \
Expand Down Expand Up @@ -888,7 +916,7 @@ def _join_exited_workers(self, shutdown=False):
for worker in cleaned:
self._putlock.release()
return exitcodes.values()
return False
return []

def shrink(self, n=1):
for i, worker in enumerate(self._iterinactive()):
Expand All @@ -905,7 +933,7 @@ def grow(self, n=1):
#assert len(self._pool) == self._processes
self._processes += 1
if self._putlock:
self.putlock.grow()
self._putlock.grow()

def _iterinactive(self):
for worker in self._pool:
Expand All @@ -927,7 +955,7 @@ def _repopulate_pool(self, exitcodes):
if self._state != RUN:
return
try:
if exitcodes[i] != EX_OK:
if exitcodes and exitcodes[i] != EX_OK:
self.restart_state.step()
except IndexError:
self.restart_state.step()
Expand Down Expand Up @@ -967,12 +995,11 @@ def _poll_result(timeout):
def _start_timeout_handler(self):
# ensure more than one thread does not start the timeout handler
# thread at once.
with self._timeout_handler_mutex:
if self._timeout_handler is None:
self._timeout_handler = self.TimeoutHandler(
self._pool, self._cache,
self.soft_timeout, self.timeout)
self._timeout_handler.start()
if self.threads:
with self._timeout_handler_mutex:
if not self._timeout_handler_started:
self._timeout_handler_started = True
self._timeout_handler.start()

def apply(self, func, args=(), kwds={}):
'''
Expand Down Expand Up @@ -1074,6 +1101,8 @@ def apply_async(self, func, args=(), kwds={},
'''
if self._state != RUN:
return
soft_timeout = soft_timeout or self.soft_timeout
timeout = timeout or self.timeout
lost_worker_timeout = lost_worker_timeout or self.lost_worker_timeout
if soft_timeout and SIG_SOFT_TIMEOUT is None:
warnings.warn(UserWarning("Soft timeouts are not supported: "
Expand All @@ -1085,11 +1114,13 @@ def apply_async(self, func, args=(), kwds={},
result = ApplyResult(self._cache, callback,
accept_callback, timeout_callback,
error_callback, soft_timeout, timeout,
lost_worker_timeout)
lost_worker_timeout,
on_timeout_set=self.on_timeout_set,
on_timeout_cancel=self.on_timeout_cancel)
if timeout or soft_timeout:
# start the timeout handler thread when required.
self._start_timeout_handler()
if self.with_task_thread:
if self.threads:
self._taskqueue.put(([(result._job, None,
func, args, kwds)], None))
else:
Expand Down Expand Up @@ -1243,7 +1274,8 @@ class ApplyResult(object):

def __init__(self, cache, callback, accept_callback=None,
timeout_callback=None, error_callback=None, soft_timeout=None,
timeout=None, lost_worker_timeout=LOST_WORKER_TIMEOUT):
timeout=None, lost_worker_timeout=LOST_WORKER_TIMEOUT,
on_timeout_set=None, on_timeout_cancel=None):
self._mutex = threading.Lock()
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
Expand All @@ -1256,6 +1288,8 @@ def __init__(self, cache, callback, accept_callback=None,
self._timeout = timeout
self._soft_timeout = soft_timeout
self._lost_worker_timeout = lost_worker_timeout
self._on_timeout_set = on_timeout_set
self._on_timeout_cancel = on_timeout_cancel

self._accepted = False
self._worker_pid = None
Expand Down Expand Up @@ -1291,6 +1325,8 @@ def get(self, timeout=None):

def _set(self, i, obj):
with self._mutex:
if self._on_timeout_cancel:
self._on_timeout_cancel(self)
self._success, self._value = obj
with self._cond:
self._ready = True
Expand All @@ -1313,6 +1349,8 @@ def _ack(self, i, time_accepted, pid):
self._worker_pid = pid
if self._ready:
self._cache.pop(self._job, None)
if self._on_timeout_set:
self._on_timeout_set(self, self._soft_timeout, self._timeout)
if self._accept_callback:
safe_apply_callback(
self._accept_callback, pid, time_accepted)
Expand Down

0 comments on commit e3a1f96

Please sign in to comment.