Skip to content
Permalink
Browse files

issue #535: wire mitogen.os_fork into Broker and Pool.

  • Loading branch information...
dw committed Feb 17, 2019
1 parent c1d73e1 commit 06e52ca89f48d257059b1889bb1029b84500fa4d
Showing with 29 additions and 6 deletions.
  1. +12 −0 mitogen/core.py
  2. +8 −0 mitogen/service.py
  3. +9 −6 tests/responder_test.py
@@ -3348,6 +3348,17 @@ def _setup_stdio(self):
# Reopen with line buffering.
sys.stdout = os.fdopen(1, 'w', 1)

def _py24_25_compat(self):
"""
Python 2.4/2.5 have grave difficulties with threads/fork. We
mandatorily quiesce all running threads during fork using a
monkey-patch there.
"""
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner.
os_fork = import_module('mitogen.os_fork')
mitogen.os_fork._notice_broker_or_pool(self.broker)

def main(self):
self._setup_master()
try:
@@ -3375,6 +3386,7 @@ def main(self):
socket.gethostname())
_v and LOG.debug('Recovered sys.executable: %r', sys.executable)

self._py24_25_compat()
self.dispatcher.run()
_v and LOG.debug('ExternalContext.main() normal exit')
except KeyboardInterrupt:
@@ -474,6 +474,7 @@ def __init__(self, router, services, size=1, overwrite=False):

for service in services:
self.add(service)
self._py_24_25_compat()
self._threads = []
for x in range(size):
name = 'mitogen.service.Pool.%x.worker-%d' % (id(self), x,)
@@ -487,6 +488,13 @@ def __init__(self, router, services, size=1, overwrite=False):

LOG.debug('%r: initialized', self)

def _py_24_25_compat(self):
if sys.version_info < (2, 6):
# import_module() is used to avoid dep scanner sending mitogen.fork
# to all mitogen.service importers.
os_fork = mitogen.core.import_module('mitogen.os_fork')
os_fork._notice_broker_or_pool(self)

@property
def size(self):
return len(self._threads)
@@ -74,8 +74,9 @@ def test_plain_old_module(self):
context = self.router.local()

self.assertEquals(256, context.call(plain_old_module.pow, 2, 8))
self.assertEquals(1, self.router.responder.get_module_count)
self.assertEquals(1, self.router.responder.good_load_module_count)
os_fork = int(sys.version_info < (2, 6)) # mitogen.os_fork
self.assertEquals(1+os_fork, self.router.responder.get_module_count)
self.assertEquals(1+os_fork, self.router.responder.good_load_module_count)
self.assertLess(300, self.router.responder.good_load_module_size)

def test_simple_pkg(self):
@@ -84,8 +85,9 @@ def test_simple_pkg(self):
context = self.router.local()
self.assertEquals(3,
context.call(simple_pkg.a.subtract_one_add_two, 2))
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(3, self.router.responder.good_load_module_count)
os_fork = int(sys.version_info < (2, 6)) # mitogen.os_fork
self.assertEquals(2+os_fork, self.router.responder.get_module_count)
self.assertEquals(3+os_fork, self.router.responder.good_load_module_count)
self.assertEquals(0, self.router.responder.bad_load_module_count)
self.assertLess(450, self.router.responder.good_load_module_size)

@@ -185,9 +187,10 @@ def test_stats(self):
c1 = self.router.local()
c2 = self.router.local(via=c1)

os_fork = int(sys.version_info < (2, 6))
self.assertEquals(256, c2.call(plain_old_module.pow, 2, 8))
self.assertEquals(2, self.router.responder.get_module_count)
self.assertEquals(2, self.router.responder.good_load_module_count)
self.assertEquals(2+os_fork, self.router.responder.get_module_count)
self.assertEquals(2+os_fork, self.router.responder.good_load_module_count)
self.assertLess(10000, self.router.responder.good_load_module_size)
self.assertGreater(40000, self.router.responder.good_load_module_size)

0 comments on commit 06e52ca

Please sign in to comment.
You can’t perform that action at this time.