Skip to content

Commit

Permalink
Fix more KillableThread related race conditions. (#782)
Browse files Browse the repository at this point in the history
These conditions include:
* Killing a thread before the _thread_proc function runs, which prevents the
  killed state from being handled properly.
* Killing a thread after the _thread_proc function ran, which prevents proper
  cleanup.  This can also cause errors with garbage collection.
* Killing threads while logging occurs can cause deadlocks if the log handler
  lock is left in a bad state.
* Killing threads while in threading.getLogger can cause deadlocks if logging's
  module-level RLock is left in a bad state.
  • Loading branch information
arsharma1 committed May 14, 2018
1 parent 05ff8e6 commit 0723912
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 33 deletions.
14 changes: 10 additions & 4 deletions openhtf/core/phase_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import openhtf.plugs
from openhtf.util import data
from openhtf.util import logs
from openhtf.util import threads

import six

Expand Down Expand Up @@ -234,10 +235,15 @@ def __call__(self, test_state):
len(arg_info.args) > len(kwargs)):
# Underlying function has room for test_api as an arg. If it doesn't
# expect it but we miscounted args, we'll get another error farther down.
# Update test_state's logger so that it's a phase-specific one.
test_state.logger = logging.getLogger(
'.'.join((logs.get_record_logger_for(test_state.execution_uid).name,
'phase', self.name)))

# The logging module has a module _lock instance that is a threading.RLock
# instance; it can cause deadlocks in Python 2.7 when a KillableThread is
# killed while its release method is running.
with threads.safe_lock_release_context(logging._lock): # pylint: disable=protected-access
# Update test_state's logger so that it is a phase-specific one.
test_state.logger = logging.getLogger(
'.'.join((logs.get_record_logger_for(test_state.execution_uid).name,
'phase', self.name)))
return self.func(
test_state if self.options.requires_state else test_state.test_api,
**kwargs)
Expand Down
24 changes: 22 additions & 2 deletions openhtf/util/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def MyPhase(test, helper):
--test-record-verbosity flag to select a different level of chatter.
"""

import argparse
import collections
import datetime
import logging
Expand All @@ -71,6 +70,7 @@ def MyPhase(test, helper):
from openhtf.util import argv
from openhtf.util import console_output
from openhtf.util import functions
from openhtf.util import threads
import six


Expand Down Expand Up @@ -105,6 +105,7 @@ def get_record_logger_for(test_uid):


def initialize_record_logger(test_uid, test_record, notify_update):
"""Initialize the a record logger, which outputs to a test record."""
htf_logger = logging.getLogger(LOGGER_PREFIX)
# Avoid duplicate logging if other loggers are configured.
htf_logger.propagate = False
Expand Down Expand Up @@ -160,6 +161,7 @@ def filter(self, record):

class TestUidFilter(logging.Filter):
"""Only allow logs to pass whose logger source matches the given uid."""

def __init__(self, test_uid):
super(TestUidFilter, self).__init__()
self.test_uid = test_uid
Expand All @@ -171,6 +173,16 @@ def filter(self, record):
return not match or (match.group('test_uid') == self.test_uid)


class KillableThreadSafeStreamHandler(logging.StreamHandler):

def handle(self, record):
# logging.Handler objects have an internal lock attribute that is a
# threading.RLock instance; it can cause deadlocks in Python 2.7 when a
# KillableThread is killed while its release method is running.
with threads.safe_lock_release_context(self.lock):
return super(KillableThreadSafeStreamHandler, self).handle(record)


class RecordHandler(logging.Handler):
"""A handler to save logs to an HTF TestRecord."""

Expand All @@ -184,6 +196,13 @@ def __init__(self, test_uid, test_record, notify_update):
self.addFilter(MAC_FILTER)
self.addFilter(TestUidFilter(test_uid))

def handle(self, record):
# logging.Handler objects have an internal lock attribute that is a
# threading.RLock instance; it can cause deadlocks in Python 2.7 when a
# KillableThread is killed while its release method is running.
with threads.safe_lock_release_context(self.lock):
return super(RecordHandler, self).handle(record)

def emit(self, record):
"""Save a logging.LogRecord to our test record.
Expand All @@ -209,6 +228,7 @@ def emit(self, record):

