From e260670cda1ad3ca818ad3882fbb30d6d1f11908 Mon Sep 17 00:00:00 2001 From: himanshu Date: Sat, 5 Jan 2019 16:04:55 +0530 Subject: [PATCH 1/4] Tempus-947 Saving device event through mqtt device api. --- .../msg/core/DeviceEventUploadRequest.java | 49 +++++++++++++++++++ .../server/common/msg/session/MsgType.java | 1 + .../server/transport/mqtt/MqttTopics.java | 1 + .../transport/mqtt/MqttTransportHandler.java | 34 +++++++++---- .../mqtt/MqttTransportServerInitializer.java | 21 ++++---- .../transport/mqtt/MqttTransportService.java | 18 ++++--- .../mqtt/adaptors/JsonMqttAdaptor.java | 19 +++++++ 7 files changed, 118 insertions(+), 25 deletions(-) create mode 100644 common/message/src/main/java/com/hashmapinc/server/common/msg/core/DeviceEventUploadRequest.java diff --git a/common/message/src/main/java/com/hashmapinc/server/common/msg/core/DeviceEventUploadRequest.java b/common/message/src/main/java/com/hashmapinc/server/common/msg/core/DeviceEventUploadRequest.java new file mode 100644 index 000000000..5d6a576ec --- /dev/null +++ b/common/message/src/main/java/com/hashmapinc/server/common/msg/core/DeviceEventUploadRequest.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * Modifications © 2017-2018 Hashmap, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hashmapinc.server.common.msg.core; + +import com.fasterxml.jackson.databind.JsonNode; +import com.hashmapinc.server.common.msg.session.FromDeviceRequestMsg; +import com.hashmapinc.server.common.msg.session.MsgType; + +public class DeviceEventUploadRequest extends BasicRequest implements FromDeviceRequestMsg { + + private JsonNode eventInfo; + + public DeviceEventUploadRequest() { + this(DEFAULT_REQUEST_ID); + } + + public JsonNode getEventInfo() { + return eventInfo; + } + + public void setEventInfo(JsonNode eventInfo) { + this.eventInfo = eventInfo; + } + + public DeviceEventUploadRequest(Integer requestId) { + super(requestId); + } + + @Override + public MsgType getMsgType() { + return MsgType.POST_DEVICE_EVENT; + } + + +} diff --git a/common/message/src/main/java/com/hashmapinc/server/common/msg/session/MsgType.java b/common/message/src/main/java/com/hashmapinc/server/common/msg/session/MsgType.java index 4a73aecc4..00c4df2e8 100644 --- a/common/message/src/main/java/com/hashmapinc/server/common/msg/session/MsgType.java +++ b/common/message/src/main/java/com/hashmapinc/server/common/msg/session/MsgType.java @@ -25,6 +25,7 @@ public enum MsgType { SPARKPLUG_TELEMETRY_UPDATE_NOTIFICATION, SPARKPLUG_SUBSCRIBE_TERMINATE, SPARKPLUG_DEATH_SUBSCRIBE, + POST_DEVICE_EVENT, SUBSCRIBE_RPC_COMMANDS_REQUEST, UNSUBSCRIBE_RPC_COMMANDS_REQUEST, TO_DEVICE_RPC_REQUEST, TO_DEVICE_RPC_RESPONSE, TO_DEVICE_RPC_RESPONSE_ACK, diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTopics.java index 30fd262eb..bac92646b 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTopics.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTopics.java @@ -32,6 +32,7 @@ public class MqttTopics { public static final String DEVICE_TELEMETRY_TOPIC = BASE_DEVICE_API_TOPIC + "/telemetry"; public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes"; public static final String DEVICE_DEPTH_TELEMETRY_TOPIC = BASE_DEVICE_API_TOPIC + "/depth/telemetry"; + public static final String DEVICE_EVENT_TOPIC = BASE_DEVICE_API_TOPIC + "/events"; public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway"; public static final String GATEWAY_CONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/connect"; diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java index bd50ae731..b9e75a213 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java @@ -18,8 +18,10 @@ import com.fasterxml.jackson.databind.JsonNode; import com.hashmapinc.server.common.data.Device; +import com.hashmapinc.server.common.data.Event; import com.hashmapinc.server.common.data.security.DeviceTokenCredentials; import com.hashmapinc.server.common.data.security.DeviceX509Credentials; +import com.hashmapinc.server.common.msg.core.DeviceEventUploadRequest; import com.hashmapinc.server.common.msg.session.AdaptorToSessionActorMsg; import com.hashmapinc.server.common.msg.session.BasicToDeviceActorSessionMsg; import com.hashmapinc.server.common.msg.session.MsgType; @@ -32,26 +34,26 @@ import com.hashmapinc.server.dao.asset.AssetService; import com.hashmapinc.server.dao.attributes.AttributesService; import com.hashmapinc.server.dao.device.DeviceService; +import com.hashmapinc.server.dao.event.EventService; import com.hashmapinc.server.dao.mail.MailService; import com.hashmapinc.server.dao.relation.RelationService; import com.hashmapinc.server.transport.mqtt.adaptors.MqttTransportAdaptor; import com.hashmapinc.server.transport.mqtt.session.DeviceSessionCtx; import com.hashmapinc.server.transport.mqtt.session.GatewaySessionCtx; +import com.hashmapinc.server.transport.mqtt.sparkplug.SparkPlugDecodeService; import com.hashmapinc.server.transport.mqtt.sparkplug.SparkPlugMsgTypes; +import com.hashmapinc.server.transport.mqtt.sparkplug.SparkPlugUtils; +import com.hashmapinc.server.transport.mqtt.sparkplug.data.SparkPlugMetaData; import com.hashmapinc.server.transport.mqtt.util.SslUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.*; -import io.netty.handler.codec.mqtt.MqttConnectReturnCode; -import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; -import com.hashmapinc.server.transport.mqtt.sparkplug.data.SparkPlugMetaData; -import com.hashmapinc.server.transport.mqtt.sparkplug.SparkPlugDecodeService; -import com.hashmapinc.server.transport.mqtt.sparkplug.SparkPlugUtils; + import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; import java.net.InetSocketAddress; @@ -83,6 +85,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final AssetService assetService; private final SslHandler sslHandler; private final MailService mailService; + private final EventService eventService; private static final String SPARK_PLUG_NAME_SPACE = "spBv1.0"; private volatile boolean connected; private volatile GatewaySessionCtx gatewaySessionCtx; @@ -90,7 +93,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private SparkPlugUtils sparkPlugUtils; public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, - MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService , AttributesService attributesService ,AssetService assetService, MailService mailService) { + MqttTransportAdaptor adaptor, SslHandler sslHandler, QuotaService quotaService , AttributesService attributesService, + AssetService assetService, EventService eventService, MailService mailService) { this.processor = processor; this.deviceService = deviceService; this.relationService = relationService; @@ -103,6 +107,7 @@ public MqttTransportHandler(SessionMsgProcessor processor, DeviceService deviceS this.attributesService = attributesService; this.assetService = assetService; this.mailService = mailService; + this.eventService = eventService; } @Override @@ -207,11 +212,12 @@ private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage try { if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { msg = adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_TELEMETRY_REQUEST, mqttMsg); - }else if(topicName.equals(MqttTopics.DEVICE_DEPTH_TELEMETRY_TOPIC)){ + } else if (topicName.equals(MqttTopics.DEVICE_DEPTH_TELEMETRY_TOPIC)){ msg = adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_TELEMETRY_REQUEST_DEPTH, mqttMsg); - } - else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) { + } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) { msg = adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_ATTRIBUTES_REQUEST, mqttMsg); + } else if (topicName.equals(MqttTopics.DEVICE_EVENT_TOPIC)) { + saveEventForDevice(adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_DEVICE_EVENT, mqttMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { msg = adaptor.convertToActorMsg(deviceSessionCtx, MsgType.GET_ATTRIBUTES_REQUEST, mqttMsg); if (msgId >= 0) { @@ -239,6 +245,16 @@ else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) { } } + private void saveEventForDevice(AdaptorToSessionActorMsg msg) { + Event event = new Event(); + Device device = deviceSessionCtx.getDevice(); + event.setEntityId(device.getId()); + event.setTenantId(device.getTenantId()); + event.setType("LC_EVENT"); + event.setBody(((DeviceEventUploadRequest)msg.getMsg()).getEventInfo()); + eventService.save(event); + } + private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { if (!checkConnected(ctx)) { return; diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportServerInitializer.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportServerInitializer.java index c357da596..a1b14f609 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportServerInitializer.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportServerInitializer.java @@ -16,21 +16,22 @@ */ package com.hashmapinc.server.transport.mqtt; +import com.hashmapinc.server.common.transport.SessionMsgProcessor; +import com.hashmapinc.server.common.transport.auth.DeviceAuthService; +import com.hashmapinc.server.common.transport.quota.QuotaService; import com.hashmapinc.server.dao.asset.AssetService; import com.hashmapinc.server.dao.attributes.AttributesService; +import com.hashmapinc.server.dao.device.DeviceService; +import com.hashmapinc.server.dao.event.EventService; import com.hashmapinc.server.dao.mail.MailService; +import com.hashmapinc.server.dao.relation.RelationService; +import com.hashmapinc.server.transport.mqtt.adaptors.MqttTransportAdaptor; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.ssl.SslHandler; -import com.hashmapinc.server.common.transport.SessionMsgProcessor; -import com.hashmapinc.server.common.transport.auth.DeviceAuthService; -import com.hashmapinc.server.common.transport.quota.QuotaService; -import com.hashmapinc.server.dao.device.DeviceService; -import com.hashmapinc.server.dao.relation.RelationService; -import com.hashmapinc.server.transport.mqtt.adaptors.MqttTransportAdaptor; public class MqttTransportServerInitializer extends ChannelInitializer { @@ -47,10 +48,11 @@ public class MqttTransportServerInitializer extends ChannelInitializer Date: Tue, 8 Jan 2019 12:43:54 +0530 Subject: [PATCH 2/4] Mqtt event publish through gateway device. --- .../msg/core/DeviceEventUploadRequest.java | 49 ------------------- .../server/transport/mqtt/MqttTopics.java | 1 + .../transport/mqtt/MqttTransportHandler.java | 38 +++++++++----- .../mqtt/adaptors/JsonMqttAdaptor.java | 16 ------ .../mqtt/session/GatewaySessionCtx.java | 44 +++++++++++++++-- 5 files changed, 67 insertions(+), 81 deletions(-) delete mode 100644 common/message/src/main/java/com/hashmapinc/server/common/msg/core/DeviceEventUploadRequest.java diff --git a/common/message/src/main/java/com/hashmapinc/server/common/msg/core/DeviceEventUploadRequest.java b/common/message/src/main/java/com/hashmapinc/server/common/msg/core/DeviceEventUploadRequest.java deleted file mode 100644 index 5d6a576ec..000000000 --- a/common/message/src/main/java/com/hashmapinc/server/common/msg/core/DeviceEventUploadRequest.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * Modifications © 2017-2018 Hashmap, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.hashmapinc.server.common.msg.core; - -import com.fasterxml.jackson.databind.JsonNode; -import com.hashmapinc.server.common.msg.session.FromDeviceRequestMsg; -import com.hashmapinc.server.common.msg.session.MsgType; - -public class DeviceEventUploadRequest extends BasicRequest implements FromDeviceRequestMsg { - - private JsonNode eventInfo; - - public DeviceEventUploadRequest() { - this(DEFAULT_REQUEST_ID); - } - - public JsonNode getEventInfo() { - return eventInfo; - } - - public void setEventInfo(JsonNode eventInfo) { - this.eventInfo = eventInfo; - } - - public DeviceEventUploadRequest(Integer requestId) { - super(requestId); - } - - @Override - public MsgType getMsgType() { - return MsgType.POST_DEVICE_EVENT; - } - - -} diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTopics.java index bac92646b..a3b60aa73 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTopics.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTopics.java @@ -43,6 +43,7 @@ public class MqttTopics { public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc"; public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/request"; public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/response"; + public static final String GATEWAY_EVENTS_TOPIC = BASE_GATEWAY_API_TOPIC + "/events"; private MqttTopics() { diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java index b9e75a213..4f27ddfde 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java @@ -17,11 +17,11 @@ package com.hashmapinc.server.transport.mqtt; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.hashmapinc.server.common.data.Device; import com.hashmapinc.server.common.data.Event; import com.hashmapinc.server.common.data.security.DeviceTokenCredentials; import com.hashmapinc.server.common.data.security.DeviceX509Credentials; -import com.hashmapinc.server.common.msg.core.DeviceEventUploadRequest; import com.hashmapinc.server.common.msg.session.AdaptorToSessionActorMsg; import com.hashmapinc.server.common.msg.session.BasicToDeviceActorSessionMsg; import com.hashmapinc.server.common.msg.session.MsgType; @@ -37,6 +37,7 @@ import com.hashmapinc.server.dao.event.EventService; import com.hashmapinc.server.dao.mail.MailService; import com.hashmapinc.server.dao.relation.RelationService; +import com.hashmapinc.server.transport.mqtt.adaptors.JsonMqttAdaptor; import com.hashmapinc.server.transport.mqtt.adaptors.MqttTransportAdaptor; import com.hashmapinc.server.transport.mqtt.session.DeviceSessionCtx; import com.hashmapinc.server.transport.mqtt.session.GatewaySessionCtx; @@ -56,6 +57,7 @@ import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -189,8 +191,10 @@ private void handleMqttPublishMsg(String topicName, int msgId, MqttPublishMessag try { if (topicName.equals(MqttTopics.GATEWAY_TELEMETRY_TOPIC)) { gatewaySessionCtx.onDeviceTelemetry(mqttMsg); - }else if (topicName.equals(MqttTopics.GATEWAY_DEPTH_TELEMETRY_TOPIC)) { + } else if (topicName.equals(MqttTopics.GATEWAY_DEPTH_TELEMETRY_TOPIC)) { gatewaySessionCtx.onDeviceDepthTelemetry(mqttMsg); + } else if (topicName.equals(MqttTopics.GATEWAY_EVENTS_TOPIC)) { + gatewaySessionCtx.onDeviceEventMsg(mqttMsg); } else if (topicName.equals(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC)) { gatewaySessionCtx.onDeviceAttributes(mqttMsg); } else if (topicName.equals(MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) { @@ -217,7 +221,7 @@ private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) { msg = adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_ATTRIBUTES_REQUEST, mqttMsg); } else if (topicName.equals(MqttTopics.DEVICE_EVENT_TOPIC)) { - saveEventForDevice(adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_DEVICE_EVENT, mqttMsg)); + saveEventForDevice(mqttMsg);//adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_DEVICE_EVENT, mqttMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { msg = adaptor.convertToActorMsg(deviceSessionCtx, MsgType.GET_ATTRIBUTES_REQUEST, mqttMsg); if (msgId >= 0) { @@ -245,14 +249,23 @@ private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage } } - private void saveEventForDevice(AdaptorToSessionActorMsg msg) { - Event event = new Event(); - Device device = deviceSessionCtx.getDevice(); - event.setEntityId(device.getId()); - event.setTenantId(device.getTenantId()); - event.setType("LC_EVENT"); - event.setBody(((DeviceEventUploadRequest)msg.getMsg()).getEventInfo()); - eventService.save(event); + private void saveEventForDevice(MqttPublishMessage inbound) throws AdaptorException { + String payload = JsonMqttAdaptor.validatePayload(deviceSessionCtx.getSessionId(), inbound.payload()); + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode eventInfo = mapper.readTree(payload); + Event event = new Event(); + Device device = deviceSessionCtx.getDevice(); + event.setEntityId(device.getId()); + event.setTenantId(device.getTenantId()); + event.setType("QUALITY_EVENT"); + event.setBody(eventInfo); + eventService.save(event); + + } catch (IOException ex) { + log.info("Execption occurred : {}", ex); + throw new AdaptorException(ex); + } } private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { @@ -475,7 +488,8 @@ private void checkGatewaySession(MqttConnectMessage msg) { JsonNode gatewayNode = infoNode.get("gateway"); JsonNode topic = infoNode.get(TOPIC); if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, deviceSessionCtx,attributesService,assetService, mailService); + gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, relationService, eventService, + deviceSessionCtx, attributesService, assetService, mailService); if((msg.payload().willTopic() != null) && msg.payload().willTopic().startsWith(SPARK_PLUG_NAME_SPACE)){ sparkPlugDecodeService = new SparkPlugDecodeService(); } diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index 05b592983..e37ed3fac 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -101,9 +101,6 @@ public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, MsgType case TO_SERVER_RPC_REQUEST: msg = convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound); break; - case POST_DEVICE_EVENT: - msg = convertToDeviceEventMsg(ctx, (MqttPublishMessage) inbound); - break; default: log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type); throw new AdaptorException(new IllegalArgumentException("Unsupported msg type: " + type + "!")); @@ -328,19 +325,6 @@ private FromDeviceMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublis } } - private FromDeviceMsg convertToDeviceEventMsg(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { - String payload = validatePayload(ctx.getSessionId(), inbound.payload()); - try { - ObjectMapper mapper = new ObjectMapper(); - JsonNode eventInfo = mapper.readTree(payload); - DeviceEventUploadRequest request = new DeviceEventUploadRequest(inbound.variableHeader().packetId()); - request.setEventInfo(eventInfo); - return request; - } catch (IOException ex) { - throw new AdaptorException(ex); - } - } - public static JsonElement validateJsonPayload(SessionId sessionId, ByteBuf payloadData) throws AdaptorException { String payload = validatePayload(sessionId, payloadData); try { diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/session/GatewaySessionCtx.java index a1e5b88a9..690b09d43 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/session/GatewaySessionCtx.java @@ -24,12 +24,14 @@ import com.google.gson.JsonSyntaxException; import com.hashmapinc.server.common.data.DataConstants; import com.hashmapinc.server.common.data.Device; +import com.hashmapinc.server.common.data.Event; import com.hashmapinc.server.common.data.asset.Asset; import com.hashmapinc.server.common.data.exception.TempusException; import com.hashmapinc.server.common.data.id.SessionId; import com.hashmapinc.server.common.data.id.TenantId; import com.hashmapinc.server.common.data.kv.AttributeKvEntry; import com.hashmapinc.server.common.data.kv.BaseAttributeKvEntry; +import com.hashmapinc.server.common.data.kv.KvEntry; import com.hashmapinc.server.common.data.relation.EntityRelation; import com.hashmapinc.server.common.data.relation.RelationTypeGroup; import com.hashmapinc.server.common.msg.core.*; @@ -37,23 +39,23 @@ import com.hashmapinc.server.common.msg.session.BasicAdaptorToSessionActorMsg; import com.hashmapinc.server.common.msg.session.BasicToDeviceActorSessionMsg; import com.hashmapinc.server.common.msg.session.ctrl.SessionCloseMsg; +import com.hashmapinc.server.common.transport.SessionMsgProcessor; import com.hashmapinc.server.common.transport.adaptor.AdaptorException; import com.hashmapinc.server.common.transport.adaptor.JsonConverter; import com.hashmapinc.server.common.transport.auth.DeviceAuthService; import com.hashmapinc.server.dao.asset.AssetService; import com.hashmapinc.server.dao.attributes.AttributesService; import com.hashmapinc.server.dao.device.DeviceService; +import com.hashmapinc.server.dao.event.EventService; import com.hashmapinc.server.dao.mail.MailService; import com.hashmapinc.server.dao.relation.RelationService; +import com.hashmapinc.server.transport.mqtt.MqttTransportHandler; import com.hashmapinc.server.transport.mqtt.sparkplug.data.SparkPlugDecodedMsg; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; -import com.hashmapinc.server.common.transport.SessionMsgProcessor; -import com.hashmapinc.server.transport.mqtt.MqttTransportHandler; -import com.hashmapinc.server.common.data.kv.KvEntry; import java.io.IOException; import java.util.*; @@ -83,12 +85,16 @@ public class GatewaySessionCtx { private final MailService mailService; private final Map devices; private ChannelHandlerContext channel; + private final EventService eventService; - public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx, AttributesService attributesService , AssetService assetService, MailService mailService) { + public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, + RelationService relationService, EventService eventService, DeviceSessionCtx gatewaySessionCtx, + AttributesService attributesService , AssetService assetService, MailService mailService) { this.processor = processor; this.deviceService = deviceService; this.authService = authService; this.relationService = relationService; + this.eventService = eventService; this.gateway = gatewaySessionCtx.getDevice(); this.gatewaySessionId = gatewaySessionCtx.getSessionId(); this.attributesService = attributesService; @@ -238,6 +244,36 @@ public void onDeviceDepthTelemetry(MqttPublishMessage mqttMsg) throws AdaptorExc } } + public void onDeviceEventMsg(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + for (Map.Entry deviceEntry : jsonObj.entrySet()) { + Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceEntry.getKey()); + if (device != null) { + saveDeviceEventInfo(device, deviceEntry.getValue().toString()); + } + } + } else { + throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); + } + } + + public void saveDeviceEventInfo(Device device, String strEventInfo) { + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode eventInfo = mapper.readTree(strEventInfo); + Event event = new Event(); + event.setEntityId(device.getId()); + event.setTenantId(device.getTenantId()); + event.setType("QUALITY_EVENT"); + event.setBody(eventInfo); + eventService.save(event); + } catch (IOException e) { + log.info("Object mapping execption {}", e); + } + } + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException { JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); if (json.isJsonObject()) { From 9e245ca181fd0d84f5f831e79d8906a36980bd1a Mon Sep 17 00:00:00 2001 From: himanshu Date: Tue, 8 Jan 2019 12:54:18 +0530 Subject: [PATCH 3/4] Tempus-947 UI changes for displaying quality issue events on device. --- ui/src/app/common/types.constant.js | 4 ++++ .../event/event-header-quality-issue.tpl.html | 22 +++++++++++++++++++ ui/src/app/event/event-header.directive.js | 4 ++++ ui/src/app/event/event-row-quality.tpl.html | 22 +++++++++++++++++++ ui/src/app/event/event-row.directive.js | 4 ++++ ui/src/app/locale/locale.constant.js | 5 ++++- 6 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 ui/src/app/event/event-header-quality-issue.tpl.html create mode 100644 ui/src/app/event/event-row-quality.tpl.html diff --git a/ui/src/app/common/types.constant.js b/ui/src/app/common/types.constant.js index f7ec42bc7..f8c746286 100644 --- a/ui/src/app/common/types.constant.js +++ b/ui/src/app/common/types.constant.js @@ -398,6 +398,10 @@ export default angular.module('tempus.types', []) stats: { value: "STATS", name: "event.type-stats" + }, + qualityEvent: { + value: "QUALITY_EVENT", + name: "event.type-quality-issue" } }, extensionType: { diff --git a/ui/src/app/event/event-header-quality-issue.tpl.html b/ui/src/app/event/event-header-quality-issue.tpl.html new file mode 100644 index 000000000..ec09f4a6c --- /dev/null +++ b/ui/src/app/event/event-header-quality-issue.tpl.html @@ -0,0 +1,22 @@ + +
event.event-time
+
event.server
+
event.issue
+
event.value
diff --git a/ui/src/app/event/event-header.directive.js b/ui/src/app/event/event-header.directive.js index 086582554..2d351853c 100644 --- a/ui/src/app/event/event-header.directive.js +++ b/ui/src/app/event/event-header.directive.js @@ -19,6 +19,7 @@ import eventHeaderLcEventTemplate from './event-header-lc-event.tpl.html'; import eventHeaderStatsTemplate from './event-header-stats.tpl.html'; import eventHeaderErrorTemplate from './event-header-error.tpl.html'; +import eventHeaderQualityTemplate from './event-header-quality-issue.tpl.html'; /* eslint-enable import/no-unresolved, import/default */ @@ -39,6 +40,9 @@ export default function EventHeaderDirective($compile, $templateCache, types) { case types.eventType.error.value: template = eventHeaderErrorTemplate; break; + case types.eventType.qualityEvent.value: + template = eventHeaderQualityTemplate; + break; } return $templateCache.get(template); } diff --git a/ui/src/app/event/event-row-quality.tpl.html b/ui/src/app/event/event-row-quality.tpl.html new file mode 100644 index 000000000..c1fffe1dd --- /dev/null +++ b/ui/src/app/event/event-row-quality.tpl.html @@ -0,0 +1,22 @@ + +
{{event.createdTime | date : 'yyyy-MM-dd HH:mm:ss'}}
+
{{event.body.server}}
+
{{event.body.issue}}
+
{{event.body.value}}
diff --git a/ui/src/app/event/event-row.directive.js b/ui/src/app/event/event-row.directive.js index cbef7352e..ed61c7a76 100644 --- a/ui/src/app/event/event-row.directive.js +++ b/ui/src/app/event/event-row.directive.js @@ -21,6 +21,7 @@ import eventErrorDialogTemplate from './event-content-dialog.tpl.html'; import eventRowLcEventTemplate from './event-row-lc-event.tpl.html'; import eventRowStatsTemplate from './event-row-stats.tpl.html'; import eventRowErrorTemplate from './event-row-error.tpl.html'; +import eventRowQualityTemplate from './event-row-quality.tpl.html'; /* eslint-enable import/no-unresolved, import/default */ @@ -41,6 +42,9 @@ export default function EventRowDirective($compile, $templateCache, $mdDialog, $ case types.eventType.error.value: template = eventRowErrorTemplate; break; + case types.eventType.qualityEvent.value: + template = eventRowQualityTemplate; + break; } return $templateCache.get(template); } diff --git a/ui/src/app/locale/locale.constant.js b/ui/src/app/locale/locale.constant.js index 442326148..8075ffabe 100644 --- a/ui/src/app/locale/locale.constant.js +++ b/ui/src/app/locale/locale.constant.js @@ -882,6 +882,7 @@ export default angular.module('tempus.locale', []) "type-error": "Error", "type-lc-event": "Lifecycle event", "type-stats": "Statistics", + "type-quality-issue": "Quality Issue", "no-events-prompt": "No events found", "error": "Error", "alarm": "Alarm", @@ -894,7 +895,9 @@ export default angular.module('tempus.locale', []) "success": "Success", "failed": "Failed", "messages-processed": "Messages processed", - "errors-occurred": "Errors occurred" + "errors-occurred": "Errors occurred", + "issue": "Issue", + "value": "Value" }, "extension": { "extensions": "Extensions", From 23cf357c4b8410dae07fa0094d430cc1c985ead4 Mon Sep 17 00:00:00 2001 From: Himanshu Mishra Date: Wed, 16 Jan 2019 15:40:39 +0530 Subject: [PATCH 4/4] Tempus-947 Removed unused imports. --- .../server/transport/mqtt/MqttTransportHandler.java | 2 +- .../transport/mqtt/adaptors/JsonMqttAdaptor.java | 13 +++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java index 4f27ddfde..c9e1e80e6 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/MqttTransportHandler.java @@ -221,7 +221,7 @@ private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_TOPIC)) { msg = adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_ATTRIBUTES_REQUEST, mqttMsg); } else if (topicName.equals(MqttTopics.DEVICE_EVENT_TOPIC)) { - saveEventForDevice(mqttMsg);//adaptor.convertToActorMsg(deviceSessionCtx, MsgType.POST_DEVICE_EVENT, mqttMsg)); + saveEventForDevice(mqttMsg); } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { msg = adaptor.convertToActorMsg(deviceSessionCtx, MsgType.GET_ATTRIBUTES_REQUEST, mqttMsg); if (msgId >= 0) { diff --git a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index e37ed3fac..8afc7ea52 100644 --- a/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/transport/mqtt/src/main/java/com/hashmapinc/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -17,7 +17,6 @@ package com.hashmapinc.server.transport.mqtt.adaptors; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonParser; @@ -30,22 +29,16 @@ import com.hashmapinc.server.common.transport.adaptor.AdaptorException; import com.hashmapinc.server.common.transport.adaptor.JsonConverter; import com.hashmapinc.server.transport.mqtt.MqttTopics; +import com.hashmapinc.server.transport.mqtt.MqttTransportHandler; import com.hashmapinc.server.transport.mqtt.session.DeviceSessionCtx; +import com.hashmapinc.server.transport.mqtt.sparkplug.SparkPlugEncodeService; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttFixedHeader; -import io.netty.handler.codec.mqtt.MqttQoS; -import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import com.hashmapinc.server.transport.mqtt.MqttTransportHandler; -import com.hashmapinc.server.transport.mqtt.sparkplug.SparkPlugEncodeService; -import java.io.IOException; import java.nio.charset.Charset; import java.util.*;