Skip to content

Commit

Permalink
refactor export into exporter, counters, and connectors (prometheus n…
Browse files Browse the repository at this point in the history
…ot working, influx partially)
  • Loading branch information
cha87de committed Jan 15, 2019
1 parent 1c78b6b commit 2b5ab4e
Show file tree
Hide file tree
Showing 20 changed files with 504 additions and 174 deletions.
23 changes: 23 additions & 0 deletions connectors/connectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package connectors

import (
"omi-gitlab.e-technik.uni-ulm.de/bwnetflow/kafka/consumer_dashboard/connectors/influx"
"omi-gitlab.e-technik.uni-ulm.de/bwnetflow/kafka/consumer_dashboard/connectors/prometheus"
)

// NewInfluxConnector instantiates a new InfluxConnector
func NewInfluxConnector(url string, username string, password string, exportFreq int) influx.Connector {
return influx.Connector{
URL: url,
Username: username,
Password: password,
ExportFreq: exportFreq,
}
}

// NewPrometheusConnector instantiates a new PrometheusConnector
func NewPrometheusConnector(addr string) prometheus.Connector {
return prometheus.Connector{
Addr: addr,
}
}
83 changes: 83 additions & 0 deletions connectors/influx/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package influx

import (
"fmt"
"time"

client "github.com/influxdata/influxdb1-client/v2"
"omi-gitlab.e-technik.uni-ulm.de/bwnetflow/kafka/consumer_dashboard/counters"
)

// Connector provides export features to Influx
type Connector struct {
URL string
Username string
Password string
ExportFreq int
influxClient client.Client
running bool
}

// Initialize established a connection to Influx DB
func (connector *Connector) Initialize() {
var err error
connector.influxClient, err = client.NewHTTPClient(client.HTTPConfig{
Addr: connector.URL,
Username: connector.Username,
Password: connector.Password,
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
return
}

connector.running = true
go connector.startPushCycle()

}

// Close closes the connection to influx
func (connector *Connector) Close() {
connector.running = false
connector.influxClient.Close()
}

func (connector *Connector) startPushCycle() {
start := time.Now()
for connector.running {
nextRun := start.Add(time.Duration(connector.ExportFreq) * time.Second)
time.Sleep(nextRun.Sub(time.Now()))

connector.push()
start = time.Now()
}
}

func (connector *Connector) push() {
fmt.Print("Push stuff to Influx")

// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "bwnetflow",
Precision: "s",
})
if err != nil {
fmt.Println("Error creating NewBatchPoints: ", err.Error())
}

// Transform counter values to influx points
pts := transformCounter(counters.Msgcount)
bp.AddPoints(pts)
pts = transformCounter(counters.FlowNumber)
bp.AddPoints(pts)
pts = transformCounter(counters.FlowBytes)
bp.AddPoints(pts)
pts = transformCounter(counters.FlowPackets)
bp.AddPoints(pts)

// Write the batch
err = connector.influxClient.Write(bp)
if err != nil {
fmt.Println("Error writing to influx: ", err.Error())
}
}
51 changes: 51 additions & 0 deletions connectors/influx/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package influx

import (
"fmt"
"time"

"github.com/influxdata/influxdb1-client/v2"
)

func influxTest() {

fmt.Println("Let's start to connect to influx")
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
Username: "admin",
Password: "secret",
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
}
defer c.Close()

// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "bwnetflow",
Precision: "s",
})
if err != nil {
fmt.Println("Error creating NewBatchPoints: ", err.Error())
}

// Create a point and add to batch
tags := map[string]string{"cpu": "cpu-total"}
fields := map[string]interface{}{
"idle": 10.1,
"system": 53.3,
"user": 46.6,
}
pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)

// Write the batch
err = c.Write(bp)
if err != nil {
fmt.Println("Error writing to influx: ", err.Error())
}
}
34 changes: 34 additions & 0 deletions connectors/influx/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package influx

import (
"fmt"
"time"

client "github.com/influxdata/influxdb1-client/v2"
"omi-gitlab.e-technik.uni-ulm.de/bwnetflow/kafka/consumer_dashboard/counters"
)

