diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index bd36c87..1f15278 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -71,12 +71,13 @@ func (c *client) IsConnected() bool { return c.client.IsConnectionOpen() } -func (c *client) HandleMessage(_ paho.Client, msg paho.Message) { +func (c *client) HandleMessage(topic string, payload []byte) { message := Message{ Timestamp: time.Now(), - Value: msg.Payload(), + Value: payload, } - c.topics.AddMessage(msg.Topic(), message) + + c.topics.AddMessage(topic, message) } func (c *client) GetTopic(reqPath string) (*Topic, bool) { @@ -104,9 +105,16 @@ func (c *client) Subscribe(reqPath string) *Topic { return t } - log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", t.Path) - if token := c.client.Subscribe(t.Path, 0, c.HandleMessage); token.Wait() && token.Error() != nil { - log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", t.Path, "error", token.Error()) + log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topicPath) + + topic := resolveTopic(t.Path) + + if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) { + // by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic + // and don't need to regex it against + and #. + c.HandleMessage(topicPath, []byte(m.Payload())) + }); token.Wait() && token.Error() != nil { + log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topicPath, "error", token.Error()) } c.topics.Store(t) return t @@ -126,7 +134,9 @@ func (c *client) Unsubscribe(reqPath string) { } log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path) - if token := c.client.Unsubscribe(t.Path); token.Wait() && token.Error() != nil { + + topic := resolveTopic(t.Path) + if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil { log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error()) } } diff --git a/pkg/mqtt/topic.go b/pkg/mqtt/topic.go index a254454..8e7db7f 100644 --- a/pkg/mqtt/topic.go +++ b/pkg/mqtt/topic.go @@ -2,6 +2,7 @@ package mqtt import ( "path" + "strings" "sync" "time" @@ -97,3 +98,10 @@ func (tm *TopicMap) Store(t *Topic) { func (tm *TopicMap) Delete(key string) { tm.Map.Delete(key) } + +// replace all __PLUS__ with + and one __HASH__ with # +// Question: Why does grafana not allow + and # in query? +func resolveTopic(topic string) string { + resolvedTopic := strings.ReplaceAll(topic, "__PLUS__", "+") + return strings.Replace(resolvedTopic, "__HASH__", "#", -1) +} diff --git a/src/datasource.ts b/src/datasource.ts index 072fa78..b5d0b65 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -1,9 +1,21 @@ -import { DataSourceInstanceSettings } from '@grafana/data'; -import { DataSourceWithBackend } from '@grafana/runtime'; +import { DataSourceInstanceSettings, ScopedVars } from '@grafana/data'; +import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime'; import { MqttDataSourceOptions, MqttQuery } from './types'; export class DataSource extends DataSourceWithBackend { constructor(instanceSettings: DataSourceInstanceSettings) { super(instanceSettings); } + + applyTemplateVariables(query: MqttQuery, scopedVars: ScopedVars): Record { + let resolvedTopic = getTemplateSrv().replace(query.topic, scopedVars); + resolvedTopic = resolvedTopic.replace(/\+/gi, '__PLUS__'); + resolvedTopic = resolvedTopic.replace(/\#/gi, '__HASH__'); + const resolvedQuery: MqttQuery = { + ...query, + topic: resolvedTopic, + }; + + return resolvedQuery; + } }