diff --git a/samples/README.md b/samples/README.md index 7454d0ba..a622478d 100644 --- a/samples/README.md +++ b/samples/README.md @@ -1,16 +1,23 @@ # Sample apps for the AWS IoT Device SDK v2 for Python -* [MQTT5 PubSub](#mqtt5-pubsub) -* [PubSub](#pubsub) -* [Basic Connect](#basic-connect) -* [Websocket Connect](#websocket-connect) -* [PKCS#11 Connect](#pkcs11-connect) -* [Windows Certificate Connect](#windows-certificate-connect) -* [Custom Authorizer Connect](#custom-authorizer-connect) -* [Shadow](#shadow) -* [Jobs](#jobs) -* [Fleet Provisioning](#fleet-provisioning) -* [Greengrass Discovery](#greengrass-discovery) +- [Sample apps for the AWS IoT Device SDK v2 for Python](#sample-apps-for-the-aws-iot-device-sdk-v2-for-python) + - [Build instructions](#build-instructions) + - [MQTT5 PubSub](#mqtt5-pubsub) + - [MQTT5 Request/Response](#mqtt5-requestresponse) + - [PubSub](#pubsub) + - [Basic Connect](#basic-connect) + - [Websocket Connect](#websocket-connect) + - [PKCS#11 Connect](#pkcs11-connect) + - [Windows Certificate Connect](#windows-certificate-connect) + - [Custom Authorizer Connect](#custom-authorizer-connect) + - [Shadow](#shadow) + - [Jobs](#jobs) + - [Fleet Provisioning](#fleet-provisioning) + - [Fleet Provisioning Detailed Instructions](#fleet-provisioning-detailed-instructions) + - [AWS Resource Setup](#aws-resource-setup) + - [Running the sample and provisioning using a certificate-key set from a provisioning claim](#running-the-sample-and-provisioning-using-a-certificate-key-set-from-a-provisioning-claim) + - [Run the sample using the certificate signing request workflow](#run-the-sample-using-the-certificate-signing-request-workflow) + - [Greengrass Discovery](#greengrass-discovery) ## Build instructions @@ -88,6 +95,71 @@ Run the sample like this: python3 mqtt5_pubsub.py --endpoint --ca_file --cert --key ``` +## MQTT5 Request/Response +This sample uses the +[Message Broker](https://docs.aws.amazon.com/iot/latest/developerguide/iot-message-broker.html) +for AWS IoT to send and receive messages +through an MQTT5 connection. + +MQTT5 introduces additional features and enhancements that improve the development experience with MQTT. You can read more about MQTT5 in the Python V2 SDK by checking out the [MQTT5 user guide](../documents/MQTT5.md). This sample demonstrates how to use the request/response pattern that is enabled by MQTT5. + +WARNING: This sample subscribes to both the request and response topics, but this is only for demonstration purposes. Ideally, another device would subscribe to the request topic and publish to the response topic (once received) but for simplicity, this sample acts as both devices. + +Note: MQTT5 support is currently in **developer preview**. We encourage feedback at all times, but feedback during the preview window is especially valuable in shaping the final product. During the preview period we may make backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid. + +On startup, the device connects to [AWS IoT Core](https://docs.aws.amazon.com/iot/latest/developerguide/iot-message-broker.html), subscribes to a request and response topic, and begins publishing messages to the request topic. The device will then receive messages back from the message broker and publish messages, along with the correlation data, to the response topic set in the received message's properties when it was published to the request topic. Finally, the device will start to receive messages from the response topic. Status updates will be continually printed to the console. + +Source: `samples/mqtt5_request_response.py` + +Your AWS IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Publish",
+        "iot:Receive"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/command/control/light-1/switch",
+        "arn:aws:iot:region:account:topic/command/control/light-1/status"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Subscribe"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/command/control/light-1/switch",
+        "arn:aws:iot:region:account:topicfilter/command/control/light-1/status"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Connect"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:client/test-*"
+      ]
+    }
+  ]
+}
+
+
+ +Run the sample like this: +``` sh +# For Windows: replace 'python3' with 'python' +python3 mqtt5_request_response.py --endpoint --ca_file --cert --key +``` + ## PubSub This sample uses the diff --git a/samples/mqtt5_request_response.py b/samples/mqtt5_request_response.py new file mode 100644 index 00000000..fd40b151 --- /dev/null +++ b/samples/mqtt5_request_response.py @@ -0,0 +1,200 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +import command_line_utils +from awscrt import mqtt5, exceptions +from uuid import uuid4 +import threading +from concurrent.futures import Future +import time, json + +TIMEOUT = 100 +request_topic_filter = "command/control/light-1/switch" +response_topic_filter = "command/control/light-1/status" +light_status = "OFF" + + +# Parse arguments +cmdUtils = command_line_utils.CommandLineUtils("PubSub - Send and receive messages through an MQTT5 connection.") +cmdUtils.add_common_mqtt5_commands() +cmdUtils.add_common_topic_message_commands() +cmdUtils.add_common_proxy_commands() +cmdUtils.add_common_logging_commands() +cmdUtils.register_command("key", "", "Path to your key in PEM format.", True, str) +cmdUtils.register_command("cert", "", "Path to your client certificate in PEM format.", True, str) +cmdUtils.register_command( + "port", + "", + "Connection port. AWS IoT supports 433 and 8883 (optional, default=auto).", + type=int) +cmdUtils.register_command( + "client_id", + "", + "Client ID to use for MQTT5 connection (optional, default=None).", + default="test-" + str(uuid4())) +cmdUtils.register_command( + "count", + "", + "The number of messages to send (optional, default='10').", + default=10, + type=int) +cmdUtils.register_command("is_ci", "", "If present the sample will run in CI mode (optional, default='None')") +# Needs to be called so the command utils parse the commands +cmdUtils.get_args() + +response_count = 0 +received_all_event = threading.Event() +future_stopped = Future() +future_connection_success = Future() +is_ci = cmdUtils.get_command("is_ci", None) != None + +# Callback when any publish is received +def on_publish_received(publish_packet_data): + publish_packet = publish_packet_data.publish_packet + assert isinstance(publish_packet, mqtt5.PublishPacket) + + global response_count + response_count += 1 + + if publish_packet.topic == request_topic_filter: + + print("Received request from {} with correlation data {}".format(publish_packet.topic, publish_packet.correlation_data)) + + global light_status + if light_status == "ON": + light_status = "OFF" + else: + light_status = "ON" + + print("Publishing light status {} to response topic '{}'\n".format(light_status, publish_packet.response_topic)) + + client.publish(mqtt5.PublishPacket( + topic=publish_packet.response_topic, + qos=mqtt5.QoS.AT_LEAST_ONCE, + correlation_data=publish_packet.correlation_data, + payload=light_status + )) + elif publish_packet.topic == response_topic_filter: + print("Received light status {} from response topic {} with correlation data {}\n".format(str(publish_packet.payload), publish_packet.topic, publish_packet.correlation_data)) + + if response_count == cmdUtils.get_command("count"): + received_all_event.set() + +# Callback for the lifecycle event Stopped +def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData): + print("Lifecycle Stopped") + global future_stopped + future_stopped.set_result(lifecycle_stopped_data) + + +# Callback for the lifecycle event Connection Success +def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): + print("Lifecycle Connection Success") + global future_connection_success + future_connection_success.set_result(lifecycle_connect_success_data) + + +# Callback for the lifecycle event Connection Failure +def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData): + print("Lifecycle Connection Failure") + print("Connection failed with exception:{}".format(lifecycle_connection_failure.exception)) + + +if __name__ == '__main__': + print("\nStarting MQTT5 PubSub Sample\n") + message_count = cmdUtils.get_command("count") + + client = cmdUtils.build_mqtt5_client( + on_publish_received=on_publish_received, + on_lifecycle_stopped=on_lifecycle_stopped, + on_lifecycle_connection_success=on_lifecycle_connection_success, + on_lifecycle_connection_failure=on_lifecycle_connection_failure) + print("MQTT5 Client Created") + + if is_ci == False: + print("Connecting to {} with client ID '{}'...".format( + cmdUtils.get_command(cmdUtils.m_cmd_endpoint), cmdUtils.get_command("client_id"))) + else: + print("Connecting to endpoint with client ID") + + client.start() + + lifecycle_connect_success_data = future_connection_success.result(TIMEOUT) + connack_packet = lifecycle_connect_success_data.connack_packet + negotiated_settings = lifecycle_connect_success_data.negotiated_settings + if is_ci == False: + print("Connected to endpoint:'{}' with Client ID:'{}' with reason_code:{}".format( + cmdUtils.get_command(cmdUtils.m_cmd_endpoint), + connack_packet.assigned_client_identifier, + repr(connack_packet.reason_code))) + + # Subscribing to the response topic + print("Subscribing to response topic '{}'...".format(response_topic_filter)) + subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket( + subscriptions=[mqtt5.Subscription( + topic_filter=response_topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE)] + )) + suback = subscribe_future.result(TIMEOUT) + print("Subscribed to response topic with {} \n".format(suback.reason_codes)) + + #Subcribing to the request topic is for sample purposes only. Ideally, this subcription would take place on another device. + print("Subscribing to request topic '{}'...".format(request_topic_filter)) + subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket( + subscriptions=[mqtt5.Subscription( + topic_filter=request_topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE)] + )) + suback = subscribe_future.result(TIMEOUT) + print("Subscribed to request topic with {} \n".format(suback.reason_codes)) + + # Publish message to server desired number of times. + # This step loops forever if count was set to 0. + if message_count == 0: + print("Sending messages until program killed") + else: + print("Sending {} message(s)".format(message_count)) + + publish_count = 1 + while (publish_count <= message_count) or (message_count == 0): + correlation_data = { + "light_id": 4, + "correlation_id": str(uuid4()) + } + print("Publishing request to request topic {} with correlation data {}".format(request_topic_filter, str(correlation_data))) + publish_future = client.publish(mqtt5.PublishPacket( + topic=request_topic_filter, + qos=mqtt5.QoS.AT_LEAST_ONCE, + correlation_data=json.dumps(correlation_data), + response_topic=response_topic_filter + )) + + publish_completion_data = publish_future.result(TIMEOUT) + print("PubAck to request topic received with {}\n".format(repr(publish_completion_data.puback.reason_code))) + + time.sleep(1) + publish_count += 1 + + received_all_event.wait(TIMEOUT) + print("{} Response(s) received.".format(response_count)) + + # Unsubscribe from response topic + print("Unsubscribing from response topic '{}'".format(response_topic_filter)) + unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket( + topic_filters=[response_topic_filter])) + unsuback = unsubscribe_future.result(TIMEOUT) + print("Unsubscribed with {}".format(unsuback.reason_codes)) + + + # Unsubscribe from request topic + print("Unsubscribing from request topic '{}'".format(request_topic_filter)) + unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket( + topic_filters=[request_topic_filter])) + unsuback = unsubscribe_future.result(TIMEOUT) + print("Unsubscribed with {}".format(unsuback.reason_codes)) + + print("Stopping Client") + client.stop() + + future_stopped.result(TIMEOUT) + print("Client Stopped!")