### 移动端应用获得临时预置Claim证书
#### 首先配置Keycloak客户端，需要server_url，client_id，realm_name，client_secret_key

In [301]:
from keycloak import KeycloakOpenID

# Configure client
keycloak_openid = KeycloakOpenID(
                    server_url="https://keycloak.aws.comdaze.com/auth/",
                    client_id="iotfleetclient",
                    realm_name="iotfleet",
                    client_secret_key="2742bc8a-9ce3-451f-83d0-e014d12442bf"
)


#### 登录keycloak，获取Token，为了和AWS交换凭证，需要提取access_token

In [302]:
token = keycloak_openid.token("user1", "Amazon@2021")
access_token=token['access_token']

#### 连接Amazon cognito identity pool

In [303]:
import boto3

IdentityPoolId =  'cn-north-1:f815bc17-e94c-47df-953b-840c67c27eb4'
AccountId ='456370280007'
cognito_identity_client = boto3.client('cognito-identity')
#    aws_access_key_id=ACCESS_KEY,
#    aws_secret_access_key=SECRET_KEY,
#    aws_session_token=SESSION_TOKEN



#### 调用get_id方法获得IdentityId

In [304]:
IdentityId_response = cognito_identity_client.get_id(
    AccountId=AccountId,
    IdentityPoolId=IdentityPoolId,
    Logins={
        'keycloak.aws.comdaze.com/auth/realms/iotfleet':access_token # keycloak提供身份access_token
    }
)

In [305]:
IdentityId=IdentityId_response['IdentityId']

#### 获得AWS临时凭证：‘AccessKeyId’，'SecretKey'，'SessionToken'

In [306]:
credentials_response = cognito_identity_client.get_credentials_for_identity(
    IdentityId=IdentityId,
    Logins={
        'keycloak.aws.comdaze.com/auth/realms/iotfleet':access_token
    }
)

In [307]:
ACCESS_KEY = credentials_response['Credentials']['AccessKeyId']
SECRET_KEY = credentials_response['Credentials']['SecretKey']
SESSION_TOKEN = credentials_response['Credentials']['SessionToken']


#### 连接AWS IoT调用create_provisioning_claim方法，申请临时证书，并且保存在手机存储里

In [308]:

import boto3

iotclient = boto3.client(
    'iot',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    aws_session_token=SESSION_TOKEN
)

In [309]:
provisioning_claim_response = iotclient.create_provisioning_claim(
    templateName= 'TrustedUserProvisioningTemplate'
)

#### 以下这个函数用于保存证书

In [None]:
import json
import os
import re
import sys

def cert_set_result(body, path, filename):
    
    path = './'
    filename = 'provision'

    if not os.path.exists(path):
        os.makedirs(path)

    #body = json.load(body)

    raw_pem = body['certificatePem']
    if raw_pem:
        pem = re.sub("\\n", "\n", raw_pem)
        pem_filename = os.path.join(path, filename + ".cert.pem")
        with open(pem_filename, 'w') as file:
            file.write(pem)

    try:
        raw_pub_key = body['keyPair']['PublicKey']
        if raw_pub_key:
            pub_key = re.sub("\\n", "\n", raw_pub_key)
            pub_key_filename = os.path.join(path, filename + ".public.key")
            with open(pub_key_filename, 'w') as file:
                file.write(pub_key)

        raw_private_key = body['keyPair']['PrivateKey']
        if raw_private_key:
            private_key = re.sub("\\n", "\n", raw_private_key)
            private_key_filename = os.path.join(path, filename + ".private.key")
            with open(private_key_filename, 'w') as file:
                file.write(private_key)
    except KeyError:
        pass

    print("Success!")


In [310]:
cert_set_result(provisioning_claim_response, './', 'provision')

Success!


### 设备端利用获得的手机推送的临时证书，向AWS IoT Core申请永久证书

#### 以下这个函数用于保存证书

In [295]:
import json
import os
import re
import sys

