Skip to content

Latest commit

 

History

History
759 lines (572 loc) · 50 KB

File metadata and controls

759 lines (572 loc) · 50 KB

六、使用基于云的实时 MQTT 提供程序和 Python 监控冲浪比赛

在本章中,我们将编写 Python 代码,将基于 PubNub 云的实时 MQTT 提供程序与 Mosquitto MQTT 服务器结合使用,以监控冲浪比赛。我们将通过分析需求从头开始构建一个解决方案,并编写 Python 代码,在连接到冲浪板中多个传感器的防水物联网板上运行。我们将定义主题和命令,并将使用基于云的 MQTT 服务器和前面章节中使用的 Mosquitto MQTT 服务器。我们将介绍以下内容:

  • 了解需求
  • 定义主题和有效负载
  • 编写冲浪板传感器仿真器
  • 配置 PubNub MQTT 接口
  • 将从传感器检索到的数据发布到基于云的 MQTT 服务器
  • 使用多个 MQTT 服务器
  • 使用干舷构建基于 web 的仪表板

了解需求

许多正在接受冲浪比赛培训的冲浪者希望我们利用连接到冲浪板中多个传感器的物联网板提供的数据构建基于网络的实时仪表板。每个物联网板将提供以下数据:

  • 状态:每个冲浪者潜水服中嵌入的许多可穿戴无线传感器和冲浪板中包含的其他传感器将提供数据,物联网板将执行实时分析以指示冲浪者的状态
  • 速度:传感器将测量冲浪板的速度,单位为英里/小时英里/小时
  • 高度:传感器将以英尺为单位测量冲浪板的高度
  • 水温:位于冲浪板鳍上的传感器将测量水温,单位为华氏度

第三方软件正在 IoT 板上运行,我们无法更改发布不同主题数据的代码。我们可以提供必要的证书来配置与 Mosquitto MQTT 服务器的安全连接,并指定其主机名和协议。此外,我们可以配置一个名称来标识冲浪板,并确定发布数据的主题。

定义主题和有效负载

IoT 板使用以下主题名称发布特定冲浪板的数据,其中surfboardname必须替换为指定给冲浪板的唯一名称:

| 变量 | 主题名称 | | 地位 | surfboards/surfboardname/status | | 速度(英里/小时) | surfboards/surfboardname/speedmph | | 高度(英尺) | surfboards/surfboardname/altitudefeet | | 水温(华氏度) | surfboards/surfboardname/temperaturef |

例如,如果我们指定surfboard01作为冲浪板的名称,则希望接收冲浪板实际速度的客户端必须订阅surfboards/surfboard01/speedmph主题。

物联网板及其连接的传感器能够区分冲浪者及其冲浪板的以下五种可能状态:

| 状态键 | 意思是 | | 0 | 闲置的 | | 1 | 划桨 | | 2 | 骑 | | 3 | 骑行结束 | | 4 | 精疲力尽的 |

IoT 板发布状态键列中指定的整数值,该列指示冲浪者及其冲浪板的当前状态。例如,当一名冲浪者在冲浪时,冲浪板将发布2surfboards/surfboard01/status主题。

董事会将在前面解释的速度、高度和水温主题中公布浮点值。在这种情况下,IoT 板将只发布整数或浮点值作为 MQTT 消息的有效负载。有效负载将不是 JSON,如我们前面的示例所示。有效载荷不包括有关测量单位的任何附加信息。此信息包含在主题名称中。

物联网委员会将每秒公布上述主题中的数据。

In the previous examples, we designed our solution from scratch. In this case, we have to interact with an IoT board that is already running code that we cannot change. Imagine that we have to start working on the solution without the IoT board; therefore, we will develop a surfboard sensor emulator in Python that will provide us with data so that we can receive the published data and develop the desired dashboard. In real-life projects, this is a very common scenario.

正如我们在前几章中了解到的,MQTT 已成为物联网项目中非常流行的协议,其中许多传感器必须发布数据。由于这种日益流行的趋势,许多基于云的消息传递基础设施都包含了 MQTT 接口或网桥。例如,PubNub 数据流网络提供了一个可伸缩的 MQTT 接口。我们可以利用到目前为止关于 MQTT 的所有知识来使用这种基于云的数据流网络。您可以在 PubNub 的网页上阅读更多关于 PubNub 的信息:http://www.pubnub.com

Python 程序将通过订阅四个主题来收集 IoT 板发布的数据,代码将每秒为冲浪者及其冲浪板建立一个完整的状态。然后,代码将构建一个包含状态、速度、海拔高度和水温的 JSON 消息,并将其发布到 MQTT PubNub 接口的主题中。

In our example, we will take advantage of the free services offered by PubNub and its MQTT interface. We won't use some advanced features and additional services that might empower our IoT project connectivity requirements, but also require a paid subscription.

我们将利用 freeboard.io 可视化从传感器收集并发布到多个仪表的 PubNub MQTT 接口的数据,并使仪表板可用于世界各地的不同计算机和设备。io 允许我们通过选择数据源和拖放可定制的小部件来构建仪表板。freeboard.io 将自己定义为一种基于云的服务,使我们能够可视化物联网。您可以在 freeboard.io 的网页中阅读有关 freeboard.io 的更多信息 http://freeboard.io

In our example, we will take advantage of the free services offered by freeboard.io and we won't use some advanced features that provide privacy for our dashboards, but also require a paid subscription. Our dashboard will be available to anyone that has the unique URL for it because we are not working with private dashboards.

以下几行显示了提供冲浪者及其冲浪板状态的消息的有效载荷示例:

{ 
    "Status": "Riding",  
    "Speed MPH": 15.0,  
    "Altitude Feet": 3.0,  
    "Water Temperature F": 56.0 
}

io 允许我们轻松地选择 PubNub MQTT 接口中接收的 JSON 消息的每个键作为仪表板的数据源。通过这种方式,我们将轻松构建一个基于 web 的仪表板,以仪表的形式向我们提供状态、速度、高度和水温值。

