-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt.go
190 lines (184 loc) · 5.75 KB
/
mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package ServiceUserInfoCost
import (
"encoding/json"
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
CoreLog "github.com/fotomxq/weeekj_core/v5/core/log"
CoreSQLPages "github.com/fotomxq/weeekj_core/v5/core/sql/pages"
IOTDevice "github.com/fotomxq/weeekj_core/v5/iot/device"
IOTMQTT "github.com/fotomxq/weeekj_core/v5/iot/mqtt"
MapRoom "github.com/fotomxq/weeekj_core/v5/map/room"
ServiceUserInfo "github.com/fotomxq/weeekj_core/v5/service/user_info"
)
// 请求信息档案列表
type subUserInfoCostListData struct {
//配对密钥
Keys IOTDevice.ArgsCheckDeviceKey `json:"keys"`
//分页
Pages CoreSQLPages.ArgsDataList `json:"pages"`
//组织ID
OrgID int64 `db:"org_id" json:"orgID" check:"id" empty:"true"`
//房间ID
RoomID int64 `db:"room_id" json:"roomID" check:"id" empty:"true"`
//信息ID
InfoID int64 `db:"info_id" json:"infoID" check:"id" empty:"true"`
//房间场景值
// 设备和房间绑定关系的mark值
RoomBindMark string `db:"room_bind_mark" json:"roomBindMark" check:"mark" empty:"true"`
//数据类型标识码
// 遥感数据及传感器数据值
SensorMark string `db:"sensor_mark" json:"sensorMark" check:"mark" empty:"true"`
}
func subUserInfoCostList(client mqtt.Client, message mqtt.Message) {
logAppend := "mqtt sub get server user info cost list, "
var resultData subUserInfoCostListData
resultByte := message.Payload()
if err := json.Unmarshal(resultByte, &resultData); err != nil {
CoreLog.MqttError(logAppend, "json, ", err)
return
}
if _, _, b := IOTMQTT.SubBefore(resultData.Keys, resultData.OrgID, logAppend); !b {
return
}
dataList, dataCount, err := GetCostList(&ArgsGetCostList{
Pages: resultData.Pages,
OrgID: resultData.OrgID,
RoomID: resultData.RoomID,
InfoID: resultData.InfoID,
RoomBindMark: resultData.RoomBindMark,
SensorMark: resultData.SensorMark,
})
if err != nil {
CoreLog.MqttError(logAppend, "get list data, ", err)
return
}
if err := pushUserInfoCostList(resultData.Keys.GroupMark, resultData.Keys.Code, dataList, dataCount); err != nil {
CoreLog.MqttError(logAppend, "push data, ", err)
return
}
}
func pushUserInfoCostList(groupMark, deviceCode string, dataList []FieldsCost, dataCount int64) (err error) {
//重组数据
type newDataType struct {
//数据集合
DataList []FieldsCost `json:"dataList"`
//房间信息
Rooms map[int64]string `json:"rooms"`
//信息人员列
InfoNames map[int64]string `json:"infoNames"`
//配置列
ConfigNames map[int64]string `json:"configNames"`
}
var newDataList newDataType
if err == nil {
newDataList.DataList = dataList
var waitRooms, waitInfo, waitConfigs []int64
for _, v := range dataList {
if v.RoomID > 0 {
waitRooms = append(waitRooms, v.RoomID)
}
if v.InfoID > 0 {
waitInfo = append(waitInfo, v.InfoID)
}
if v.ConfigID > 0 {
waitConfigs = append(waitConfigs, v.ConfigID)
}
}
if len(waitRooms) > 0 {
newDataList.Rooms, _ = MapRoom.GetRoomsName(&MapRoom.ArgsGetRooms{
IDs: waitInfo,
HaveRemove: false,
})
}
if len(waitInfo) > 0 {
newDataList.InfoNames, _ = ServiceUserInfo.GetInfoMoreNames(&ServiceUserInfo.ArgsGetInfoMore{
IDs: waitInfo,
HaveRemove: false,
})
}
if len(waitConfigs) > 0 {
newDataList.ConfigNames, _ = GetConfigsName(&ArgsGetConfigs{
IDs: waitConfigs,
HaveRemove: false,
OrgID: -1,
})
}
}
//获取和打包数据
var dataByte []byte
dataByte, err = json.Marshal(newDataList)
if err != nil {
err = errors.New("json error, " + err.Error())
return
}
//推送数据
topic := fmt.Sprint("service/user/info/cost/list/group/", groupMark, "/code/", deviceCode)
if err = IOTMQTT.MQTTClient.PublishWait(topic, 0, false, dataByte); err != nil {
err = errors.New(fmt.Sprint("mqtt push data, ", err))
return
}
return
}
// 请求最新的一条数据
type subUserInfoCostLastData struct {
//配对密钥
Keys IOTDevice.ArgsCheckDeviceKey `json:"keys"`
//组织ID
OrgID int64 `db:"org_id" json:"orgID" check:"id" empty:"true"`
//房间ID
RoomIDs []int64 `db:"room_ids" json:"roomIDs" check:"ids" empty:"true"`
//信息ID
InfoIDs []int64 `db:"info_ids" json:"infoIDs" check:"ids" empty:"true"`
//房间场景值
// 设备和房间绑定关系的mark值
RoomBindMark string `db:"room_bind_mark" json:"roomBindMark" check:"mark" empty:"true"`
//数据类型标识码
// 遥感数据及传感器数据值
SensorMark string `db:"sensor_mark" json:"sensorMark" check:"mark" empty:"true"`
}
func subUserInfoCostLast(client mqtt.Client, message mqtt.Message) {
//前置处理
logAppend := "mqtt sub get server user info cost last, "
var resultData subUserInfoCostLastData
resultByte := message.Payload()
if err := json.Unmarshal(resultByte, &resultData); err != nil {
CoreLog.MqttError(logAppend, "json, ", err)
return
}
if _, _, b := IOTMQTT.SubBefore(resultData.Keys, resultData.OrgID, logAppend); !b {
return
}
//构建数据
var dataList []FieldsCost
for _, v := range resultData.RoomIDs {
data, err := GetCostLast(&ArgsGetCostLast{
OrgID: resultData.OrgID,
RoomID: v,
InfoID: 0,
RoomBindMark: resultData.RoomBindMark,
SensorMark: resultData.SensorMark,
})
if err != nil {
continue
}
dataList = append(dataList, data)
}
for _, v := range resultData.InfoIDs {
data, err := GetCostLast(&ArgsGetCostLast{
OrgID: resultData.OrgID,
RoomID: 0,
InfoID: v,
RoomBindMark: resultData.RoomBindMark,
SensorMark: resultData.SensorMark,
})
if err != nil {
continue
}
dataList = append(dataList, data)
}
if err := pushUserInfoCostList(resultData.Keys.GroupMark, resultData.Keys.Code, dataList, int64(len(dataList))); err != nil {
CoreLog.MqttError(logAppend, "push data, ", err)
return
}
}