def long_term_cert_set_result(body, path, filename):
    
    if not os.path.exists(path):
        os.makedirs(path)

    cert_begin_str = "-----BEGIN CERTIFICATE-----"
    cert_end_str = "', private_key"
    raw_pem = (body[body.index(cert_begin_str):body.index(cert_end_str)])
    raw_pem = raw_pem.replace('\\n', '\n')
    if raw_pem:
        pem_filename = os.path.join(path, filename + ".cert.pem")
        with open(pem_filename, 'w') as file:
            file.write(raw_pem)
            
    key_begin_str = "-----BEGIN RSA PRIVATE KEY-----"
    key_end_str = "')"

    raw_private_key = (body[body.index(key_begin_str):body.index(key_end_str)]) 
    raw_private_key = raw_private_key.replace('\\n', '\n')
    try:
        if raw_private_key:
            private_key_filename = os.path.join(path, filename + ".private.key")
            with open(private_key_filename, 'w') as file:
                file.write(raw_private_key)
    except KeyError:
        pass

    print("Cert and Key Save Success!")

#### 以下程序为主程序的运行流程的辅助函数

In [297]:
from awscrt import auth, http, io, mqtt
from awsiot import iotidentity
from awsiot import mqtt_connection_builder
from concurrent.futures import Future
import sys
import threading
import time
import traceback
from uuid import uuid4
import json

class LockedData:
    def __init__(self):
        self.lock = threading.Lock()
        self.disconnect_called = False

locked_data = LockedData()

# Function for gracefully quitting this sample
def exit(msg_or_exception):
    if isinstance(msg_or_exception, Exception):
        print("Exiting Sample due to exception.")
        traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2])
    else:
        print("Exiting Sample:", msg_or_exception)

    with locked_data.lock:
        if not locked_data.disconnect_called:
            print("Disconnecting...")
            locked_data.disconnect_called = True
            future = mqtt_connection.disconnect()
            future.add_done_callback(on_disconnected)

def on_disconnected(disconnect_future):
    # type: (Future) -> None
    print("Disconnected.")

    # Signal that sample is finished
    is_sample_done.set()

def on_publish_register_thing(future):
    # type: (Future) -> None
    try:
        future.result() # raises exception if publish failed
        print("Published RegisterThing request..")

    except Exception as e:
        print("Failed to publish RegisterThing request.")
        exit(e)

def on_publish_create_keys_and_certificate(future):
    # type: (Future) -> None
    try:
        future.result() # raises exception if publish failed
        print("Published CreateKeysAndCertificate request..")

    except Exception as e:
        print("Failed to publish CreateKeysAndCertificate request.")
        exit(e)

def on_publish_create_certificate_from_csr(future):
    # type: (Future) -> None
    try:
        future.result() # raises exception if publish failed
        print("Published CreateCertificateFromCsr request..")

    except Exception as e:
        print("Failed to publish CreateCertificateFromCsr request.")
        exit(e)

def createkeysandcertificate_execution_accepted(response):
    # type: (iotidentity.CreateKeysAndCertificateResponse) -> None
    try:
        global createKeysAndCertificateResponse
        createKeysAndCertificateResponse = response
        #body=createKeysAndCertificateResponse
        #print(body)
        #long_term_cert_set_result(body, './', 'long-term')
        
        print("Received a new message {}".format(createKeysAndCertificateResponse))

        return

    except Exception as e:
        exit(e)

def createkeysandcertificate_execution_rejected(rejected):
    # type: (iotidentity.RejectedError) -> None
    exit("CreateKeysAndCertificate Request rejected with code:'{}' message:'{}' statuscode:'{}'".format(
        rejected.error_code, rejected.error_message, rejected.status_code))

def createcertificatefromcsr_execution_accepted(response):
    # type: (iotidentity.CreateCertificateFromCsrResponse) -> None
    try:
        global createCertificateFromCsrResponse
        createCertificateFromCsrResponse = response
        print("Received a new message {}".format(createCertificateFromCsrResponse))
        global certificateOwnershipToken
        certificateOwnershipToken = response.certificate_ownership_token

        return

    except Exception as e:
        exit(e)

def createcertificatefromcsr_execution_rejected(rejected):
    # type: (iotidentity.RejectedError) -> None
    exit("CreateCertificateFromCsr Request rejected with code:'{}' message:'{}' statuscode:'{}'".format(
        rejected.error_code, rejected.error_message, rejected.status_code))

def registerthing_execution_accepted(response):
    # type: (iotidentity.RegisterThingResponse) -> None
    try:
        global registerThingResponse
        registerThingResponse = response
        print("Received a new message {} ".format(registerThingResponse))
        return

    except Exception as e:
        exit(e)