func transformCounter(counter counters.Counter) []*client.Point {
pts := make([]*client.Point, 0)
counter.Access.Lock()
for _, items := range counter.Fields {
labels := items.Label
val := items.Value

tags := map[string]string{}
for k, v := range labels.Fields {
tags[k] = v
}
fields := map[string]interface{}{
"count": int64(val),
}
pt, err := client.NewPoint(counter.Name, tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}

pts = append(pts, pt)
}
counter.Access.Unlock()
return pts
}
13 changes: 7 additions & 6 deletions prometheus/exporter.go → connectors/prometheus/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Exporter provides export features to Prometheus
type Exporter struct {
// Connector provides export features to Prometheus
type Connector struct {
Addr string
}

// Initialize Prometheus Exporter, listen on addr with path /metrics and /flowdata
func (exporter *Exporter) Initialize(addr string) {
prometheus.MustRegister(msgcount, kafkaOffsets)
func (connector *Connector) Initialize() {
//prometheus.MustRegister(counters.Msgcount, counters.KafkaOffsets)

flowReg := prometheus.NewRegistry()
flowReg.MustRegister(flowNumber, flowBytes, flowPackets, hostBytes, hostConnections)
//flowReg.MustRegister(counters.FlowNumber, counters.FlowBytes, counters.FlowPackets, counters.HostBytes, counters.HostConnections)

http.Handle("/metrics", promhttp.Handler())
http.Handle("/flowdata", promhttp.HandlerFor(flowReg, promhttp.HandlerOpts{}))

go func() {
http.ListenAndServe(addr, nil)
http.ListenAndServe(connector.Addr, nil)
}()
log.Println("Enabled Prometheus /metrics and /flowdata endpoints.")
}
63 changes: 63 additions & 0 deletions counters/counters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package counters

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

var ( // Meta Monitoring Data, to be added to default /metrics
Msgcount = Counter{
Fields: make(map[uint32]CounterItems),
Name: "kafka_messages_total",
Access: &sync.Mutex{},
}

Kafkalabels = []string{
"topic",
"partition",
}
KafkaOffsets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_offset_current",
Help: "Current Kafka Offset of the consumer",
}, Kafkalabels)
)

var ( // Flow Data, to be exported on /flowdata
FlowNumber = Counter{
Fields: make(map[uint32]CounterItems),
Name: "flow_number_total",
Access: &sync.Mutex{},
}
FlowBytes = Counter{
Fields: make(map[uint32]CounterItems),
Name: "flow_bytes",
Access: &sync.Mutex{},
}
FlowPackets = Counter{
Fields: make(map[uint32]CounterItems),
Name: "flow_packets",
Access: &sync.Mutex{},
}

// TOP HOSTS
Hostlabels = []string{
"cid",
"ipSrc",
"ipDst",
"peer",
}

HostBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "host_bytes",
Help: "Number of Bytes for top N hosts.",
}, Hostlabels)

HostConnections = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "host_connections_total",
Help: "Number of Connections for top N hosts.",
}, Hostlabels)
)
27 changes: 27 additions & 0 deletions counters/test/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"reflect"

"github.com/prometheus/client_golang/prometheus"
)

func main() {
msgcount := prometheus.NewCounter(
prometheus.CounterOpts{
Name: "kafka_messages_total",
Help: "Number of Kafka messages",
})

msgcount.Inc()
msgcount.Inc()

var x int
x = 44
println(x)

s := reflect.ValueOf(x).Elem()
println(s.String())
//metric := s.FieldByName("valInt").Interface()
//fmt.Println(metric)
}
66 changes: 66 additions & 0 deletions counters/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package counters

import (
"bytes"
"hash/fnv"
"sync"
)

// LABEL

func NewLabel(fields map[string]string) Label {
return Label{
Fields: fields,
}
}

func NewEmptyLabel() Label {
return Label{
Fields: make(map[string]string),
}
}

type Label struct {
Fields map[string]string
}

func (label *Label) hash() uint32 {
var buffer bytes.Buffer
for k, v := range label.Fields {
buffer.WriteString(k + ":" + v + ";")
}
h := fnv.New32a()
h.Write(buffer.Bytes())
return h.Sum32()
}

// COUNTER

type Counter struct {
Fields map[uint32]CounterItems
Name string
Access *sync.Mutex
}

func (counter *Counter) Add(label Label, value uint64) {
counter.Access.Lock()
h := label.hash()
item, ok := counter.Fields[h]
if !ok {
counter.Fields[h] = CounterItems{
Label: label,
Value: uint64(0),
}
item = counter.Fields[h]
}
item.Value += value
counter.Fields[h] = item
counter.Access.Unlock()
}

// COUNTER ITEMS

type CounterItems struct {
Label Label
Value uint64
}
Loading

0 comments on commit 2b5ab4e

Please sign in to comment.