Permalink
Browse files

Merge branch '2.7'

Conflicts:
	CHANGES.txt
	README.rst
	billiard/__init__.py
	billiard/pool.py
	billiard/util.py
  • Loading branch information...
ask committed Apr 17, 2013
2 parents 56978d6 + b1ddce7 commit 401ec585183a2ff75e8e1f32556abbf2a71fe649
View
@@ -17,3 +17,8 @@ devdatabase.db
bundle_version.gen
celeryd.log
celeryd.pid
+nosetests.xml
+coverage.xml
+cover/
+*.so
+.tox/
View
@@ -0,0 +1,7 @@
+language: python
+python:
+ - 2.6
+ - 2.7
+install:
+ - pip install --use-mirrors tox
+script: TOXENV=py$(echo $TRAVIS_PYTHON_VERSION | tr -d .) tox
View
@@ -5,6 +5,16 @@
- No longer compatible with Python 2.5
+2.7.3.28 - 2013-04-16
+---------------------
+
+- Pool: Fixed regression that disabled the deadlock
+ fix in 2.7.3.24
+
+- Pool: RestartFreqExceeded could be raised prematurely.
+
+- Process: Include pid in startup and process INFO logs.
+
2.7.3.27 - 2013-04-12
---------------------
View
@@ -5,3 +5,5 @@ include Makefile
recursive-include Lib *.py
recursive-include Modules *.c *.h
recursive-include Doc *.rst *.py
+recursive-include funtests *.py
+recursive-include requirements *.txt
View
@@ -39,7 +39,7 @@ def reset_signals(handler=_shutdown_cleanup):
try:
signum = getattr(signal, sig)
current = signal.getsignal(signum)
- if current and current != signal.SIG_IGN:
+ if current is not None and current != signal.SIG_IGN:
signal.signal(signum, handler)
except (OSError, AttributeError, ValueError, RuntimeError):
pass
@@ -52,17 +52,22 @@ def __init__(self, maxR, maxT):
self.maxR, self.maxT = maxR, maxT
self.R, self.T = 0, None
- def step(self):
- now = time()
+ def step(self, now=None):
+ now = time() if now is None else now
R = self.R
if self.T and now - self.T >= self.maxT:
- self.R = 0
- elif self.maxR and R >= self.maxR:
- # verify that R has a value as it may have been reset
- # by another thread, and we want to avoid locking.
- if self.R:
- raise self.RestartFreqExceeded(
- "%r in %rs" % (self.R, self.maxT),
- )
+ # maxT passed, reset counter and time passed.
+ self.T, self.R = now, 0
+ elif self.maxR and self.R >= self.maxR:
+ # verify that R has a value as the result handler
+ # resets this when a job is accepted. If a job is accepted
+ # the startup probably went fine (startup restart burst
+ # protection)
+ if self.R: # pragma: no cover
+ pass
+ self.R = 0 # reset in case someone catches the error
+ raise self.RestartFreqExceeded("%r in %rs" % (R, self.maxT))
+ # first run sets T
+ if self.T is None:
+ self.T = now
self.R += 1
- self.T = now
View
@@ -807,8 +807,6 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._initializer = initializer
self._initargs = initargs
self.lost_worker_timeout = lost_worker_timeout or LOST_WORKER_TIMEOUT
- self.max_restarts = max_restarts or round(processes * 100)
- self.restart_state = restart_state(max_restarts, max_restart_freq or 1)
self.on_process_up = on_process_up
self.on_process_down = on_process_down
self.on_timeout_set = on_timeout_set
@@ -833,6 +831,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
except NotImplementedError:
processes = 1
self._processes = processes
+ self.max_restarts = max_restarts or round(processes * 100)
+ self.restart_state = restart_state(max_restarts, max_restart_freq or 1)
if initializer is not None and \
not isinstance(initializer, collections.Callable):
@@ -976,8 +976,6 @@ def _join_exited_workers(self, shutdown=False):
exitcodes[worker_pid])
break
for worker in values(cleaned):
- if self._putlock is not None:
- self._putlock.release()
if self.on_process_down:
self.on_process_down(worker)
return list(exitcodes.values())
@@ -1043,7 +1041,11 @@ def did_start_ok(self):
def _maintain_pool(self):
""""Clean up any exited workers and start replacements for them.
"""
- self._repopulate_pool(self._join_exited_workers())
+ joined = self._join_exited_workers()
+ self._repopulate_pool(joined)
+ for i in range(len(joined)):
+ if self._putlock is not None:
+ self._putlock.release()
def maintain_pool(self, *args, **kwargs):
if self._worker_handler._state == RUN and self._state == RUN:
View
@@ -46,9 +46,10 @@ def current_process():
def _cleanup():
# check for processes which have finished
- for p in list(_current_process._children):
- if p._popen.poll() is not None:
- _current_process._children.discard(p)
+ if _current_process is not None:
+ for p in list(_current_process._children):
+ if p._popen.poll() is not None:
+ _current_process._children.discard(p)
def active_children(_cleanup=_cleanup):
@@ -60,7 +61,9 @@ def active_children(_cleanup=_cleanup):
except TypeError:
# called after gc collect so _cleanup does not exist anymore
return []
- return list(_current_process._children)
+ if _current_process is not None:
+ return list(_current_process._children)
+ return []
class Process(object):
@@ -250,7 +253,7 @@ def _bootstrap(self):
# delay finalization of the old process object until after
# _run_after_forkers() is executed
del old_process
- util.info('child process calling self.run()')
+ util.info('child process %s calling self.run()', self.pid)
try:
self.run()
exitcode = 0
@@ -272,7 +275,8 @@ def _bootstrap(self):
sys.stderr.write('Process %s:\n' % self.name)
traceback.print_exc()
finally:
- util.info('process exiting with exitcode %d', exitcode)
+ util.info('process %s exiting with exitcode %d',
+ self.pid, exitcode)
sys.stdout.flush()
sys.stderr.flush()
return exitcode
View
@@ -0,0 +1,18 @@
+from __future__ import absolute_import
+
+import atexit
+
+
+def teardown():
+ # Workaround for multiprocessing bug where logging
+ # is attempted after global already collected at shutdown.
+ cancelled = set()
+ try:
+ import multiprocessing.util
+ cancelled.add(multiprocessing.util._exit_function)
+ except (AttributeError, ImportError):
+ pass
+
+ atexit._exithandlers[:] = [
+ e for e in atexit._exithandlers if e[0] not in cancelled
+ ]
View
@@ -0,0 +1,85 @@
+from __future__ import absolute_import
+
+import sys
+
+
+class WarningMessage(object):
+
+ """Holds the result of a single showwarning() call."""
+
+ _WARNING_DETAILS = ('message', 'category', 'filename', 'lineno', 'file',
+ 'line')
+
+ def __init__(self, message, category, filename, lineno, file=None,
+ line=None):
+ local_values = locals()
+ for attr in self._WARNING_DETAILS:
+ setattr(self, attr, local_values[attr])
+
+ self._category_name = category and category.__name__ or None
+
+ def __str__(self):
+ return ('{message : %r, category : %r, filename : %r, lineno : %s, '
+ 'line : %r}' % (self.message, self._category_name,
+ self.filename, self.lineno, self.line))
+
+
+class catch_warnings(object):
+
+ """A context manager that copies and restores the warnings filter upon
+ exiting the context.
+
+ The 'record' argument specifies whether warnings should be captured by a
+ custom implementation of warnings.showwarning() and be appended to a list
+ returned by the context manager. Otherwise None is returned by the context
+ manager. The objects appended to the list are arguments whose attributes
+ mirror the arguments to showwarning().
+
+ The 'module' argument is to specify an alternative module to the module
+ named 'warnings' and imported under that name. This argument is only
+ useful when testing the warnings module itself.
+
+ """
+
+ def __init__(self, record=False, module=None):
+ """Specify whether to record warnings and if an alternative module
+ should be used other than sys.modules['warnings'].
+
+ For compatibility with Python 3.0, please consider all arguments to be
+ keyword-only.
+
+ """
+ self._record = record
+ self._module = module is None and sys.modules['warnings'] or module
+ self._entered = False
+
+ def __repr__(self):
+ args = []
+ if self._record:
+ args.append('record=True')
+ if self._module is not sys.modules['warnings']:
+ args.append('module=%r' % self._module)
+ name = type(self).__name__
+ return '%s(%s)' % (name, ', '.join(args))
+
+ def __enter__(self):
+ if self._entered:
+ raise RuntimeError('Cannot enter %r twice' % self)
+ self._entered = True
+ self._filters = self._module.filters
+ self._module.filters = self._filters[:]
+ self._showwarning = self._module.showwarning
+ if self._record:
+ log = []
+
+ def showwarning(*args, **kwargs):
+ log.append(WarningMessage(*args, **kwargs))
+
+ self._module.showwarning = showwarning
+ return log
+
+ def __exit__(self, *exc_info):
+ if not self._entered:
+ raise RuntimeError('Cannot exit %r without entering first' % self)
+ self._module.filters = self._filters
+ self._module.showwarning = self._showwarning
@@ -0,0 +1,98 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
+import os
+import signal
+
+from contextlib import contextmanager
+from mock import call, patch, Mock
+from time import time
+
+from billiard.common import (
+ _shutdown_cleanup,
+ reset_signals,
+ restart_state,
+)
+
+from .utils import Case
+
+
+def signo(name):
+ return getattr(signal, name)
+
+
+@contextmanager
+def termsigs(*sigs):
+ from billiard import common
+ prev, common.TERMSIGS = common.TERMSIGS, sigs
+ try:
+ yield
+ finally:
+ common.TERMSIGS = prev
+
+
+class test_reset_signals(Case):
+
+ def test_shutdown_handler(self):
+ with patch('sys.exit') as exit:
+ _shutdown_cleanup(15, Mock())
+ self.assertTrue(exit.called)
+ self.assertEqual(os.WTERMSIG(exit.call_args[0][0]), 15)
+
+ def test_does_not_reset_ignored_signal(self, sigs=['SIGTERM']):
+ with self.assert_context(sigs, signal.SIG_IGN) as (_, SET):
+ self.assertFalse(SET.called)
+
+ def test_does_not_reset_if_current_is_None(self, sigs=['SIGTERM']):
+ with self.assert_context(sigs, None) as (_, SET):
+ self.assertFalse(SET.called)
+
+ def test_resets_for_SIG_DFL(self, sigs=['SIGTERM', 'SIGINT', 'SIGUSR1']):
+ with self.assert_context(sigs, signal.SIG_DFL) as (_, SET):
+ SET.assert_has_calls([
+ call(signo(sig), _shutdown_cleanup) for sig in sigs
+ ])
+
+ def test_resets_for_obj(self, sigs=['SIGTERM', 'SIGINT', 'SIGUSR1']):
+ with self.assert_context(sigs, object()) as (_, SET):
+ SET.assert_has_calls([
+ call(signo(sig), _shutdown_cleanup) for sig in sigs
+ ])
+
+ def test_handles_errors(self, sigs=['SIGTERM']):
+ for exc in (OSError(), AttributeError(),
+ ValueError(), RuntimeError()):
+ with self.assert_context(sigs, signal.SIG_DFL, exc) as (_, SET):
+ self.assertTrue(SET.called)
+
+ @contextmanager
+ def assert_context(self, sigs, get_returns=None, set_effect=None):
+ with termsigs(*sigs):
+ with patch('signal.getsignal') as GET:
+ with patch('signal.signal') as SET:
+ GET.return_value = get_returns
+ SET.side_effect = set_effect
+ reset_signals()
+ GET.assert_has_calls([
+ call(signo(sig)) for sig in sigs
+ ])
+ yield GET, SET
+
+
+class test_restart_state(Case):
+
+ def test_raises(self):
+ s = restart_state(100, 1) # max 100 restarts in 1 second.
+ s.R = 99
+ s.step()
+ with self.assertRaises(s.RestartFreqExceeded):
+ s.step()
+
+ def test_time_passed_resets_counter(self):
+ s = restart_state(100, 10)
+ s.R, s.T = 100, time()
+ with self.assertRaises(s.RestartFreqExceeded):
+ s.step()
+ s.R, s.T = 100, time()
+ s.step(time() + 20)
+ self.assertEqual(s.R, 1)
@@ -0,0 +1,12 @@
+from __future__ import absolute_import
+
+import billiard
+
+from .utils import Case
+
+
+class test_billiard(Case):
+
+ def test_has_version(self):
+ self.assertTrue(billiard.__version__)
+ self.assertIsInstance(billiard.__version__, str)
Oops, something went wrong.

0 comments on commit 401ec58

Please sign in to comment.