Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sparkpluggw key value engine #6

Merged
merged 12 commits into from
Apr 8, 2020
124 changes: 92 additions & 32 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
oslog "log"
"os"
"strings"
"sync"
"time"

Expand All @@ -30,7 +31,7 @@ const (
SPReincarnationSuccess string = "sp_reincarnation_success_count"
SPReincarnationDelay string = "sp_reincarnation_delayed_count"

NewMetricString string = "Creating new SP metric %s\n"
NewMetricString string = "Creating new SP metric %s\n"

SPReincarnateTimer uint32 = 900
SPReincarnateRetry uint32 = 60
Expand Down Expand Up @@ -58,13 +59,18 @@ const (
PBPropertySetList uint32 = 21
)

type prometheusmetric struct {
prommetric *prometheus.GaugeVec
promlabel []string
}

type spplugExporter struct {
client mqtt.Client
versionDesc *prometheus.Desc
connectDesc *prometheus.Desc

// Holds the mertrics collected
metrics map[string]*prometheus.GaugeVec
metrics map[string][]prometheusmetric
counterMetrics map[string]*prometheus.CounterVec
}

Expand Down Expand Up @@ -109,7 +115,7 @@ func initSparkPlugExporter(e **spplugExporter) {
prometheus.BuildFQName(progname, "mqtt", "connected"),
"Is the exporter connected to mqtt broker", nil, nil),
}

(*e).client = mqtt.NewClient(options)

log.Debugf("Initializing Exporter Metrics and Data\n")
Expand All @@ -131,8 +137,11 @@ func (e *spplugExporter) Describe(ch chan<- *prometheus.Desc) {
for _, m := range e.counterMetrics {
m.Describe(ch)
}
for _, m := range e.metrics {
m.Describe(ch)
for k := range e.metrics {
for _, ma := range e.metrics[k] {
ma.prommetric.Describe(ch)
}

}
}

Expand Down Expand Up @@ -160,16 +169,19 @@ func (e *spplugExporter) Collect(ch chan<- prometheus.Metric) {
m.Collect(ch)
}

for _, m := range e.metrics {
m.Collect(ch)
for k := range e.metrics {
for _, ma := range e.metrics[k] {
ma.prommetric.Collect(ch)
}

}
}

func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
return func(c mqtt.Client, m mqtt.Message) {
mutex.Lock()
defer mutex.Unlock()

var newMetric prometheusmetric
var pbMsg pb.Payload
var eventString string

Expand All @@ -180,7 +192,7 @@ func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
}

topic := m.Topic()
log.Infof("Received message: %s\n", topic)
log.Debugf("Received message: %s\n", topic)
log.Debugf("%s\n", pbMsg.String())

// Get the labels and value for the labels from the topic and constants
Expand All @@ -195,45 +207,92 @@ func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
siteLabelValues["sp_group_id"],
siteLabelValues["sp_edge_node_id"])

// Sparkplug messages contain multiple metrics within them
// traverse them and process them

metricList := pbMsg.GetMetrics()
log.Debugf("Received message in processMetric: %s\n", metricList)

