Skip to content

Commit

Permalink
Reworked management tests
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Sep 28, 2021
1 parent dbff6c5 commit 3abf0ee
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions amqpstorm/tests/functional/management/test_connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time
import uuid

from amqpstorm import AMQPConnectionError
from amqpstorm import Connection
from amqpstorm.management import ManagementApi
from amqpstorm.tests import HOST
Expand Down Expand Up @@ -37,9 +36,11 @@ def test_api_connection_list(self):
def test_api_connection_client_properties(self):
api = ManagementApi(HTTP_URL, USERNAME, PASSWORD, timeout=1)

client_properties = {'platform': 'Atari', 'license': 'MIT'}
connection = Connection(HOST, USERNAME, PASSWORD, timeout=1,
client_properties=client_properties)
connection = Connection(
HOST, USERNAME, PASSWORD,
client_properties={'platform': 'Atari', 'license': 'MIT'}
)
self.assertTrue(connection.is_open)

connection_found = False
for _ in range(10):
Expand All @@ -51,8 +52,6 @@ def test_api_connection_client_properties(self):
break
time.sleep(1)

connection.close()

self.assertTrue(
connection_found,
'Could not find connection with custom client properties'
Expand All @@ -62,26 +61,28 @@ def test_api_connection_close(self):
connection_id = str(uuid.uuid4())
api = ManagementApi(HTTP_URL, USERNAME, PASSWORD, timeout=1)

client_properties = {'platform': connection_id}
connection = Connection(HOST, USERNAME, PASSWORD, timeout=1,
client_properties=client_properties)
connection = Connection(
HOST, USERNAME, PASSWORD,
client_properties={'platform': connection_id}
)
self.assertTrue(connection.is_open)

connection_found = False
for _ in range(10):
for conn in api.connection.list():
if conn['client_properties']['platform'] != connection_id:
continue
connection_found = True
api.connection.close(conn['name'], reason=connection_id)
self.assertIsNone(
api.connection.close(conn['name'], reason=connection_id)
)
time.sleep(1)
if connection_found:
break
time.sleep(1)

self.assertTrue(
connection_found,
'Could not find connection'
)
self.assertRaisesRegex(
AMQPConnectionError, 'connection closed',
connection.channel, 1
)

self.assertTrue(connection.is_closed)

0 comments on commit 3abf0ee

Please sign in to comment.