Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
305 lines (240 sloc) 12.4 KB

服务端检测设备是否在线 {#concept_wzg_gk2_ggb .concept}

基于MQTT接入的设备靠心跳保活,但心跳是周期性的、且自动收发和超时重连,这些特性给主动检测设备端是否在线带来了一定难度。服务端虽提供了GetDeviceStatus和BatchGetDeviceState接口查询设备状态,但是调用API获取设备状态是基于会话实现的,会话保活是建立在心跳基础上的。本文提供通过使用RRPC来判定设备是否在线的原理、流程、实现方式。

原理 {#section_uwv_322_ggb .section}

如果设备可以接收服务端下发的消息并作出响应,那么该设备通信是没问题的,并且设备一定在线。

消息收发是物联网平台的核心能力。因此,这种判定方法不会因为物联网平台架构升级或业务变动而变化,也不会因为设备使用的客户端不同而不同。是服务端检测设备是否在线最通用的一种原理。

物联网平台提供的RRPC能力就是该原理的一种特殊实现。

流程 {#section_f4q_jcf_ggb .section}

  1. 设备端订阅RRPC请求的Topic =/sys/yourProductKey/yourDeviceName/rrpc/request/+

    格式固定。

  2. 服务端调用RRpc接口发送指令{"id":123,"version":"1.0","time":1234567890123}

    消息内容可自定义,但建议使用此格式。

    参数说明:

    字段 类型 说明
    id Object 用于验证收发的消息是否是同一个,请自行业务层保证唯一
    version String 版本号固定1.0
    time Long 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
  3. 设备端接收指令并响应RRPC请求。{"id":123,"version":"1.0","time":1234567890123}

    离线判定逻辑

    • 严格的:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线
    • 普通的:发送消息后,5秒内没有收到消息算失败,连续2次失败,判定为离线
    • 宽松的:发送消息后,5秒内没有收到消息算失败,连续3次失败,判定为离线 说明: 您可以根据自己的情况,自定义离线判定逻辑。

实现 {#section_dm5_ng2_ggb .section}

为方便体验,本例基于设备端Java SDK Demo服务端Java SDK Demo开发。

说明: 您可以根据自己的喜好,选择不同的设备端SDK服务端SDK进行开发。

分别下载Demo工程,服务端添加CheckDeviceStatusOnServer类,设备端添加Device类,填写AccessKey信息和设备三元组信息。

设备端代码如下:

import java.io.UnsupportedEncodingException;

import com.aliyun.alink.dm.api.DeviceInfo;
import com.aliyun.alink.dm.api.InitResult;
import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
import com.aliyun.alink.linkkit.api.LinkKit;
import com.aliyun.alink.linkkit.api.LinkKitInitParams;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
import com.aliyun.alink.linksdk.cmp.core.base.ARequest;
import com.aliyun.alink.linksdk.cmp.core.base.AResponse;
import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
import com.aliyun.alink.linksdk.tools.AError;

public class Device {

    // ===================需要用户填写的参数,开始===========================
    // 产品productKey,设备证书参数之一
    private static String productKey = "";
    // 设备名字deviceName,设备证书参数之一
    private static String deviceName = "";
    // 设备密钥deviceSecret,设备证书参数之一
    private static String deviceSecret = "";
    // 消息通信的Topic,无需创建和定义,直接使用即可
    private static String rrpcTopic = "/sys/" + productKey + "/" + deviceName + "/rrpc/request/+";
    // ===================需要用户填写的参数,结束===========================

    public static void main(String[] args) throws InterruptedException {

        Device device = new Device();

        // 初始化
        device.init(productKey, deviceName, deviceSecret);

        // 下行数据监听
        device.registerNotifyListener();

        // 订阅Topic
        device.subscribe(rrpcTopic);
    }

    /**
     * 初始化
     * 
     * @param pk productKey
     * @param dn devcieName
     * @param ds deviceSecret
     * @throws InterruptedException
     */
    public void init(String pk, String dn, String ds) throws InterruptedException {

        LinkKitInitParams params = new LinkKitInitParams();

        // 设置 MQTT 初始化参数
        IoTMqttClientConfig config = new IoTMqttClientConfig();
        config.productKey = pk;
        config.deviceName = dn;
        config.deviceSecret = ds;
        params.mqttClientConfig = config;

        // 设置初始化设备证书信息,用户传入
        DeviceInfo deviceInfo = new DeviceInfo();
        deviceInfo.productKey = pk;
        deviceInfo.deviceName = dn;
        deviceInfo.deviceSecret = ds;

        params.deviceInfo = deviceInfo;

        LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
            public void onError(AError aError) {
                System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
                        + aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
            }

            public void onInitDone(InitResult initResult) {
                System.out.println("init success !!");
            }
        });

        // 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
        Thread.sleep(2000);
    }

    /**
     * 监听下行数据
     */
    public void registerNotifyListener() {
        LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
            @Override
            public boolean shouldHandle(String connectId, String topic) {
                // 只处理特定Topic的消息
                if (topic.contains("/rrpc/request/")) {
                    return true;
                } else {
                    return false;
                }
            }

            @Override
            public void onNotify(String connectId, String topic, AMessage aMessage) {
                // 接收RRPC请求并回复RRPC响应
                try {
                    String response = topic.replace("/request/", "/response/");
                    publish(response, new String((byte[]) aMessage.getData(), "UTF-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void onConnectStateChange(String connectId, ConnectState connectState) {
            }
        });
    }

    /**
     * 发布消息
     * 
     * @param topic 发送消息的Topic
     * @param payload 发送的消息内容
     */
    public void publish(String topic, String payload) {
        MqttPublishRequest request = new MqttPublishRequest();
        request.topic = topic;
        request.payloadObj = payload;
        request.qos = 0;
        LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
            @Override
            public void onResponse(ARequest aRequest, AResponse aResponse) {
            }

            @Override
            public void onFailure(ARequest aRequest, AError aError) {
            }
        });
    }

    /**
     * 订阅消息
     * 
     * @param topic 订阅消息的Topic
     */
    public void subscribe(String topic) {
        MqttSubscribeRequest request = new MqttSubscribeRequest();
        request.topic = topic;
        LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
            @Override
            public void onSuccess() {
            }

            @Override
            public void onFailure(AError aError) {
            }
        });
    }

}