class CliFormatter(logging.Formatter):
"""Formats log messages for printing to the CLI."""

def format(self, record):
"""Format the record as tersely as possible but preserve info."""
super(CliFormatter, self).format(record)
Expand Down Expand Up @@ -246,7 +266,7 @@ def configure_cli_logging():
elif CLI_LOGGING_VERBOSITY > 2:
logging_level = logging.NOTSET

cli_handler = logging.StreamHandler(stream=sys.stdout)
cli_handler = KillableThreadSafeStreamHandler(stream=sys.stdout)
cli_handler.setFormatter(CliFormatter())
cli_handler.setLevel(logging_level)
cli_handler.addFilter(MAC_FILTER)
Expand Down
154 changes: 127 additions & 27 deletions openhtf/util/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,89 @@

"""Thread library defining a few helpers."""

import contextlib
import ctypes
import functools
import logging
import sys
import threading

import six
try:
from six.moves import _thread
except ImportError:
from six.moves import _dummy_thread as _thread


_LOG = logging.getLogger(__name__)


class ThreadTerminationError(SystemExit):
"""Sibling of SystemExit, but specific to thread termination."""


def safe_lock_release_context(rlock):
if six.PY2:
return _safe_lock_release_py2(rlock)
# Python3 has a C-implementation of RLock, which doesn't have the thread
# termination issues.
return _placeholder_release_py3()


@contextlib.contextmanager
def _placeholder_release_py3():
yield


# pylint: disable=protected-access
@contextlib.contextmanager
def _safe_lock_release_py2(rlock):
"""Ensure that a threading.RLock is fully released for Python 2.
The RLock release code is:
https://github.com/python/cpython/blob/2.7/Lib/threading.py#L187
The RLock object's release method does not release all of its state if an
exception is raised in the middle of its operation. There are three pieces of
internal state that must be cleaned up:
- owning thread ident, an integer.
- entry count, an integer that counts how many times the current owner has
locked the RLock.
- internal lock, a threading.Lock instance that handles blocking.
Args:
rlock: threading.RLock, lock to fully release.
Yields:
None.
"""
assert isinstance(rlock, threading._RLock)
ident = _thread.get_ident()
expected_count = 0
if rlock._RLock__owner == ident:
expected_count = rlock._RLock__count
try:
yield
except ThreadTerminationError:
# Check if the current thread still owns the lock by checking if we can
# acquire the underlying lock.
if rlock._RLock__block.acquire(0):
# Lock is clean, so unlock and we are done.
rlock._RLock__block.release()
elif rlock._RLock__owner == ident and expected_count > 0:
# The lock is still held up the stack, so make sure the count is accurate.
if rlock._RLock__count != expected_count:
rlock._RLock__count = expected_count
elif rlock._RLock__owner == ident or rlock._RLock__owner is None:
# The internal lock is still acquired, but either this thread or no thread
# owns it, which means it needs to be hard reset.
rlock._RLock__owner = None
rlock._RLock__count = 0
rlock._RLock__block.release()
raise
# pylint: enable=protected-access


def loop(_=None, force=False): # pylint: disable=invalid-name
"""Causes a function to loop indefinitely."""
if not force:
Expand All @@ -47,17 +118,39 @@ def _proc(*args, **kwargs):
return real_loop


