# 数据采集网关（RTU）阿里云业务中间层设计

## 1.目的

梳理软件设计，便于程序理解和维护。

## 2.范围 

  适用于数据采集网关（RTU）及同类产品。  

## 3.背景

数据采集网关（RTU）的北向（对平台）业务任务存在于南向（对子设备）采集任务和北向链路任务中间，是平台业务在数据采集网关（RTU）上的实现。以独立任务（线程）的方式运行。

北向业务任务运行时，首先要对业务平台进行初始化。代码如下：

In [None]:
/*****************************************************************************
* Function     : PlatformInit()
* Description  : 业务平台初始化
* Input        : platform_id:业务平台编号
* Output       : None
* Return       : None
* Note(s)      : None
* Contributor  : 2020年12月3日        Andre
*****************************************************************************/
static PLATFORM* PlatformInit(PLATFORM_ID_E platform_id)
{
    PLATFORM *p_platform = NULL;
    
    p_platform = fibo_malloc(sizeof(PLATFORM));
    if(p_platform == NULL)
    {
        Log_e("fibo_malloc fail");
        return NULL;
    }
    
    if (platform_id >= PLATFORM_ID_MAX)
    {
        Log_e("platform_id %d is invalid", platform_id);
        return NULL;
    }

    p_platform->id = platform_id;
    switch(p_platform->id)
    {
        case LSYUN_PAAS:
            p_platform->FunProtocolDeal = LsyunPaasDeal;
            break;
        case ALIYUN:
            p_platform->FunProtocolDeal = AliyunDeal;
            break;
        default:
            break;
    }

    return p_platform;
}

其中，ALIYUN是针对阿里云物模型数据在数据采集网关（RTU）上的实现，AliyunDeal是阿里云协议解析处理函数。

## 4.总体设计

阿里云物模型是物理空间中的实体（如传感器、车载装置、楼宇、工厂等）在云端的数字化表示，从属性、服务和事件三个维度，分别描述了该实体是什么、能做什么、可以对外提供哪些信息。

阿里云业务实现的核心思想是通过配置文件读取阿里云业务相关数据（比如测点标识符），然后把采集到的数据转换成物模型所规定的属性、服务和事件，最后上报。

阿里云业务中间层总体设计示意图如下：

![jupyter](./picture/阿里云业务示意图.bmp)

## 5.详细设计

### 5.1 属性（property）上报

属性上报由北向（阿里云）业务任务定时触发。其中，首次上报要对齐上一个整点时间。之后按上报周期，定时上报数据。

为防止队列数据过大，北向（阿里云）业务任务发出的消息只包含时间戳，device_id和测点个数。数据在调用ParsePeriodData函数组包的时候拼接到报文中。

属性上报部分代码如下：

In [None]:
                // 周期数据上报
                start_time = LS_utc_time(NULL);
                // 首次上报，要对齐上一个整点时间
                if(first_upload_flag == 0)
                {
                    start_time /= met_app_para.upload_period;
                    start_time *= met_app_para.upload_period;
                    first_upload_flag = 1;
                }
                // 取整点时间上报
                if(start_time > last_time + met_app_para.upload_period)
                {
                    start_time /= met_app_para.upload_period;
                    start_time *= met_app_para.upload_period;
                    Log_w("Send PeriodData start");
                    // DIDO一直上报属性
                    SendPeriodDatatoLink(start_time, aliyun_parameter->device.ied.device_code, aliyun_parameter->code_chain_num);
                    for(i=0;i<north_met_num_of_meter-1;i++)
                    {
                        // 断连从设备不上报属性
                        if(g_last_device_communicate_state[(aliyun_parameter+1+i)->device.ied.device_code] == COMMUNICATION_CONNECT)
                            SendPeriodDatatoLink(start_time, (aliyun_parameter+1+i)->device.ied.device_code, (aliyun_parameter+1+i)->code_chain_num);                       
                    }
                    last_time = start_time;
                }

