Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions awsiot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
'mqtt5_client_builder',
]

from awscrt import mqtt
from awscrt import mqtt, mqtt5
from concurrent.futures import Future
import json
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar
Expand All @@ -32,8 +32,13 @@ class MqttServiceClient:
mqtt_connection: MQTT connection to use
"""

def __init__(self, mqtt_connection: mqtt.Connection):
self._mqtt_connection = mqtt_connection # type: mqtt.Connection
def __init__(self, mqtt_connection: mqtt.Connection or mqtt5.Client):
if isinstance(mqtt_connection, mqtt.Connection):
self._mqtt_connection = mqtt_connection # type: mqtt.Connection
elif isinstance(mqtt_connection, mqtt5.Client):
self._mqtt_connection = mqtt_connection.new_connection()
else:
assert("The service client could only take mqtt.Connection and mqtt5.Client as argument")

@property
def mqtt_connection(self) -> mqtt.Connection:
Expand Down
66 changes: 63 additions & 3 deletions samples/fleetprovisioning.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Fleet provisioning
# Fleet provisioning MQTT5

[**Return to main sample list**](./README.md)

Expand Down Expand Up @@ -75,18 +75,78 @@ There are many different ways to run the Fleet Provisioning sample because of ho

``` sh
# For Windows: replace 'python3' with 'python' and '/' with '\'
python3 fleetprovisioning.py --endpoint <endpoint> --cert <file> --key <file> --template_name <name> --template_parameters <parameters>
python3 fleetprovisioning_mqtt5.py --endpoint <endpoint> --cert <file> --key <file> --template_name <name> --template_parameters <parameters>
```

You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it:

``` sh
# For Windows: replace 'python3' with 'python' and '/' with '\'
python3 fleetprovisioning.py --endpoint <endpoint> --cert <file> --key <file> --template_name <name> --template_parameters <parameters> --ca_file <file>
python3 fleetprovisioning_mqtt5.py --endpoint <endpoint> --cert <file> --key <file> --template_name <name> --template_parameters <parameters> --ca_file <file>
```

However, this is just one way using the `CreateKeysAndCertificate` workflow. Below are a detailed list of instructions with the different ways to connect. While the detailed instructions do not show it, you can pass `--ca_file` as needed no matter which way you connect via Fleet Provisioning.

## Service Client Notes
### Difference relative to MQTT311 IoTIdentityClient
The IoTIdentityClient with mqtt5 client is almost identical to the mqtt3 one. The only difference is that you would need setup up a Mqtt5 Client and pass it to the IotIdentityClient.
For how to setup a Mqtt5 Client, please refer to [MQTT5 UserGuide](../documents/MQTT5_Userguide.md) and [MQTT5 PubSub Sample](./mqtt5_pubsub.py)

<table>
<tr>
<th>Create a IoTIdentityClient with Mqtt5</th>
<th>Create a IoTIdentityClient with Mqtt311</th>
</tr>
<tr>
<td>

```python
# Create a Mqtt5 Client
mqtt5_client = mqtt5_client_builder.mtls_from_path(
endpoint,
port,
cert_filepath,
pri_key_filepath,
ca_filepath,
client_id,
clean_session,
keep_alive_secs,
http_proxy_options,
on_lifecycle_connection_success,
on_lifecycle_stopped)

# Create the Identity Client from Mqtt5 Client
identity_client = iotidentity.IotIdentityClient(mqtt5_client)
```

</td>
<td>

```python
# Create a Mqtt311 Connection from the command line data
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint,
port,
cert_filepath,
pri_key_filepath,
ca_filepath,
client_id,
clean_session,
keep_alive_secs,
http_proxy_options)

# Create the Identity Client from Mqtt311 Connection
identity_client = iotidentity.IotIdentityClient(mqtt_connection)
```

</td>
</tr>
</table>

### Mqtt5.QoS v.s. Mqtt3.QoS
As the service client interface is unchanged for both Mqtt3 Connection and Mqtt5 Client,the IotIdentityClient will use Mqtt3.QoS instead of Mqtt5.QoS even with a Mqtt5 Client. You could use mqtt3.QoS.to_mqtt5() and mqtt5.QoS.to_mqtt3() to convert the value.


### Fleet Provisioning Detailed Instructions

#### Aws Resource Setup
Expand Down
83 changes: 38 additions & 45 deletions samples/fleetprovisioning.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.

from awscrt import mqtt, http
from awsiot import iotidentity, mqtt_connection_builder
from awscrt import mqtt5, http
from awsiot import iotidentity, mqtt5_client_builder
from concurrent.futures import Future
import sys
import threading
Expand Down Expand Up @@ -31,12 +31,13 @@

# Using globals to simplify sample code
is_sample_done = threading.Event()
mqtt_connection = None
mqtt5_client = None
identity_client = None
createKeysAndCertificateResponse = None
createCertificateFromCsrResponse = None
registerThingResponse = None

future_stopped = Future()
future_connection_success = Future()

class LockedData:
def __init__(self):
Expand All @@ -56,18 +57,28 @@ def exit(msg_or_exception):

with locked_data.lock:
if not locked_data.disconnect_called:
print("Disconnecting...")
print("Stop the Client...")
locked_data.disconnect_called = True
future = mqtt_connection.disconnect()
future.add_done_callback(on_disconnected)
mqtt5_client.stop()
future_stopped.result()


def on_disconnected(disconnect_future):
# type: (Future) -> None
print("Disconnected.")
# Callback for the lifecycle event Connection Success
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
print("Lifecycle Connection Success")
global future_connection_success
future_connection_success.set_result(lifecycle_connect_success_data)


# Callback for the lifecycle event on Client Stopped
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
# type: (Future) -> None
print("Client Stopped.")
global future_stopped
# Signal that sample is finished
is_sample_done.set()
future_stopped.set_result(lifecycle_stopped_data)



def on_publish_register_thing(future):
Expand Down Expand Up @@ -163,24 +174,6 @@ def registerthing_execution_rejected(rejected):
exit("RegisterThing Request rejected with code:'{}' message:'{}' status code:'{}'".format(
rejected.error_code, rejected.error_message, rejected.status_code))

# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
print("Session did not persist. Resubscribing to existing topics...")
resubscribe_future, _ = connection.resubscribe_existing_topics()

# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
print("Resubscribe results: {}".format(resubscribe_results))
Expand Down Expand Up @@ -241,34 +234,34 @@ def waitForRegisterThingResponse():
port=cmdData.input_proxy_port)

# Create a MQTT connection from the command line data
mqtt_connection = mqtt_connection_builder.mtls_from_path(
mqtt5_client = mqtt5_client_builder.mtls_from_path(
endpoint=cmdData.input_endpoint,
port=cmdData.input_port,
cert_filepath=cmdData.input_cert,
pri_key_filepath=cmdData.input_key,
ca_filepath=cmdData.input_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=cmdData.input_clientId,
clean_session=False,
keep_alive_secs=30,
http_proxy_options=proxy_options)
http_proxy_options=proxy_options,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_stopped=on_lifecycle_stopped)

if not cmdData.input_is_ci:
print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...")
else:
print("Connecting to endpoint with client ID")

connected_future = mqtt_connection.connect()
mqtt5_client.start()

identity_client = iotidentity.IotIdentityClient(mqtt_connection)
identity_client = iotidentity.IotIdentityClient(mqtt5_client)

# Wait for connection to be fully established.
# Note that it's not necessary to wait, commands issued to the
# mqtt_connection before its fully connected will simply be queued.
# mqtt5_client before its fully connected will simply be queued.
# But this sample waits here so it's obvious when a connection
# fails or succeeds.
connected_future.result()
future_connection_success.result()
print("Connected!")

try:
Expand All @@ -283,7 +276,7 @@ def waitForRegisterThingResponse():
print("Subscribing to CreateKeysAndCertificate Accepted topic...")
createkeysandcertificate_subscribed_accepted_future, _ = identity_client.subscribe_to_create_keys_and_certificate_accepted(
request=createkeysandcertificate_subscription_request,
qos=mqtt.QoS.AT_LEAST_ONCE,
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=createkeysandcertificate_execution_accepted)

# Wait for subscription to succeed
Expand All @@ -292,7 +285,7 @@ def waitForRegisterThingResponse():
print("Subscribing to CreateKeysAndCertificate Rejected topic...")
createkeysandcertificate_subscribed_rejected_future, _ = identity_client.subscribe_to_create_keys_and_certificate_rejected(
request=createkeysandcertificate_subscription_request,
qos=mqtt.QoS.AT_LEAST_ONCE,
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=createkeysandcertificate_execution_rejected)

# Wait for subscription to succeed
Expand All @@ -303,7 +296,7 @@ def waitForRegisterThingResponse():
print("Subscribing to CreateCertificateFromCsr Accepted topic...")
createcertificatefromcsr_subscribed_accepted_future, _ = identity_client.subscribe_to_create_certificate_from_csr_accepted(
request=createcertificatefromcsr_subscription_request,
qos=mqtt.QoS.AT_LEAST_ONCE,
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=createcertificatefromcsr_execution_accepted)

# Wait for subscription to succeed
Expand All @@ -312,7 +305,7 @@ def waitForRegisterThingResponse():
print("Subscribing to CreateCertificateFromCsr Rejected topic...")
createcertificatefromcsr_subscribed_rejected_future, _ = identity_client.subscribe_to_create_certificate_from_csr_rejected(
request=createcertificatefromcsr_subscription_request,
qos=mqtt.QoS.AT_LEAST_ONCE,
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=createcertificatefromcsr_execution_rejected)

# Wait for subscription to succeed
Expand All @@ -324,7 +317,7 @@ def waitForRegisterThingResponse():
print("Subscribing to RegisterThing Accepted topic...")
registerthing_subscribed_accepted_future, _ = identity_client.subscribe_to_register_thing_accepted(
request=registerthing_subscription_request,
qos=mqtt.QoS.AT_LEAST_ONCE,
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=registerthing_execution_accepted)

# Wait for subscription to succeed
Expand All @@ -333,7 +326,7 @@ def waitForRegisterThingResponse():
print("Subscribing to RegisterThing Rejected topic...")
registerthing_subscribed_rejected_future, _ = identity_client.subscribe_to_register_thing_rejected(
request=registerthing_subscription_request,
qos=mqtt.QoS.AT_LEAST_ONCE,
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=registerthing_execution_rejected)
# Wait for subscription to succeed
registerthing_subscribed_rejected_future.result()
Expand All @@ -343,7 +336,7 @@ def waitForRegisterThingResponse():
if cmdData.input_csr_path is None:
print("Publishing to CreateKeysAndCertificate...")
publish_future = identity_client.publish_create_keys_and_certificate(
request=iotidentity.CreateKeysAndCertificateRequest(), qos=mqtt.QoS.AT_LEAST_ONCE)
request=iotidentity.CreateKeysAndCertificateRequest(), qos=mqtt5.QoS.AT_LEAST_ONCE)
publish_future.add_done_callback(on_publish_create_keys_and_certificate)

waitForCreateKeysAndCertificateResponse()
Expand All @@ -360,7 +353,7 @@ def waitForRegisterThingResponse():
csrPath = open(cmdData.input_csr_path, 'r').read()
publish_future = identity_client.publish_create_certificate_from_csr(
request=iotidentity.CreateCertificateFromCsrRequest(certificate_signing_request=csrPath),
qos=mqtt.QoS.AT_LEAST_ONCE)
qos=mqtt5.QoS.AT_LEAST_ONCE)
publish_future.add_done_callback(on_publish_create_certificate_from_csr)

waitForCreateCertificateFromCsrResponse()
Expand All @@ -375,7 +368,7 @@ def waitForRegisterThingResponse():

print("Publishing to RegisterThing topic...")
registerthing_publish_future = identity_client.publish_register_thing(
registerThingRequest, mqtt.QoS.AT_LEAST_ONCE)
registerThingRequest, mqtt5.QoS.AT_LEAST_ONCE)
registerthing_publish_future.add_done_callback(on_publish_register_thing)

waitForRegisterThingResponse()
Expand Down
Loading