Skip to content

Commit

Permalink
Fix Channel not closing properly after being closed by server #114
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Nov 19, 2021
1 parent 57fd73f commit 290982b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
5 changes: 5 additions & 0 deletions amqpstorm/channel.py
Expand Up @@ -502,6 +502,11 @@ def _close_channel(self, frame_in):
:param specification.Channel.Close frame_in: Channel Close frame.
:return:
"""
if self._connection.is_open:
try:
self.write_frame(specification.Channel.CloseOk())
except AMQPConnectionError:
pass
self.set_state(self.CLOSED)
self.remove_consumer_tag()
if self._inbound:
Expand Down
5 changes: 3 additions & 2 deletions amqpstorm/io.py
Expand Up @@ -77,14 +77,15 @@ def close(self):
try:
self._running.clear()
self._close_socket()
self.socket = None
self.poller = None
finally:
self._wr_lock.release()
self._rd_lock.release()

if self._inbound_thread:
self._inbound_thread.join(timeout=self._parameters['timeout'])

self.socket = None
self.poller = None
self._inbound_thread = None

def open(self):
Expand Down
30 changes: 30 additions & 0 deletions amqpstorm/tests/functional/test_reliability.py
Expand Up @@ -3,6 +3,7 @@
import threading
import time

from amqpstorm import AMQPChannelError
from amqpstorm import AMQPConnectionError
from amqpstorm import AMQPMessageError
from amqpstorm import Connection
Expand Down Expand Up @@ -186,6 +187,35 @@ def test_functional_ssl_connection_without_ssl(self):
sys.modules['ssl'] = restore_func
imp.reload(compatibility)

@setup(new_connection=False, queue=True)
def test_functional_verify_passive_declare(self):

self.connection = self.connection = Connection(HOST, USERNAME,
PASSWORD)
self.channel = self.connection.channel()
self.assertEqual(int(self.channel), 1)

self.assertRaises(
AMQPChannelError,
self.channel.queue.declare, self.queue_name, passive=True
)

self.channel = self.connection.channel()
self.assertEqual(int(self.channel), 1)

self.assertRaises(
AMQPChannelError,
self.channel.queue.declare, self.queue_name, passive=True
)

self.channel = self.connection.channel()
self.assertEqual(int(self.channel), 1)

self.channel.queue.declare(self.queue_name)

self.channel.close()
self.connection.close()


class PublishAndConsume1kTest(TestFunctionalFramework):
messages_to_send = 1000
Expand Down

0 comments on commit 290982b

Please sign in to comment.