总之,我们的解决方案将由以下两个 Python 程序组成:

  • 冲浪板传感器仿真器:该程序将与我们的 Mosquito MQTT 服务器建立安全连接,并每秒将从CSV(缩写为逗号分隔值)文件读取的状态、速度、高度和水温值发布到相应的主题。这个程序的工作原理就像我们有一个现实生活中的冲浪者,穿着潜水服和冲浪板传感器在海浪中冲浪并发布数据。

  • 冲浪板监视器:该程序将与我们的 Mosquito MQTT 服务器建立安全连接,并订阅冲浪板传感器模拟器发布状态、速度、高度和水温值的主题。冲浪板监视器程序还将与 PubNub MQTT 接口建立连接。程序将每秒向 PubNub MQTT 接口发布一条消息,其中包含确定冲浪者及其冲浪板状态的键值对。

编写冲浪板传感器仿真器

首先,我们将创建一个 CSV 文件,其中包含许多状态、速度(以英里/小时为单位)、高度(以英尺为单位)和温度(以华氏度为单位)值(以逗号分隔)。文件中的每一行将表示冲浪板传感器模拟器将发布到相应主题的一组值。在这种情况下,使用随机值并不方便,因为我们希望模拟冲浪者及其冲浪板的真实场景。

现在,我们将在主虚拟环境文件夹中创建一个名为surfboard_sensors_data.csv的新文件。以下几行代码定义了从冲浪者及其冲浪板的短冲浪会话中检索到的数据。

从左到右用逗号分隔的值如下:速度(以英里/小时为单位)、高度(以英尺为单位)和温度(以华氏度为单位)。首先,冲浪者处于空闲状态,他在划桨时提高速度,在冲浪时达到速度最大值,最后在他的状态设置为“骑行完成”时降低速度。样本的代码文件包含在mqtt_python_gaston_hillar_06_01文件夹中,在surfboard_sensors_data.csv文件中:

0, 1, 2, 58 
0, 1.1, 2, 58 
1, 2, 3, 57 
1, 3, 3, 57 
1, 3, 3, 57 
1, 3, 3, 57 
1, 4, 4, 57 
1, 5, 5, 57 
2, 8, 5, 57 
2, 10, 4, 57 
2, 12, 4, 56 
2, 15, 3, 56 
2, 15, 3, 56 
2, 15, 3, 56 
2, 15, 3, 56 
2, 15, 3, 56 
2, 12, 3, 56 
3, 3, 3, 55 
3, 2, 3, 55 
3, 1, 3, 55 
3, 0, 3, 55 

现在,我们将在主虚拟环境文件夹中创建一个名为surfboard_config.py的新 Python 文件。以下几行显示了此文件的代码,其中定义了许多配置值,这些值将用于配置冲浪板传感器模拟器将向其发布从传感器检索到的值的主题。冲浪板监视器也需要这些主题来订阅它们,因此在特定的 Python 脚本中包含所有配置值是很方便的。样本的代码文件包含在mqtt_python_gaston_hillar_06_01文件夹中的surfboard_config.py文件中:

surfboard_name = "surfboard01" 
topic_format = "surfboards/{}/{}" 
status_topic = topic_format.format( 
    surfboard_name,  
    "status") 
speed_mph_topic = topic_format.format( 
    surfboard_name,  
    "speedmph") 
altitude_feet_topic = topic_format.format( 
    surfboard_name,  
    "altitudefeet") 
water_temperature_f_topic = topic_format.format( 
    surfboard_name,  
    "temperaturef")

该代码定义冲浪板名称并将其存储在surfboard_name变量中。topic_format变量包含一个字符串,可以轻松构建具有公共前缀的不同主题。下表根据已定义的名为surfboard01的冲浪板,总结了定义每个传感器主题名称的四个变量的字符串值:

| 变量 | | | status_topic | surfboards/surfboard01/status | | speed_mph_topic | surfboards/surfboard01/speedmph | | altitude_feet_topic | surfboards/surfboard01/altitudefeet | | temperature_f_topic | surfboards/surfboard01/temperaturef |

现在,我们将在主虚拟环境文件夹中创建一个名为surfboard_sensors_emulator.py的新 Python 文件。以下几行显示了此文件的代码,它与我们的 MOSQUITO MQTT 服务器建立连接,读取先前创建的surfboard_sensors_data.csvCSV 文件,并将从该文件读取的值连续发布到先前枚举的主题。样本的代码文件包含在mqtt_python_gaston_hillar_06_01文件夹中的surfboard_sensors_emulator.py文件中:

from config import * 
from surfboard_config import * 
import paho.mqtt.client as mqtt 
import time 
import csv 

def on_connect(client, userdata, flags, rc): 
    print("Result from connect: {}".format( 
        mqtt.connack_string(rc))) 
    # Check whether the result form connect is the CONNACK_ACCEPTED connack code 
    if rc != mqtt.CONNACK_ACCEPTED: 
        raise IOError("I couldn't establish a connection with the MQTT server") 

def publish_value(client, topic, value): 
    result = client.publish(topic=topic, 
        payload=value, 
        qos=0) 
    return result 

if __name__ == "__main__": 
    client = mqtt.Client(protocol=mqtt.MQTTv311) 
    client.on_connect = on_connect 
    client.tls_set(ca_certs = ca_certificate, 
        certfile=client_certificate, 
        keyfile=client_key) 
    client.connect(host=mqtt_server_host, 
        port=mqtt_server_port, 
        keepalive=mqtt_keepalive) 
    client.loop_start() 
    publish_debug_message = "{}: {}" 
    try: 
        while True: 
            with open('surfboard_sensors_data.csv') as csvfile: 
                reader=csv.reader(csvfile) 
                for row in reader: 
                    status_value = int(row[0]) 
                    speed_mph_value = float(row[1]) 
                    altitude_feet_value = float(row[2]) 
                    water_temperature_f_value = float(row[3]) 
                    print(publish_debug_message.format( 
                        status_topic, 
                        status_value)) 
                    print(publish_debug_message.format( 
                        speed_mph_topic,  
                        speed_mph_value)) 
                    print(publish_debug_message.format( 
                        altitude_feet_topic,  
                        altitude_feet_value)) 
                    print(publish_debug_message.format( 
                        water_temperature_f_topic,  
                        water_temperature_f_value)) 
                    publish_value(client,  
                        status_topic,  
                        status_value) 
                    publish_value(client,  
                        speed_mph_topic,  
                        speed_mph_value) 
                    publish_value(client,  
                        altitude_feet_topic,  
                        altitude_feet_value) 
                    publish_value(client, 
                        water_temperature_f_topic,  
                        water_temperature_f_value)                    time.sleep(1) 
    except KeyboardInterrupt: 
        print("I'll disconnect from the MQTT server") 
        client.disconnect() 
        client.loop_stop() 