### 5.2 事件（event）上报

根据应用场景，数据采集网关定义了从设备断链、数据预警和数据变化上送三种通用事件。定义如下：

In [None]:
//alink协议EVENT报警码
#define ERROR_CODE_SLAVE_OFFLINE    0x01        //从设备断链
#define ERROR_CODE_DATA_ALARM       0x02        //数据预警
#define ERROR_CODE_BURST            0x03        //数据变化上送

其中，从设备断连与连接情况有关，数据预警和数据变化上送与数据有关。

#### 5.2.1 从设备断链事件

当南向采集任务发现从设备断连/恢复时，会发送一条消息到北向业务任务。北向业务任务收到此消息后，把消息转发给北向链路任务。北向链路再调用ParseDeviceAlarm函数组包从设备断链事件并发送给阿里云平台。

#### 5.2.2 数据预警和数据变化上送事件

当南向采集任务发现数据发生变化时，会发送一条消息到北向业务任务。北向业务任务收到此消息后，要根据新老数据（老数据指上次产生事件对应数据）判断是否发生数据预警或数据变化上送事件。对应代码如下：

In [None]:
/*****************************************************************************
* Function     : EventUploadJudge()
* Description  : 判断变化上送和数据告警数据是否需要上报
* Input        : event_data:    alink事件结构体
* Output       : reason:        error_code指针
* Return       : 0      需要上报
                 -1     不需要上报
* Note(s)      : None
* Contributor  : 2021年3月20日        Andre
*****************************************************************************/
int EventUploadJudge(ALINK_EVENT_t *event_data, uint8_t *reason)
{
    int i, j, k, ret;
    north_slave_code_chain_t *temp;
    float value_difference = 0;
    
    if(event_data == NULL)
    {
        Log_e("event_data is null, convert fail");
        return -1;
    }

    for(i=0;i<event_data->param_amount;i++)
    {
        //边缘计算
        ret = EdgeComputing(&event_data->param[i]);
        if(ret < 0)
        {
            Log_e("EdgeComputing error");
            return -1;              
        }
        //搜索device_code
        for(k=0;k<north_met_num_of_meter;k++)
        {
            if((aliyun_parameter + k)->device.ied.device_code == event_data->device_code)
                break;
        }
        //找不到返回失败 
        if(k >= north_met_num_of_meter)
            return -1;
        
        //搜索serial_number
        if((aliyun_parameter + k)->north_slave_code_chain == NULL)
            return -1;
        temp = (aliyun_parameter + k)->north_slave_code_chain;
        for(j=0;j<(aliyun_parameter + k)->code_chain_num;j++)
        {
            if(temp->serial_number == event_data->param[i].serial_number)
            {
                // 是变化上送还是数据预警,不可同时属于二者
                Log_d("temp->serial_number = %d", temp->serial_number);             
                Log_d("temp->last_update_value = %.2f, event_data->param[%d].value = %.2f", temp->last_update_value, i, event_data->param[i].value);
                Log_d("temp->burst_flag = %d, temp->deadzone = %.2f", temp->parameter.burst_flag, temp->parameter.deadzone);
                Log_d("temp->alarm_flag = %d, temp->alarm_ceiling = %.2f, temp->alarm_floor = %.2f", temp->parameter.alarm_flag, temp->parameter.alarm_ceiling, temp->parameter.alarm_floor);
                // 判断是否为变化上送
                if(temp->parameter.burst_flag == 1)
                {
                    // 比大小
                    if(event_data->param[i].value >= temp->last_update_value)
                        value_difference = event_data->param[i].value - temp->last_update_value;
                    else
                        value_difference = temp->last_update_value - event_data->param[i].value;
                    // 如果超过死区
                    if(value_difference > temp->parameter.deadzone)
                    {
                        *reason = BURST_UPLOAD;
                        temp->last_update_value = event_data->param[i].value;
                        return 0;
                    }
                    // 如果不超过死区
                    else
                        return -1;                      
                }
                // 判断是否为数据预警
                else if(temp->parameter.alarm_flag == 1)
                {
                    // 如果上次超限
                    if((temp->last_update_value > temp->parameter.alarm_ceiling) || (temp->last_update_value < temp->parameter.alarm_floor))
                    {
                        // 如果本次恢复
                        if((event_data->param[i].value <= temp->parameter.alarm_ceiling) && (event_data->param[i].value >= temp->parameter.alarm_floor))
                        {
                            *reason = DATA_ALARM_RECOVER;
                            temp->last_update_value = event_data->param[i].value;
                            return 0;
                        }
                        // 如果本次未恢复
                        else
                            return -1;                  
                    }
                    // 如果上次未超限
                    {
                        // 如果本次超限
                        if((event_data->param[i].value > temp->parameter.alarm_ceiling) || (event_data->param[i].value < temp->parameter.alarm_floor))
                        {
                            *reason = DATA_ALARM_OCCUR;
                            temp->last_update_value = event_data->param[i].value;
                            return 0;
                        }
                        // 如果本次未超限
                        else
                            return -1;                  
                    }
                }
                else
                {
                    Log_e("EventUploadJudge error");
                    return -1;                              
                }
                break;
            }
            temp = temp->p_next;
        }
        //找不到返回失败 
        if(j >= (aliyun_parameter + k)->code_chain_num)
            return -1;
    }

    return 0;
}

