-
Notifications
You must be signed in to change notification settings - Fork 0
/
emitter.go
103 lines (84 loc) · 2.62 KB
/
emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package amqp
import (
"encoding/json"
"fmt"
"os"
"time"
amqphelper "github.com/martin-helmich/cloudnativego-backend/src/lib/helper/amqp"
"github.com/martin-helmich/cloudnativego-backend/src/lib/msgqueue"
"github.com/streadway/amqp"
)
type amqpEventEmitter struct {
connection *amqp.Connection
exchange string
events chan *emittedEvent
}
type emittedEvent struct {
event msgqueue.Event
errorChan chan error
}
// NewAMQPEventEmitterFromEnvironment will create a new event emitter from
// the configured environment variables. Important variables are:
//
// - AMQP_URL; the URL of the AMQP broker to connect to
// - AMQP_EXCHANGE; the name of the exchange to bind to
//
// For missing environment variables, this function will assume sane defaults.
func NewAMQPEventEmitterFromEnvironment() (msgqueue.EventEmitter, error) {
var url string
var exchange string
if url = os.Getenv("AMQP_URL"); url == "" {
url = "amqp://localhost:5672"
}
if exchange = os.Getenv("AMQP_EXCHANGE"); exchange == "" {
exchange = "example"
}
conn := <-amqphelper.RetryConnect(url, 5*time.Second)
return NewAMQPEventEmitter(conn, exchange)
}
// NewAMQPEventEmitter creates a new event emitter.
// It will need an AMQP connection passed as parameter and use this connection
// to create its own channel (note: AMQP channels are not thread-safe, so just
// accepting the connection as a parameter and then creating our own private
// channel is the safest way to ensure this).
func NewAMQPEventEmitter(conn *amqp.Connection, exchange string) (msgqueue.EventEmitter, error) {
emitter := amqpEventEmitter{
connection: conn,
exchange: exchange,
}
err := emitter.setup()
if err != nil {
return nil, err
}
return &emitter, nil
}
func (a *amqpEventEmitter) setup() error {
channel, err := a.connection.Channel()
if err != nil {
return err
}
defer channel.Close()
// Normally, all(many) of these options should be configurable.
// For our example, it'll probably do.
err = channel.ExchangeDeclare(a.exchange, "topic", true, false, false, false, nil)
return err
}
func (a *amqpEventEmitter) Emit(event msgqueue.Event) error {
channel, err := a.connection.Channel()
if err != nil {
return err
}
defer channel.Close()
// TODO: Alternatives to JSON? Msgpack or Protobuf, maybe?
jsonBody, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("could not JSON-serialize event: %s", err)
}
msg := amqp.Publishing{
Headers: amqp.Table{"x-event-name": event.EventName()},
ContentType: "application/json",
Body: jsonBody,
}
err = channel.Publish(a.exchange, event.EventName(), false, false, msg)
return err
}