From e8d65d8f4e928d8a78e044a8491f7c9d28eeb6c1 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 31 Jul 2025 10:54:37 -0700 Subject: [PATCH 1/3] refactor shared-sub-test --- test/test_mqtt5.py | 173 ++++++++++++++------------------------------- 1 file changed, 52 insertions(+), 121 deletions(-) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index b841478e7..133e6f3e5 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1009,160 +1009,91 @@ def test_operation_sub_unsub(self): client.stop() callbacks.future_stopped.result(TIMEOUT) - total_callbacks = 0 - all_packets_received = Future() - mutex = Lock() - received_subscriptions = [0] * 10 - - def subscriber1_callback(self, publish_received_data: mqtt5.PublishReceivedData): - self.mutex.acquire() - var = publish_received_data.publish_packet.payload - self.received_subscriptions[int(var)] = 1 - self.total_callbacks = self.total_callbacks + 1 - if self.total_callbacks == 10: - self.all_packets_received.set_result(None) - self.mutex.release() - - def subscriber2_callback(self, publish_received_data: mqtt5.PublishReceivedData): - self.mutex.acquire() - var = publish_received_data.publish_packet.payload - self.received_subscriptions[int(var)] = 1 - self.total_callbacks = self.total_callbacks + 1 - if self.total_callbacks == 10: - self.all_packets_received.set_result(None) - self.mutex.release() - def test_operation_shared_subscription(self): input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") - client_id_subscriber1 = create_client_id() - client_id_subscriber2 = create_client_id() client_id_publisher = create_client_id() - testTopic = "test/MQTT5_Binding_Python_" + client_id_publisher - sharedTopicfilter = "$share/crttest/test/MQTT5_Binding_Python_" + client_id_publisher - tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( input_cert, input_key ) - # subscriber 1 - connect_subscriber1_options = mqtt5.ConnectPacket(client_id=client_id_subscriber1) - subscriber1_generic_callback = Mqtt5TestCallbacks() - subscriber1_options = mqtt5.ClientOptions( + client_options_subscriber_1 = mqtt5.ClientOptions( host_name=input_host_name, port=8883, tls_ctx=io.ClientTlsContext(tls_ctx_options), - connect_options=connect_subscriber1_options, - on_publish_callback_fn=self.subscriber1_callback, - on_lifecycle_event_stopped_fn=subscriber1_generic_callback.on_lifecycle_stopped, - on_lifecycle_event_attempting_connect_fn=subscriber1_generic_callback.on_lifecycle_attempting_connect, - on_lifecycle_event_connection_success_fn=subscriber1_generic_callback.on_lifecycle_connection_success, - on_lifecycle_event_connection_failure_fn=subscriber1_generic_callback.on_lifecycle_connection_failure - ) - subscriber1_client = mqtt5.Client(client_options=subscriber1_options) - - # subscriber 2 - connect_subscriber2_options = mqtt5.ConnectPacket(client_id=client_id_subscriber2) - subscriber2_generic_callback = Mqtt5TestCallbacks() - subscriber2_options = mqtt5.ClientOptions( + connect_options=mqtt5.ConnectPacket(client_id=create_client_id()) + ) + client_options_subscriber_2 = mqtt5.ClientOptions( host_name=input_host_name, port=8883, tls_ctx=io.ClientTlsContext(tls_ctx_options), - connect_options=connect_subscriber2_options, - on_publish_callback_fn=self.subscriber2_callback, - on_lifecycle_event_stopped_fn=subscriber2_generic_callback.on_lifecycle_stopped, - on_lifecycle_event_attempting_connect_fn=subscriber2_generic_callback.on_lifecycle_attempting_connect, - on_lifecycle_event_connection_success_fn=subscriber2_generic_callback.on_lifecycle_connection_success, - on_lifecycle_event_connection_failure_fn=subscriber2_generic_callback.on_lifecycle_connection_failure + connect_options=mqtt5.ConnectPacket(client_id=create_client_id()) ) - subscriber2_client = mqtt5.Client(client_options=subscriber2_options) - - # publisher - connect_publisher_options = mqtt5.ConnectPacket(client_id=client_id_publisher) - publisher_generic_callback = Mqtt5TestCallbacks() - - publisher_options = mqtt5.ClientOptions( + client_options_publisher = mqtt5.ClientOptions( host_name=input_host_name, port=8883, tls_ctx=io.ClientTlsContext(tls_ctx_options), - connect_options=connect_publisher_options, - on_lifecycle_event_stopped_fn=publisher_generic_callback.on_lifecycle_stopped, - on_lifecycle_event_attempting_connect_fn=publisher_generic_callback.on_lifecycle_attempting_connect, - on_lifecycle_event_connection_success_fn=publisher_generic_callback.on_lifecycle_connection_success, - on_lifecycle_event_connection_failure_fn=publisher_generic_callback.on_lifecycle_connection_failure - ) - publisher_client = mqtt5.Client(client_options=publisher_options) - - print("Connecting all 3 clients\n") - subscriber1_client.start() - subscriber1_generic_callback.future_connection_success.result(TIMEOUT) + connect_options=mqtt5.ConnectPacket(client_id=client_id_publisher) + ) + + callbacks_subscriber_1 = Mqtt5TestCallbacks() + callbacks_subscriber_2 = Mqtt5TestCallbacks() + callbacks_publisher = Mqtt5TestCallbacks() + + client_subscriber_1 = self._create_client( + client_options=client_options_subscriber_1, callbacks=callbacks_subscriber_1) + client_subscriber_2 = self._create_client( + client_options=client_options_subscriber_2, callbacks=callbacks_subscriber_2) + client_publisher = self._create_client( + client_options=client_options_publisher, callbacks=callbacks_publisher) + + client_subscriber_1.start() + callbacks_subscriber_1.future_connection_success.result(TIMEOUT) + client_subscriber_2.start() + callbacks_subscriber_2.future_connection_success.result(TIMEOUT) + client_publisher.start() + callbacks_publisher.future_connection_success.result(TIMEOUT) - subscriber2_client.start() - subscriber2_generic_callback.future_connection_success.result(TIMEOUT) - - publisher_client.start() - publisher_generic_callback.future_connection_success.result(TIMEOUT) - print("All clients connected\n") + topic_filter = "test/MQTT5_Binding_Python_" + client_id_publisher + topic_filter_shared = "$share/crttest/test/MQTT5_Binding_Python_" + client_id_publisher + payload = "Hello Subscriber" - # Subscriber 1 subscriptions = [] - subscriptions.append(mqtt5.Subscription(topic_filter=sharedTopicfilter, qos=mqtt5.QoS.AT_LEAST_ONCE)) - subscribe_packet = mqtt5.SubscribePacket( - subscriptions=subscriptions) - subscribe_future = subscriber1_client.subscribe(subscribe_packet=subscribe_packet) - suback_packet1 = subscribe_future.result(TIMEOUT) - self.assertIsInstance(suback_packet1, mqtt5.SubackPacket) - - # Subscriber 2 - subscriptions2 = [] - subscriptions2.append(mqtt5.Subscription(topic_filter=sharedTopicfilter, qos=mqtt5.QoS.AT_LEAST_ONCE)) - subscribe_packet2 = mqtt5.SubscribePacket( - subscriptions=subscriptions2) - subscribe_future2 = subscriber2_client.subscribe(subscribe_packet=subscribe_packet2) - suback_packet2 = subscribe_future2.result(TIMEOUT) - self.assertIsInstance(suback_packet2, mqtt5.SubackPacket) - + subscriptions.append(mqtt5.Subscription(topic_filter=topic_filter_shared, qos=mqtt5.QoS.AT_LEAST_ONCE)) + subscriber_packet = mqtt5.SubscribePacket(subscriptions=subscriptions) + subscribe_future_1 = client_subscriber_1.subscribe(subscribe_packet=subscriber_packet) + suback_packet_1 = subscribe_future_1.result(TIMEOUT) + self.assertIsInstance(suback_packet_1, mqtt5.SubackPacket) + subscribe_future_2 = client_subscriber_2.subscribe(subscribe_packet=subscriber_packet) + suback_packet_2 = subscribe_future_2.result(TIMEOUT) + self.assertIsInstance(suback_packet_2, mqtt5.SubackPacket) + + # Expected publish count 10 shared between the two clients evenly + callbacks_subscriber_1.on_publish_receive_expected = 5 + callbacks_subscriber_2.on_publish_receive_expected = 5 publishes = 10 for x in range(0, publishes): packet = mqtt5.PublishPacket( - payload=f"{x}", + payload=payload, qos=mqtt5.QoS.AT_LEAST_ONCE, - topic=testTopic + topic=topic_filter ) - publish_future = publisher_client.publish(packet) + publish_future = client_publisher.publish(packet) publish_future.result(TIMEOUT) - self.all_packets_received.result(TIMEOUT) - - topic_filters = [] - topic_filters.append(testTopic) - unsubscribe_packet = mqtt5.UnsubscribePacket(topic_filters=testTopic) - - unsubscribe_future = subscriber1_client.unsubscribe(unsubscribe_packet) - unsuback_packet = unsubscribe_future.result(TIMEOUT) - self.assertIsInstance(unsuback_packet, mqtt5.UnsubackPacket) - - unsubscribe_future = subscriber2_client.unsubscribe(unsubscribe_packet) - unsuback_packet = unsubscribe_future.result(TIMEOUT) - self.assertIsInstance(unsuback_packet, mqtt5.UnsubackPacket) - - self.assertEqual(self.total_callbacks, 10) - - for e in self.received_subscriptions: - self.assertEqual(e, 1) - - subscriber1_client.stop() - subscriber1_generic_callback.future_stopped.result(TIMEOUT) - - subscriber2_client.stop() - subscriber2_generic_callback.future_stopped.result(TIMEOUT) + callbacks_subscriber_1.future_expected_publishes_received.result(TIMEOUT) + callbacks_subscriber_2.future_expected_publishes_received.result(TIMEOUT) - publisher_client.stop() - publisher_generic_callback.future_stopped.result(TIMEOUT) + client_subscriber_1.stop() + callbacks_subscriber_1.future_stopped.result(TIMEOUT) + client_subscriber_2.stop() + callbacks_subscriber_2.future_stopped.result(TIMEOUT) + client_publisher.stop() + callbacks_publisher.future_stopped.result(TIMEOUT) def test_operation_will(self): input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") From c46ee718560013db4df713aea07c2afe78f716bb Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 31 Jul 2025 10:58:07 -0700 Subject: [PATCH 2/3] lint --- test/test_mqtt5.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index 133e6f3e5..951099619 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1050,7 +1050,7 @@ def test_operation_shared_subscription(self): client_options=client_options_subscriber_2, callbacks=callbacks_subscriber_2) client_publisher = self._create_client( client_options=client_options_publisher, callbacks=callbacks_publisher) - + client_subscriber_1.start() callbacks_subscriber_1.future_connection_success.result(TIMEOUT) client_subscriber_2.start() From a54147c52b03913643f805fc0fafee561b471bf2 Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Thu, 31 Jul 2025 11:25:10 -0700 Subject: [PATCH 3/3] remove the shared subscription test. --- test/test_mqtt5.py | 86 ---------------------------------------------- 1 file changed, 86 deletions(-) diff --git a/test/test_mqtt5.py b/test/test_mqtt5.py index 951099619..d52da2abc 100644 --- a/test/test_mqtt5.py +++ b/test/test_mqtt5.py @@ -1009,92 +1009,6 @@ def test_operation_sub_unsub(self): client.stop() callbacks.future_stopped.result(TIMEOUT) - def test_operation_shared_subscription(self): - input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") - input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT") - input_key = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_KEY") - - client_id_publisher = create_client_id() - - tls_ctx_options = io.TlsContextOptions.create_client_with_mtls_from_path( - input_cert, - input_key - ) - - client_options_subscriber_1 = mqtt5.ClientOptions( - host_name=input_host_name, - port=8883, - tls_ctx=io.ClientTlsContext(tls_ctx_options), - connect_options=mqtt5.ConnectPacket(client_id=create_client_id()) - ) - client_options_subscriber_2 = mqtt5.ClientOptions( - host_name=input_host_name, - port=8883, - tls_ctx=io.ClientTlsContext(tls_ctx_options), - connect_options=mqtt5.ConnectPacket(client_id=create_client_id()) - ) - client_options_publisher = mqtt5.ClientOptions( - host_name=input_host_name, - port=8883, - tls_ctx=io.ClientTlsContext(tls_ctx_options), - connect_options=mqtt5.ConnectPacket(client_id=client_id_publisher) - ) - - callbacks_subscriber_1 = Mqtt5TestCallbacks() - callbacks_subscriber_2 = Mqtt5TestCallbacks() - callbacks_publisher = Mqtt5TestCallbacks() - - client_subscriber_1 = self._create_client( - client_options=client_options_subscriber_1, callbacks=callbacks_subscriber_1) - client_subscriber_2 = self._create_client( - client_options=client_options_subscriber_2, callbacks=callbacks_subscriber_2) - client_publisher = self._create_client( - client_options=client_options_publisher, callbacks=callbacks_publisher) - - client_subscriber_1.start() - callbacks_subscriber_1.future_connection_success.result(TIMEOUT) - client_subscriber_2.start() - callbacks_subscriber_2.future_connection_success.result(TIMEOUT) - client_publisher.start() - callbacks_publisher.future_connection_success.result(TIMEOUT) - - topic_filter = "test/MQTT5_Binding_Python_" + client_id_publisher - topic_filter_shared = "$share/crttest/test/MQTT5_Binding_Python_" + client_id_publisher - payload = "Hello Subscriber" - - subscriptions = [] - subscriptions.append(mqtt5.Subscription(topic_filter=topic_filter_shared, qos=mqtt5.QoS.AT_LEAST_ONCE)) - subscriber_packet = mqtt5.SubscribePacket(subscriptions=subscriptions) - subscribe_future_1 = client_subscriber_1.subscribe(subscribe_packet=subscriber_packet) - suback_packet_1 = subscribe_future_1.result(TIMEOUT) - self.assertIsInstance(suback_packet_1, mqtt5.SubackPacket) - subscribe_future_2 = client_subscriber_2.subscribe(subscribe_packet=subscriber_packet) - suback_packet_2 = subscribe_future_2.result(TIMEOUT) - self.assertIsInstance(suback_packet_2, mqtt5.SubackPacket) - - # Expected publish count 10 shared between the two clients evenly - callbacks_subscriber_1.on_publish_receive_expected = 5 - callbacks_subscriber_2.on_publish_receive_expected = 5 - publishes = 10 - for x in range(0, publishes): - packet = mqtt5.PublishPacket( - payload=payload, - qos=mqtt5.QoS.AT_LEAST_ONCE, - topic=topic_filter - ) - publish_future = client_publisher.publish(packet) - publish_future.result(TIMEOUT) - - callbacks_subscriber_1.future_expected_publishes_received.result(TIMEOUT) - callbacks_subscriber_2.future_expected_publishes_received.result(TIMEOUT) - - client_subscriber_1.stop() - callbacks_subscriber_1.future_stopped.result(TIMEOUT) - client_subscriber_2.stop() - callbacks_subscriber_2.future_stopped.result(TIMEOUT) - client_publisher.stop() - callbacks_publisher.future_stopped.result(TIMEOUT) - def test_operation_will(self): input_host_name = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_HOST") input_cert = _get_env_variable("AWS_TEST_MQTT5_IOT_CORE_RSA_CERT")