服务端代码如下:

import java.io.UnsupportedEncodingException;

import org.apache.commons.codec.binary.Base64;

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.iot.model.v20170420.RRpcRequest;
import com.aliyuncs.iot.model.v20170420.RRpcResponse;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;

public class CheckDeviceStatusOnServer extends BaseTest {

    // ===================需要用户填写的参数,开始===========================
    // 用户账号AccessKey
    private static String accessKeyID = "";
    // 用户账号AccesseKeySecret
    private static String accessKeySecret = "";
    // 产品productKey,设备证书参数之一
    private static String productKey = "";
    // 设备名字deviceName,设备证书参数之一
    private static String deviceName = "";
    // ===================需要用户填写的参数,结束===========================

    public static void main(String[] args) throws ServerException, ClientException, UnsupportedEncodingException {

        // -------------------------------------------------------------------
        // 要发送的消息,可以自定义,建议使用当前格式
        // -------------------------------------------------------------------
        // Field   | Tyep   | Desc
        // -------------------------------------------------------------------
        // id      | Object | 用于验证收发的消息是否是同一个,请自行业务层保证唯一
        // -------------------------------------------------------------------
        // version | String | 版本号固定1.0
        // -------------------------------------------------------------------
        // time    | Long   | 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
        // -------------------------------------------------------------------
        String payload = "{\"id\":123, \"version\":\"1.0\",\"time\":" + System.currentTimeMillis() + "}";

        // 构建RRPC请求
        RRpcRequest request = new RRpcRequest();
        request.setProductKey(productKey);
        request.setDeviceName(deviceName);
        request.setRequestBase64Byte(Base64.encodeBase64String(payload.getBytes()));
        request.setTimeout(5000);

        // 获取服务端请求客户端
        DefaultAcsClient client = getClient();

        // 发起RRPC请求
        RRpcResponse response = (RRpcResponse) client.getAcsResponse(request);

        // RRPC响应处理
        // 这个不能看response.getSuccess(),这个仅表明RRPC请求发送成功,不代表设备接收成功和响应成功
        // 需要根据RrpcCode来判定,参考文档https://help.aliyun.com/document_detail/69797.html
        if (response != null && "SUCCESS".equals(response.getRrpcCode())) {
            if (payload.equals(new String(Base64.decodeBase64(response.getPayloadBase64Byte()), "UTF-8"))) {
                System.out.println("Device is online");
            } else {
                System.out.println("Device is offline1");
            }
        } else {
            System.out.println("Device is offline");
        }
    }

    public static DefaultAcsClient getClient() {

        DefaultAcsClient client = null;

        try {
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKeyID, accessKeySecret);
            DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
            client = new DefaultAcsClient(profile);
        } catch (Exception e) {
            System.out.println("init client failed !! exception:" + e.getMessage());
        }

        return client;
    }
}

说明: 检测流程在服务端主动触发。

You can’t perform that action at this time.