diff --git a/amqpstorm/channel.py b/amqpstorm/channel.py index ed2ec52c..b0d829b3 100644 --- a/amqpstorm/channel.py +++ b/amqpstorm/channel.py @@ -502,6 +502,11 @@ def _close_channel(self, frame_in): :param specification.Channel.Close frame_in: Channel Close frame. :return: """ + if self._connection.is_open: + try: + self.write_frame(specification.Channel.CloseOk()) + except AMQPConnectionError: + pass self.set_state(self.CLOSED) self.remove_consumer_tag() if self._inbound: diff --git a/amqpstorm/io.py b/amqpstorm/io.py index c3513c74..5d44a119 100644 --- a/amqpstorm/io.py +++ b/amqpstorm/io.py @@ -77,14 +77,15 @@ def close(self): try: self._running.clear() self._close_socket() - self.socket = None - self.poller = None finally: self._wr_lock.release() self._rd_lock.release() if self._inbound_thread: self._inbound_thread.join(timeout=self._parameters['timeout']) + + self.socket = None + self.poller = None self._inbound_thread = None def open(self): diff --git a/amqpstorm/tests/functional/test_reliability.py b/amqpstorm/tests/functional/test_reliability.py index 3b87e432..9a3bc5fe 100644 --- a/amqpstorm/tests/functional/test_reliability.py +++ b/amqpstorm/tests/functional/test_reliability.py @@ -3,6 +3,7 @@ import threading import time +from amqpstorm import AMQPChannelError from amqpstorm import AMQPConnectionError from amqpstorm import AMQPMessageError from amqpstorm import Connection @@ -186,6 +187,35 @@ def test_functional_ssl_connection_without_ssl(self): sys.modules['ssl'] = restore_func imp.reload(compatibility) + @setup(new_connection=False, queue=True) + def test_functional_verify_passive_declare(self): + + self.connection = self.connection = Connection(HOST, USERNAME, + PASSWORD) + self.channel = self.connection.channel() + self.assertEqual(int(self.channel), 1) + + self.assertRaises( + AMQPChannelError, + self.channel.queue.declare, self.queue_name, passive=True + ) + + self.channel = self.connection.channel() + self.assertEqual(int(self.channel), 1) + + self.assertRaises( + AMQPChannelError, + self.channel.queue.declare, self.queue_name, passive=True + ) + + self.channel = self.connection.channel() + self.assertEqual(int(self.channel), 1) + + self.channel.queue.declare(self.queue_name) + + self.channel.close() + self.connection.close() + class PublishAndConsume1kTest(TestFunctionalFramework): messages_to_send = 1000