-
Notifications
You must be signed in to change notification settings - Fork 1
/
scheduler.go
108 lines (81 loc) · 2.67 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package main
import (
"time"
"github.com/sirupsen/logrus"
"github.com/michaelklishin/rabbit-hole"
"github.com/maikelh/rabbitmq-metrics-exporter/exporters"
"fmt"
"github.com/spf13/viper"
"github.com/maikelh/rabbitmq-metrics-exporter/structs"
)
type SchedulerInterface interface {
Start()
}
type Scheduler struct {
ticker *time.Ticker
rabbit *rabbithole.Client
exporter exporters.Exporter
}
func (s *Scheduler) Start() error {
var rabbitmqUrl = fmt.Sprintf("http://%s:%s", viper.GetString("rabbitmq.host"), viper.GetString("RabbitMQ.port"))
logrus.Info("Connecting to rabbitmq", rabbitmqUrl)
client, err := rabbithole.NewClient(rabbitmqUrl, viper.GetString("RabbitMQ.user"), viper.GetString("RabbitMQ.password"))
if err != nil {
return err
}
s.rabbit = client
export, err := exporters.CreateExporter(viper.GetString("exporter.type"))
if err != nil {
return err
}
s.exporter = export
s.ticker = time.NewTicker(time.Second * 5)
for {
select {
case tickTime := <-s.ticker.C:
logrus.Info("Tick received")
s.tickHandler(tickTime)
}
}
}
func (s *Scheduler) tickHandler(time time.Time) {
// For now we only handle queues, other info can come later
queues, err := s.getQueueInformation()
if err != nil {
logrus.Error(err)
return
}
err = s.exporter.UpdateQueues(queues, viper.GetString("rabbitmq.host"), "/", time)
if err != nil {
logrus.Error(err)
}
}
func (s *Scheduler) getQueueInformation() ([]structs.Queue, error) {
rabbitQueues, err := s.rabbit.ListQueues()
if err != nil {
return nil, err
}
var queues []structs.Queue
for _, rabbitQueue := range rabbitQueues {
var queue = structs.Queue{}
queue.Name = rabbitQueue.Name
queue.Node = rabbitQueue.Node
queue.Vhost = rabbitQueue.Vhost
queue.AutoDelete = rabbitQueue.AutoDelete
queue.Durable = rabbitQueue.Durable
queue.MessagesTotal = int64(rabbitQueue.Messages)
queue.MessagesReady = int64(rabbitQueue.MessagesReady)
queue.MessagesUnacknowledged = int64(rabbitQueue.MessagesUnacknowledged)
queue.MessageBytes = int64(rabbitQueue.MessagesBytes)
queue.MessageBytesReady = int64(rabbitQueue.MessagesBytes)
queue.MessagesRAM = int64(rabbitQueue.MessagesRAM)
queue.MessagesPersistent = int64(rabbitQueue.MessagesPersistent)
queue.RateDelivered = int64(rabbitQueue.MessageStats.DeliverDetails.Rate)
queue.RateDeliveredGet = int64(rabbitQueue.MessageStats.DeliverGetDetails.Rate)
queue.RateDeliveredNoAck = int64(rabbitQueue.MessageStats.DeliverNoAckDetails.Rate)
queue.RatePublished = int64(rabbitQueue.MessageStats.PublishDetails.Rate)
queue.RateRedelivered = int64(rabbitQueue.MessageStats.RedeliverDetails.Rate)
queues = append(queues, queue)
}
return queues, nil
}