Skip to content

Commit

Permalink
Fixing glance-api hangs in the qpid notifier
Browse files Browse the repository at this point in the history
Glance-api was able to hang in qpid notifier under heavy image creation load.

The ``thread`` and ``select`` modules used by the python-qpid for managing
the AMQP connection. When the eventlet was not able to switch between threads
because leaded to hang and/or pipe(2) leaking issues.

* Monkey patching the ``select`` and ``thread`` modules to be  eventlet friendly
  in order to avoid hanging issues.

* The reference to the connection object in the QpidStrategy
  was replaceable by a concurrent thread, which could cause various issues.
  Using just local variables for storing connection object in order to avoid
  concurrent unsafe manipulation.

Fixing bug 1229042

Change-Id: I8fa8c4f36892b96d406216cb3c64854a94ca9df7
  • Loading branch information
afazekas committed Sep 26, 2013
1 parent 8c31de4 commit 2e7aa76
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 32 deletions.
5 changes: 3 additions & 2 deletions glance/cmd/api.py
Expand Up @@ -26,8 +26,9 @@
import os
import sys

# Monkey patch socket and time
eventlet.patcher.monkey_patch(all=False, socket=True, time=True)
# Monkey patch socket, time, select, threads
eventlet.patcher.monkey_patch(all=False, socket=True, time=True,
select=True, thread=True)

# If ../glance/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
Expand Down
40 changes: 20 additions & 20 deletions glance/notifier/notify_qpid.py
Expand Up @@ -83,30 +83,31 @@ class QpidStrategy(strategy.Strategy):
def _open_connection(self):
"""Initialize the Qpid notification strategy."""
broker = CONF.qpid_hostname + ":" + CONF.qpid_port
self.connection = qpid.messaging.Connection(broker)
self.connection.username = CONF.qpid_username
self.connection.password = CONF.qpid_password
self.connection.sasl_mechanisms = CONF.qpid_sasl_mechanisms
connection = qpid.messaging.Connection(broker)
connection.username = CONF.qpid_username
connection.password = CONF.qpid_password
connection.sasl_mechanisms = CONF.qpid_sasl_mechanisms
# Hard code this option as enabled so that reconnect logic isn't needed
# in this file at all.
self.connection.reconnect = True
connection.reconnect = True
if CONF.qpid_reconnect_timeout:
self.connection.reconnect_timeout = CONF.qpid_reconnect_timeout
connection.reconnect_timeout = CONF.qpid_reconnect_timeout
if CONF.qpid_reconnect_limit:
self.connection.reconnect_limit = CONF.qpid_reconnect_limit
connection.reconnect_limit = CONF.qpid_reconnect_limit
if CONF.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
connection.reconnect_interval_max = (
CONF.qpid_reconnect_interval_max)
if CONF.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
connection.reconnect_interval_min = (
CONF.qpid_reconnect_interval_min)
if CONF.qpid_reconnect_interval:
self.connection.reconnect_interval = CONF.qpid_reconnect_interval
self.connection.heartbeat = CONF.qpid_heartbeat
self.connection.protocol = CONF.qpid_protocol
self.connection.tcp_nodelay = CONF.qpid_tcp_nodelay
self.connection.open()
connection.reconnect_interval = CONF.qpid_reconnect_interval
connection.heartbeat = CONF.qpid_heartbeat
connection.protocol = CONF.qpid_protocol
connection.tcp_nodelay = CONF.qpid_tcp_nodelay
connection.open()
LOG.info(_('Connected to AMQP server on %s') % broker)
return connection

def _send(self, priority, msg):
addr_opts = {
Expand All @@ -124,11 +125,10 @@ def _send(self, priority, msg):
topic = "%s.%s" % (CONF.qpid_notification_topic, priority)
address = "%s/%s ; %s" % (CONF.qpid_notification_exchange, topic,
json.dumps(addr_opts))

connection = None
try:
self.connection = None
self._open_connection()
session = self.connection.session()
connection = self._open_connection()
session = connection.session()
sender = session.sender(address)
qpid_msg = qpid.messaging.Message(content=msg)
sender.send(qpid_msg)
Expand All @@ -138,8 +138,8 @@ def _send(self, priority, msg):
'Message: %(msg)s') % details)
raise
finally:
if self.connection and self.connection.opened():
self.connection.close()
if connection and connection.opened():
connection.close()

def warn(self, msg):
self._send('warn', msg)
Expand Down
31 changes: 21 additions & 10 deletions glance/tests/unit/test_notifier.py
Expand Up @@ -348,7 +348,7 @@ def reset_qpid(self):
qpid.messaging.Sender = self.orig_sender
qpid.messaging.Receiver = self.orig_receiver

def _test_notify(self, priority, exception=False, opened=True):
def _test_notify(self, priority, exception=False, exception_send=False):
test_msg = {'a': 'b'}

self.mock_connection = self.mocker.CreateMock(self.orig_connection)
Expand All @@ -358,7 +358,7 @@ def _test_notify(self, priority, exception=False, opened=True):
self.mock_connection.username = ""
if exception:
self.mock_connection.open().AndRaise(
Exception('Test Exception'))
Exception('Test open Exception'))
else:
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
Expand All @@ -368,27 +368,32 @@ def _test_notify(self, priority, exception=False, opened=True):
'"create": "always"}' % priority)
self.mock_session.sender(expected_address).AndReturn(
self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
self.mock_connection.opened().AndReturn(opened)
if opened:
if exception_send:
self.mock_sender.send(mox.IgnoreArg()).AndRaise(
Exception('Test send Exception'))
# NOTE(afazekas): the opened and close call is expected
# in this case, but not expected if the open fails
else:
self.mock_sender.send(mox.IgnoreArg())
self.mock_connection.opened().AndReturn(True)
self.mock_connection.close()

self.mocker.ReplayAll()

self.config(notifier_strategy="qpid")
notifier = self.notify_qpid.QpidStrategy()
if priority == 'info':
if exception:
if exception or exception_send:
self.assertRaises(Exception, notifier.info, test_msg)
else:
notifier.info(test_msg)
elif priority == 'warn':
if exception:
if exception or exception_send:
self.assertRaises(Exception, notifier.warn, test_msg)
else:
notifier.warn(test_msg)
elif priority == 'error':
if exception:
if exception or exception_send:
self.assertRaises(Exception, notifier.error, test_msg)
else:
notifier.error(test_msg)
Expand All @@ -407,8 +412,14 @@ def test_error(self):
def test_exception_open_successful(self):
self._test_notify('info', exception=True)

def test_exception_open_failed(self):
self._test_notify('info', exception=True, opened=False)
def test_info_fail(self):
self._test_notify('info', exception_send=True)

def test_warn_fail(self):
self._test_notify('warn', exception_send=True)

def test_error_fail(self):
self._test_notify('error', exception_send=True)


class TestRabbitContentType(utils.BaseTestCase):
Expand Down

0 comments on commit 2e7aa76

Please sign in to comment.