In [None]:
!python -m ensurepip --upgrade

In [None]:
!pip install paho-mqtt

# 1.使用请求/响应模式发送更透明的消息
请求/响应消息模式是一种以异步方式跟踪对客户端请求响应的方法。这是一种在MQTTv5中实现的机制，允许发布者为要为特定消息发送的响应指定主题。因此，当订阅者收到请求时，它也会收到发送回复的主题。它还支持相关数据字段，允许跟踪数据包，例如请求或设备识别参数。

例如，带有连接门锁的智能家居应用程序可以从请求/响应模式中受益。假设用户正在通过移动应用程序与门锁交互，该应用程序发送MQTT消息以打开/关闭锁。应用程序和门锁之间交换的任何消息都必须得到确认，并可追溯数据包是否已送达。此外，门锁命令需要与上下文一起传递，例如请求者用户身份。

![image](AWS-iot-mqttv5-diagram.png)

1.移动应用程序的MQTT客户端订阅了响应主题。然后，将锁定请求包发布到home07/main_door/lock主题，预期响应主题为home07/main_door/status，相关数据对象包含请求者user_profile_id和request_id。

2.当门锁在home07/main_door/lock上收到锁请求时，它会处理MQTT数据包，包括响应主题和相关数据。

3.门锁做出决定，并通过传递相关数据发布主题来做出回应。

4.订阅者函数在home07/main_door/status上接收响应，并使用相关数据记录该决定。请求者可以使用user_profile_id和request_id采取进一步操作。

In [None]:
from paho.mqtt.packettypes import PacketTypes
import ssl
import time
from paho.mqtt.properties import Properties
import paho.mqtt.client as mqtt
import logging
import json

logging.basicConfig(level=logging.DEBUG)
certificates_path = 'certificates'
endpoint = 'a2u3inau7j0faa-ats.iot.ap-northeast-1.amazonaws.com'
# endpoint=  'a2jtec7plm36gl.ats.iot.cn-north-1.amazonaws.com.cn'

certs = {
    "cafile": certificates_path+"/AmazonRootCA1.pem",
    "certfile": certificates_path+"/app-certificate.pem.crt",
    "keyfile": certificates_path+"/app-private.pem.key",
}


def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None):
    # Publish a plain text payload to topic "home07/main_door/lock"
    properties = Properties(PacketTypes.PUBLISH)
    pub_topic = "home07/main_door/lock"

    command_parameters = {
        "user_profile_id": 4,
        "request_id": "eb1bd30a-c7e6-42a4-9e00-d5baee89f65c"
    }

    properties.CorrelationData = json.dumps(command_parameters).encode('utf-8')
    properties.ResponseTopic = "home07/main_door/status"
    payload = "LOCK"
    mqttc.publish(pub_topic, payload, qos=0, properties=properties)
    time.sleep(1)


def on_connect(mqttc, userdata, flags, reasonCode, properties=None):
    mqttc.subscribe('home07/main_door/status', qos=0)


def on_message(mqttc, userdata, message):
    raw_payload = str(message.payload.decode("utf-8"))
    logging.debug(f"Received a message on topic: '{message.topic}', payload: '{raw_payload}'")

    if message.topic == "home07/main_door/status":
        logging.debug(f"Main door status: '{raw_payload}'' with parameters: '{str(message.properties.CorrelationData)}'")

mqttc = mqtt.Client("MobileApp", protocol=mqtt.MQTTv5)
logger = logging.getLogger(__name__)
mqttc.enable_logger(logger)

mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.on_connect = on_connect

mqttc.tls_set(certs["cafile"],
              certfile=certs["certfile"],
              keyfile=certs["keyfile"],
              cert_reqs=ssl.CERT_REQUIRED,
              tls_version=ssl.PROTOCOL_TLSv1_2,
              ciphers=None)

mqttc.connect(endpoint, 8883)
mqttc.loop_forever()

# 2.具有用户属性功能的更灵活的设备消息传递

用户属性功能允许连接的设备或订阅者应用程序通过将自定义键值对附加到MQTT数据包（包括发布和连接）来传递自定义信息。该功能与HTTP标头提供了类似的功能，只要标头中不超过8KB的总大小，就可以使用。