def registerthing_execution_rejected(rejected):
    # type: (iotidentity.RejectedError) -> None
    exit("RegisterThing Request rejected with code:'{}' message:'{}' statuscode:'{}'".format(
        rejected.error_code, rejected.error_message, rejected.status_code))

# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)

def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))

def waitForCreateKeysAndCertificateResponse():
    # Wait for the response.
    loopCount = 0
    while loopCount < 10 and createKeysAndCertificateResponse is None:
        if createKeysAndCertificateResponse is not None:
            break
        print('Waiting... CreateKeysAndCertificateResponse: ' + json.dumps(createKeysAndCertificateResponse))
        loopCount += 1
        time.sleep(1)

def waitForCreateCertificateFromCsrResponse():
    # Wait for the response.
    loopCount = 0
    while loopCount < 10 and createCertificateFromCsrResponse is None:
        if createCertificateFromCsrResponse is not None:
            break
        print('Waiting...CreateCertificateFromCsrResponse: ' + json.dumps(createCertificateFromCsrResponse))
        loopCount += 1
        time.sleep(1)

def waitForRegisterThingResponse():
    # Wait for the response.
    loopCount = 0
    while loopCount < 20 and registerThingResponse is None:
        if registerThingResponse is not None:
            break
        loopCount += 1
        print('Waiting... RegisterThingResponse: ' + json.dumps(registerThingResponse))
        time.sleep(1)




#### 以下为Fleet Provisioning的主程序

In [298]:

is_sample_done = threading.Event()
verbosity = io.LogLevel.NoLogs.name
io.init_logging(getattr(io.LogLevel, verbosity), 'stderr')
mqtt_connection = None
identity_client = None
    
createKeysAndCertificateResponse = None
createCertificateFromCsrResponse = None
registerThingResponse = None


