Skip to content

Commit

Permalink
refactor!: Make multi-level topics the only implementation (#579)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Removed single level topics, multi level is used by default

closes: #551

Signed-off-by: Marc-Philippe Fuller <marc-philippe.fuller@intel.com>
  • Loading branch information
marcpfuller committed Apr 26, 2023
1 parent e2c0731 commit 15e1216
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 103 deletions.
11 changes: 2 additions & 9 deletions cmd/res/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,8 @@ MQTTBrokerInfo:
AuthMode: "none"
CredentialsName: "credentials"

# Comment out/remove when using multi-level topics
IncomingTopic: "DataTopic"
ResponseTopic: "ResponseTopic"
UseTopicLevels: false

# Uncomment to use multi-level topics
# IncomingTopic: "incoming/data/#"
# ResponseTopic: "command/response/#"
# UseTopicLevels: true
IncomingTopic: "incoming/data/#"
ResponseTopic: "command/response/#"

Writable:
# ResponseFetchInterval specifies the retry interval(milliseconds) to fetch the command response from the MQTT broker
Expand Down
5 changes: 1 addition & 4 deletions cmd/res/devices/mqtt.test.device.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ deviceList:
labels: [ "MQTT", "test" ]
protocols:
mqtt:
# Comment out/remove below to use multi-level topics
CommandTopic: "CommandTopic"
# Uncomment below to use multi-level topics
# CommandTopic: "command/MQTT-test-device"
CommandTopic: "command/MQTT-test-device"
# autoEvents:
# interval: "20s"
# onChange: false
Expand Down
5 changes: 2 additions & 3 deletions internal/driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ type MQTTBrokerInfo struct {
AuthMode string
CredentialsName string

IncomingTopic string
ResponseTopic string
UseTopicLevels bool
IncomingTopic string
ResponseTopic string

Writable WritableInfo
}
Expand Down
30 changes: 6 additions & 24 deletions internal/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (d *Driver) HandleReadCommands(deviceName string, protocols map[string]mode
}

func (d *Driver) handleReadCommandRequest(req sdkModel.CommandRequest, topic string) (*sdkModel.CommandValue, error) {
var result = &sdkModel.CommandValue{}
var result *sdkModel.CommandValue
var err error
var qos = byte(0)
var retained = false
Expand All @@ -125,20 +125,8 @@ func (d *Driver) handleReadCommandRequest(req sdkModel.CommandRequest, topic str
var cmd = req.DeviceResourceName
var payload []byte

if d.serviceConfig.MQTTBrokerInfo.UseTopicLevels {
topic = fmt.Sprintf("%s/%s/%s/%s", topic, cmd, method, cmdUuid)
// will publish empty payload
} else {
data := make(map[string]interface{})
data["uuid"] = cmdUuid
data["method"] = method
data["cmd"] = cmd

payload, err = json.Marshal(data)
if err != nil {
return result, err
}
}
topic = fmt.Sprintf("%s/%s/%s/%s", topic, cmd, method, cmdUuid)
// will publish empty payload

driver.mqttClient.Publish(topic, qos, retained, payload)

Expand Down Expand Up @@ -204,15 +192,9 @@ func (d *Driver) handleWriteCommandRequest(req sdkModel.CommandRequest, topic st
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
if d.serviceConfig.MQTTBrokerInfo.UseTopicLevels {
topic = fmt.Sprintf("%s/%s/%s/%s", topic, cmd, method, cmdUuid)
data[cmd] = commandValue
} else {
data["uuid"] = cmdUuid
data["method"] = method
data["cmd"] = cmd
data[cmd] = commandValue
}

topic = fmt.Sprintf("%s/%s/%s/%s", topic, cmd, method, cmdUuid)
data[cmd] = commandValue

payload, err = json.Marshal(data)
if err != nil {
Expand Down
51 changes: 11 additions & 40 deletions internal/driver/incominglistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
package driver

import (
"encoding/json"
"fmt"
"strings"

mqtt "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -25,45 +23,18 @@ func (d *Driver) onIncomingDataReceived(client mqtt.Client, message mqtt.Message
var resourceName string
var reading interface{}

if d.serviceConfig.MQTTBrokerInfo.UseTopicLevels {
incomingTopic := message.Topic()
subscribedTopic := d.serviceConfig.MQTTBrokerInfo.IncomingTopic
subscribedTopic = strings.Replace(subscribedTopic, "#", "", -1)
incomingTopic = strings.Replace(incomingTopic, subscribedTopic, "", -1)
metaData := strings.Split(incomingTopic, "/")
if len(metaData) != 2 {
driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, incoming topic data should have format .../<device_name>/<resource_name>: `%s`", incomingTopic)
return
}
deviceName = metaData[0]
resourceName = metaData[1]
reading = string(message.Payload())
} else {
var data map[string]interface{}
err := json.Unmarshal(message.Payload(), &data)
if err != nil {
driver.Logger.Errorf("Error unmarshaling payload: %s", err)
return
}
nameVal, ok := data[name]
if !ok {
driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, reading data `%v` should contain the field `%s` to indicate the device name", data, name)
return
}
cmdVal, ok := data[cmd]
if !ok {
driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, reading data `%v` should contain the field `%s` to indicate the device resource name", data, cmd)
return
}
deviceName = fmt.Sprintf("%s", nameVal)
resourceName = fmt.Sprintf("%s", cmdVal)

reading, ok = data[resourceName]
if !ok {
driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, reading data `%v` should contain the field `%s` with the actual reading value", data, resourceName)
return
}
incomingTopic := message.Topic()
subscribedTopic := d.serviceConfig.MQTTBrokerInfo.IncomingTopic
subscribedTopic = strings.Replace(subscribedTopic, "#", "", -1)
incomingTopic = strings.Replace(incomingTopic, subscribedTopic, "", -1)
metaData := strings.Split(incomingTopic, "/")
if len(metaData) != 2 {
driver.Logger.Errorf("[Incoming listener] Incoming reading ignored, incoming topic data should have format .../<device_name>/<resource_name>: `%s`", incomingTopic)
return
}
deviceName = metaData[0]
resourceName = metaData[1]
reading = string(message.Payload())

deviceObject, ok := d.sdk.DeviceResource(deviceName, resourceName)
if !ok {
Expand Down
30 changes: 7 additions & 23 deletions internal/driver/responselistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package driver

import (
"encoding/json"
"strings"

mqtt "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -16,31 +15,16 @@ import (
func (d *Driver) onCommandResponseReceived(client mqtt.Client, message mqtt.Message) {
var uuid string

if d.serviceConfig.MQTTBrokerInfo.UseTopicLevels {
topic := message.Topic()
metaData := strings.Split(topic, "/")
topic := message.Topic()
metaData := strings.Split(topic, "/")

if len(metaData) == 0 {
driver.Logger.Errorf("[Response listener] Command response ignored. metaData in the message is not sufficient to retrieve UUID: topic=%v msg=%v", message.Topic(), metaData)
return
} else {
uuid = metaData[len(metaData)-1]
}
if len(metaData) == 0 {
driver.Logger.Errorf("[Response listener] Command response ignored. metaData in the message is not sufficient to retrieve UUID: topic=%v msg=%v", message.Topic(), metaData)
return
} else {
var response map[string]interface{}
var ok bool

err := json.Unmarshal(message.Payload(), &response)
if err != nil {
driver.Logger.Errorf("Error unmarshaling payload: %s", err)
return
}
uuid, ok = response["uuid"].(string)
if !ok {
driver.Logger.Errorf("[Response listener] Command response ignored. No UUID found in the message: topic=%v msg=%v", message.Topic(), string(message.Payload()))
return
}
uuid = metaData[len(metaData)-1]
}

driver.CommandResponses.Store(uuid, string(message.Payload()))
driver.Logger.Debugf("[Response listener] Command response received: topic=%v uuid=%v msg=%v", message.Topic(), uuid, string(message.Payload()))
}

0 comments on commit 15e1216

Please sign in to comment.