Skip to content

Commit

Permalink
Make kombu failures retry on IOError
Browse files Browse the repository at this point in the history
Fixes bug 797770

Unfortunately if rabbit decides protocol negotiation is taking too long,
it will close the socket on us.  This ends up raising IOError with a
'socket closed' message.  This patch will catch IOError and re-try.

Change-Id: I9110c845b71118c0fad760d90e91c585e6db46ed
  • Loading branch information
comstud authored and vishvananda committed Mar 30, 2012
1 parent ada63db commit 0d669f1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
4 changes: 2 additions & 2 deletions nova/rpc/impl_kombu.py
Expand Up @@ -445,7 +445,7 @@ def reconnect(self):
try:
self._connect()
return
except self.connection_errors, e:
except (self.connection_errors, IOError), e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
Expand Down Expand Up @@ -488,7 +488,7 @@ def ensure(self, error_callback, method, *args, **kwargs):
while True:
try:
return method(*args, **kwargs)
except (self.connection_errors, socket.timeout), e:
except (self.connection_errors, socket.timeout, IOError), e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
Expand Down
17 changes: 15 additions & 2 deletions nova/tests/rpc/test_kombu.py
Expand Up @@ -35,14 +35,15 @@ class MyException(Exception):
pass


def _raise_exc_stub(stubs, times, obj, method, exc_msg):
def _raise_exc_stub(stubs, times, obj, method, exc_msg,
exc_class=MyException):
info = {'called': 0}
orig_method = getattr(obj, method)

def _raise_stub(*args, **kwargs):
info['called'] += 1
if info['called'] <= times:
raise MyException(exc_msg)
raise exc_class(exc_msg)
orig_method(*args, **kwargs)
stubs.Set(obj, method, _raise_stub)
return info
Expand Down Expand Up @@ -213,6 +214,18 @@ def test_declare_consumer_errors_will_reconnect(self):
self.assertEqual(info['called'], 2)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))

def test_declare_consumer_ioerrors_will_reconnect(self):
"""Test that an IOError exception causes a reconnection"""
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
'__init__', 'Socket closed', exc_class=IOError)

conn = self.rpc.Connection()
result = conn.declare_consumer(self.rpc.DirectConsumer,
'test_topic', None)

self.assertEqual(info['called'], 3)
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))

def test_publishing_errors_will_reconnect(self):
# Test that any exception with 'timeout' in it causes a
# reconnection when declaring the publisher class and when
Expand Down

0 comments on commit 0d669f1

Please sign in to comment.