Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sparkpluggw key value engine #6

Merged
merged 12 commits into from
Apr 8, 2020
127 changes: 95 additions & 32 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
oslog "log"
"os"
"strings"
"sync"
"time"

Expand All @@ -30,7 +31,7 @@ const (
SPReincarnationSuccess string = "sp_reincarnation_success_count"
SPReincarnationDelay string = "sp_reincarnation_delayed_count"

NewMetricString string = "Creating new SP metric %s\n"
NewMetricString string = "Creating new SP metric %s\n"

SPReincarnateTimer uint32 = 900
SPReincarnateRetry uint32 = 60
Expand Down Expand Up @@ -58,13 +59,18 @@ const (
PBPropertySetList uint32 = 21
)

type prometheusmetric struct {
prommetric *prometheus.GaugeVec
promlabel []string
}

type spplugExporter struct {
client mqtt.Client
versionDesc *prometheus.Desc
connectDesc *prometheus.Desc

// Holds the mertrics collected
metrics map[string]*prometheus.GaugeVec
metrics map[string][]prometheusmetric
counterMetrics map[string]*prometheus.CounterVec
}

Expand Down Expand Up @@ -109,7 +115,7 @@ func initSparkPlugExporter(e **spplugExporter) {
prometheus.BuildFQName(progname, "mqtt", "connected"),
"Is the exporter connected to mqtt broker", nil, nil),
}

(*e).client = mqtt.NewClient(options)

log.Debugf("Initializing Exporter Metrics and Data\n")
Expand All @@ -131,8 +137,11 @@ func (e *spplugExporter) Describe(ch chan<- *prometheus.Desc) {
for _, m := range e.counterMetrics {
m.Describe(ch)
}
for _, m := range e.metrics {
m.Describe(ch)
for k := range e.metrics {
for _, ma := range e.metrics[k] {
ma.prommetric.Describe(ch)
}

}
}

Expand Down Expand Up @@ -160,16 +169,19 @@ func (e *spplugExporter) Collect(ch chan<- prometheus.Metric) {
m.Collect(ch)
}

for _, m := range e.metrics {
m.Collect(ch)
for k := range e.metrics {
for _, ma := range e.metrics[k] {
ma.prommetric.Collect(ch)
}

}
}

func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
return func(c mqtt.Client, m mqtt.Message) {
mutex.Lock()
defer mutex.Unlock()

var newMetric prometheusmetric
var pbMsg pb.Payload
var eventString string

Expand All @@ -180,7 +192,7 @@ func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
}

topic := m.Topic()
log.Infof("Received message: %s\n", topic)
log.Debugf("Received message: %s\n", topic)
log.Debugf("%s\n", pbMsg.String())

// Get the labels and value for the labels from the topic and constants
Expand All @@ -195,45 +207,95 @@ func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
siteLabelValues["sp_group_id"],
siteLabelValues["sp_edge_node_id"])

// Sparkplug messages contain multiple metrics within them
// traverse them and process them

metricList := pbMsg.GetMetrics()
log.Debugf("Received message in processMetric: %s\n", metricList)