例如，您可以将用户属性功能用于多供应商传感器部署用例。假设一个带有部署在工业或智能家居应用程序中的不同供应商的多个传感器的案例。在这些情况下，单个传感器可以使用用户属性中指定的各种编码发送数据。根据用户属性值，消息的订阅者可以采取特定措施来处理它们。

In [None]:
from paho.mqtt.packettypes import PacketTypes
import ssl
import time
from paho.mqtt.properties import Properties
import paho.mqtt.client as mqtt
import logging
import base64

# 此示例脚本显示了不同品牌的三个传感器，使用不同的数据编码发布到他们的主题。订阅者通过评估其Content-Type用户属性值来处理原始传感器值和base64编码的传感器值。

def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None):
    # Publish a plain text payload to topic "sensor01"
    properties = Properties(PacketTypes.PUBLISH)
    properties.UserProperty = [("Content-Type", "text/plain"), ("Hardware-Revision", "Xiaomi-rev1.17c")]
    pub_topic = "sensors/gateway01/sensor01"
    payload = "23.4"
    mqttc.publish(pub_topic, payload, qos=0, properties=properties)
    
    time.sleep(1)
    
    # Publish a base64 encoded payload  to topic "sensor02"
    properties = Properties(PacketTypes.PUBLISH)
    properties.UserProperty = [("Content-Type", "base64"), ("Hardware-Manufacturer", "Huawei-rev8.2")]
    pub_topic = "sensors/gateway01/sensor02"
    payload_encoded = base64.b64encode(b"23.7")
    mqttc.publish(pub_topic, payload_encoded, qos=0, properties=properties)
    
    time.sleep(1)
    
    # Publish payload without user properties to topic "sensor03"
    pub_topic = "sensors/gateway01/sensor03"
    payload = "24.4"
    mqttc.publish(pub_topic, payload, qos=0)

def on_connect(mqttc, userdata, flags, reasonCode, properties=None):
    mqttc.subscribe('sensors/gateway01/#', qos=0)

def on_message(mqttc, userdata, message):
    logging.debug(f"Received a message on topic: '{message.topic}'")
    raw_payload = str(message.payload.decode("utf-8"))
    
    if hasattr(message.properties, 'UserProperty'):
        logging.debug(f"Message has user properties: {message.properties.UserProperty}")
        if "Content-Type" in dict(message.properties.UserProperty):
            message_content_type = dict(message.properties.UserProperty)["Content-Type"]
            logging.debug(f"Received message with Content-Type: '{message_content_type}'")
            if message_content_type == "base64":
                decoded_payload = base64.b64decode(raw_payload).decode("utf-8")
                logging.debug(f"Raw payload: '{raw_payload}', Decoded base64 payload: '{decoded_payload}'")
            elif message_content_type == "text/plain":
                logging.debug(f"Plain text payload: '{raw_payload}'")
            else:
                logging.debug(f"Content-Type unknown, raw payload: '{raw_payload}'")
        else:
            logging.debug(f"No Content-Type specified, raw payload: '{raw_payload}'")
    else:
        logging.debug(f"No User Property specified, raw payload: '{raw_payload}'")

mqttc = mqtt.Client("TestThing02", protocol=mqtt.MQTTv5)
logger = logging.getLogger(__name__)
mqttc.enable_logger(logger)
    
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.on_connect = on_connect

mqttc.tls_set(certs["cafile"],
                   certfile=certs["certfile"],
                   keyfile=certs["keyfile"],
                   cert_reqs=ssl.CERT_REQUIRED,
                   tls_version=ssl.PROTOCOL_TLSv1_2,
                   ciphers=None)

mqttc.connect(endpoint, 8883)
mqttc.loop_forever()

# 3.使用主题别名功能更有效地使用设备带宽

蜂窝物联网设备和传感器使用移动网络与其后端服务通信。由于其计量数据服务，这些设备大多被设计为在尽可能低的带宽下运行。假设蜂窝网络连接的传感器设备设计用于在农田上运行，那么它们将以低数据通信和长电池续航能力运行。此外，较大的数据包通常会导致更多的功耗。考虑到这些传感器只发布几个字节的传感器值，长长的MQTT主题成为设备消息传递的开销。