如果发生数据预警或数据变化上送事件，则把对应事件发送给北向链路任务。北向链路任务收到后组包上报。

### 5.3 服务调用和响应

用户通过阿里云平台发起服务调用时，阿里云会下发报文给数据采集网关。网关的北向链路任务收到报文后调用DeparseWriteData函数拆包，把对应数据以消息的形式发到北向业务任务。

北向业务任务收到消息后，根据下发的设备编号（device_code）查找这台设备对应的总线，再把消息发送到对应总线的南向采集任务执行。对应代码如下：

In [None]:
/*****************************************************************************
* Function     : AliyunDeal()
* Description  : 阿里云协议解析处理
* Input        : data   数据
                 len    数据长度
* Output       : None
* Return       : 0      成功
                 -1     失败
* Note(s)      : None
* Contributor  : 2021年3月19日        Andre
*****************************************************************************/
int AliyunDeal(uint8_t *data, uint16_t len)
{
    uint8_t device_code_temp;
    int32_t i, j, ret = -1;
    POINT_WRITE_t msg = {0};
    
    if((data == NULL) || (len < 10) || (len > MQTT_MBOX_LEN))
    {
        Log_w("data is invalid, AliyunDeal fail");
        return -1;
    }

    // 使用透传模式
    msg.direct_send_flag = 1;
    // 透传数据
    for(i=0;i<len;i++)      
        msg.custom_data[i] = data[i];
    // 透传数据长度
    msg.custom_data_length = len;
    
    g_write_priority_flag = 1;
    // 从拓扑表中查找对应设备在哪个总线
    device_code_temp = (msg.custom_data[1+6]<<8)+msg.custom_data[1+7];
    for(i=0;i<DTU_RS485_NUM;i++)
    {
        for(j=0;j<MAX_METER_NUM;j++)
        {
            if(g_topology[i][j] == device_code_temp)
                break;
        }
    }
    // 没找到就发给总线1,会回复失败
    if(i<DTU_RS485_NUM)
        ret = ls_sys_mbox_post(north_to_south_business_queue[i], &msg,500);//发送消息队列
    else
        ret = ls_sys_mbox_post(north_to_south_business_queue[0], &msg,500);//发送消息队列     

    return ret;
}

执行完毕后，南向采集任务把结果以消息反馈给北向业务任务。北向业务任务再把消息透传给北向链路任务。北向链路任务收到消息后组包，把服务响应发送给阿里云平台。

## 6.不足和改进方法

1.服务调用流程的链路较长，导致服务响应慢。