for _, metric := range metricList {
metricName, err := getMetricName(metric)

if err != nil {
metricLabels := siteLabels
metricLabelValues := cloneLabelSet(siteLabelValues)

newLabelname, metricName, err := getMetricName(metric)

if newLabelname != nil {
for list := 0; list < len(newLabelname); list++ {
parts := strings.Split(newLabelname[list], ":")

metricLabels = append(metricLabels, parts[0])
metricLabelValues[parts[0]] = string(parts[1])
}
}

if err != nil {
if metricName != "Device Control/Rebirth" {
log.Errorf("Error: %s %s %v \n", siteLabelValues["sp_edge_node_id"], metricName, err)
log.Errorf("Error: %s %s %v \n", siteLabelValues["sp_edge_node_id"], metricName, err)
e.counterMetrics[SPPushInvalidMetric].With(siteLabelValues).Inc()
}

continue
}

if _, ok := e.metrics[metricName]; !ok {
// if metricName is not within the e.metrics OR
// if metricLabels (note you will need a function to compare maps) is not within e.metrics[metricName], then you need to create a new metric
_, metricNameExists := e.metrics[metricName]
var labelIndex int
var labelSetExists bool

// If the metric name exists in the map, that means that we
// have seen this metric before. However due to the customized
// label support, it's possible that we can use an existing metric
// or need to create a new one.

if metricNameExists {
// Each entry under a metricName contains a set of label names
// and a pointer to the metric. This is what makes a time
// series unique, so if the label names we received are not
// contained in the list, we need to create a new entry

labelSetExists, labelIndex = compareLabelSet(e.metrics[metricName],
metricLabels)

// If the labels are not there, we create a new metric.
// If they are we update an existing metric
if !labelSetExists {

eventString = "Creating metric"
eventString = "Creating new timeseries for existing metric name"
newMetric.promlabel = append(newMetric.promlabel,
metricLabels...)

e.metrics[metricName] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: metricName,
Help: "Metric pushed via MQTT",
},
siteLabels,
)
newMetric.prommetric = createNewMetric(metricName,
metricLabels)

labelIndex = len(e.metrics[metricName])

e.metrics[metricName] = append(e.metrics[metricName],
newMetric)
} else {
eventString = "Updating metric"
e.metrics[metricName][labelIndex].promlabel = metricLabels
}
} else {
eventString = "Updating metric"
eventString = "Creating new metric name and timeseries"
newMetric.promlabel = append(newMetric.promlabel,
metricLabels...)
newMetric.prommetric = createNewMetric(metricName,
metricLabels)
labelIndex = 0
e.metrics[metricName] = append(e.metrics[metricName],
newMetric)
}

if metricVal, err := convertMetricToFloat(metric); err != nil {
log.Debugf("Error %v converting data type for metric %s\n",
err, metricName)
} else {
log.Debugf("%s %s : %g\n", eventString, metricName, metricVal)
e.metrics[metricName].With(siteLabelValues).Set(metricVal)
e.metrics[SPLastTimePushedMetric].With(siteLabelValues).SetToCurrentTime()
log.Infof("%s: Metric name(%s) Metric value(%g) Applied labels(%s) Applied labels values(%s) siteLabelValues(%s)\n",
eventString, metricName, metricVal, metricLabels, metricLabelValues, siteLabelValues)

e.metrics[metricName][labelIndex].prommetric.With(metricLabelValues).Set(metricVal)
e.metrics[SPLastTimePushedMetric][0].prommetric.With(siteLabelValues).SetToCurrentTime()
e.counterMetrics[SPPushTotalMetric].With(siteLabelValues).Inc()
}
}
Expand Down Expand Up @@ -309,7 +368,7 @@ func (e *spplugExporter) reincarnate(namespace string, group string,

func (e *spplugExporter) initializeMetricsAndData() {

e.metrics = make(map[string]*prometheus.GaugeVec)
e.metrics = make(map[string][]prometheusmetric)
e.counterMetrics = make(map[string]*prometheus.CounterVec)

edgeNodeList = make(map[string]bool)
Expand All @@ -329,14 +388,15 @@ func (e *spplugExporter) initializeMetricsAndData() {
)

log.Debugf(NewMetricString, SPLastTimePushedMetric)

e.metrics[SPLastTimePushedMetric] = prometheus.NewGaugeVec(
var test prometheusmetric
test.prommetric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: SPLastTimePushedMetric,
Help: fmt.Sprintf("Last time a metric was pushed to a MQTT topic"),
},
siteLabels,
)
e.metrics[SPLastTimePushedMetric] = append(e.metrics[SPLastTimePushedMetric], test)

log.Debugf(NewMetricString, SPPushInvalidMetric)

Expand Down
116 changes: 99 additions & 17 deletions utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
)

// contants for various SP labels and metric names
const (
SPNamespace string = "sp_namespace"
SPGroupID string = "sp_group_id"
SPEdgeNodeID string = "sp_edge_node_id"
SPDeviceID string = "sp_device_id"
SPMQTTTopic string = "sp_mqtt_topic"
SPMQTTServer string = "sp_mqtt_server"
SPNamespace string = "sp_namespace"
SPGroupID string = "sp_group_id"
SPEdgeNodeID string = "sp_edge_node_id"
SPDeviceID string = "sp_device_id"
SPMQTTTopic string = "sp_mqtt_topic"
SPMQTTServer string = "sp_mqtt_server"
)

