Skip to content

Commit

Permalink
Fixed another unlikely issue related to extreme network conditions
Browse files Browse the repository at this point in the history
The goal here is not to fix issues caused by network corruption,
but rather to make sure that the exceptions raised when
connections do fail are always predictable.

Fail fast and reliably!
  • Loading branch information
eandersson committed May 6, 2016
1 parent 1a1512e commit 36ae359
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
2 changes: 1 addition & 1 deletion amqpstorm/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def _handle_amqp_frame(self, data_in):
pass
except pamqp_spec.AMQPFrameError as why:
LOGGER.error('AMQPFrameError: %r', why, exc_info=True)
except UnicodeDecodeError as why:
except (UnicodeDecodeError, ValueError) as why:
LOGGER.error(why, exc_info=True)
self.exceptions.append(AMQPConnectionError(why))
return data_in, None, None
Expand Down
45 changes: 45 additions & 0 deletions amqpstorm/tests/connection_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ def throw_error(*_):
pamqp_frame.unmarshal = restore_func

def test_connection_handle_unicode_error(self):
"""This test covers an unlikely issue triggered by network corruption.
pamqp.decode._maybe_utf8 raises:
UnicodeDecodeError: 'utf8' codec can't
decode byte 0xc5 in position 1: invalid continuation byte
The goal here is not to fix issues caused by network corruption,
but rather to make sure that the exceptions raised when
connections do fail are always predictable.
Fail fast and reliably!
:return:
"""
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)

def throw_error(_):
Expand All @@ -145,6 +159,37 @@ def throw_error(_):
finally:
pamqp_frame.unmarshal = restore_func

def test_connection_handle_value_error(self):
"""This test covers an unlikely issue triggered by network corruption.
pamqp.decode._embedded_value raises:
ValueError: Unknown type: b'\x13'
The goal here is not to fix issues caused by network corruption,
but rather to make sure that the exceptions raised when
connections do fail are always predictable.
Fail fast and reliably!
:return:
"""
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)

def throw_error(_):
raise ValueError("Unknown type: b'\x13'")

restore_func = pamqp_frame.unmarshal
try:
pamqp_frame.unmarshal = throw_error

result = connection._handle_amqp_frame('error')

self.assertEqual(result[0], 'error')
self.assertIsNone(result[1])
self.assertIsNone(result[2])
finally:
pamqp_frame.unmarshal = restore_func

def test_connection_wait_for_connection(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=5,
lazy=True)
Expand Down
8 changes: 4 additions & 4 deletions examples/scalable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _update_consumers(self):
def _start_consumer(self, consumer):
"""Start a consumer as a new Thread.
:param consumer:
:param Consumer consumer:
:return:
"""
thread = threading.Thread(target=consumer.start,
Expand All @@ -138,11 +138,11 @@ def _start_consumer(self, consumer):
self._wait_for_consumer_to_start(consumer)

@staticmethod
def _wait_for_consumer_to_start(consumer, timeout=1):
def _wait_for_consumer_to_start(consumer, timeout=30):
"""Wait to make sure the consumer has time to start.
:param consumer:
:param timeout:
:param Consumer consumer:
:param int timeout:
:return:
"""
start_time = time.time()
Expand Down
8 changes: 4 additions & 4 deletions examples/scalable_rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _update_consumers(self):
def _start_consumer(self, consumer):
"""Start a consumer as a new Thread.
:param consumer:
:param Consumer consumer:
:return:
"""
thread = threading.Thread(target=consumer.start,
Expand All @@ -148,11 +148,11 @@ def _start_consumer(self, consumer):
self._wait_for_consumer_to_start(consumer)

@staticmethod
def _wait_for_consumer_to_start(consumer, timeout=1):
def _wait_for_consumer_to_start(consumer, timeout=30):
"""Wait to make sure the consumer has time to start.
:param consumer:
:param timeout:
:param Consumer consumer:
:param int timeout:
:return:
"""
start_time = time.time()
Expand Down

0 comments on commit 36ae359

Please sign in to comment.