Skip to content

Commit

Permalink
add prometheus exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
genofire committed Apr 14, 2022
1 parent aa9d94f commit 7f06b5f
Show file tree
Hide file tree
Showing 12 changed files with 596 additions and 17 deletions.
22 changes: 15 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"yanic/respond"
"yanic/runtime"
"yanic/webserver"
"yanic/webserver/prometheus"
)

// serveCmd represents the serve command
Expand All @@ -39,13 +40,6 @@ var serveCmd = &cobra.Command{
}
defer allOutput.Close()

if config.Webserver.Enable {
log.Infof("starting webserver on %s", config.Webserver.Bind)
srv := webserver.New(config.Webserver.Bind, config.Webserver.Webroot)
go webserver.Start(srv)
defer srv.Close()
}

if config.Respondd.Enable {
// Delaying startup to start at a multiple of `duration` since the zero time.
if duration := config.Respondd.Synchronize.Duration; duration > 0 {
Expand All @@ -60,6 +54,20 @@ var serveCmd = &cobra.Command{
defer collector.Close()
}

if config.Webserver.Enable {
log.Infof("starting webserver on %s", config.Webserver.Bind)
srv := webserver.New(config.Webserver)
go webserver.Start(srv)
if config.Webserver.Prometheus.Enable && config.Respondd.Enable {
prometheus.CreateExporter(config.Webserver.Prometheus, srv, collector, nodes)
} else {
log.Panic("to use prometheus exporter, please enable [respondd].")
}
defer srv.Close()
} else if config.Webserver.Prometheus.Enable {
log.Panic("to use prometheus exporter, please enable [webserver].")
}

// Wait for INT/TERM
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
Expand Down
11 changes: 9 additions & 2 deletions config_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ enable = false
bind = "127.0.0.1:8080"
webroot = "/var/www/html/meshviewer"

[webserver.prometheus]
# need webserver and respondd enabled (the exporter used the first interface
enable = false
# how long to wait till answer are calulated
wait = "1s"
# how old values are allowed
outdated = "2s"



[nodes]
# Cache file
Expand Down Expand Up @@ -181,8 +190,6 @@ path = "/var/www/html/meshviewer/data/raw.json"
# WARNING: if it is not set, it will publish contact information of other persons
no_owner = true



[database]
# this will send delete commands to the database to prune data
# which is older than:
Expand Down
2 changes: 1 addition & 1 deletion output/prometheus-sd/output_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package prometheus_sd

import (
"os"
"net"
"os"
"testing"

"github.com/stretchr/testify/assert"
Expand Down
9 changes: 6 additions & 3 deletions webserver/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package webserver

import "yanic/webserver/prometheus"

type Config struct {
Enable bool `toml:"enable"`
Bind string `toml:"bind"`
Webroot string `toml:"webroot"`
Enable bool `toml:"enable"`
Bind string `toml:"bind"`
Webroot string `toml:"webroot"`
Prometheus prometheus.Config `toml:"prometheus"`
}
31 changes: 31 additions & 0 deletions webserver/prometheus/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package prometheus

import (
"net/http"

"yanic/lib/duration"

"yanic/respond"
"yanic/runtime"
)

type Config struct {
Enable bool `toml:"enable"`
Wait duration.Duration `toml:"wait"`
Outdated duration.Duration `toml:"outdated"`
}

func CreateExporter(config Config, srv *http.Server, coll *respond.Collector, nodes *runtime.Nodes) {
mux := http.NewServeMux()
ex := &Exporter{
config: config,
srv: srv,
coll: coll,
nodes: nodes,
}
mux.Handle("/metric", ex)
if srv.Handler != nil {
mux.Handle("/", srv.Handler)
}
srv.Handler = mux
}
124 changes: 124 additions & 0 deletions webserver/prometheus/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package prometheus

import (
"io"
"net"
"net/http"
"time"

"github.com/bdlm/log"

"yanic/respond"
"yanic/runtime"
)

type Exporter struct {
config Config
srv *http.Server
coll *respond.Collector
nodes *runtime.Nodes
}

func (ex *Exporter) ServeHTTP(res http.ResponseWriter, req *http.Request) {
var ip net.IP
nodeID := ""

queryValues := req.URL.Query()

if nodeIDs := queryValues["node_id"]; len(nodeIDs) > 0 {
nodeID = nodeIDs[0]
node, ok := ex.nodes.List[nodeID]
if !ok || node.Address == nil {
http.Error(res, "not able to get node by cached nodeid", http.StatusNotFound)
return
}
ip = node.Address.IP
if ex.writeNode(res, node, true) {
log.WithFields(map[string]interface{}{
"ip": ip,
"node_id": nodeID,
}).Debug("take node from cache")
return
}
} else if ipstr := queryValues["ip"]; len(ipstr) > 0 {
ip = net.ParseIP(ipstr[0])
if ip == nil {
http.Error(res, "not able to parse ip address", http.StatusBadRequest)
return
}
node_select := ex.nodes.Select(func(n *runtime.Node) bool {
n_addr := n.Address
nodeID = n.Nodeinfo.NodeID
return n_addr != nil && ip.Equal(n_addr.IP)
})
if len(node_select) == 1 {
if ex.writeNode(res, node_select[0], true) {
log.WithFields(map[string]interface{}{
"ip": ip,
"node_id": nodeID,
}).Debug("take node from cache")
return
}
} else if len(node_select) > 1 {
log.Error("strange count of nodes")
}
} else {
http.Error(res, "please request with ?ip= or ?node_id=", http.StatusNotFound)
return
}

// send request
ex.coll.SendPacket(ip)

// wait
log.WithFields(map[string]interface{}{
"ip": ip,
"node_id": nodeID,
}).Debug("waited for")
time.Sleep(ex.config.Wait.Duration)

// result
node, ok := ex.nodes.List[nodeID]
if !ok {
http.Error(res, "not able to fetch this node", http.StatusGatewayTimeout)
return
}
ex.writeNode(res, node, false)
}

func (ex *Exporter) writeNode(res http.ResponseWriter, node *runtime.Node, dry bool) bool {
logger := log.WithField("database", "prometheus")
if nodeinfo := node.Nodeinfo; nodeinfo != nil {
logger = logger.WithField("node_id", nodeinfo.NodeID)
}

if !time.Now().Before(node.Lastseen.GetTime().Add(ex.config.Outdated.Duration)) {
if dry {
return false
}
m := Metric{Labels: MetricLabelsFromNode(node), Name: "yanic_node_up", Value: 0}
str, err := m.String()
if err == nil {
_, err = io.WriteString(res, str+"\n")
}
if err != nil {
logger.Warnf("not able to get metrics from node: %s", err)
http.Error(res, "not able to generate metric from node", http.StatusInternalServerError)
}
return false
}

metrics := MetricsFromNode(ex.nodes, node)
for _, m := range metrics {
str, err := m.String()
if err == nil {
_, err = io.WriteString(res, str+"\n")
}
if err != nil {
logger.Warnf("not able to get metrics from node: %s", err)
http.Error(res, "not able to generate metric from node", http.StatusInternalServerError)
}
}

return true
}
38 changes: 38 additions & 0 deletions webserver/prometheus/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package prometheus

import (
"errors"
"fmt"
"strings"
)

type Metric struct {
Name string
Value interface{}
Labels map[string]interface{}
}

func (m *Metric) String() (string, error) {
if m.Value == nil {
return "", errors.New("no value of metric found")
}
output := m.Name
if len(m.Labels) > 0 {
output += "{"
for label, v := range m.Labels {
switch value := v.(type) {
case string:
output = fmt.Sprintf("%s%s=\"%s\",", output, label, strings.ReplaceAll(value, "\"", "'"))
case float32:
output = fmt.Sprintf("%s%s=\"%.4f\",", output, label, value)
case float64:
output = fmt.Sprintf("%s%s=\"%.4f\",", output, label, value)
default:
output = fmt.Sprintf("%s%s=\"%v\",", output, label, value)
}
}
lastChar := len(output) - 1
output = output[:lastChar] + "}"
}
return fmt.Sprintf("%s %v", output, m.Value), nil
}
73 changes: 73 additions & 0 deletions webserver/prometheus/metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package prometheus

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMetric(t *testing.T) {
assert := assert.New(t)

var tests = []struct {
input Metric
err string
output []string
}{
{
input: Metric{Name: "test1"},
err: "no value of metric found",
},
{
input: Metric{Name: "test2", Value: 3},
output: []string{"test2 3"},
},
{
input: Metric{Name: "test2-obj", Value: 1,
Labels: map[string]interface{}{
"test": []string{"4"},
},
},
output: []string{`test2-obj{test="[4]"} 1`},
},
{
input: Metric{Name: "test3", Value: 3.2,
Labels: map[string]interface{}{
"site_code": "lola",
},
},
output: []string{`test3{site_code="lola"} 3.2`},
},
{
input: Metric{Name: "test4", Value: "0",
Labels: map[string]interface{}{
"frequency": float32(3.2),
},
},
output: []string{`test4{frequency="3.2000"} 0`},
},
{
input: Metric{Name: "test5", Value: 3,
Labels: map[string]interface{}{
"node_id": "lola",
"blub": 3.3423533,
},
},
output: []string{
`test5{blub="3.3424",node_id="lola"} 3`,
`test5{node_id="lola",blub="3.3424"} 3`,
},
},
}

for _, test := range tests {
output, err := test.input.String()

if test.err == "" {
assert.NoError(err)
assert.Contains(test.output, output, "not acceptable output found")
} else {
assert.EqualError(err, test.err)
}
}
}
Loading

0 comments on commit 7f06b5f

Please sign in to comment.