第 4 章使用 Python 和 MQTT 消息编写控制车辆的代码时,我们在主虚拟环境文件夹中创建了一个名为config.py的 Python 文件。在这个文件中,我们定义了许多配置值,用于建立与 Mosquitto MQTT 服务器的连接。这样,所有配置值都包含在特定的 Python 脚本中。如果您需要对此文件进行更改以配置冲浪板模拟器和未来的冲浪板监视器,请确保查看该章中的说明。

第一行导入我们在config.py文件和之前编码的surfboard_config.py文件中声明的变量。在本例中,我们还导入了csv模块,以使我们能够轻松地读取包含模拟传感器值的 CSV 文件。on_connect函数的代码与我们在前面的示例中使用的代码非常相似。

publish_value函数接收 MQTT 客户机、主题名称以及我们希望在clienttopicvalue参数中发布的值。函数调用client.publish方法,将接收到的值作为有效负载发布到topic参数中接收到的 QoS 级别为 0 的主题名称。

主块使用我们非常熟悉的代码与 Mosquitto MQTT 服务器建立连接。调用client.connect方法后,代码调用client.loop_start方法启动一个新线程,该线程处理 MQTT 网络流量并释放主线程。

然后,代码进入一个连续循环,打开surfboard_sensors_data.csvCSV 文件并创建一个csv.reader,将每行逗号分隔的值读取到row数组中。代码取row[0]中的字符串,表示状态值;将其转换为整数值;并将值保存在status_value局部变量中。下一行检索row[1]row[2]row[3]中的速度、高度和水温字符串。代码将这三个值转换为浮点数,并将它们保存在speed_mph_valuealtitude_feet_valuewater_temperature_f_value局部变量中。

下一行打印调试消息,其中包含从 CSV 文件中读取的每个模拟传感器的值,并为每个值调用前面解释的publish_value函数。对publish_value函数的每次调用都使用surfboard_config.py文件中配置的主题名称的相应变量,因为每个值都发布到不同的主题。

在代码发布四个模拟传感器的值后,它将休眠一秒钟,并对 CSV 文件中的下一行重复该过程。读取最后一行后,代码再次启动循环,直到用户按下Ctrl+C并引发并捕获KeyboardInterrupt异常。在本例中,我们捕获此异常并调用client.disconnectclient.loop_stop方法,以适当地断开与 Mosquitto MQTT 服务器的连接。在前面的例子中,我们并不关心这个异常。

配置 PubNub MQTT 接口

PubNub 要求我们先注册并创建一个具有有效电子邮件和密码的帐户,然后才能在 PubNub 中创建应用程序,这允许我们开始使用其免费服务,包括设备的 PubNub MQTT 接口。我们不需要输入任何信用卡或付款信息。如果您已经在 PubNub 拥有帐户,则可以跳过下一步。

创建帐户后,PubNub 会将您重定向到列出 PubNub 应用程序的管理门户。为了在网络上发送和接收消息,需要生成 PubNub 发布和订阅密钥。点击新建 APP+,在 APP 名称中输入MQTT,点击新建。

新窗格将表示管理门户中的应用程序。以下屏幕截图显示 PubNub 管理门户中的 MQTT 应用程序窗格:

单击 MQTT 窗格,PubNub 将显示为应用程序自动生成的演示密钥集窗格。单击此窗格,PubNub 将显示发布密钥、订阅密钥和密钥。我们必须复制并粘贴每个键,以便在我们的代码中使用它们,这些代码将使用 PubNub MQTT 接口发布消息,并使用 freeboard.io 基于 web 的仪表板订阅消息。下面的屏幕截图显示了键的前缀。请注意,图像中的其余字符已被删除:

要复制密钥,必须单击密钥右侧的眼睛图标,PubNub 将使所有字符可见。

将从传感器检索到的数据发布到基于云的 MQTT 服务器

如果我们用数字显示冲浪者及其冲浪板的状态,就很难理解真实的状态。因此,我们必须将表示状态的整数映射为解释状态的字符串。

现在,我们将在主虚拟环境文件夹中创建一个名为surfboard_status.py的新 Python 文件。以下几行显示了该文件的代码,该文件为不同的状态号定义了常量,并提供了一个字典,该字典将这些带有整数的常量映射到带有状态描述的字符串。样本的代码文件包含在mqtt_python_gaston_hillar_06_01文件夹中的surfboard_status.py文件中:

SURFBOARD_STATUS_IDLE = 0 
SURFBOARD_STATUS_PADDLING = 1 
SURFBOARD_STATUS_RIDING = 2 
SURFBOARD_STATUS_RIDE_FINISHED = 3 
SURFBOARD_STATUS_WIPED_OUT = 4 

SURFBOARD_STATUS_DICTIONARY = { 
    SURFBOARD_STATUS_IDLE: 'Idle', 
    SURFBOARD_STATUS_PADDLING: 'Paddling', 
    SURFBOARD_STATUS_RIDING: 'Riding', 
    SURFBOARD_STATUS_RIDE_FINISHED: 'Ride finished', 
    SURFBOARD_STATUS_WIPED_OUT: 'Wiped out', 
    } 

现在,我们将为冲浪板监视器编写代码。我们将把代码分成许多代码段,以便于理解每个代码段。在主虚拟环境文件夹中创建一个名为surfboard_monitor.py的新 Python 文件。以下几行声明了我们将用于建立与 PubNub MQTT 接口的连接的所有必要导入和变量。不要忘记将分配给pubnub_publish_keypubnub_subscribe_key变量的字符串替换为您从前面解释的 PubNub 密钥生成过程中检索到的值。样本的代码文件包含在mqtt_python_gaston_hillar_06_01文件夹中的surfboard_monitor.py文件中:

from config import * 
from surfboard_status import * 
from surfboard_config import * 
import paho.mqtt.client as mqtt 
import time 
import json 

