From 952a5661c83e15505e2f6dc31e538efd1b6e57db Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Thu, 18 Jan 2024 13:22:48 +0100 Subject: [PATCH 1/3] Add variable support --- src/datasource.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/datasource.ts b/src/datasource.ts index 072fa78..4da9e87 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -1,9 +1,18 @@ -import { DataSourceInstanceSettings } from '@grafana/data'; -import { DataSourceWithBackend } from '@grafana/runtime'; +import { DataSourceInstanceSettings, ScopedVars } from '@grafana/data'; +import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime'; import { MqttDataSourceOptions, MqttQuery } from './types'; export class DataSource extends DataSourceWithBackend { constructor(instanceSettings: DataSourceInstanceSettings) { super(instanceSettings); } + + applyTemplateVariables(query: MqttQuery, scopedVars: ScopedVars): Record { + const resolvedQuery: MqttQuery = { + ...query, + topic: getTemplateSrv().replace(query.topic, scopedVars), + }; + + return resolvedQuery; + } } From 932b03bfc6acc217f907e62e3883d6d315b717f9 Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Wed, 24 Jan 2024 11:38:22 +0100 Subject: [PATCH 2/3] Add support for + and # --- pkg/mqtt/client.go | 24 +++++++++++++++++------- pkg/mqtt/topic.go | 8 ++++++++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index bd36c87..1f15278 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -71,12 +71,13 @@ func (c *client) IsConnected() bool { return c.client.IsConnectionOpen() } -func (c *client) HandleMessage(_ paho.Client, msg paho.Message) { +func (c *client) HandleMessage(topic string, payload []byte) { message := Message{ Timestamp: time.Now(), - Value: msg.Payload(), + Value: payload, } - c.topics.AddMessage(msg.Topic(), message) + + c.topics.AddMessage(topic, message) } func (c *client) GetTopic(reqPath string) (*Topic, bool) { @@ -104,9 +105,16 @@ func (c *client) Subscribe(reqPath string) *Topic { return t } - log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", t.Path) - if token := c.client.Subscribe(t.Path, 0, c.HandleMessage); token.Wait() && token.Error() != nil { - log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", t.Path, "error", token.Error()) + log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topicPath) + + topic := resolveTopic(t.Path) + + if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) { + // by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic + // and don't need to regex it against + and #. + c.HandleMessage(topicPath, []byte(m.Payload())) + }); token.Wait() && token.Error() != nil { + log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topicPath, "error", token.Error()) } c.topics.Store(t) return t @@ -126,7 +134,9 @@ func (c *client) Unsubscribe(reqPath string) { } log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path) - if token := c.client.Unsubscribe(t.Path); token.Wait() && token.Error() != nil { + + topic := resolveTopic(t.Path) + if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil { log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error()) } } diff --git a/pkg/mqtt/topic.go b/pkg/mqtt/topic.go index a254454..8e7db7f 100644 --- a/pkg/mqtt/topic.go +++ b/pkg/mqtt/topic.go @@ -2,6 +2,7 @@ package mqtt import ( "path" + "strings" "sync" "time" @@ -97,3 +98,10 @@ func (tm *TopicMap) Store(t *Topic) { func (tm *TopicMap) Delete(key string) { tm.Map.Delete(key) } + +// replace all __PLUS__ with + and one __HASH__ with # +// Question: Why does grafana not allow + and # in query? +func resolveTopic(topic string) string { + resolvedTopic := strings.ReplaceAll(topic, "__PLUS__", "+") + return strings.Replace(resolvedTopic, "__HASH__", "#", -1) +} From cd1728c22473cd2e9914dc2a09a9d3c821e4dc3b Mon Sep 17 00:00:00 2001 From: Niklas Ciecior Date: Wed, 24 Jan 2024 17:27:43 +0100 Subject: [PATCH 3/3] Add support for + and # in the query editor --- src/datasource.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/datasource.ts b/src/datasource.ts index 4da9e87..b5d0b65 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -8,9 +8,12 @@ export class DataSource extends DataSourceWithBackend { + let resolvedTopic = getTemplateSrv().replace(query.topic, scopedVars); + resolvedTopic = resolvedTopic.replace(/\+/gi, '__PLUS__'); + resolvedTopic = resolvedTopic.replace(/\#/gi, '__HASH__'); const resolvedQuery: MqttQuery = { ...query, - topic: getTemplateSrv().replace(query.topic, scopedVars), + topic: resolvedTopic, }; return resolvedQuery;