def fleet_provisioning(endpoint, cert, key, root_ca, client_id, use_websocket, signing_region, proxy_host, proxy_port, verbosity, csr, templateName, templateParameters):
   
    
    # Spin up resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    proxy_options = None
    if (proxy_host):
        proxy_options = http.HttpProxyOptions(host_name = proxy_host, port= proxy_port)

    if  use_websocket == True:
        credentials_provider = auth.AwsCredentialsProvider.new_default_chain(client_bootstrap)
        mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
            endpoint= endpoint,
            client_bootstrap=client_bootstrap,
            region= signing_region,
            credentials_provider=credentials_provider,
            http_proxy_options=proxy_options,
            on_connection_interrupted=on_connection_interrupted,
            on_connection_resumed=on_connection_resumed,
            ca_filepath= root_ca,
            client_id= client_id,
            clean_session=False,
            keep_alive_secs=30)

    else:
        mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint= endpoint,
            cert_filepath= cert,
            pri_key_filepath= key,
            client_bootstrap=client_bootstrap,
            ca_filepath= root_ca,
            client_id= client_id,
            on_connection_interrupted=on_connection_interrupted,
            on_connection_resumed=on_connection_resumed,
            clean_session=False,
            keep_alive_secs=30,
            http_proxy_options=proxy_options)

    print("Connecting to {} with client ID '{}'...".format(
         endpoint, client_id))

    connected_future = mqtt_connection.connect()

    identity_client = iotidentity.IotIdentityClient(mqtt_connection)

    # Wait for connection to be fully established.
    # Note that it's not necessary to wait, commands issued to the
    # mqtt_connection before its fully connected will simply be queued.
    # But this sample waits here so it's obvious when a connection
    # fails or succeeds.
    connected_future.result()
    print("Connected!")

    try:
        # Subscribe to necessary topics.
        # Note that is **is** important to wait for "accepted/rejected" subscriptions
        # to succeed before publishing the corresponding "request".

        # Keys workflow if csr is not provided
        if  csr is None:
            createkeysandcertificate_subscription_request = iotidentity.CreateKeysAndCertificateSubscriptionRequest()

            print("Subscribing to CreateKeysAndCertificate Accepted topic...")
            createkeysandcertificate_subscribed_accepted_future, _ = identity_client.subscribe_to_create_keys_and_certificate_accepted(
                request=createkeysandcertificate_subscription_request,
                qos=mqtt.QoS.AT_LEAST_ONCE,
                callback=createkeysandcertificate_execution_accepted)

            # Wait for subscription to succeed
        
            createkeysandcertificate_subscribed_accepted_future.result()
        
            print("Subscribing to CreateKeysAndCertificate Rejected topic...")
            createkeysandcertificate_subscribed_rejected_future, _ = identity_client.subscribe_to_create_keys_and_certificate_rejected(
                request=createkeysandcertificate_subscription_request,
                qos=mqtt.QoS.AT_LEAST_ONCE,
                callback=createkeysandcertificate_execution_rejected)

            # Wait for subscription to succeed
            createkeysandcertificate_subscribed_rejected_future.result()
        else:
            createcertificatefromcsr_subscription_request = iotidentity.CreateCertificateFromCsrSubscriptionRequest()

            print("Subscribing to CreateCertificateFromCsr Accepted topic...")
            createcertificatefromcsr_subscribed_accepted_future, _ = identity_client.subscribe_to_create_certificate_from_csr_accepted(
                request=createcertificatefromcsr_subscription_request,
                qos=mqtt.QoS.AT_LEAST_ONCE,
                callback=createcertificatefromcsr_execution_accepted)

            # Wait for subscription to succeed
            createcertificatefromcsr_subscribed_accepted_future.result()

            print("Subscribing to CreateCertificateFromCsr Rejected topic...")
            createcertificatefromcsr_subscribed_rejected_future, _ = identity_client.subscribe_to_create_certificate_from_csr_rejected(
                request=createcertificatefromcsr_subscription_request,
                qos=mqtt.QoS.AT_LEAST_ONCE,
                callback=createcertificatefromcsr_execution_rejected)

            # Wait for subscription to succeed
            createcertificatefromcsr_subscribed_rejected_future.result()


        registerthing_subscription_request = iotidentity.RegisterThingSubscriptionRequest(template_name= templateName)

        print("Subscribing to RegisterThing Accepted topic...")
        registerthing_subscribed_accepted_future, _ = identity_client.subscribe_to_register_thing_accepted(
            request=registerthing_subscription_request,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=registerthing_execution_accepted)

        # Wait for subscription to succeed
        registerthing_subscribed_accepted_future.result()

        print("Subscribing to RegisterThing Rejected topic...")
        registerthing_subscribed_rejected_future, _ = identity_client.subscribe_to_register_thing_rejected(
            request=registerthing_subscription_request,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=registerthing_execution_rejected)
        # Wait for subscription to succeed
        registerthing_subscribed_rejected_future.result()

        if  csr is None:
            print("Publishing to CreateKeysAndCertificate...")
            publish_future = identity_client.publish_create_keys_and_certificate(
                request=iotidentity.CreateKeysAndCertificateRequest(), qos=mqtt.QoS.AT_LEAST_ONCE)
            publish_future.add_done_callback(on_publish_create_keys_and_certificate)

            waitForCreateKeysAndCertificateResponse()

            if createKeysAndCertificateResponse is None:
                raise Exception('CreateKeysAndCertificate API did not succeed')

            registerThingRequest = iotidentity.RegisterThingRequest(
                template_name= templateName,
                certificate_ownership_token=createKeysAndCertificateResponse.certificate_ownership_token,
                parameters=json.loads( templateParameters))
            
            body=str(createKeysAndCertificateResponse)
            long_term_cert_set_result(body, './', 'long-term')
        else:
            print("Publishing to CreateCertificateFromCsr...")
            csrPath = open( csr, 'r').read()
            publish_future = identity_client.publish_create_certificate_from_csr(
                request=iotidentity.CreateCertificateFromCsrRequest(certificate_signing_request=csrPath),
                qos=mqtt.QoS.AT_LEAST_ONCE)
            publish_future.add_done_callback(on_publish_create_certificate_from_csr)

            waitForCreateCertificateFromCsrResponse()

            if createCertificateFromCsrResponse is None:
                raise Exception('CreateCertificateFromCsr API did not succeed')

            registerThingRequest = iotidentity.RegisterThingRequest(
                template_name= templateName,
                certificate_ownership_token=createCertificateFromCsrResponse.certificate_ownership_token,
                parameters=json.loads( templateParameters))

        print("Publishing to RegisterThing topic...")
        registerthing_publish_future = identity_client.publish_register_thing(registerThingRequest, mqtt.QoS.AT_LEAST_ONCE)
        registerthing_publish_future.add_done_callback(on_publish_register_thing)
        waitForRegisterThingResponse()
