From 15e121638b61dd5d2b05931572372c8312ebd12f Mon Sep 17 00:00:00 2001 From: Marc-Philippe Fuller Date: Tue, 25 Apr 2023 17:09:16 -0700 Subject: [PATCH] refactor!: Make multi-level topics the only implementation (#579) BREAKING CHANGE: Removed single level topics, multi level is used by default closes: #551 Signed-off-by: Marc-Philippe Fuller --- cmd/res/configuration.yaml | 11 ++---- cmd/res/devices/mqtt.test.device.yaml | 5 +-- internal/driver/config.go | 5 ++- internal/driver/driver.go | 30 ++++------------ internal/driver/incominglistener.go | 51 ++++++--------------------- internal/driver/responselistener.go | 30 ++++------------ 6 files changed, 29 insertions(+), 103 deletions(-) diff --git a/cmd/res/configuration.yaml b/cmd/res/configuration.yaml index 23a712de..36656fe9 100644 --- a/cmd/res/configuration.yaml +++ b/cmd/res/configuration.yaml @@ -49,15 +49,8 @@ MQTTBrokerInfo: AuthMode: "none" CredentialsName: "credentials" - # Comment out/remove when using multi-level topics - IncomingTopic: "DataTopic" - ResponseTopic: "ResponseTopic" - UseTopicLevels: false - -# Uncomment to use multi-level topics -# IncomingTopic: "incoming/data/#" -# ResponseTopic: "command/response/#" -# UseTopicLevels: true + IncomingTopic: "incoming/data/#" + ResponseTopic: "command/response/#" Writable: # ResponseFetchInterval specifies the retry interval(milliseconds) to fetch the command response from the MQTT broker diff --git a/cmd/res/devices/mqtt.test.device.yaml b/cmd/res/devices/mqtt.test.device.yaml index 0d9d2be9..e5ede48b 100644 --- a/cmd/res/devices/mqtt.test.device.yaml +++ b/cmd/res/devices/mqtt.test.device.yaml @@ -6,10 +6,7 @@ deviceList: labels: [ "MQTT", "test" ] protocols: mqtt: - # Comment out/remove below to use multi-level topics - CommandTopic: "CommandTopic" - # Uncomment below to use multi-level topics - # CommandTopic: "command/MQTT-test-device" + CommandTopic: "command/MQTT-test-device" # autoEvents: # interval: "20s" # onChange: false diff --git a/internal/driver/config.go b/internal/driver/config.go index 931a649b..ccf4f443 100644 --- a/internal/driver/config.go +++ b/internal/driver/config.go @@ -51,9 +51,8 @@ type MQTTBrokerInfo struct { AuthMode string CredentialsName string - IncomingTopic string - ResponseTopic string - UseTopicLevels bool + IncomingTopic string + ResponseTopic string Writable WritableInfo } diff --git a/internal/driver/driver.go b/internal/driver/driver.go index 1bada85f..bc9efcbf 100644 --- a/internal/driver/driver.go +++ b/internal/driver/driver.go @@ -114,7 +114,7 @@ func (d *Driver) HandleReadCommands(deviceName string, protocols map[string]mode } func (d *Driver) handleReadCommandRequest(req sdkModel.CommandRequest, topic string) (*sdkModel.CommandValue, error) { - var result = &sdkModel.CommandValue{} + var result *sdkModel.CommandValue var err error var qos = byte(0) var retained = false @@ -125,20 +125,8 @@ func (d *Driver) handleReadCommandRequest(req sdkModel.CommandRequest, topic str var cmd = req.DeviceResourceName var payload []byte - if d.serviceConfig.MQTTBrokerInfo.UseTopicLevels { - topic = fmt.Sprintf("%s/%s/%s/%s", topic, cmd, method, cmdUuid) - // will publish empty payload - } else { - data := make(map[string]interface{}) - data["uuid"] = cmdUuid - data["method"] = method - data["cmd"] = cmd - - payload, err = json.Marshal(data) - if err != nil { - return result, err - } - } + topic = fmt.Sprintf("%s/%s/%s/%s", topic, cmd, method, cmdUuid) + // will publish empty payload driver.mqttClient.Publish(topic, qos, retained, payload) @@ -204,15 +192,9 @@ func (d *Driver) handleWriteCommandRequest(req sdkModel.CommandRequest, topic st if err != nil { return errors.NewCommonEdgeXWrapper(err) } - if d.serviceConfig.MQTTBrokerInfo.UseTopicLevels { - topic = fmt.Sprintf("%s/%s/%s/%s", topic, cmd, method, cmdUuid) - data[cmd] = commandValue - } else { - data["uuid"] = cmdUuid - data["method"] = method - data["cmd"] = cmd - data[cmd] = commandValue - } + + topic = fmt.Sprintf("%s/%s/%s/%s", topic, cmd, method, cmdUuid) + data[cmd] = commandValue payload, err = json.Marshal(data) if err != nil { diff --git a/internal/driver/incominglistener.go b/internal/driver/incominglistener.go index f0372443..7cd61510 100644 --- a/internal/driver/incominglistener.go +++ b/internal/driver/incominglistener.go @@ -7,8 +7,6 @@ package driver import ( - "encoding/json" - "fmt" "strings" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -25,45 +23,18 @@ func (d *Driver) onIncomingDataReceived(client mqtt.Client, message mqtt.Message var resourceName string var reading interface{} - if d.serviceConfig.MQTTBrokerInfo.UseTopicLevels { - incomingTopic := message.Topic() - subscribedTopic := d.serviceConfig.MQTTBrokerInfo.IncomingTopic - subscribedTopic = strings.Replace(subscribedTopic, "#", "", -1) - incomingTopic = strings.Replace(incomingTopic, subscribedTopic, "", -1) - metaData := strings.Split(incomingTopic, "/") - if len(metaData) != 2 { - driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, incoming topic data should have format ...//: `%s`", incomingTopic) - return - } - deviceName = metaData[0] - resourceName = metaData[1] - reading = string(message.Payload()) - } else { - var data map[string]interface{} - err := json.Unmarshal(message.Payload(), &data) - if err != nil { - driver.Logger.Errorf("Error unmarshaling payload: %s", err) - return - } - nameVal, ok := data[name] - if !ok { - driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, reading data `%v` should contain the field `%s` to indicate the device name", data, name) - return - } - cmdVal, ok := data[cmd] - if !ok { - driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, reading data `%v` should contain the field `%s` to indicate the device resource name", data, cmd) - return - } - deviceName = fmt.Sprintf("%s", nameVal) - resourceName = fmt.Sprintf("%s", cmdVal) - - reading, ok = data[resourceName] - if !ok { - driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, reading data `%v` should contain the field `%s` with the actual reading value", data, resourceName) - return - } + incomingTopic := message.Topic() + subscribedTopic := d.serviceConfig.MQTTBrokerInfo.IncomingTopic + subscribedTopic = strings.Replace(subscribedTopic, "#", "", -1) + incomingTopic = strings.Replace(incomingTopic, subscribedTopic, "", -1) + metaData := strings.Split(incomingTopic, "/") + if len(metaData) != 2 { + driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, incoming topic data should have format ...//: `%s`", incomingTopic) + return } + deviceName = metaData[0] + resourceName = metaData[1] + reading = string(message.Payload()) deviceObject, ok := d.sdk.DeviceResource(deviceName, resourceName) if !ok { diff --git a/internal/driver/responselistener.go b/internal/driver/responselistener.go index c25ee2af..834ed394 100644 --- a/internal/driver/responselistener.go +++ b/internal/driver/responselistener.go @@ -7,7 +7,6 @@ package driver import ( - "encoding/json" "strings" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -16,31 +15,16 @@ import ( func (d *Driver) onCommandResponseReceived(client mqtt.Client, message mqtt.Message) { var uuid string - if d.serviceConfig.MQTTBrokerInfo.UseTopicLevels { - topic := message.Topic() - metaData := strings.Split(topic, "/") + topic := message.Topic() + metaData := strings.Split(topic, "/") - if len(metaData) == 0 { - driver.Logger.Errorf("[Response listener] Command response ignored. metaData in the message is not sufficient to retrieve UUID: topic=%v msg=%v", message.Topic(), metaData) - return - } else { - uuid = metaData[len(metaData)-1] - } + if len(metaData) == 0 { + driver.Logger.Errorf("[Response listener] Command response ignored. metaData in the message is not sufficient to retrieve UUID: topic=%v msg=%v", message.Topic(), metaData) + return } else { - var response map[string]interface{} - var ok bool - - err := json.Unmarshal(message.Payload(), &response) - if err != nil { - driver.Logger.Errorf("Error unmarshaling payload: %s", err) - return - } - uuid, ok = response["uuid"].(string) - if !ok { - driver.Logger.Errorf("[Response listener] Command response ignored. No UUID found in the message: topic=%v msg=%v", message.Topic(), string(message.Payload())) - return - } + uuid = metaData[len(metaData)-1] } + driver.CommandResponses.Store(uuid, string(message.Payload())) driver.Logger.Debugf("[Response listener] Command response received: topic=%v uuid=%v msg=%v", message.Topic(), uuid, string(message.Payload())) }