forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
amqp.go
161 lines (141 loc) · 3.29 KB
/
amqp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package amqp
import (
"bytes"
"fmt"
"log"
"sync"
"time"
"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/plugins/outputs"
"github.com/streadway/amqp"
)
type AMQP struct {
// AMQP brokers to send metrics to
URL string
// AMQP exchange
Exchange string
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// InfluxDB database
Database string
// InfluxDB retention policy
RetentionPolicy string
// InfluxDB precision
Precision string
channel *amqp.Channel
sync.Mutex
headers amqp.Table
}
const (
DefaultRetentionPolicy = "default"
DefaultDatabase = "telegraf"
DefaultPrecision = "s"
)
var sampleConfig = `
# AMQP url
url = "amqp://localhost:5672/influxdb"
# AMQP exchange
exchange = "telegraf"
# Telegraf tag to use as a routing key
# ie, if this tag exists, it's value will be used as the routing key
routing_tag = "host"
# InfluxDB retention policy
#retention_policy = "default"
# InfluxDB database
#database = "telegraf"
# InfluxDB precision
#precision = "s"
`
func (q *AMQP) Connect() error {
q.Lock()
defer q.Unlock()
q.headers = amqp.Table{
"precision": q.Precision,
"database": q.Database,
"retention_policy": q.RetentionPolicy,
}
connection, err := amqp.Dial(q.URL)
if err != nil {
return err
}
channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Failed to open a channel: %s", err)
}
err = channel.ExchangeDeclare(
q.Exchange, // name
"topic", // type
true, // durable
false, // delete when unused
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare an exchange: %s", err)
}
q.channel = channel
go func() {
log.Printf("Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
log.Printf("Trying to reconnect")
for err := q.Connect(); err != nil; err = q.Connect() {
log.Println(err)
time.Sleep(10 * time.Second)
}
}()
return nil
}
func (q *AMQP) Close() error {
return q.channel.Close()
}
func (q *AMQP) SampleConfig() string {
return sampleConfig
}
func (q *AMQP) Description() string {
return "Configuration for the AMQP server to send metrics to"
}
func (q *AMQP) Write(points []*client.Point) error {
q.Lock()
defer q.Unlock()
if len(points) == 0 {
return nil
}
var outbuf = make(map[string][][]byte)
for _, p := range points {
// Combine tags from Point and BatchPoints and grab the resulting
// line-protocol output string to write to AMQP
var value, key string
value = p.String()
if q.RoutingTag != "" {
if h, ok := p.Tags()[q.RoutingTag]; ok {
key = h
}
}
outbuf[key] = append(outbuf[key], []byte(value))
}
for key, buf := range outbuf {
err := q.channel.Publish(
q.Exchange, // exchange
key, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: q.headers,
ContentType: "text/plain",
Body: bytes.Join(buf, []byte("\n")),
})
if err != nil {
return fmt.Errorf("FAILED to send amqp message: %s", err)
}
}
return nil
}
func init() {
outputs.Add("amqp", func() outputs.Output {
return &AMQP{
Database: DefaultDatabase,
Precision: DefaultPrecision,
RetentionPolicy: DefaultRetentionPolicy,
}
})
}