Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sparkpluggw key value engine #6

Merged
merged 12 commits into from
Apr 8, 2020
132 changes: 100 additions & 32 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"sync"
"time"
"strings"
"reflect"

pb "github.com/IHI-Energy-Storage/sparkpluggw/Sparkplug"
mqtt "github.com/eclipse/paho.mqtt.golang"
Expand Down Expand Up @@ -58,13 +60,18 @@ const (
PBPropertySetList uint32 = 21
)

type prometheusmetric struct {
prommetric *prometheus.GaugeVec
promlabel []string
}

type spplugExporter struct {
client mqtt.Client
versionDesc *prometheus.Desc
connectDesc *prometheus.Desc

// Holds the mertrics collected
metrics map[string]*prometheus.GaugeVec
metrics map[string] []prometheusmetric
counterMetrics map[string]*prometheus.CounterVec
}

Expand Down Expand Up @@ -109,7 +116,7 @@ func initSparkPlugExporter(e **spplugExporter) {
prometheus.BuildFQName(progname, "mqtt", "connected"),
"Is the exporter connected to mqtt broker", nil, nil),
}

(*e).client = mqtt.NewClient(options)

log.Debugf("Initializing Exporter Metrics and Data\n")
Expand All @@ -131,8 +138,11 @@ func (e *spplugExporter) Describe(ch chan<- *prometheus.Desc) {
for _, m := range e.counterMetrics {
m.Describe(ch)
}
for _, m := range e.metrics {
m.Describe(ch)
for k := range e.metrics{
chaets marked this conversation as resolved.
Show resolved Hide resolved
for _, ma:= range e.metrics[k]{
ma.prommetric.Describe(ch)
}

}
}

Expand Down Expand Up @@ -160,16 +170,19 @@ func (e *spplugExporter) Collect(ch chan<- prometheus.Metric) {
m.Collect(ch)
}

for _, m := range e.metrics {
m.Collect(ch)
for k := range e.metrics{
chaets marked this conversation as resolved.
Show resolved Hide resolved
for _, ma:= range e.metrics[k]{
ma.prommetric.Collect(ch)
}

}
}

func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
return func(c mqtt.Client, m mqtt.Message) {
mutex.Lock()
defer mutex.Unlock()

var newMetric prometheusmetric
var pbMsg pb.Payload
var eventString string

Expand All @@ -195,13 +208,29 @@ func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
siteLabelValues["sp_group_id"],
siteLabelValues["sp_edge_node_id"])

// Sparkplug messages contain multiple metrics within them
// traverse them and process them

metricList := pbMsg.GetMetrics()
log.Debugf("Received message in processMetric: %s\n", metricList)

for _, metric := range metricList {
metricName, err := getMetricName(metric)

metricLabels := siteLabels
metricLabelValues := cloneLabelSet(siteLabelValues)

log.Debugf("Check the data structures: %v : %v",
chaets marked this conversation as resolved.
Show resolved Hide resolved
reflect.ValueOf(metricLabels), reflect.ValueOf(siteLabels))

newLabelname, metricName, err := getMetricName(metric)

// log.Infof("Received message for metric: %s", metricName, newLabelname)

if newLabelname != nil {
for list := 0; list < len(newLabelname); list++ {
parts := strings.Split(newLabelname[list], ":")

metricLabels = append(metricLabels, parts[0])
metricLabelValues[parts[0]] = string(parts[1])
}
}

if err != nil {
if metricName != "Device Control/Rebirth" {
Expand All @@ -212,28 +241,65 @@ func (e *spplugExporter) receiveMessage() func(mqtt.Client, mqtt.Message) {
continue
}

if _, ok := e.metrics[metricName]; !ok {

eventString = "Creating metric"

e.metrics[metricName] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: metricName,
Help: "Metric pushed via MQTT",
},
siteLabels,
)
} else {
eventString = "Updating metric"
}

// if metricName is not within the e.metrics OR
// if metricLabels (note you will need a function to compare maps) is not within e.metrics[metricName], then you need to create a new metric
_, metricNameExists := e.metrics[metricName]
var labelIndex int
var labelSetExists bool

// If the metric name exists in the map, that means that we
// have seen this metric before. However due to the customized
// label support, it's possible that we can use an existing metric
// or need to create a new one.

if (metricNameExists) {
// Each entry under a metricName contains a set of label names
// and a pointer to the metric. This is what makes a time
// series unique, so if the label names we received are not
// contained in the list, we need to create a new entry

labelSetExists, labelIndex =
chaets marked this conversation as resolved.
Show resolved Hide resolved
compareLabelSet(e.metrics[metricName],
metricLabels)

// If the labels are not there, we create a new metric.
// If they are we update an existing metric
if !labelSetExists {

eventString = "Creating new timeseries for existing metric name"
newMetric.promlabel = append(newMetric.promlabel,
metricLabels...)

newMetric.prommetric = createNewMetric(metricName,
metricLabels)

labelIndex = len(e.metrics[metricName])

e.metrics[metricName] = append(e.metrics[metricName],
newMetric)
}else{
eventString = "Updating metric"
e.metrics[metricName][labelIndex].promlabel = metricLabels
}
} else {
eventString = "Creating new metric name and timeseries"
newMetric.promlabel = append(newMetric.promlabel,
metricLabels...)
newMetric.prommetric = createNewMetric(metricName,
metricLabels)
labelIndex = 0
e.metrics[metricName] = append(e.metrics[metricName],
newMetric)
}
if metricVal, err := convertMetricToFloat(metric); err != nil {
log.Debugf("Error %v converting data type for metric %s\n",
err, metricName)
} else {
log.Debugf("%s %s : %g\n", eventString, metricName, metricVal)
e.metrics[metricName].With(siteLabelValues).Set(metricVal)
e.metrics[SPLastTimePushedMetric].With(siteLabelValues).SetToCurrentTime()
log.Debugf("%s: name(%s) value(%g) labels(%s) values(%s) siteLabelValues(%s)\n",
chaets marked this conversation as resolved.
Show resolved Hide resolved
eventString, metricName, metricVal, metricLabels, metricLabelValues,siteLabelValues)

e.metrics[metricName][labelIndex].prommetric.With(metricLabelValues).Set(metricVal)
e.metrics[SPLastTimePushedMetric][0].prommetric.With(siteLabelValues).SetToCurrentTime()
e.counterMetrics[SPPushTotalMetric].With(siteLabelValues).Inc()
}
}
Expand Down Expand Up @@ -309,7 +375,8 @@ func (e *spplugExporter) reincarnate(namespace string, group string,

func (e *spplugExporter) initializeMetricsAndData() {

e.metrics = make(map[string]*prometheus.GaugeVec)
// e.metrics = make(map[string]*prometheus.GaugeVec)
chaets marked this conversation as resolved.
Show resolved Hide resolved
e.metrics = make(map[string] []prometheusmetric)
e.counterMetrics = make(map[string]*prometheus.CounterVec)

edgeNodeList = make(map[string]bool)
Expand All @@ -329,14 +396,15 @@ func (e *spplugExporter) initializeMetricsAndData() {
)

log.Debugf(NewMetricString, SPLastTimePushedMetric)

e.metrics[SPLastTimePushedMetric] = prometheus.NewGaugeVec(
var test prometheusmetric
test.prommetric= prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: SPLastTimePushedMetric,
Help: fmt.Sprintf("Last time a metric was pushed to a MQTT topic"),
},
siteLabels,
)
e.metrics[SPLastTimePushedMetric] = append(e.metrics[SPLastTimePushedMetric], test)

log.Debugf(NewMetricString, SPPushInvalidMetric)

Expand Down
99 changes: 92 additions & 7 deletions utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"strings"


pb "github.com/IHI-Energy-Storage/sparkpluggw/Sparkplug"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/golang/protobuf/proto"
Expand All @@ -18,6 +19,7 @@ const (
SPGroupID string = "sp_group_id"
SPEdgeNodeID string = "sp_edge_node_id"
SPDeviceID string = "sp_device_id"
SPkeyID string = "sp_key_id"
chaets marked this conversation as resolved.
Show resolved Hide resolved
SPMQTTTopic string = "sp_mqtt_topic"
SPMQTTServer string = "sp_mqtt_server"
)
Expand All @@ -39,10 +41,79 @@ func sendMQTTMsg(c mqtt.Client, pbMsg *pb.Payload,
return true
}

func cloneLabelSet(labels prometheus.Labels) (prometheus.Labels) {
newLabels := prometheus.Labels{}

for key, value := range labels {
newLabels[key] = value
}

return newLabels
}

// In order for 2 label sets to match, they have to have the exact same
// number of entries and the exact same entries orthogonal or the order
// that they are stored

func compareLabelSet(metricSet []prometheusmetric,
newLabels []string) (bool, int) {
chaets marked this conversation as resolved.
Show resolved Hide resolved
returnCode := false
returnIndex := 0
tmpIndex := 0
for _, existingMetric := range metricSet {

// Make sure that both label sets have the same number of entries
if len(existingMetric.promlabel) == len(newLabels) {

// Initially we believe all labeles are unverified
// As we verify we decrement, if we end up with something > 0
// we know the set does not match

mismatchedLabels := len(newLabels)

for _, newLabel := range newLabels {
// Compare the current new label to everything in existing
// label set
for _, existingLabel := range existingMetric.promlabel{
if existingLabel == newLabel {
mismatchedLabels--
break
}
}
}

log.Debugf("mismatchedLabels: %d tmpIndex: %d\n",mismatchedLabels, tmpIndex)
chaets marked this conversation as resolved.
Show resolved Hide resolved
if mismatchedLabels == 0 {
returnCode = true
returnIndex = tmpIndex
}
}

tmpIndex++
}
return returnCode, returnIndex
}


func createNewMetric(metricName string, metricLabels []string)(*prometheus.GaugeVec) {
var newMetric prometheusmetric

newMetric.prommetric = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: metricName,
Help: "Metric pushed via MQTT",
},
metricLabels,
)
return newMetric.prommetric
}

