Permalink
Comparing changes
Open a pull request
- 3 commits
- 4 files changed
- 0 commit comments
- 1 contributor
Unified
Split
Showing
with
29 additions
and 21 deletions.
- +26 −1 celery/app/base.py
- +1 −0 celery/bin/multi.py
- +0 −9 celery/tests/backends/test_backends.py
- +2 −11 celery/tests/bin/test_multi.py
| @@ -58,6 +58,8 @@ | ||
| Please set this variable and make it point to | ||
| a configuration module.""" | ||
| _after_fork_registered = False | ||
| def app_has_custom(app, attr): | ||
| return mro_lookup(app.__class__, attr, stop=(Celery, object), | ||
| @@ -70,6 +72,29 @@ def _unpickle_appattr(reverse_name, args): | ||
| return get_current_app()._rgetattr(reverse_name)(*args) | ||
| def _global_after_fork(): | ||
| # Previously every app would call: | ||
| # `register_after_fork(app, app._after_fork)` | ||
| # but this created a leak as `register_after_fork` stores concrete object | ||
| # references and once registered an object cannot be removed without | ||
| # touching and iterating over the private afterfork registry list. | ||
| # | ||
| # See Issue #1949 | ||
| from celery import _state | ||
| from multiprocessing.util import info | ||
| for app in _state.apps: | ||
| try: | ||
| app._after_fork() | ||
| except Exception as exc: | ||
| info('after forker raised exception: %r' % (exc, ), exc_info=1) | ||
| def _ensure_after_fork(): | ||
| global _after_fork_registered | ||
| _after_fork_registered = True | ||
| register_after_fork(_global_after_fork, _global_after_fork) | ||
| class Celery(object): | ||
| #: This is deprecated, use :meth:`reduce_keys` instead | ||
| Pickler = AppPickler | ||
| @@ -590,7 +615,7 @@ def TaskSetResult(self): # XXX compat | ||
| @property | ||
| def pool(self): | ||
| if self._pool is None: | ||
| register_after_fork(self, self._after_fork) | ||
| _ensure_after_fork() | ||
| limit = self.conf.BROKER_POOL_LIMIT | ||
| self._pool = self.connection().Pool(limit=limit) | ||
| return self._pool | ||
| @@ -509,6 +509,7 @@ def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''): | ||
| expand = partial( | ||
| node_format, nodename=nodename, N=shortname, d=hostname, | ||
| h=nodename, | ||
| ) | ||
| argv = ([expand(cmd)] + | ||
| [format_opt(opt, expand(value)) | ||
| @@ -19,15 +19,6 @@ def test_get_backend_aliases(self): | ||
| expect_cls, | ||
| ) | ||
| def test_get_backend_cache(self): | ||
| backends.get_backend_cls.clear() | ||
| hits = backends.get_backend_cls.hits | ||
| misses = backends.get_backend_cls.misses | ||
| self.assertTrue(backends.get_backend_cls('amqp', self.app.loader)) | ||
| self.assertEqual(backends.get_backend_cls.misses, misses + 1) | ||
| self.assertTrue(backends.get_backend_cls('amqp', self.app.loader)) | ||
| self.assertEqual(backends.get_backend_cls.hits, hits + 1) | ||
| def test_unknown_backend(self): | ||
| with self.assertRaises(ImportError): | ||
| backends.get_backend_cls('fasodaopjeqijwqe', self.app.loader) | ||
| @@ -8,7 +8,6 @@ | ||
| main, | ||
| MultiTool, | ||
| findsig, | ||
| abbreviations, | ||
| parse_ns_range, | ||
| format_opt, | ||
| quote, | ||
| @@ -30,14 +29,6 @@ def test_findsig(self): | ||
| self.assertEqual(findsig(['-s']), signal.SIGTERM) | ||
| self.assertEqual(findsig(['-log']), signal.SIGTERM) | ||
| def test_abbreviations(self): | ||
| expander = abbreviations({'%s': 'START', | ||
| '%x': 'STOP'}) | ||
| self.assertEqual(expander('foo%s'), 'fooSTART') | ||
| self.assertEqual(expander('foo%x'), 'fooSTOP') | ||
| self.assertEqual(expander('foo%y'), 'foo%y') | ||
| self.assertIsNone(expander(None)) | ||
| def test_parse_ns_range(self): | ||
| self.assertEqual(parse_ns_range('1-3', True), ['1', '2', '3']) | ||
| self.assertEqual(parse_ns_range('1-3', False), ['1-3']) | ||
| @@ -78,6 +69,7 @@ class test_multi_args(AppCase): | ||
| @patch('socket.gethostname') | ||
| def test_parse(self, gethostname): | ||
| gethostname.return_value = 'example.com' | ||
| p = NamespacedOptionParser([ | ||
| '-c:jerry,elaine', '5', | ||
| '--loglevel:kramer=DEBUG', | ||
| @@ -120,12 +112,11 @@ def assert_line_in(name, args): | ||
| ) | ||
| expand = names[0][2] | ||
| self.assertEqual(expand('%h'), '*P*jerry@*S*') | ||
| self.assertEqual(expand('%n'), 'jerry') | ||
| self.assertEqual(expand('%n'), '*P*jerry') | ||
| names2 = list(multi_args(p, cmd='COMMAND', append='', | ||
| prefix='*P*', suffix='*S*')) | ||
| self.assertEqual(names2[0][1][-1], '-- .disable_rate_limits=1') | ||
| gethostname.return_value = 'example.com' | ||
| p2 = NamespacedOptionParser(['10', '-c:1', '5']) | ||
| names3 = list(multi_args(p2, cmd='COMMAND')) | ||
| self.assertEqual(len(names3), 10) | ||