Skip to content

Commit

Permalink
Merge pull request #15 from gatoatigrado/ntung-closes-fds-fix-2
Browse files Browse the repository at this point in the history
pulling apart multiprocessing queue internals to fix the leftover FDs issue
  • Loading branch information
gatoatigrado committed Dec 12, 2013
2 parents 50c6ef3 + 46b3b16 commit e2cbb8c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 53 deletions.
123 changes: 103 additions & 20 deletions tests/closes_fds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@
from __future__ import absolute_import
from __future__ import print_function

import errno
import logging
import mock
import os
import os.path
import resource
import stat
import testify as T
import vimap.pool
import vimap.queue_manager
import vimap.worker_process
import testify as T
from collections import namedtuple


# decrypt POSIX stuff
Expand All @@ -23,30 +30,75 @@
'socket': stat.S_ISSOCK}


FDInfo = namedtuple("FDInfo", ["modes", "symlink"])
current_proc_fd_dir = lambda *subpaths: os.path.join("/proc", str(os.getpid()), "fd", *subpaths)


def fd_type_if_open(fd_number):
"""For a given open file descriptor, return a list of human-readable
strings describing the file type.
"""For a given open file descriptor, return information about that file descriptor.
'modes' are a list of human-readable strings describing the file type;
'symlink' is the target of the file descriptor (often a pipe name)
"""
fd_stat = os.fstat(fd_number)
return [
k for k, v in readable_mode_strings.items()
if v(fd_stat.st_mode)]


def get_open_fds():
modes = [k for k, v in readable_mode_strings.items() if v(fd_stat.st_mode)]
if os.path.isdir(current_proc_fd_dir()):
return FDInfo(
modes=modes,
symlink=os.readlink(current_proc_fd_dir(str(fd_number))))
else:
return FDInfo(modes=modes, symlink=None)


def list_fds_linux():
"""A method to list open FDs that uses /proc/{pid}/fd."""
fds = [
(int(i), current_proc_fd_dir(str(i)))
for i in os.listdir(current_proc_fd_dir())]
# NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
# re-check whether the FD still exists (via os.path.exists)
return [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]


def list_fds_other():
"""A method to list open FDs that doesn't need /proc/{pid}."""
max_fds_soft, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
if max_fds_soft == resource.RLIM_INFINITY or not (3 < max_fds_soft < 4096):
logging.warning(
"max_fds_soft invalid ({0}), assuming 4096 is a sufficient upper bound"
.format(max_fds_soft))
max_fds_soft = 4096

# The first three FDs are stdin, stdout, and stderr. We're interested in
# everything after.
for i in xrange(3, max_fds_soft):
try:
info = os.fstat(i)
yield i
except OSError as e:
if e.errno != errno.EBADF:
raise


def get_open_fds(retries=3):
"""
Returns a map,
fd (int) --> modes (list of human-readable strings)
fd (int) --> FDInfo
"""
unix_fd_dir = "/proc/{0}/fd".format(os.getpid())
fds = [(int(i), os.path.join(unix_fd_dir, i)) for i in os.listdir(unix_fd_dir)]
# NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
# re-check whether the FD still exists (via os.path.exists)
fds = [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]
return dict(filter(
lambda (k, v): v is not None,
((i, fd_type_if_open(i)) for i in fds)))
if os.path.isdir(current_proc_fd_dir()):
fds = list_fds_linux()
else:
fds = list_fds_other()

try:
return dict(filter(
lambda (k, v): v is not None,
((i, fd_type_if_open(i)) for i in fds)))
except OSError:
if retries == 0:
raise
return get_open_fds(retries - 1)


def difference_open_fds(before, after):
Expand All @@ -58,7 +110,8 @@ def difference_open_fds(before, after):
# "a - b" for dicts -- remove anything in 'a' that has a key in b
dict_diff = lambda a, b: dict((k, a[k]) for k in (frozenset(a) - frozenset(b)))
for k in (frozenset(after) & frozenset(before)):
assert before[k] == after[k], "Changing FD types aren't supported!"
if before[k] != after[k]:
print("WARNING: FD {0} changed from {1} to {2}".format(k, before[k], after[k]))
return {
'closed': dict_diff(before, after),
'opened': dict_diff(after, before)}
Expand Down Expand Up @@ -88,7 +141,31 @@ def basic_worker(xs):
yield x + 1


def repeat(times):
"""Repeats a test to help catch flakiness."""
def fcn_helper(fcn):
return lambda *args, **kwargs: [fcn(*args, **kwargs) for _ in xrange(times)]
return fcn_helper


class TestBasicMapDoesntLeaveAroundFDs(T.TestCase):
@T.setup_teardown
def instrument_queue_initiation(self):
old_init = vimap.queue_manager.VimapQueueManager.__init__
def instrumented_init(*args, **kwargs):
self.before_queue_manager_init = get_open_fds()
old_init(*args, **kwargs)
self.after_queue_manager_init = get_open_fds()
self.queue_fds = difference_open_fds(
self.before_queue_manager_init,
self.after_queue_manager_init)['opened']
with mock.patch.object(
vimap.queue_manager.VimapQueueManager,
'__init__',
instrumented_init):
yield

