forked from grafana/grafana
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventpublisher.go
150 lines (130 loc) · 3 KB
/
eventpublisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package eventpublisher
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/setting"
"github.com/streadway/amqp"
)
var (
url string
exchange string
conn *amqp.Connection
channel *amqp.Channel
)
func getConnection() (*amqp.Connection, error) {
c, err := amqp.Dial(url)
if err != nil {
return nil, err
}
return c, err
}
func getChannel() (*amqp.Channel, error) {
ch, err := conn.Channel()
if err != nil {
return nil, err
}
err = ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
return ch, err
}
func Init() {
sec := setting.Cfg.Section("event_publisher")
if !sec.Key("enabled").MustBool(false) {
return
}
url = sec.Key("rabbitmq_url").String()
exchange = sec.Key("exchange").String()
bus.AddWildcardListener(eventListener)
if err := Setup(); err != nil {
log.Fatal(4, "Failed to connect to notification queue: %v", err)
return
}
}
// Every connection should declare the topology they expect
func Setup() error {
c, err := getConnection()
if err != nil {
return err
}
conn = c
ch, err := getChannel()
if err != nil {
return err
}
channel = ch
// listen for close events so we can reconnect.
errChan := channel.NotifyClose(make(chan *amqp.Error))
go func() {
for e := range errChan {
fmt.Println("connection to rabbitmq lost.")
fmt.Println(e)
fmt.Println("attempting to create new rabbitmq channel.")
ch, err := getChannel()
if err == nil {
channel = ch
break
}
//could not create channel, so lets close the connection
// and re-create.
_ = conn.Close()
for err != nil {
time.Sleep(2 * time.Second)
fmt.Println("attempting to reconnect to rabbitmq.")
err = Setup()
}
fmt.Println("Connected to rabbitmq again.")
}
}()
return nil
}
func publish(routingKey string, msgString []byte) {
for {
err := channel.Publish(
exchange, //exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: msgString,
},
)
if err == nil {
return
}
// failures are most likely because the connection was lost.
// the connection will be re-established, so just keep
// retrying every 2seconds until we successfully publish.
time.Sleep(2 * time.Second)
fmt.Println("publish failed, retrying.")
}
}
func eventListener(event interface{}) error {
wireEvent, err := events.ToOnWriteEvent(event)
if err != nil {
return err
}
msgString, err := json.Marshal(wireEvent)
if err != nil {
return err
}
routingKey := fmt.Sprintf("%s.%s", wireEvent.Priority, wireEvent.EventType)
// this is run in a greenthread and we expect that publish will keep
// retrying until the message gets sent.
go publish(routingKey, msgString)
return nil
}