func sendMQTTMsg(c mqtt.Client, pbMsg *pb.Payload,
Expand All @@ -39,11 +39,77 @@ func sendMQTTMsg(c mqtt.Client, pbMsg *pb.Payload,
return true
}

func cloneLabelSet(labels prometheus.Labels) prometheus.Labels {
newLabels := prometheus.Labels{}

for key, value := range labels {
newLabels[key] = value
}

return newLabels
}

// In order for 2 label sets to match, they have to have the exact same
// number of entries and the exact same entries orthogonal or the order
// that they are stored

func compareLabelSet(metricSet []prometheusmetric,
newLabels []string) (bool, int) {
returnCode := false
returnIndex := 0
tmpIndex := 0
for _, existingMetric := range metricSet {

// Make sure that both label sets have the same number of entries
if len(existingMetric.promlabel) == len(newLabels) {

// Initially we believe all labeles are unverified
// As we verify we decrement, if we end up with something > 0
// we know the set does not match

mismatchedLabels := len(newLabels)

for _, newLabel := range newLabels {
// Compare the current new label to everything in existing
// label set
for _, existingLabel := range existingMetric.promlabel {
if existingLabel == newLabel {
mismatchedLabels--
break
}
}
}

if mismatchedLabels == 0 {
returnCode = true
returnIndex = tmpIndex
}
}

tmpIndex++
}
return returnCode, returnIndex
}

func createNewMetric(metricName string, metricLabels []string) *prometheus.GaugeVec {
var newMetric prometheusmetric

newMetric.prommetric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: metricName,
Help: "Metric pushed via MQTT",
},
metricLabels,
)
return newMetric.prommetric
}

func prepareLabelsAndValues(topic string) ([]string, prometheus.Labels, bool) {
var labels []string
t := strings.TrimPrefix(topic, *prefix)
t = strings.TrimPrefix(t, "/")
parts := strings.Split(t, "/")

// 6.1.3 covers 9 message types, only process device data
// Sparkplug puts 5 key namespacing elements in the topic name
// these are being parsed and will be added as metric labels
Expand All @@ -60,8 +126,9 @@ func prepareLabelsAndValues(topic string) ([]string, prometheus.Labels, bool) {

/* See the sparkplug definition for the topic construction */
/** Set the Prometheus labels to their corresponding topic part **/

var labels = getLabelSet()
if labels == nil {
labels = getLabelSet()
}

labelValues := prometheus.Labels{}

Expand Down Expand Up @@ -100,8 +167,8 @@ func getNodeLabelSetandValues(namespace string, group string,
nodeID string) ([]string, map[string]string) {
labels := getNodeLabelSet()
labelValues := map[string]string{
SPNamespace: namespace,
SPGroupID: group,
SPNamespace: namespace,
SPGroupID: group,
SPEdgeNodeID: nodeID,
}

Expand All @@ -111,19 +178,34 @@ func getNodeLabelSetandValues(namespace string, group string,
func getNodeLabelSet() []string {
return []string{SPNamespace, SPGroupID, SPEdgeNodeID}
}

func getMetricName(metric *pb.Payload_Metric)(string, error) {
// This function acceptys MQTT metric message,
// extracts out the nested folders(if any), add those folder names in Key value labels
// and return label value sets, metrics wrt to those labelvalues and error(if any)
func getMetricName(metric *pb.Payload_Metric) ([]string, string, error) {
var errUnexpectedType error
var labelvalues []string

metricName := metric.GetName()

if strings.Contains(metricName, "/") == true && metricName != "Device Control/Rebirth" {
parts := strings.Split(metricName, "/")
size := len(parts)
metricName = parts[size-1]
for metlen := 0; metlen <= size-2; metlen++ {
labelvalues = append(labelvalues, parts[metlen])

metricName := model.LabelValue(metric.GetName())
}
log.Debugf("Received message for labelvalues: %s\n", labelvalues)
}
metricNameL := model.LabelValue(metricName)

if model.IsValidMetricName(metricName) == true {
if model.IsValidMetricName(metricNameL) == true {
errUnexpectedType = nil
} else {
errUnexpectedType = errors.New("Non-compliant metric name")
}

return string(metricName), errUnexpectedType
return []string(labelvalues), string(metricNameL), errUnexpectedType
}

func convertMetricToFloat(metric *pb.Payload_Metric) (float64, error) {
Expand Down