# Publish key is the one that usually starts with the "pub-c-" prefix 
# Do not forget to replace the string with your publish key 
pubnub_publish_key = "pub-c-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" 
# Subscribe key is the one that usually starts with the "sub-c" prefix 
# Do not forget to replace the string with your subscribe key 
pubnub_subscribe_key = "sub-c-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" 
pubnub_mqtt_server_host = "mqtt.pndsn.com" 
pubnub_mqtt_server_port = 1883 
pubnub_mqtt_keepalive = 60 
device_id = surfboard_name 
pubnub_topic = surfboard_name 

第一行导入我们在config.py文件以及之前编码的surfboard_config.pysurfboard_status.py文件中声明的变量。然后,代码声明了以下变量,我们将使用这些变量建立与 PubNub MQTT 接口的连接:

  • pubnub_publish_key:此字符串指定 PubNub 发布密钥。
  • pubnub_subscribe_key:此字符串指定 PubNub 订阅密钥。
  • pubnub_mqtt_server_host:此字符串指定 PubNub MQTT 服务器地址。为了使用 PubNub MQTT 接口,我们必须始终与mqtt.pndsn.com主机建立连接。
  • pubnub_mqtt_server_port:此编号指定 PubNub MQTT 服务器端口。在这种情况下,我们将与 PubNub MQTT 服务器建立一个不安全的连接,因此我们将使用端口号1883。在本例中,我们希望保持 PubNub MQTT 接口配置的简单,因此不使用 TLS。
  • pubnub_mqtt_keepalive:此编号指定与 PubNub MQTT 接口的连接的保持活动间隔配置。
  • device_id:此字符串指定创建Surfboard类实例时要使用的设备标识符。代码分配从surfboard_config.py文件导入的surfboard_name值。稍后我们将分析此类的代码。
  • Pubnub_topic:此字符串指定冲浪板监视器将 JSON 有效负载发布到的主题,其中的键值对指定冲浪者及其冲浪板的状态。代码分配从surfboard_config.py文件导入的surfboard_name值。

The surfboard monitor will establish a connection to the mqtt.pndsn.com host on port 1883. So, we have to make sure that our firewall configuration has the appropriate inbound and outbound rules configurations to allow a connection on the specified port.

将以下行添加到主虚拟环境文件夹中的现有surfboard_monitor.py。以下几行声明了Surfboard类。样本的代码文件包含在mqtt_python_gaston_hillar_06_01文件夹中的surfboard_monitor.py文件中:

class Surfboard: 
    active_instance = None 
    def __init__(self, device_id, status,  
        speed_mph, altitude_feet, water_temperature_f): 
        self.device_id = device_id 
        self.status = status 
        self.speed_mph = speed_mph 
        self.altitude_feet = altitude_feet 
        self.water_temperature_f = water_temperature_f 
        self.is_pubnub_connected = False 
        Surfboard.active_instance = self 

    def build_json_message(self): 
        # Build a message with the status for the surfboard 
        message = { 
            "Status": SURFBOARD_STATUS_DICTIONARY[self.status], 
            "Speed MPH": self.speed_mph, 
            "Altitude Feet": self.altitude_feet, 
            "Water Temperature F": self.water_temperature_f,  
        } 
        json_message = json.dumps(message) 
        return json_message

我们必须指定一个device_id以及传感器在device_idstatusspeed_mphaltitude_feetwater_temperature_f所需参数中提供的数据的初始值。构造函数,即__init__方法,将接收到的值保存在具有相同名称的属性中。

代码还将对该实例的引用保存在active_instance类属性中,因为我们必须在许多函数中访问该实例,我们将这些函数指定为两个 MQTT 客户端(PubNub MQTT 客户端和 Mosquitto MQTT 客户端)将触发的不同事件的回调。在代码创建一个Surfboard实例后,我们将使用Surfboard.active_instance类属性访问活动实例。

该类声明了build_json_message方法,该方法为冲浪板构建一条状态消息,并返回 JSON 字符串和组成状态消息的键值对。代码将存储在status属性中的数字映射到字符串中,该字符串使用surfboard_status.py文件中声明的SURFBOARD_STATUS_DICTIONARY来解释状态。代码使用speed_mphaltitude_feetwater_temperature_f属性为其他键提供值。

将以下行添加到主虚拟环境文件夹中的现有surfboard_monitor.py。下面几行声明了我们将用作回调的函数以及这些回调将调用的其他函数。样本的代码文件包含在mqtt_python_gaston_hillar_06_01文件夹中的surfboard_monitor.py文件中:

def on_connect_mosquitto(client, userdata, flags, rc): 
    print("Result from Mosquitto connect: {}".format( 
        mqtt.connack_string(rc))) 
    # Check whether the result form connect is the CONNACK_ACCEPTED connack code 
    if rc == mqtt.CONNACK_ACCEPTED: 
        # Subscribe to a topic filter that provides all the sensors 
        sensors_topic_filter = topic_format.format( 
            surfboard_name, 
            "+") 
        client.subscribe(sensors_topic_filter, qos=0) 

def on_subscribe_mosquitto(client, userdata, mid, granted_qos): 
    print("I've subscribed with QoS: {}".format( 
        granted_qos[0])) 

def print_received_message_mosquitto(msg): 
    print("Message received. Topic: {}. Payload: {}".format( 
        msg.topic,  
        str(msg.payload))) 

def on_status_message_mosquitto(client, userdata, msg): 
    print_received_message_mosquitto(msg) 
    Surfboard.active_instance.status = int(msg.payload) 

def on_speed_mph_message_mosquitto(client, userdata, msg): 
    print_received_message_mosquitto(msg) 
    Surfboard.active_instance.speed_mph = float(msg.payload) 

def on_altitude_feet_message_mosquitto(client, userdata, msg): 
    print_received_message_mosquitto(msg) 
    Surfboard.active_instance.altitude_feet = float(msg.payload) 

def on_water_temperature_f_message_mosquitto(client, userdata, msg): 
    print_received_message_mosquitto(msg) 
    Surfboard.active_instance.water_temperature_f = float(msg.payload) 

def on_connect_pubnub(client, userdata, flags, rc): 
    print("Result from PubNub connect: {}".format( 
        mqtt.connack_string(rc))) 
    # Check whether the result form connect is the CONNACK_ACCEPTED connack code 
    if rc == mqtt.CONNACK_ACCEPTED: 
        Surfboard.active_instance.is_pubnub_connected = True 

def on_disconnect_pubnub(client, userdata, rc): 
    Surfboard.active_instance.is_pubnub_connected = False 
    print("Disconnected from PubNub")