for _, metric := range metricList {
metricName, err := getMetricName(metric)

if err != nil {
metricLabels := siteLabels
metricLabelValues := cloneLabelSet(siteLabelValues)

newLabelname, metricName, err := getMetricName(metric)

if newLabelname != nil {
for list := 0; list < len(newLabelname); list++ {
parts := strings.Split(newLabelname[list], ":")

metricLabels = append(metricLabels, parts[0])
metricLabelValues[parts[0]] = string(parts[1])
}
}

if err != nil {
if metricName != "Device Control/Rebirth" {
log.Errorf("Error: %s %s %v \n", siteLabelValues["sp_edge_node_id"], metricName, err)
log.Errorf("Error: %s %s %v \n", siteLabelValues["sp_edge_node_id"], metricName, err)
e.counterMetrics[SPPushInvalidMetric].With(siteLabelValues).Inc()
}

continue
}

if _, ok := e.metrics[metricName]; !ok {
// if metricName is not within the e.metrics OR
// if metricLabels (note you will need a function to compare maps) is not within e.metrics[metricName], then you need to create a new metric
_, metricNameExists := e.metrics[metricName]
var labelIndex int
var labelSetExists bool

eventString = "Creating metric"
// If the metric name exists in the map, that means that we
// have seen this metric before. However due to the customized
// label support, it's possible that we can use an existing metric
// or need to create a new one.

if metricNameExists {
// Each entry under a metricName contains a set of label names
// and a pointer to the metric. This is what makes a time
// series unique, so if the label names we received are not
// contained in the list, we need to create a new entry

labelSetExists, labelIndex = compareLabelSet(e.metrics[metricName],
metricLabels)

// If the labels are not there, we create a new metric.
// If they are we update an existing metric
if !labelSetExists {

eventString = "Creating new timeseries for existing metric"
newMetric.promlabel = append(newMetric.promlabel,
metricLabels...)

newMetric.prommetric = createNewMetric(metricName,
metricLabels)

labelIndex = len(e.metrics[metricName])

e.metrics[metricName] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: metricName,
Help: "Metric pushed via MQTT",
},
siteLabels,
)
e.metrics[metricName] = append(e.metrics[metricName],
newMetric)
} else {
eventString = "Updating metric"
e.metrics[metricName][labelIndex].promlabel = metricLabels
}
} else {
eventString = "Updating metric"
eventString = "Creating metric"
newMetric.promlabel = append(newMetric.promlabel,
metricLabels...)
newMetric.prommetric = createNewMetric(metricName,
metricLabels)
labelIndex = 0
e.metrics[metricName] = append(e.metrics[metricName],
newMetric)
}

if metricVal, err := convertMetricToFloat(metric); err != nil {
log.Debugf("Error %v converting data type for metric %s\n",
err, metricName)
} else {
log.Debugf("%s %s : %g\n", eventString, metricName, metricVal)
e.metrics[metricName].With(siteLabelValues).Set(metricVal)
e.metrics[SPLastTimePushedMetric].With(siteLabelValues).SetToCurrentTime()
log.Infof("%s: name (%s) value (%g) labels: (%s)\n",
eventString, metricName, metricVal, metricLabelValues)

log.Debugf("metriclabels: (%s) siteLabelValues: (%s)\n",
metricLabels, siteLabelValues)

e.metrics[metricName][labelIndex].prommetric.With(metricLabelValues).Set(metricVal)
e.metrics[SPLastTimePushedMetric][0].prommetric.With(siteLabelValues).SetToCurrentTime()
e.counterMetrics[SPPushTotalMetric].With(siteLabelValues).Inc()
}
}
Expand Down Expand Up @@ -309,7 +371,7 @@ func (e *spplugExporter) reincarnate(namespace string, group string,

func (e *spplugExporter) initializeMetricsAndData() {

e.metrics = make(map[string]*prometheus.GaugeVec)
e.metrics = make(map[string][]prometheusmetric)
e.counterMetrics = make(map[string]*prometheus.CounterVec)

edgeNodeList = make(map[string]bool)
Expand All @@ -329,14 +391,15 @@ func (e *spplugExporter) initializeMetricsAndData() {
)

log.Debugf(NewMetricString, SPLastTimePushedMetric)

e.metrics[SPLastTimePushedMetric] = prometheus.NewGaugeVec(
var test prometheusmetric
test.prommetric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: SPLastTimePushedMetric,
Help: fmt.Sprintf("Last time a metric was pushed to a MQTT topic"),
},
siteLabels,
)
e.metrics[SPLastTimePushedMetric] = append(e.metrics[SPLastTimePushedMetric], test)

log.Debugf(NewMetricString, SPPushInvalidMetric)

Expand Down
Loading