-
Notifications
You must be signed in to change notification settings - Fork 3
/
kafka.go
52 lines (45 loc) · 1.57 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package metrics
import (
"github.com/go-kit/kit/metrics/prometheus"
stdprom "github.com/prometheus/client_golang/prometheus"
)
type KafkaMetrics struct {
KafkaWriterOffset *prometheus.Counter
KafkaReaderOffset *prometheus.Counter
KafkaCurrentTopic *prometheus.Gauge
KafkaLatency *prometheus.Histogram
}
func NewKafkaMetrics() *KafkaMetrics {
return &KafkaMetrics{
KafkaWriterOffset: prometheus.NewCounterFrom(stdprom.CounterOpts{
Name: "kafka_writer_offset",
Help: "Kafka writer offset",
}, []string{"topic"}),
KafkaReaderOffset: prometheus.NewCounterFrom(stdprom.CounterOpts{
Name: "kafka_reader_offset",
Help: "Kafka reader offset",
}, []string{"topic"}),
KafkaCurrentTopic: prometheus.NewGaugeFrom(stdprom.GaugeOpts{
Name: "kafka_current_topic",
Help: "Kafka current topic",
}, []string{"topic"}),
KafkaLatency: prometheus.NewHistogramFrom(stdprom.HistogramOpts{
Name: "kafka_latency",
Help: "Kafka latency",
Buckets: getLoadTimeBucket(),
}, []string{"topic"}),
}
}
func (m *KafkaMetrics) IncreaseWriterOffset(topic string, offset int64) {
m.KafkaWriterOffset.With("topic", topic).Add(float64(offset))
}
func (m *KafkaMetrics) IncreaseReaderOffset(topic string, offset int64) {
m.KafkaReaderOffset.With("topic", topic).Add(float64(offset))
}
func (m *KafkaMetrics) SwitchTopic(origin, target string) {
m.KafkaCurrentTopic.With("topic", origin).Set(0)
m.KafkaCurrentTopic.With("topic", target).Set(1)
}
func (m *KafkaMetrics) ObserveLatency(topic string, latency float64) {
m.KafkaLatency.With("topic", topic).Observe(latency)
}