主题别名功能允许MQTT客户端为主题分配数字别名，然后在发布进一步消息时引用别名。这允许通过用单个数字引用主题而不是主题本身来减少传输的MQTT数据包大小。

传感器值示例：23.2

MQTT主题示例（83字节）：sensors/field/field001/equipments/a804e598-ee90-4f89-9cde-458f8fe9b980/temperature



In [None]:
mqttc = mqtt.Client("TestThing03", protocol=mqtt.MQTTv5)
logger = logging.getLogger(__name__)
mqttc.enable_logger(logger)


mqttc.tls_set(certs["cafile"],
              certfile=certs["certfile"],
              keyfile=certs["keyfile"],
              cert_reqs=ssl.CERT_REQUIRED,
              tls_version=ssl.PROTOCOL_TLSv1_2,
              ciphers=None)

mqttc.connect(endpoint, 8883)

properties = Properties(PacketTypes.PUBLISH)
properties.TopicAlias = 1

topic = "sensors/field/field001/equipments/a804e598-ee90-4f89-9cde-458f8fe9b980/temperature"

payload = "23.4"
mqttc.publish(topic, payload, qos=0, properties=properties)

time.sleep(1)

payload = "25.5"
mqttc.publish('', payload, qos=0, properties=properties)

time.sleep(1)

payload = "22.2"
mqttc.publish('', payload, qos=0, properties=properties)

# 4.使用消息过期、会话过期和Clean Start功能更好地控制设备行为

MQTTv5有一组会话和消息过期参数，可以更好地控制设备行为。通过新的会话和消息过期参数，代理提供并强制进行更好的会话控制，而不是依赖于客户端的实现。

* 会话过期功能允许您定义固定的间隔，之后代理删除特定客户端的会话信息。
* 消息过期功能定义了一个设置的间隔，该间隔用于为当前未连接的任何匹配订阅者存储已发布的消息。当一起使用时，会话过期间隔会覆盖消息过期。此外，消息过期间隔覆盖任何AWS IoT Core消息保留间隔。查看AWS IoT Core消息代理和协议限制和配额页面以获取限制。
* Clean Start是一个标志，可以与会话到期间隔一起设置。在数据包中设置此标志表示会话应该在不使用现有会话的情况下开始。

对于连接模式不规则的设备来说，车联网是一个很好的例子，在恢复连接时需要弹性。带有移动应用程序的互联汽车用例，可以与汽车系统（如空调和门锁）进行交互，可以展示这些功能。这可能是使用远程命令远程解锁/锁定门以进行送货服务或汽车共享的情况。移动应用程序发布的这些远程命令需要在特定时间窗口内处理。您可以指定消息过期间隔，该间隔表明，如果汽车在短时间内（即发送后10秒内）没有收到命令，则消息必须过期。您可以为时间紧迫程度较低的远程命令指定第二种类型的消息，例如控制空调系统。在这种情况下，您可以设置远程命令以打开AC，消息过期2分钟。

In [None]:
from paho.mqtt.packettypes import PacketTypes
import ssl
import time
from paho.mqtt.properties import Properties
import paho.mqtt.client as mqtt
import logging
import json

certificates_path = 'certificates'
#endpoint = 'a2u3inau7j0faa-ats.iot.ap-northeast-1.amazonaws.com'
endpoint=  'a2jtec7plm36gl.ats.iot.cn-north-1.amazonaws.com.cn'

certs = {
    "cafile": certificates_path+"/AmazonRootCA1.pem",
    "certfile": certificates_path+"/app-certificate.pem.crt",
    "keyfile": certificates_path+"/app-private.pem.key",
}


logging.basicConfig(level=logging.DEBUG)

def pub_client_on_connect(mqtt_sub, user_data, flags, reason_code, properties=None):
    properties=Properties(PacketTypes.PUBLISH)
    properties.MessageExpiryInterval = 10
    payload = "UNLOCK"
    topic = "vehicle/driver_door/lock"
    mqtt_pub.publish(topic, payload, qos=1, properties=properties)
    
    time.sleep(1)
    
    properties.MessageExpiryInterval = 120
    payload = "PRE_HEAT"
    topic = "vehicle/air_conditioner/set"
    mqtt_pub.publish(topic, payload, qos=1, properties=properties)