@repeat(30)
def test_all_fds_cleaned_up(self):
initial_open_fds = get_open_fds()
pool = vimap.pool.fork_identical(basic_worker, num_workers=1)
Expand All @@ -101,12 +178,18 @@ def test_all_fds_cleaned_up(self):
# T.assert_equal(after_fork['closed'], [])
T.assert_gte(len(after_fork['opened']), 2) # should have at least 3 open fds
# All opened files should be FIFOs
T.assert_equal(all(typ == ['fifo'] for typ in after_fork['opened'].values()), True)
if not all(info.modes == ['fifo'] for info in after_fork['opened'].values()):
print("Infos: {0}".format(after_fork['opened']))
T.assert_not_reached("Some infos are not FIFOs")

after_cleanup = difference_open_fds(after_fork_open_fds, after_finish_open_fds)
T.assert_gte(len(after_cleanup['closed']), 2)

left_around = difference_open_fds(initial_open_fds, after_finish_open_fds)
if len(left_around['opened']) != 0:
queue_fds_left_around = dict(
item for item in self.queue_fds.items() if item[0] in left_around['opened'])
print("Queue FDs left around: {0}".format(queue_fds_left_around))
T.assert_equal(len(left_around['opened']), 0)


Expand Down
37 changes: 23 additions & 14 deletions vimap/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@
_MAX_IN_FLIGHT = 100


# NOTE(gatoatigrado|2013-11-01) Queue feeder threads will send an,
#
# IOError: [Errno 32] Broken pipe
#
# in the _send() method of multiprocessing if the queue is closed too soon.
# We throw in some hacks to sleep for 10ms, which seems to effectively avoid
# this flake. It's not fun. I'm going to write a multiprocessing.Queue
# replacement/alternative hopefully-soon.
_AVOID_SEND_FLAKINESS = True


class VimapQueueManager(object):
'''Args: Sequence of vimap workers.'''
queue_class = multiprocessing.queues.Queue
Expand Down Expand Up @@ -68,16 +57,33 @@ def close(self):
the corresponding attribute so any future attempted accesses will
fail.
"""
if _AVOID_SEND_FLAKINESS and (self.queue_class is multiprocessing.queues.Queue):
# multiprocessing's queue probably has a bug in its shutdown routine
time.sleep(0.01)
finalize_methods = []

def _wait_close(queue_name, pipe_name, pipe):
"""Works around bugs (or misuse?) in multiprocessing.queue by waiting for
a queue's internal pipes to actually be closed.
See https://github.com/gatoatigrado/vimap/issues/14 for more information.
"""
if not pipe.closed:
if self.debug:
print("Force-closing {0} pipe for queue {1}".format(pipe_name, queue_name))
pipe.close()

def _close_queue(name, queue):
if self.debug:
print("Main thread queue manager: Closing and joining {0} queue".format(name))
queue.close()
queue.join_thread()

# NOTE: If we're using a different queue (e.g. our mock SerialQueue),
# or using a different implementation of Python, don't do this fragile
# mock.
if hasattr(queue, '_reader') and hasattr(queue, '_writer'):
reader_pipe, writer_pipe = queue._reader, queue._writer
finalize_methods.append(lambda: _wait_close(name, 'reader', reader_pipe))
finalize_methods.append(lambda: _wait_close(name, 'writer', writer_pipe))

_close_queue('input', self.input_queue)
del self.input_queue # Make future accesses fail

Expand All @@ -87,6 +93,9 @@ def _close_queue(name, queue):
_close_queue('output', self.output_queue)
del self.output_queue # Make future accesses fail

for finalize_method in finalize_methods:
finalize_method()

def add_output_hook(self, hook):
'''Add a function which will be executed immediately when output is
taken off of the queue. The only current use case is to react to
Expand Down
42 changes: 23 additions & 19 deletions vimap/real_worker_routine.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 +55,37 @@ def worker_input_generator(self):
file=sys.stderr)
raise

def explicitly_close_queues(self):
'''Explicitly join queues, so that we'll get "stuck" in something that's
more easily debugged than multiprocessing.
NOTE: It's tempting to call self.output_queue.cancel_join_thread(),
but this seems to leave us in a bad state in practice (reproducible
via existing tests).
'''
self.input_queue.close()
self.output_queue.close()
def safe_close_queue(self, name, queue):
self.debug("Closing queue {0}", name)
queue.close()
try:
self.debug("Joining input queue")
self.input_queue.join_thread()
self.debug("...done")
self.debug("Joining thread for queue {0}", name)

try:
self.debug(
"Joining output queue (size {size}, full: {full})",
size=self.output_queue.qsize(),
full=self.output_queue.full())
"Joining queue {name} (size {size}, full: {full})",
name=name,
size=queue.qsize(),
full=queue.full())
except NotImplementedError:
pass # Mac OS X doesn't implement qsize()
self.output_queue.join_thread()
self.debug("...done")
queue.join_thread()
# threads might have already been closed
except AssertionError as e:
self.debug("Couldn't join threads; error {0}", e)
self.debug("Couldn't join queue {0}; error {1}", name, e)
else:
self.debug("Done closing {0}, no exceptions.", name)

def explicitly_close_queues(self):
'''Explicitly join queues, so that we'll get "stuck" in something that's
more easily debugged than multiprocessing.
NOTE: It's tempting to call self.output_queue.cancel_join_thread(),
but this seems to leave us in a bad state in practice (reproducible
via existing tests).
'''
self.safe_close_queue('input', self.input_queue)
self.safe_close_queue('output', self.output_queue)

def handle_output(self, output):
"""Makes the imperative calls to put an output item on the output
Expand Down

0 comments on commit e2cbb8c

Please sign in to comment.