class ExceptionSafeThread(threading.Thread):
"""A thread object which handles exceptions and logging if an error occurs.
class KillableThread(threading.Thread):
"""A thread object which handles exceptions and is able to be killed.
Note: The reason we don't bother with arguments in thread proc is because this
class is meant to be subclassed. If you were to invoke this with
target=Function then you'd lose the exception handling anyway.
Based on recipe available at http://tomerfiliba.com/recipes/Thread2/
Note: To fully address race conditions involved with the use of
PyThreadState_SetAsyncExc, the GIL must be held from when the thread is
checked to when it's async-raised. In this case, we're not doing that, and
there remains the remote possibility that a thread identifier is reused and we
accidentally kill the wrong thread.
The kill method will only kill a background thread that (1) has not started
yet or (2) is currently running its _thread_proc function. This ensures that
the _thread_exception and _thread_finished methods are not affected, so state
can be properly determined. In addition, this prevents thread termination
during garbage collection.
"""

def __init__(self, *args, **kwargs):
super(KillableThread, self).__init__(*args, **kwargs)
self._running_lock = threading.Lock()
self._killed = threading.Event()

def run(self):
try:
self._thread_proc()
with self._running_lock:
if self._killed.is_set():
raise ThreadTerminationError()
self._thread_proc()
except Exception: # pylint: disable=broad-except
if not self._thread_exception(*sys.exc_info()):
_LOG.critical('Thread raised an exception: %s', self.name)
Expand All @@ -66,6 +159,15 @@ def run(self):
self._thread_finished()
_LOG.debug('Thread finished: %s', self.name)

def _is_thread_proc_running(self):
# Acquire the lock without blocking, though this object is fully implemented
# in C, so we cannot specify keyword arguments.
could_acquire = self._running_lock.acquire(0)
if could_acquire:
self._running_lock.release()
return False
return True

def _thread_proc(self):
"""The method called when executing the thread."""

Expand All @@ -76,31 +178,33 @@ def _thread_exception(self, exc_type, exc_val, exc_tb):
"""The method called if _thread_proc raises an exception.
To suppress the exception, return True from this method.
"""


class KillableThread(ExceptionSafeThread):
"""Killable thread raises an internal exception when killed.
Based on recipe available at http://tomerfiliba.com/recipes/Thread2/
Note: To fully address race conditions involved with the use of
PyThreadState_SetAsyncExc, the GIL must be held from when the thread is
checked to when it's async-raised. In this case, we're not doing that, and
there remains the remote possibility that a thread identifier is reused and we
accidentally kill the wrong thread.
Args:
exc_type: exception class.
exc_val: exception instance of the type exc_type.
exc_tb: traceback object for the current exception instance.
"""
Returns:
True if the exception should be ignored. The default case ignores the
exception raised by the kill functionality.
"""
return exc_type is ThreadTerminationError

def kill(self):
"""Terminates the current thread by raising an error."""
if self.is_alive():
self.async_raise(ThreadTerminationError)
self._killed.set()
if not self.is_alive():
logging.debug('Cannot kill thread that is no longer running.')
return
if not self._is_thread_proc_running():
logging.debug("Thread's _thread_proc function is no longer running, "
'will not kill; letting thread exit gracefully.')
return
self.async_raise(ThreadTerminationError)

def async_raise(self, exc_type):
"""Raise the exception."""
# Should only be called on a started thread so raise otherwise
# Should only be called on a started thread, so raise otherwise.
assert self.ident is not None, 'Only started threads have thread identifier'

# If the thread has died we don't want to raise an exception so log.
Expand All @@ -117,12 +221,8 @@ def async_raise(self, exc_type):
elif result > 1:
# Something bad happened, call with a NULL exception to undo.
ctypes.pythonapi.PyThreadState_SetAsyncExc(self.ident, None)
raise RuntimeError('Error: PyThreadState_SetAsyncExc %s %s (%s) %s',
exc_type, self.name, self.ident, result)

def _thread_exception(self, exc_type, exc_val, exc_tb):
"""Suppress the exception when we're kill()'d."""
return exc_type is ThreadTerminationError
raise RuntimeError('Error: PyThreadState_SetAsyncExc %s %s (%s) %s' % (
exc_type, self.name, self.ident, result))


class NoneByDefaultThreadLocal(threading.local):
Expand Down Expand Up @@ -152,6 +252,6 @@ def synchronized_method(self, *args, **kwargs):
raise RuntimeError('Can\'t synchronize method `%s` of %s without '
'attribute `_lock`.%s' %
(func.__name__, type(self).__name__, hint))
with self._lock:
with self._lock: # pylint: disable=protected-access
return func(self, *args, **kwargs)
return synchronized_method

0 comments on commit 0723912

Please sign in to comment.