#        exit("success")
        future = mqtt_connection.disconnect()
        future.add_done_callback(on_disconnected)
        

    except Exception as e:
        exit(e)

    # Wait for the sample to finish
#    is_sample_done.wait()


#### 执行fleet_provisioning函数申请永久证书和在AWS IoT Core注册，并且保存永久证书在设备磁盘

In [299]:
fleet_provisioning(
    endpoint = 'a2jtec7plm36gl.ats.iot.cn-north-1.amazonaws.com.cn',
    root_ca =  './root.ca.pem',
    cert =  './provision.cert.pem',
    key =  './provision.private.key',
    templateName =  'TrustedUserProvisioningTemplate',
    templateParameters= "{\"SerialNumber\":\"1\",\"DeviceLocation\":\"Seattle\"}",
    client_id = "test-" + str(uuid4()),
    use_websocket = False,
    signing_region  = 'cn-north-1',
    proxy_host = None,
    proxy_port = None,
    csr = None,
    verbosity = io.LogLevel.NoLogs.name
)

Connecting to a2jtec7plm36gl.ats.iot.cn-north-1.amazonaws.com.cn with client ID 'test-fd1da517-0f29-4be7-ad70-6955dfcaa610'...
Connected!
Subscribing to CreateKeysAndCertificate Accepted topic...
Subscribing to CreateKeysAndCertificate Rejected topic...
Subscribing to RegisterThing Accepted topic...
Subscribing to RegisterThing Rejected topic...
Publishing to CreateKeysAndCertificate...
Waiting... CreateKeysAndCertificateResponse: null
Published CreateKeysAndCertificate request..
Received a new message awsiot.iotidentity.CreateKeysAndCertificateResponse(certificate_id='56a2f4e6f571226b9f22ab9fe049fba185d2d25397fca7bec57879bf9ee827cd', certificate_ownership_token='eyJ2ZXJzaW9uIjoiMjAxOTEwMjMiLCJjaXBoZXIiOiJBZ2JGb1lENlh0VW1xWnl4UDl1Tzh4RDFNdjFmODlzc05zR3NOOUZ6Q2ZjV1lsNkV3Wkl0VnZNdjBoaTM2RE8zS01ZelBlU0dnVGh3WDRhRGM0TkM3M204LzBxSm9sZjJtdkEyT2VEZ2FBeXU0ZU42UXlvMS9JeGYzSmREWFVyVmpTN1pSQjZSNFowcFIzUUh5UlREeHBtYy9MdkJFdk52SktZYnl5WVc3OW5WTUZrUUZoLzVaTmZHaEtLd1NUR0pnT2tCOEVxWi9PNmQ2cElYeFBvdDkx

#### 测试永久证书发布和订阅消息

In [300]:
!python3 pubsub.py --endpoint a2jtec7plm36gl.ats.iot.cn-north-1.amazonaws.com.cn --root-ca ./root.ca.pem --key  ./long-term.private.key --cert ./long-term.cert.pem

Connecting to a2jtec7plm36gl.ats.iot.cn-north-1.amazonaws.com.cn with client ID 'test-99a26901-299d-41ec-81d1-4354f7baa7af'...
Connected!
Subscribing to topic 'test/topic'...
Subscribed with QoS.AT_LEAST_ONCE
Sending 10 message(s)
Publishing message to topic 'test/topic': Hello World! [1]
Received message from topic 'test/topic': b'"Hello World! [1]"'
Publishing message to topic 'test/topic': Hello World! [2]
Received message from topic 'test/topic': b'"Hello World! [2]"'
Publishing message to topic 'test/topic': Hello World! [3]
Received message from topic 'test/topic': b'"Hello World! [3]"'
Publishing message to topic 'test/topic': Hello World! [4]
Received message from topic 'test/topic': b'"Hello World! [4]"'
Publishing message to topic 'test/topic': Hello World! [5]
Received message from topic 'test/topic': b'"Hello World! [5]"'
Publishing message to topic 'test/topic': Hello World! [6]
Received message from topic 'test/topic': b'"Hello World! [6]"'
Publishing message to topic 'te