Skip to content

Commit

Permalink
Bug fix for #63 (#64)
Browse files Browse the repository at this point in the history
* Properly wait until the inbound queue is empty #63
* Preparing new release
* Remove unnecessary continue
  • Loading branch information
eandersson committed Oct 18, 2018
1 parent bc9af98 commit a96f795
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 13 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Expand Up @@ -8,7 +8,6 @@ python:
- 3.5
- 3.6
- 3.7
- nightly
install:
- pip install -r requirements.txt
- pip install -r test-requirements.txt
Expand All @@ -24,4 +23,4 @@ before_script:
after_success:
- bash <(curl -s https://codecov.io/bash)
services:
- docker
- docker
4 changes: 4 additions & 0 deletions CHANGELOG.rst
@@ -1,6 +1,10 @@
Changelog
=========

Version 2.4.3
-------------
- Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.

Version 2.4.2
-------------
- Added support for External Authentication - Thanks Bernd Höhl.
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2014-2017 Erik Olof Gunnar Andersson
Copyright (c) 2014-2018 Erik Olof Gunnar Andersson

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
File renamed without changes.
4 changes: 4 additions & 0 deletions README.rst
Expand Up @@ -20,6 +20,10 @@ Additional documentation is available on `amqpstorm.io <https://www.amqpstorm.io
Changelog
=========

Version 2.4.3
-------------
- Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.

Version 2.4.2
-------------
- Added support for External Authentication - Thanks Bernd Höhl.
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
@@ -1,5 +1,5 @@
"""AMQPStorm."""
__version__ = '2.4.2' # noqa
__version__ = '2.4.3' # noqa
__author__ = 'eandersson' # noqa

import logging
Expand Down
6 changes: 4 additions & 2 deletions amqpstorm/channel.py
Expand Up @@ -112,9 +112,11 @@ def build_inbound_messages(self, break_on_empty=False, to_tuple=False,
while not self.is_closed:
message = self._build_message(auto_decode=auto_decode)
if not message:
if break_on_empty:
break
self.check_for_errors()
if break_on_empty:
sleep(IDLE_WAIT * 10)
if not self._inbound:
break
sleep(IDLE_WAIT)
continue
if to_tuple:
Expand Down
35 changes: 29 additions & 6 deletions amqpstorm/tests/unit/channel/channel_message_handling_tests.py
Expand Up @@ -197,6 +197,29 @@ def test_channel_build_empty_inbound_messages(self):
else:
self.assertRaises(StopIteration, generator.__next__)

def test_channel_build_no_message_but_inbound_not_empty(self):
channel = Channel(0, FakeConnection(), 360)
channel.set_state(Channel.OPEN)

message = self.message.encode('utf-8')
message_len = len(message)

def add_content():
channel._inbound.append(ContentHeader(body_size=message_len))
channel._inbound.append(ContentBody(value=message))

deliver = specification.Basic.Deliver()
channel._inbound = [deliver]

self.assertTrue(channel._inbound)

threading.Timer(function=add_content, interval=0.2).start()

for msg in channel.build_inbound_messages(break_on_empty=True):
self.assertEqual(msg.body, message.decode('utf-8'))

self.assertFalse(channel._inbound)

def test_channel_build_inbound_messages(self):
channel = Channel(0, FakeConnection(), 360)
channel.set_state(Channel.OPEN)
Expand All @@ -211,8 +234,8 @@ def test_channel_build_inbound_messages(self):
channel._inbound = [deliver, header, body]

messages_consumed = 0
for message in channel.build_inbound_messages(break_on_empty=True):
self.assertIsInstance(message, Message)
for msg in channel.build_inbound_messages(break_on_empty=True):
self.assertIsInstance(msg, Message)
messages_consumed += 1

self.assertEqual(messages_consumed, 1)
Expand All @@ -232,8 +255,8 @@ def test_channel_build_multiple_inbound_messages(self):
deliver, header, body, deliver, header, body]

messages_consumed = 0
for message in channel.build_inbound_messages(break_on_empty=True):
self.assertIsInstance(message, Message)
for msg in channel.build_inbound_messages(break_on_empty=True):
self.assertIsInstance(msg, Message)
messages_consumed += 1

self.assertEqual(messages_consumed, 4)
Expand All @@ -255,8 +278,8 @@ def test_channel_build_large_number_inbound_messages(self):
channel._inbound.append(body)

messages_consumed = 0
for message in channel.build_inbound_messages(break_on_empty=True):
self.assertIsInstance(message, Message)
for msg in channel.build_inbound_messages(break_on_empty=True):
self.assertIsInstance(msg, Message)
messages_consumed += 1

self.assertEqual(messages_consumed, 10000)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -3,7 +3,7 @@

setup(
name='AMQPStorm',
version='2.4.2',
version='2.4.3',
description='Thread-safe Python RabbitMQ Client & Management library.',
long_description=open('README.rst').read(),
author='Erik Olof Gunnar Andersson',
Expand Down

0 comments on commit a96f795

Please sign in to comment.