/
prometheus.go
65 lines (53 loc) · 1.76 KB
/
prometheus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package nsqmiddleware
import (
"fmt"
"time"
"github.com/nsqio/go-nsq"
"github.com/prometheus/client_golang/prometheus"
)
const (
promMessageName = "nsqm_consumer_messages_total"
promDurationName = "nsqm_consumer_duration_milliseconds"
)
var (
promMessage *prometheus.CounterVec
promLatency *prometheus.HistogramVec
)
func init() {
promMessage = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: promMessageName,
Help: "How many NSQ messages processed, partitioned by topic, channel, attempts and status.",
},
[]string{"topic", "channel", "attempts", "status"},
)
prometheus.MustRegister(promMessage)
buckets := []float64{300, 1000, 2500, 5000}
promLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: promDurationName,
Help: "How long it took to consume the message, partitioned by topic, channel, attempts and status.",
Buckets: buckets,
},
[]string{"topic", "channel", "attempts", "status"},
)
prometheus.MustRegister(promLatency)
}
// Prometheus is a handler that exposes prometheus metrics
// for the number of messages, and the process duration,
// partitioned by topic, channel, attempts and status.
type Prometheus struct{}
// NewPrometheus returns a new Prometheus Middleware instance.
func NewPrometheus() *Prometheus {
return &Prometheus{}
}
func (prometheus Prometheus) HandleMessage(topic, channel string, message *nsq.Message, next nsq.HandlerFunc) error {
start := time.Now()
status := "ok"
err := next(message)
if err != nil {
status = "error"
}
go promMessage.WithLabelValues(topic, channel, fmt.Sprint(message.Attempts), status).Inc()
go promLatency.WithLabelValues(topic, channel, fmt.Sprint(message.Attempts), status).Observe(float64(time.Since(start).Nanoseconds()) / 1000000)
return err
}