代码声明以下以mosquitto前缀结尾的函数:

  • on_connect_mosquitto:此函数是与 MOSQUITO MQTT 服务器建立成功连接后将执行的回调。代码检查提供 MOSQUITO MQTT 服务器返回的CONNACK代码的rc参数的值。如果此值与mqtt.CONNACK_ACCEPTED匹配,则表示 MOSQUITO MQTT 服务器接受了连接请求,因此代码调用client参数中接收到的 MQTT 客户端的client.subscribe方法,以订阅 QoS 级别为 0 的surfboards/surfboard01/+主题过滤器。通过这种方式,MQTT 客户端将接收发送到surfboards/surfboard01/statussurfboards/surfboard01/speedmphsurfboards/surfboard01/altitudefeetsurfboards/surfboard01/temperaturef主题的消息,其中包含从不同传感器检索到的值。
  • on_subscribe_mosquitto:订阅surfboards/surfboard01/+主题过滤器成功后调用此函数。与前面的示例一样,该函数打印一条消息,指示授予订阅的 QoS 级别。
  • print_received_message_mosquitto:此函数接收msg参数中的mqtt.MQTTMessage实例,并打印此消息的主题和有效负载,以帮助我们了解应用程序中发生的情况。
  • on_status_message_mosquitto:当 MOSQUITO MQTT 服务器向surfboards/surfboard01/status主题发送消息时,将调用此函数。该函数以接收到的mqtt.MQTTMessage实例为参数调用print_received_message_mosquitto函数,并将Surfboard活动实例的status属性的值设置为将接收到的消息的有效负载转换为int
  • on_speed_mph_message_mosquitto:当 MOSQUITO MQTT 服务器向surfboards/surfboard01/speedmph主题发送消息时,将调用此函数。该函数以接收到的mqtt.MQTTMessage实例为参数调用print_received_message_mosquitto函数,并将Surfboard活动实例的speed_mph属性的值设置为将接收到的消息的有效负载转换为float
  • on_altitude_feet_message_mosquitto:当 MOSQUITO MQTT 服务器向surfboards/surfboard01/altitudefeet主题发送消息时,将调用此函数。该函数以接收到的mqtt.MQTTMessage实例为参数调用print_received_message_mosquitto函数,并将Surfboard活动实例的altitude_feet属性的值设置为将接收到的消息的有效负载转换为int
  • on_water_temperature_f_message_mosquitto:当 MOSQUITO MQTT 服务器向surfboards/surfboard01/watertemperaturef主题发送消息时,将调用此函数。该函数以接收到的mqtt.MQTTMessage实例为参数调用print_received_message_mosquitto函数,并将Surfboard活动实例的water_temperature_f属性的值设置为将接收到的消息的有效负载转换为int

In this case, we don't have a single function that works as a callback to process all the incoming messages from the Mosquitto MQTT server. We work with a callback for each specific topic. This way, we don't have to check the topic for the message to determine the code that we have to run.

代码声明以下以pubnub前缀结尾的函数:

  • on_connect_pubnub:此函数是与 PubNub MQTT 服务器建立成功连接后将执行的回调。代码检查提供 PubNub MQTT 服务器返回的CONNACK代码的rc参数的值。如果该值与mqtt.CONNACK_ACCEPTED匹配,则表示 PubNub MQTT 服务器接受了连接请求,因此代码将冲浪板活动实例的is_pubnub_connected属性值设置为True
  • on_disconnect_pubnub:此函数是连接到 PubNub MQTT 服务器的客户端失去连接时将执行的回调。该代码将冲浪板活动实例的is_pubnub_connected属性的值设置为False,并打印一条消息。

使用多个 MQTT 服务器

将以下行添加到主虚拟环境文件夹中的现有surfboard_monitor.py。以下几行声明了主块。样本的代码文件包含在mqtt_python_gaston_hillar_06_01文件夹中的surfboard_monitor.py文件中:

if __name__ == "__main__": 
    surfboard = Surfboard(device_id=device_id, 
        status=SURFBOARD_STATUS_IDLE, 
        speed_mph=0,  
        altitude_feet=0,  
        water_temperature_f=0) 
    pubnub_client_id = "{}/{}/{}".format( 
        pubnub_publish_key, 
        pubnub_subscribe_key, 
        device_id) 
    pubnub_client = mqtt.Client(client_id=pubnub_client_id, 
        protocol=mqtt.MQTTv311) 
    pubnub_client.on_connect = on_connect_pubnub 
    pubnub_client.on_disconnect = on_disconnect_pubnub 
    pubnub_client.connect(host=pubnub_mqtt_server_host, 
        port=pubnub_mqtt_server_port, 
        keepalive=pubnub_mqtt_keepalive) 
    pubnub_client.loop_start() 
    mosquitto_client = mqtt.Client(protocol=mqtt.MQTTv311) 
    mosquitto_client.on_connect = on_connect_mosquitto 
    mosquitto_client.on_subscribe = on_subscribe_mosquitto 
    mosquitto_client.message_callback_add( 
        status_topic, 
        on_status_message_mosquitto) 
    mosquitto_client.message_callback_add( 
        speed_mph_topic, 
        on_speed_mph_message_mosquitto) 
    mosquitto_client.message_callback_add( 
        altitude_feet_topic, 
        on_altitude_feet_message_mosquitto) 
    mosquitto_client.message_callback_add( 
        water_temperature_f_topic, 
        on_water_temperature_f_message_mosquitto) 
    mosquitto_client.tls_set(ca_certs = ca_certificate, 
        certfile=client_certificate, 
        keyfile=client_key) 
    mosquitto_client.connect(host=mqtt_server_host, 
        port=mqtt_server_port, 
        keepalive=mqtt_keepalive) 
    mosquitto_client.loop_start() 
    try: 
        while True: 
            if Surfboard.active_instance.is_pubnub_connected: 
                payload = Surfboard.active_instance.build_json_message() 
                result = pubnub_client.publish(topic=pubnub_topic, 
                    payload=payload, 
                    qos=0) 
                print("Publishing: {}".format(payload)) 
            else: 
                print("Not connected") 
            time.sleep(1) 
    except KeyboardInterrupt: 
        print("I'll disconnect from both Mosquitto and PubNub") 
        pubnub_client.disconnect() 
        pubnub_client.loop_stop() 
        mosquitto_client.disconnect() 
        mosquitto_client.loop_stop() 

