Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to publish data via different query #93

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 68 additions & 7 deletions pkg/mqtt/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mqtt

import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"path"
@@ -15,6 +17,7 @@ type Client interface {
GetTopic(string) (*Topic, bool)
IsConnected() bool
Subscribe(string) *Topic
Publish(string, map[string]any, string) (json.RawMessage, error)
Unsubscribe(string)
Dispose()
}
@@ -71,12 +74,13 @@ func (c *client) IsConnected() bool {
return c.client.IsConnectionOpen()
}

func (c *client) HandleMessage(_ paho.Client, msg paho.Message) {
func (c *client) HandleMessage(topic string, payload []byte) {
message := Message{
Timestamp: time.Now(),
Value: msg.Payload(),
Value: payload,
}
c.topics.AddMessage(msg.Topic(), message)

c.topics.AddMessage(topic, message)
}

func (c *client) GetTopic(reqPath string) (*Topic, bool) {
@@ -104,9 +108,16 @@ func (c *client) Subscribe(reqPath string) *Topic {
return t
}

log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", t.Path)
if token := c.client.Subscribe(t.Path, 0, c.HandleMessage); token.Wait() && token.Error() != nil {
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", t.Path, "error", token.Error())
log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topicPath)

topic := resolveTopic(t.Path)

if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) {
// by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic
// and don't need to regex it against + and #.
c.HandleMessage(topicPath, []byte(m.Payload()))
}); token.Wait() && token.Error() != nil {
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topicPath, "error", token.Error())
}
c.topics.Store(t)
return t
@@ -126,11 +137,61 @@ func (c *client) Unsubscribe(reqPath string) {
}

log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path)
if token := c.client.Unsubscribe(t.Path); token.Wait() && token.Error() != nil {

topic := resolveTopic(t.Path)
if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error())
}
}

func (c *client) Publish(topic string, payload map[string]any, responseTopic string) (json.RawMessage, error) {
var response json.RawMessage
var err error
done := make(chan struct{}, 1)

if responseTopic != "" {
tokenSub := c.client.Subscribe(responseTopic, 2, func(c paho.Client, m paho.Message) {
response = m.Payload()
done <- struct{}{}
})

if !tokenSub.WaitTimeout(time.Second) && tokenSub.Error() != nil {
err = errors.Join(err, tokenSub.Error())
return response, err
}

defer c.client.Unsubscribe(responseTopic)
} else {
done <- struct{}{}
}

data, errMarshal := json.Marshal(&payload)
if errMarshal != nil {
err = errors.Join(err, errMarshal)
return response, err
}

token := c.client.Publish(topic, 2, false, data)

if token.Error() != nil {
err = errors.Join(err, token.Error())
return response, err
}

if !token.WaitTimeout(time.Second) {
err = errors.Join(err, errors.New("publish timeout"))
return response, err
}

select {
case <-done:
case <-time.After(time.Second):
err = errors.Join(err, errors.New("subscribe timeout"))
}

return response, err
}

func (c *client) Dispose() {
log.DefaultLogger.Info("MQTT Disconnecting")
c.client.Disconnect(250)
18 changes: 14 additions & 4 deletions pkg/mqtt/topic.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package mqtt

import (
"path"
"strings"
"sync"
"time"

@@ -15,10 +16,12 @@ type Message struct {

// Topic represents a MQTT topic.
type Topic struct {
Path string `json:"topic"`
Interval time.Duration
Messages []Message
framer *framer
Path string `json:"topic"`
Payload map[string]any `json:"payload,omitempty"`
ResponsePath string `json:"response,omitempty"`
Interval time.Duration
Messages []Message
framer *framer
}

// Key returns the key for the topic.
@@ -97,3 +100,10 @@ func (tm *TopicMap) Store(t *Topic) {
func (tm *TopicMap) Delete(key string) {
tm.Map.Delete(key)
}

// replace all __PLUS__ with + and one __HASH__ with #
// Question: Why does grafana not allow + and # in query?
func resolveTopic(topic string) string {
resolvedTopic := strings.ReplaceAll(topic, "__PLUS__", "+")
return strings.Replace(resolvedTopic, "__HASH__", "#", -1)
}
8 changes: 6 additions & 2 deletions pkg/plugin/datasource_test.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package plugin_test

import (
"context"
"encoding/json"
"testing"

"github.com/grafana/grafana-plugin-sdk-go/backend"
@@ -60,5 +61,8 @@ func (c *fakeMQTTClient) IsSubscribed(_ string) bool {
}

func (c *fakeMQTTClient) Subscribe(_ string) *mqtt.Topic { return nil }
func (c *fakeMQTTClient) Unsubscribe(_ string) {}
func (c *fakeMQTTClient) Dispose() {}
func (c *fakeMQTTClient) Publish(string, map[string]any, string) (json.RawMessage, error) {
return json.RawMessage{}, nil
}
func (c *fakeMQTTClient) Unsubscribe(_ string) {}
func (c *fakeMQTTClient) Dispose() {}
23 changes: 17 additions & 6 deletions pkg/plugin/query.go
Original file line number Diff line number Diff line change
@@ -38,13 +38,24 @@ func (ds *MQTTDatasource) query(query backend.DataQuery) backend.DataResponse {
return response
}

t.Interval = query.Interval
// Subscribe
if len(t.Payload) == 0 {
t.Interval = query.Interval

frame := data.NewFrame("")
frame.SetMeta(&data.FrameMeta{
Channel: path.Join(ds.channelPrefix, t.Key()),
})
frame := data.NewFrame("")
frame.SetMeta(&data.FrameMeta{
Channel: path.Join(ds.channelPrefix, t.Key()),
})

response.Frames = append(response.Frames, frame)
response.Frames = append(response.Frames, frame)
return response
}

// Publish
resp, err := ds.Client.Publish(t.Path, t.Payload, t.ResponsePath)

field := data.NewField("Body", data.Labels{}, []json.RawMessage{resp})
response.Frames = append(response.Frames, data.NewFrame("Response", field))
response.Error = err
return response
}
16 changes: 14 additions & 2 deletions src/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
import { DataSourceInstanceSettings } from '@grafana/data';
import { DataSourceWithBackend } from '@grafana/runtime';
import { DataSourceInstanceSettings, ScopedVars } from '@grafana/data';
import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime';
import { MqttDataSourceOptions, MqttQuery } from './types';

export class DataSource extends DataSourceWithBackend<MqttQuery, MqttDataSourceOptions> {
constructor(instanceSettings: DataSourceInstanceSettings<MqttDataSourceOptions>) {
super(instanceSettings);
}

applyTemplateVariables(query: MqttQuery, scopedVars: ScopedVars): Record<string, any> {
let resolvedTopic = getTemplateSrv().replace(query.topic, scopedVars);
resolvedTopic = resolvedTopic.replace(/\+/gi, '__PLUS__');
resolvedTopic = resolvedTopic.replace(/\#/gi, '__HASH__');
const resolvedQuery: MqttQuery = {
...query,
topic: resolvedTopic,
};

return resolvedQuery;
}
}