Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pulling apart multiprocessing queue internals to fix the leftover FDs issue #15

Merged
merged 6 commits into from
Dec 12, 2013
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 55 additions & 14 deletions tests/closes_fds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
from __future__ import absolute_import
from __future__ import print_function

import mock
import os
import stat
import testify as T
import vimap.pool
import vimap.queue_manager
import vimap.worker_process
import testify as T
from collections import namedtuple


# decrypt POSIX stuff
Expand All @@ -23,30 +26,39 @@
'socket': stat.S_ISSOCK}


FDInfo = namedtuple("FDInfo", ["modes", "symlink"])


def fd_type_if_open(fd_number):
"""For a given open file descriptor, return a list of human-readable
strings describing the file type.
"""
fd_stat = os.fstat(fd_number)
return [
k for k, v in readable_mode_strings.items()
if v(fd_stat.st_mode)]
return FDInfo(
modes=[k for k, v in readable_mode_strings.items() if v(fd_stat.st_mode)],
symlink=os.readlink("/proc/{0}/fd/{1}".format(os.getpid(), fd_number)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look portable (I know this won't work on OS X, for example).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in latest version -- I re-added the older version which scans for FDs, currently from 3 to 30 (tested manually by changing "/proc" to "/proc2")



def get_open_fds():
def get_open_fds(retries=3):
"""
Returns a map,

fd (int) --> modes (list of human-readable strings)
fd (int) --> FDInfo
"""
unix_fd_dir = "/proc/{0}/fd".format(os.getpid())
fds = [(int(i), os.path.join(unix_fd_dir, i)) for i in os.listdir(unix_fd_dir)]
# NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
# re-check whether the FD still exists (via os.path.exists)
fds = [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]
return dict(filter(
lambda (k, v): v is not None,
((i, fd_type_if_open(i)) for i in fds)))

try:
# NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
# re-check whether the FD still exists (via os.path.exists)
fds = [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]
return dict(filter(
lambda (k, v): v is not None,
((i, fd_type_if_open(i)) for i in fds)))
except OSError:
if retries == 0:
raise
return get_open_fds(retries - 1)


def difference_open_fds(before, after):
Expand All @@ -58,7 +70,8 @@ def difference_open_fds(before, after):
# "a - b" for dicts -- remove anything in 'a' that has a key in b
dict_diff = lambda a, b: dict((k, a[k]) for k in (frozenset(a) - frozenset(b)))
for k in (frozenset(after) & frozenset(before)):
assert before[k] == after[k], "Changing FD types aren't supported!"
if before[k] != after[k]:
print("WARNING: FD {0} changed from {1} to {2}".format(k, before[k], after[k]))
return {
'closed': dict_diff(before, after),
'opened': dict_diff(after, before)}
Expand Down Expand Up @@ -88,7 +101,31 @@ def basic_worker(xs):
yield x + 1


def repeat(times):
"""Repeats a test to help catch flakiness."""
def fcn_helper(fcn):
return lambda *args, **kwargs: [fcn(*args, **kwargs) for _ in xrange(times)]
return fcn_helper


class TestBasicMapDoesntLeaveAroundFDs(T.TestCase):
@T.setup_teardown
def instrument_queue_initiation(self):
old_init = vimap.queue_manager.VimapQueueManager.__init__
def instrumented_init(*args, **kwargs):
self.before_queue_manager_init = get_open_fds()
old_init(*args, **kwargs)
self.after_queue_manager_init = get_open_fds()
self.queue_fds = difference_open_fds(
self.before_queue_manager_init,
self.after_queue_manager_init)['opened']
with mock.patch.object(
vimap.queue_manager.VimapQueueManager,
'__init__',
instrumented_init):
yield

@repeat(30)
def test_all_fds_cleaned_up(self):
initial_open_fds = get_open_fds()
pool = vimap.pool.fork_identical(basic_worker, num_workers=1)
Expand All @@ -101,12 +138,16 @@ def test_all_fds_cleaned_up(self):
# T.assert_equal(after_fork['closed'], [])
T.assert_gte(len(after_fork['opened']), 2) # should have at least 3 open fds
# All opened files should be FIFOs
T.assert_equal(all(typ == ['fifo'] for typ in after_fork['opened'].values()), True)
T.assert_equal(all(info.modes == ['fifo'] for info in after_fork['opened'].values()), True)

after_cleanup = difference_open_fds(after_fork_open_fds, after_finish_open_fds)
T.assert_gte(len(after_cleanup['closed']), 2)

left_around = difference_open_fds(initial_open_fds, after_finish_open_fds)
if len(left_around['opened']) != 0:
queue_fds_left_around = dict(
item for item in self.queue_fds.items() if item[0] in left_around['opened'])
print("Queue FDs left around: {0}".format(queue_fds_left_around))
T.assert_equal(len(left_around['opened']), 0)


Expand Down
37 changes: 23 additions & 14 deletions vimap/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@
_MAX_IN_FLIGHT = 100


# NOTE(gatoatigrado|2013-11-01) Queue feeder threads will send an,
#
# IOError: [Errno 32] Broken pipe
#
# in the _send() method of multiprocessing if the queue is closed too soon.
# We throw in some hacks to sleep for 10ms, which seems to effectively avoid
# this flake. It's not fun. I'm going to write a multiprocessing.Queue
# replacement/alternative hopefully-soon.
_AVOID_SEND_FLAKINESS = True


class VimapQueueManager(object):
'''Args: Sequence of vimap workers.'''
queue_class = multiprocessing.queues.Queue
Expand Down Expand Up @@ -68,16 +57,33 @@ def close(self):
the corresponding attribute so any future attempted accesses will
fail.
"""
if _AVOID_SEND_FLAKINESS and (self.queue_class is multiprocessing.queues.Queue):
# multiprocessing's queue probably has a bug in its shutdown routine
time.sleep(0.01)
finalize_methods = []

def _wait_close(queue_name, pipe_name, pipe):
"""Works around bugs (or misuse?) in multiprocessing.queue by waiting for
a queue's internal pipes to actually be closed.

See https://github.com/gatoatigrado/vimap/issues/14 for more information.
"""
if not pipe.closed:
if self.debug:
print("Force-closing {0} pipe for queue {1}".format(pipe_name, queue_name))
pipe.close()

def _close_queue(name, queue):
if self.debug:
print("Main thread queue manager: Closing and joining {0} queue".format(name))
queue.close()
queue.join_thread()

# NOTE: If we're using a different queue (e.g. our mock SerialQueue),
# or using a different implementation of Python, don't do this fragile
# mock.
if hasattr(queue, '_reader') and hasattr(queue, '_writer'):
reader_pipe, writer_pipe = queue._reader, queue._writer
finalize_methods.append(lambda: _wait_close(name, 'reader', reader_pipe))
finalize_methods.append(lambda: _wait_close(name, 'writer', writer_pipe))

_close_queue('input', self.input_queue)
del self.input_queue # Make future accesses fail

Expand All @@ -87,6 +93,9 @@ def _close_queue(name, queue):
_close_queue('output', self.output_queue)
del self.output_queue # Make future accesses fail

for finalize_method in finalize_methods:
finalize_method()

def add_output_hook(self, hook):
'''Add a function which will be executed immediately when output is
taken off of the queue. The only current use case is to react to
Expand Down
42 changes: 23 additions & 19 deletions vimap/real_worker_routine.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,33 +54,37 @@ def worker_input_generator(self):
file=sys.stderr)
raise

def explicitly_close_queues(self):
'''Explicitly join queues, so that we'll get "stuck" in something that's
more easily debugged than multiprocessing.

NOTE: It's tempting to call self.output_queue.cancel_join_thread(),
but this seems to leave us in a bad state in practice (reproducible
via existing tests).
'''
self.input_queue.close()
self.output_queue.close()
def safe_close_queue(self, name, queue):
self.debug("Closing queue {0}", name)
queue.close()
try:
self.debug("Joining input queue")
self.input_queue.join_thread()
self.debug("...done")
self.debug("Joining thread for queue {0}", name)

try:
self.debug(
"Joining output queue (size {size}, full: {full})",
size=self.output_queue.qsize(),
full=self.output_queue.full())
"Joining queue {name} (size {size}, full: {full})",
name=name,
size=queue.qsize(),
full=queue.full())
except NotImplementedError:
pass # Mac OS X doesn't implement qsize()
self.output_queue.join_thread()
self.debug("...done")
queue.join_thread()
# threads might have already been closed
except AssertionError as e:
self.debug("Couldn't join threads; error {0}", e)
self.debug("Couldn't join queue {0}; error {1}", name, e)
else:
self.debug("Done closing {0}, no exceptions.", name)

def explicitly_close_queues(self):
'''Explicitly join queues, so that we'll get "stuck" in something that's
more easily debugged than multiprocessing.

NOTE: It's tempting to call self.output_queue.cancel_join_thread(),
but this seems to leave us in a bad state in practice (reproducible
via existing tests).
'''
self.safe_close_queue('input', self.input_queue)
self.safe_close_queue('output', self.output_queue)

def run(self, input_queue, output_queue):
'''
Expand Down