首先,主块创建一个Surfboard类的实例,并将其保存在surfboard局部变量中。然后,代码生成与 PubNub MQTT 接口建立连接所需的客户端 ID 字符串,并将其保存在pubnub_client_id局部变量中。PubNub MQTT 接口要求我们使用由以下内容组成的客户端 ID:

publish_key/subscribe_key/device_id 

代码使用pubnub_publish_keypubnub_subscribe_keydevice_id变量的值来构建 PubNub MQTT 接口所需的客户端 ID。然后,代码创建名为pubnub_clientmqtt.Client类(paho.mqtt.client.Client的实例,它表示 PubNub MQTT 接口客户端。我们使用此实例与 PubNub MQTT 服务器通信。

然后,代码将函数分配给属性。下表总结了这些作业:

| 属性 | 分配功能 | | pubnub_client.on_connect | on_connect_pubnub | | pubnub_client.on_disconnect | on_disconnect_pubnub |

然后,代码调用pubnub_client.connect方法并指定hostportkeepalive参数的值。这样,代码要求 MQTT 客户机建立到指定 PubNub MQTT 服务器的连接。调用pubnub_client.connect方法后,代码调用pubnub_client.loop_start方法。此方法启动一个新线程,该线程处理与 PubNub MQTT 接口相关的 MQTT 网络流量,并释放主线程。

然后,主块创建名为mosquitto_clientmqtt.Client类(paho.mqtt.client.Client的另一个实例,该实例表示 MOSQUITO MQTT 服务器客户端。我们使用此实例与本地 Mosquitto MQTT 服务器通信。

然后,代码将函数分配给属性。下表总结了这些作业:

| 属性 | 分配功能 | | mosquitto_client.on_connect | on_connect_mosquitto | | mosquitto_client.on_subscribe | on_subscribe_mosquitto |

请注意,在本例中,代码没有将函数分配给mosquitto_client.on_message。接下来的几行调用mosquitto_client.message_callback_add方法,以指定客户端在收到特定主题中的消息时必须调用的回调函数。下表总结了将根据定义消息到达主题的变量调用的函数:

| 主题变量 | 分配功能 | | status_topic | on_status_message_mosquitto | | speed_mph_topic | on_speed_mph_message_mosquitto | | altitude_feet_topic | on_altitude_feet_message_mosquitto | | water_temperature_f_topic | on_water_temperature_f_message_mosquitto |

每当客户端收到来自任何传感器的消息时,它都会更新Surfboard活动实例的相应属性。这些分配的功能负责更新Surfboard活动实例的状态。

然后,代码调用众所周知的mosquitto_client.tls_setmosquitto_client.connect方法。这样,代码要求 MQTT 客户机建立到指定的 MOSQUITO MQTT 服务器的连接。调用mosquitto_client.connect方法后,代码调用mosquitto_client.loop_start方法。此方法启动一个新线程,该线程处理与 MOSQUITO MQTT 服务器相关的 MQTT 网络流量,并释放主线程。

Notice that we made two calls to loop_start, and therefore we will have two threads processing MQTT network traffic: one for the PubNub MQTT server and the other for the Mosquitto MQTT server.

接下来的几行声明了一个while循环,该循环将一直运行,直到KeyboardInterrupt异常发生。循环检查Surfboard.active_instance.is_pubnub_connected属性的值,以确保与 PubNub MQTT 服务器的连接没有中断。如果连接处于活动状态,代码将调用Surfboard.active_instance.build_json_message方法,根据Surfboard属性的当前值构建 JSON 字符串,只要传感器发送到具有新值的消息,就会更新这些属性。

代码将 JSON 字符串保存在payload局部变量中,并调用pubnub_client.publish方法将payloadJSON 格式的字符串发布到pubnub_topic变量中保存的主题名称中,QoS 级别为 0。这样,消息将由负责处理 PubNub MQTT 客户端的 MQTT 网络事件的线程发布,并且将更新使用 PubNub MQTT 服务器作为数据源的基于 web 的仪表板。下一行打印一条消息,其中包含发布到 PubNub MQTT 服务器的有效负载。

运行多个客户端

现在,我们将运行冲浪板传感器模拟器和我们最近编写的冲浪板监视器。确保在遵循必要的步骤激活我们一直工作的虚拟环境后运行这些 Python 程序。

执行以下行以在要用作 surfboard sensor emulator 并使用 Linux 或 macOS 的 MQTT 客户端的任何计算机或设备上启动 surfboard sensor emulator 示例:

    python3 surfboard_sensors_emulator.py  

在 Windows 中,必须执行以下行:

    python surfboard_sensors_emulator.py

几秒钟后,您将看到以下几行中显示的输出:

 Result from connect: Connection Accepted.
    surfboards/surfboard01/status: 0
    surfboards/surfboard01/speedmph: 1.0
    surfboards/surfboard01/altitudefeet: 2.0
    surfboards/surfboard01/temperaturef: 58.0
    surfboards/surfboard01/status: 0
    surfboards/surfboard01/speedmph: 1.1
    surfboards/surfboard01/altitudefeet: 2.0
    surfboards/surfboard01/temperaturef: 58.0
    surfboards/surfboard01/status: 1
    surfboards/surfboard01/speedmph: 2.0
    surfboards/surfboard01/altitudefeet: 3.0
    surfboards/surfboard01/temperaturef: 57.0

程序将继续将主题的消息发布到 MOSQUITO MQTT 服务器。保持代码在本地计算机或您选择用作本例冲浪板传感器模拟器的物联网板上运行。

然后,在要用作 MQTT 客户机的任何计算机或设备上执行以下行以启动冲浪板监视器示例,该客户机从 Mosquitto MQTT 服务器接收消息,并将消息发布到 PubNub MQTT 服务器,并使用 Linux 或 macOS:

    python3 surfboard_monitor.py  

在 Windows 中,必须执行以下行:

    python surfboard_monitor.py

几秒钟后,您将看到一个输出,其中包含与下一行类似的消息。请注意,这些值会有所不同,因为您开始运行程序的时间会使这些值发生变化:

    Not connected
    Result from Mosquitto connect: Connection Accepted.
    I've subscribed with QoS: 0
    Result from PubNub connect: Connection Accepted.
    Message received. Topic: surfboards/surfboard01/status. Payload: 
    b'3'
    Message received. Topic: surfboards/surfboard01/speedmph. Payload: 
    b'0.0'
    Message received. Topic: surfboards/surfboard01/altitudefeet. 
    Payload: b'3.0'
    Message received. Topic: surfboards/surfboard01/temperaturef. 
    Payload: b'55.0'
    Publishing: {"Status": "Ride finished", "Speed MPH": 0.0, "Altitude 
    Feet": 3.0, "Water Temperature F": 55.0}
    Message received. Topic: surfboards/surfboard01/status. Payload: 
    b'0'
    Message received. Topic: surfboards/surfboard01/speedmph. Payload: 
    b'1.0'
    Message received. Topic: surfboards/surfboard01/altitudefeet. 
    Payload: b'2.0'
    Message received. Topic: surfboards/surfboard01/temperaturef. 
    Payload: b'58.0'
    Publishing: {"Status": "Idle", "Speed MPH": 1.0, "Altitude Feet": 
    2.0, "Water Temperature F": 58.0}
    Message received. Topic: surfboards/surfboard01/status. Payload: 
    b'0'
    Message received. Topic: surfboards/surfboard01/speedmph. Payload: 
    b'1.1'
    Message received. Topic: surfboards/surfboard01/altitudefeet. 
    Payload: b'2.0'
    Message received. Topic: surfboards/surfboard01/temperaturef. 
    Payload: b'58.0'
    Publishing: {"Status": "Idle", "Speed MPH": 1.1, "Altitude Feet": 
    2.0, "Water Temperature F": 58.0}

程序将继续接收来自冲浪板传感器模拟器的消息,并将消息发布到 PubNub MQTT 服务器。保持代码在本地计算机或您选择用作本例冲浪板监视器的物联网板上运行。

下面的屏幕截图显示了在使用 macOS 的计算机上运行的两个终端窗口。左侧的终端显示了作为冲浪板传感器仿真器的 Python 客户端显示的消息,即surfboard_sensors_emulator.py脚本。右侧的终端显示为用作冲浪板监视器的 Python 客户端运行代码的结果,即surfboard_monitor.py脚本:

使用干舷构建基于 web 的仪表板

现在,我们已经准备好使用 PubNub MQTT 服务器作为数据源来构建基于 web 的实时仪表板。如前所述,我们将利用 freeboard.io 在许多仪表中可视化冲浪者和冲浪板数据。

在构建基于 web 的仪表板之前,freeboard.io 要求我们注册并创建一个具有有效电子邮件和密码的帐户。我们不需要输入任何信用卡或付款信息。如果您已经在 freeboard.io 拥有帐户,则可以跳过下一步。

转到http://freeboard.io 在您的 web 浏览器中单击“立即开始”。您也可以直接前往https://freeboard.io/signup 。在“选择用户名”中输入所需用户名,在“输入电子邮件”中输入电子邮件,在“创建密码”中输入所需密码。填写完所有字段后,单击“创建我的帐户”。

创建帐户后,您可以转到http://freeboard.io 在您的 web 浏览器中,单击登录。您可以通过访问来实现相同的目标 https://freeboard.io/login 。然后,输入您的用户名或电子邮件和密码,然后单击登录。干舷将显示您的干舷,也称为仪表板。

在“新建”按钮左侧的“输入名称”文本框中输入Surfboard01,然后单击此按钮。freeboard.io 将显示一个带有许多按钮的空仪表板,允许我们添加窗格和数据源等。以下屏幕截图显示了空的仪表板:

单击“添加以下数据源”,网站将打开“数据源”对话框。在类型下拉列表中选择 PubNub,对话框将显示定义 PubNub 数据源所需的字段。

Notice that it is also possible to use MQTT as a datasource for freeboard.io. However, this would require us to make our Mosquitto MQTT server publicly available. Instead, we take advantage of the PubNub MQTT interface which allows us to make the messages easily available on the PubNub network. However, you can definitely work with an MQTT server as a datasource in your projects that require a dashboard where freeboard.io provides you the with required features.

在名称中输入surfboard01

输入从 PubNub 设置复制的订阅密钥。请记住,订阅密钥通常以sub-c前缀开头。

在频道中输入surfboard01

如果前面的任何值名称错误,则数据源将没有适当的数据。以下屏幕截图显示 PubNub 数据源的配置,其中 subscribe 仅显示sub-c前缀:

单击保存,数据源将显示在下面的数据源列表中。由于冲浪板传感器模拟器和冲浪板监视器正在运行,下面显示的上次更新时间将每秒更改一次。如果时间不是每秒钟改变一次,则意味着数据源的配置错误,或者任何 Python 程序都没有按预期运行。

单击“添加窗格”将新的空窗格添加到仪表板。然后,单击新空窗格右上角的加号(+),干舷将显示小部件对话框。

在类型下拉列表中选择文本,对话框将显示向仪表板内的窗格添加文本小部件所需的字段。在标题中输入Status

单击值文本框右侧的+数据源,选择冲浪板 01,然后选择状态。进行选择后,值文本框datasources ["surfboard01"] ["Status"]中将显示以下文本,如下一个屏幕截图所示:

然后,单击“保存”,干舷将关闭对话框,并将新仪表添加到仪表板中以前创建的窗格中。仪表将显示冲浪板监视器发布到 PubNub MQTT 接口的最新状态值,即代码上次发布的 JSON 数据中的Status键的值。以下屏幕截图显示了显示上次更新时间的 surfboard01 数据源和显示状态最新值的仪表:

单击“添加窗格”向仪表板添加另一个新的空窗格。然后,单击新空窗格右上角的加号(+),干舷将显示小部件对话框。

在类型下拉列表中选择 Gauge,对话框将显示向仪表板内的窗格添加 Gauge 小部件所需的字段。在标题中输入Speed

单击值文本框右侧的+数据源,选择冲浪板 01,然后选择速度 MPH。进行选择后,值文本框中将显示以下文本:datasources ["surfboard01"] ["Speed MPH"]

在单位中输入MPH,在最小值中输入0,在最大值中输入40。然后,单击“保存”,干舷将关闭对话框,并将新仪表添加到仪表板上先前创建的窗格中。仪表将显示冲浪板监视器发布到 PubNub MQTT 接口的最新速度值,即代码上次发布的 JSON 数据中的Speed MPH键的值。

以下屏幕截图显示了 surfboard01 数据源,其中显示了上次更新的时间,添加的仪表显示了最新的速度值(以英里/小时为单位)。

单击“添加窗格”向仪表板添加另一个新的空窗格。然后,单击新空窗格右上角的加号(+),干舷将显示小部件对话框。

在类型下拉列表中选择 Gauge,对话框将显示向仪表板上的窗格添加 Gauge 小部件所需的字段。在标题中输入Altitude

单击值文本框右侧的+数据源,选择冲浪板 01,然后选择高度英尺。进行选择后,值文本框中将显示以下文本:datasources ["surfboard01"] ["Altitude Feet"]

在单位中输入Feet,在最小值中输入0,在最大值中输入30。然后,单击“保存”,干舷将关闭对话框,并将新仪表添加到仪表板上先前创建的窗格中。仪表将显示冲浪板监视器发布到 PubNub MQTT 接口的最新高度值,即代码上次发布的 JSON 数据中的Altitude Feet键的值。

现在,我们将添加最后一个窗格。单击“添加窗格”向仪表板添加另一个新的空窗格。然后,单击新空窗格右上角的加号(+),干舷将显示小部件对话框。

在类型下拉列表中选择 Gauge,对话框将显示向仪表板上的窗格添加 Gauge 小部件所需的字段。在标题中输入Water temperature

单击数值文本框右侧的+数据源,选择冲浪板 01,然后选择水温 F。选择后,数值文本框中将出现以下文本:datasources ["surfboard01"] ["Water Temperature F"]

在单位中输入ºF,在最小值中输入0,在最大值中输入80。然后,单击“保存”,干舷将关闭对话框,并将新仪表添加到仪表板上先前创建的窗格中。仪表将显示冲浪板监视器发布到 PubNub MQTT 接口的水温最新值,即代码上次发布的 JSON 数据中的Water Temperature F键的值。

拖放窗格以定位窗格,其布局如下一个屏幕截图所示。屏幕截图显示了我们用四个窗格和三个仪表板构建的仪表板,当冲浪板监视器将数据发布到 PubNub MQTT 接口时,这些仪表板每秒自动刷新数据。

We can access the recently built dashboard on any device by entering the URL that our web browser displays at the time we are working with the dashboard. The URL is composed of the https://freeboard.io/board/ prefix followed by letters and numbers. For example, if the URL is https://freeboard.io/board/EXAMPLE, we just need to enter it in any web browser running on any device or computer connected to the internet, and we can watch the gauges and they will be refreshed as new data is being published from our surfboard monitor.

PubNub 作为我们的数据源,freeboard.io 作为我们基于 web 的仪表板,使我们能够轻松监控从冲浪者潜水服和冲浪板中的传感器检索到的数据。我们可以在任何提供 web 浏览器的设备上监控数据。这两种基于云的物联网服务的组合只是我们如何在解决方案中轻松将不同服务与 MQTT 组合的一个示例。

测试你的知识

让我们看看您是否能正确回答以下问题:

  1. PubNub MQTT 接口要求我们使用由以下内容组成的客户端 ID:

    1. publish_key/subscribe_key/device_id
    2. device_id/publish_key/subscribe_key
    3. publish_key/device_id
  2. 将消息发布到 PubNub MQTT 接口时:

    1. 它仅在 PubNub MQTT 子网上可用
    2. 它在 PubNub 网络上可用
    3. 它需要特定的有效负载前缀才能在 PubNub 网络上可用
  3. paho.mqtt.client.Client实例的以下哪种方法允许我们指定客户端在收到特定主题的消息时必须调用的回调函数:

    1. message_callback_add
    2. message_arrived_to_topic_callback
    3. message_on_topic

总结

在本章中,我们结合了在前几章中学习的所有知识,构建了一个基于 web 的仪表板,该仪表板具有每秒以仪表为单位显示数据的干舷。我们从头开始构建解决方案。首先,我们分析了需求,了解了嵌入冲浪板的物联网板将如何为我们提供必要的数据。

我们对冲浪板传感器模拟器进行编码,使其与物联网板的工作方式相同。然后,我们配置了 PubNub MQTT 接口,并编写了一个冲浪板监视器,该监视器从冲浪板传感器模拟器收集数据,并将数据发布到基于云的 PubNub MQTT 接口。我们编写了一个 Python 程序,该程序与两个具有两个线程循环接口的 MQTT 客户机一起工作。

最后,我们可以利用发布到 PubNub MQTT 接口的消息在 PubNub 网络上也可用的事实,轻松构建基于 web 的超高仪表板。

我们能够创建能够在最流行和强大的物联网板上运行的代码。我们已经准备好在各种项目中使用 MQTT,使用最流行和通用的编程语言之一:Python 3。

解决

第 1 章:安装 MQTT 3.1.1 MOSQUITO 服务器

| 问题 | 答案 | | Q1 | 2. | | 问题 2 | 1. | | 第三季度 | 3. | | 第四季度 | 3. | | 问题 5 | 2. |

第 2 章:使用命令行和 GUI 工具了解 MQTT 的工作原理

| 问题 | 答案 | | Q1 | 2. | | 问题 2 | 3. | | 第三季度 | 1. | | 第四季度 | 3. | | 问题 5 | 3. |

第 3 章:保护 MQTT 3.1.1 MOSQUITO 服务器

| 问题 | 答案 | | Q1 | 3. | | 问题 2 | 3. | | 第三季度 | 1. | | 第四季度 | 2. | | 问题 5 | 1. |

第 4 章:使用 Python 和 MQTT 消息编写控制车辆的代码

| 问题 | 答案 | | Q1 | 1. | | 问题 2 | 2. | | 第三季度 | 2. | | 第四季度 | 2. | | 问题 5 | 3. |

第 5 章:用 Python 测试和改进我们的车辆控制解决方案

| 问题 | 答案 | | Q1 | 1. | | 问题 2 | 2. | | 第三季度 | 3. | | 第四季度 | 3. | | 问题 5 | 1. |

第 6 章:使用基于云的实时 MQTT 提供商和 Python 监控冲浪比赛

| 问题 | 答案 | | Q1 | 1. | | 问题 2 | 2. | | 第三季度 | 1. |