func prepareLabelsAndValues(topic string) ([]string, prometheus.Labels, bool) {
var labels []string
t := strings.TrimPrefix(topic, *prefix)
t = strings.TrimPrefix(t, "/")
parts := strings.Split(t, "/")
log.Debugf("first test to check the lengh for Metricx: %s: %d\n", parts, len(parts))
chaets marked this conversation as resolved.
Show resolved Hide resolved

// 6.1.3 covers 9 message types, only process device data
// Sparkplug puts 5 key namespacing elements in the topic name
Expand All @@ -60,11 +131,12 @@ func prepareLabelsAndValues(topic string) ([]string, prometheus.Labels, bool) {

/* See the sparkplug definition for the topic construction */
/** Set the Prometheus labels to their corresponding topic part **/

var labels = getLabelSet()
if labels == nil {
labels = getLabelSet()
}

labelValues := prometheus.Labels{}

log.Debugf("prometheus.Labels{}: %s\n", prometheus.Labels{})
chaets marked this conversation as resolved.
Show resolved Hide resolved
// Labels are created from the topic parsing above and compared against
// the set of labels for this metric. If this is a unique set then it will
// be stored and the metric will be treated as unique and new. If the
Expand Down Expand Up @@ -112,18 +184,31 @@ func getNodeLabelSet() []string {
return []string{SPNamespace, SPGroupID, SPEdgeNodeID}
}

func getMetricName(metric *pb.Payload_Metric)(string, error) {
func getMetricName(metric *pb.Payload_Metric)([]string, string, error) {
chaets marked this conversation as resolved.
Show resolved Hide resolved
var errUnexpectedType error
var labelvalues []string

metricName := metric.GetName()

if strings.Contains(metricName, "/") == true && metricName != "Device Control/Rebirth"{
parts := strings.Split(metricName, "/")
size:= len(parts)
metricName = parts[size-1]
for metlen := 0; metlen <= size-2; metlen++ {
labelvalues = append(labelvalues, parts[metlen])

metricName := model.LabelValue(metric.GetName())
}
log.Debugf("Received message for labelvalues: %s\n", labelvalues)
}
metricNameL := model.LabelValue(metricName)

if model.IsValidMetricName(metricName) == true {
if model.IsValidMetricName(metricNameL) == true {
errUnexpectedType = nil
} else {
errUnexpectedType = errors.New("Non-compliant metric name")
}

return string(metricName), errUnexpectedType
return []string(labelvalues), string(metricNameL), errUnexpectedType
}

func convertMetricToFloat(metric *pb.Payload_Metric) (float64, error) {
Expand Down