Skip to content

Commit

Permalink
Clean up threading book-keeping at fork when monkey-patched
Browse files Browse the repository at this point in the history
Previously, if we patched threading then forked (or, in some cases, used
the subprocess module), Python would log an ignored exception like

   Exception ignored in: <function _after_fork at 0x7f16493489d8>
   Traceback (most recent call last):
     File "/usr/lib/python3.7/threading.py", line 1335, in _after_fork
       assert len(_active) == 1
   AssertionError:

This comes down to threading in Python 3.7+ having an import side-effect
of registering an at-fork callback. When we re-import threading to patch
it, the old (but still registered) callback still points to the old
thread-tracking dict, rather than the new dict that's actually doing the
tracking.

Now, register our own at_fork hook that will fix up the dict reference
before threading's _at_fork runs and put it back afterwards.

Closes #592
  • Loading branch information
tipabu committed May 9, 2020
1 parent 88164c9 commit 3df6c27
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 0 deletions.
20 changes: 20 additions & 0 deletions eventlet/patcher.py
Expand Up @@ -312,6 +312,26 @@ def monkey_patch(**on):
for attr_name in deleted:
if hasattr(orig_mod, attr_name):
delattr(orig_mod, attr_name)

if name == 'threading' and sys.version_info >= (3, 7):
def fix_threading_active(
_os=original('os'),
_global_dict=original('threading').current_thread.__globals__,
_patched=orig_mod
):
_prefork_active = [None]

def before_fork():
_prefork_active[0] = _global_dict['_active']
_global_dict['_active'] = _patched._active

def after_fork():
_global_dict['_active'] = _prefork_active[0]

_os.register_at_fork(
before=before_fork,
after_in_parent=after_fork)
fix_threading_active()
finally:
imp.release_lock()

Expand Down
52 changes: 52 additions & 0 deletions tests/patcher_test.py
Expand Up @@ -294,6 +294,58 @@ def test_subprocess_after_monkey_patch():
assert output.rstrip() == b'pass'


def test_fork_after_monkey_patch():
code = '''\
import eventlet
import eventlet.patcher
eventlet.monkey_patch()
import os
import sys
import threading
_threading = eventlet.patcher.original('threading')
import eventlet.green.threading
def target():
eventlet.sleep(0.1)
def check(n, mod, tag):
assert len(mod._active) == n, 'Expected {} {} threads, got {}'.format(n, tag, mod._active)
threads = [
threading.Thread(target=target, name='patched'),
_threading.Thread(target=target, name='original-1'),
_threading.Thread(target=target, name='original-2'),
eventlet.green.threading.Thread(target=target, name='green-1'),
eventlet.green.threading.Thread(target=target, name='green-2'),
eventlet.green.threading.Thread(target=target, name='green-3'),
]
for t in threads: t.start()
check(2, threading, 'pre-fork patched')
check(3, _threading, 'pre-fork original')
check(4, eventlet.green.threading, 'pre-fork green')
if os.fork() == 0:
# Inside the child, we should only have a main thread,
# but old pythons make it difficult to ensure
if sys.version_info >= (3, 7):
check(1, threading, 'child post-fork patched')
check(1, _threading, 'child post-fork original')
check(1, eventlet.green.threading, 'child post-fork green')
exit()
else:
os.wait()
check(2, threading, 'post-fork patched')
check(3, _threading, 'post-fork original')
check(4, eventlet.green.threading, 'post-fork green')
for t in threads: t.join()
check(1, threading, 'post-join patched')
check(1, _threading, 'post-join original')
check(1, eventlet.green.threading, 'post-join green')
print('pass')
'''
output = tests.run_python(
path=None,
args=['-c', code],
expect_pass=True,
)


class Threading(ProcessBase):
def test_orig_thread(self):
new_mod = """import eventlet
Expand Down

0 comments on commit 3df6c27

Please sign in to comment.