# Publisher client
mqtt_pub = mqtt.Client("App", protocol=mqtt.MQTTv5)
logger = logging.getLogger(__name__)
mqtt_pub.enable_logger(logger)

mqtt_pub.on_connect = pub_client_on_connect

mqtt_pub.tls_set(certs["cafile"],
                   certfile=certs["certfile"],
                   keyfile=certs["keyfile"],
                   cert_reqs=ssl.CERT_REQUIRED,
                   tls_version=ssl.PROTOCOL_TLSv1_2,
                   ciphers=None)

mqtt_pub.connect(endpoint, 8883)
mqtt_pub.loop_forever()

# 5.使用原因代码和服务器断开功能增强了设备连接流程

原因代码允许发件人确定发布者和订阅者之间交易中的错误类型（如果有的话）。

服务器断开连接功能是服务器的响应，其中包含连接关闭原因代码。此功能在分析断开/拒绝的原因时很有帮助，您可以将其用于各种调试目的。

一个示例用例可以是与云中运行的各种服务集成的边缘传感器网关。当MQTT客户端断开连接时，它们通常被配置为自动尝试重新连接。使用MQTTv3.1.1，当设备在没有断开连接的原因代码的情况下试图执行未经授权的MQTT操作时，网关的订阅主题和物联网设备策略之间的配置错误导致连接/断开循环。使用MQTTv5，设备知道断开连接的原因，当从服务器断开连接的原因被指定为身份验证时，它不会尝试订阅该主题。设备可以报告问题，并尝试使用原因代码采取适当的补救措施。

In [None]:
from paho.mqtt.packettypes import PacketTypes
import ssl
import time
from paho.mqtt.properties import Properties
import paho.mqtt.client as mqtt
import logging
import base64

certificates_path = 'certificates'
endpoint = 'a2u3inau7j0faa-ats.iot.ap-northeast-1.amazonaws.com'

certs = {
    "cafile": certificates_path+"/AmazonRootCA1.pem",
    "certfile": certificates_path+"/client-cert.pem",
    "keyfile": certificates_path+"/private-key.pem",
}

logging.basicConfig(level=logging.DEBUG)

def on_connect(mqttc, user_data, flags, reason_code, properties=None):
    logging.debug(f"Connected {flags}")
    
    # Publishing 1 message without a topic alias
    topic = "sensors/field/field001/equipments/9e6282ff-c8f0-49cd-b3a0-fa17ad6b84a7/temperature"
    payload = "23.4"
    mqttc.publish(topic, payload, qos=1)
    
    time.sleep(1)
    
    # Publishing 1 message with a topic alias
    properties = Properties(PacketTypes.PUBLISH)
    properties.TopicAlias = 14
    topic = "sensors/field/field001/equipments/46be210d-8a83-4e92-a3fe-4f989704d21e/temperature"
    payload = "26.2"
    mqttc.publish(topic, payload, qos=1, properties=properties)
    
    
def on_disconnect(mqttc, user_data, reason_code, properties=None):
    logging.debug(f"Received Disconnect with reason: {reason_code}")
    if reason_code == 148:
        logging.debug(
            "The disconnect is caused by the topic alias. Logging the issue for further analysis and exiting.")
        exit()
    else:
        logging.debug(
            "The disconnect reason doesn't have a specific action to take.")
    
def on_publish(client,userdata, result,properties=None):
    logging.debug(f"Published {result}")

mqttc = mqtt.Client("TestThing05", protocol=mqtt.MQTTv5)
logger = logging.getLogger(__name__)
mqttc.enable_logger(logger)

mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect

mqttc.tls_set(certs["cafile"],
                   certfile=certs["certfile"],
                   keyfile=certs["keyfile"],
                   cert_reqs=ssl.CERT_REQUIRED,
                   tls_version=ssl.PROTOCOL_TLSv1_2,
                   ciphers=None)

mqttc.connect(endpoint, 8883)

mqttc.loop_forever()

