Skip to content

Commit

Permalink
implement prometheus exporter; fix bug in label hash
Browse files Browse the repository at this point in the history
  • Loading branch information
cha87de committed Jan 31, 2019
1 parent c6f2fab commit 6a95fc1
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 44 deletions.
48 changes: 40 additions & 8 deletions connectors/prometheus/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,58 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"omi-gitlab.e-technik.uni-ulm.de/bwnetflow/kafka/consumer_dashboard/counters"
)

// Connector provides export features to Prometheus
type Connector struct {
Addr string
}

// TODO needs to be adjusted to new counters
// cf. https://godoc.org/github.com/prometheus/client_golang/prometheus#example-Collector

// Initialize Prometheus Exporter, listen on addr with path /metrics and /flowdata
func (connector *Connector) Initialize() {
//prometheus.MustRegister(counters.Msgcount, counters.KafkaOffsets)

flowReg := prometheus.NewRegistry()
//flowReg.MustRegister(counters.FlowNumber, counters.FlowBytes, counters.FlowPackets, counters.HostBytes, counters.HostConnections)
// default flow counters
flowLabels := []string{
"ipversion",
"application",
"protoname",
"direction",
"cid",
"peer",
}
flowNumber := NewFacadeCollector(counters.FlowNumber, flowLabels)
flowBytes := NewFacadeCollector(counters.FlowBytes, flowLabels)
flowPackets := NewFacadeCollector(counters.FlowPackets, flowLabels)

http.Handle("/metrics", promhttp.Handler()) // TODO: this should be enabled regardless of the CLI opt
http.Handle("/flowdata", promhttp.HandlerFor(flowReg, promhttp.HandlerOpts{}))
// top host flow counters
flowHostLabels := []string{
"cid",
"ipSrc",
"ipDst",
"peer",
}
hostBytes := NewFacadeCollector(counters.HostBytes, flowHostLabels)
hostConnections := NewFacadeCollector(counters.HostConnections, flowHostLabels)

flowReg := prometheus.NewPedanticRegistry()
flowReg.MustRegister(flowNumber, flowBytes, flowPackets, hostBytes, hostConnections)

// kafka meta counters
metaLabels := []string{
"topic",
"partition",
}
msgcount := NewFacadeCollector(counters.Msgcount, []string{})
KafkaOffsets := NewFacadeCollector(counters.KafkaOffsets, metaLabels)

metaReg := prometheus.NewPedanticRegistry()
metaReg.MustRegister(msgcount, KafkaOffsets)

// register paths and start http server
http.Handle("/metrics", promhttp.Handler())
http.Handle("/metadata", promhttp.HandlerFor(metaReg, promhttp.HandlerOpts{}))
http.Handle("/flowdata", promhttp.HandlerFor(flowReg, promhttp.HandlerOpts{}))
go func() {
http.ListenAndServe(connector.Addr, nil)
}()
Expand Down
59 changes: 59 additions & 0 deletions connectors/prometheus/facadecollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package prometheus

import (
"github.com/prometheus/client_golang/prometheus"
"omi-gitlab.e-technik.uni-ulm.de/bwnetflow/kafka/consumer_dashboard/counters"
)

// implemented according to https://godoc.org/github.com/prometheus/client_golang/prometheus#example-Collector

// NewFacadeCollector returns a new instance of FacadeCollector
func NewFacadeCollector(counter counters.Counter, labels []string) FacadeCollector {
return FacadeCollector{
counter: counter,
labels: labels,
desc: prometheus.NewDesc(
counter.Name,
"consumer_dashboard export of "+counter.Name,
labels,
nil,
),
}
}

// FacadeCollector implements the Collector interface.
type FacadeCollector struct {
counter counters.Counter
labels []string
desc *prometheus.Desc
}

// Describe returns the prometheus metric description
func (collector FacadeCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- collector.desc
}

// Collect returns the counter values and labels from the internal counting system
func (collector FacadeCollector) Collect(ch chan<- prometheus.Metric) {
collector.counter.Access.Lock()
defer collector.counter.Access.Unlock()

for _, item := range collector.counter.Fields {
value := float64(item.Value)
labels := make([]string, len(item.Label.Fields))
for i, labelName := range collector.labels {
label, ok := item.Label.Fields[labelName]
if ok {
labels[i] = label
}
}

ch <- prometheus.MustNewConstMetric(
collector.desc,
prometheus.CounterValue,
value,
labels...,
)
}

}
14 changes: 13 additions & 1 deletion counters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package counters
import (
"bytes"
"hash/fnv"
"sort"
"sync"
)

Expand All @@ -24,9 +25,20 @@ type Label struct {
Fields map[string]string
}

func (label *Label) Hash() uint32 {
return label.hash()
}

func (label *Label) hash() uint32 {
var buffer bytes.Buffer
for k, v := range label.Fields {
keys := make([]string, 0, len(label.Fields))
for k := range label.Fields {
keys = append(keys, k)
}
sort.Strings(keys)

for _, k := range keys {
v := label.Fields[k]
buffer.WriteString(k + ":" + v + ";")
}
h := fnv.New32a()
Expand Down
28 changes: 28 additions & 0 deletions counters/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package counters

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func TestLabelHash(t *testing.T) {
labelMap1 := NewLabel(map[string]string{
"topic": "flow-messages-enriched",
"partition": "0",
})
labelMap2 := NewLabel(map[string]string{
"topic": "flow-messages-enriched",
"partition": "0",
})
labelMap3 := NewLabel(map[string]string{
"partition": "0",
"topic": "flow-messages-enriched",
})

Convey("Should hash labels correctly", t, func() {
So(labelMap1.hash(), ShouldEqual, uint32(2844051426))
So(labelMap2.hash(), ShouldEqual, uint32(2844051426))
So(labelMap3.hash(), ShouldEqual, uint32(2844051426))
})
}
24 changes: 11 additions & 13 deletions exporter/increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@ func (exporter *Exporter) Increment(flow *flow.FlowMessage) {
application = appGuess2
}

labels := counters.Label{
Fields: map[string]string{
// "src_port": fmt.Sprint(srcPort),
// "dst_port": fmt.Sprint(dstPort),
"ipversion": flow.GetIPversion().String(),
"application": application,
"protoname": fmt.Sprint(flow.GetProtoName()),
"direction": fmt.Sprint(flow.GetDirection()),
"cid": fmt.Sprint(flow.GetCid()),
"peer": flow.GetPeer(),
// "remotecountry": flow.GetRemoteCountry(),
},
}
labels := counters.NewLabel(map[string]string{
// "src_port": fmt.Sprint(srcPort),
// "dst_port": fmt.Sprint(dstPort),
"ipversion": flow.GetIPversion().String(),
"application": application,
"protoname": fmt.Sprint(flow.GetProtoName()),
"direction": fmt.Sprint(flow.GetDirection()),
"cid": fmt.Sprint(flow.GetCid()),
"peer": flow.GetPeer(),
// "remotecountry": flow.GetRemoteCountry(),
})

counters.Msgcount.Add(counters.NewEmptyLabel(), 1)

Expand Down
10 changes: 4 additions & 6 deletions exporter/incrementCtrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (

// IncrementCtrl updates the kafka offset counters
func (exporter *Exporter) IncrementCtrl(topic string, partition int32, offset int64) {
labels := counters.Label{
Fields: map[string]string{
"topic": topic,
"partition": fmt.Sprint(partition),
},
}
labels := counters.NewLabel(map[string]string{
"topic": topic,
"partition": fmt.Sprint(partition),
})
counters.KafkaOffsets.Add(labels, uint64(offset))
}
28 changes: 12 additions & 16 deletions exporter/tophost.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ const (

// TopHost updates one entry for Top Hosts
func (exporter *Exporter) TopHost(topHostType TopHostType, cid uint32, ipSrc string, ipDst string, peer string, value uint64) {
labels := counters.Label{
Fields: map[string]string{
"cid": fmt.Sprintf("%d", cid),
"ipSrc": ipSrc,
"ipDst": ipDst,
"peer": peer,
},
}
labels := counters.NewLabel(map[string]string{
"cid": fmt.Sprintf("%d", cid),
"ipSrc": ipSrc,
"ipDst": ipDst,
"peer": peer,
})

var counterVec counters.Counter
if topHostType == TopHostTypeBytes {
Expand All @@ -40,14 +38,12 @@ func (exporter *Exporter) TopHost(topHostType TopHostType, cid uint32, ipSrc str

// RemoveTopHost removes the host from the counter vector
func (exporter *Exporter) RemoveTopHost(topHostType TopHostType, cid uint32, ipSrc string, ipDst string, peer string) {
labels := counters.Label{
Fields: map[string]string{
"cid": fmt.Sprintf("%d", cid),
"ipSrc": ipSrc,
"ipDst": ipDst,
"peer": peer,
},
}
labels := counters.NewLabel(map[string]string{
"cid": fmt.Sprintf("%d", cid),
"ipSrc": ipSrc,
"ipDst": ipDst,
"peer": peer,
})
var counterVec counters.Counter
if topHostType == TopHostTypeBytes {
counterVec = counters.HostBytes
Expand Down

0 comments on commit 6a95fc1

Please sign in to comment.