-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.go
152 lines (125 loc) · 4.08 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package amqp
import (
"fmt"
"os"
"time"
amqphelper "github.com/martin-helmich/cloudnativego-backend/src/lib/helper/amqp"
"github.com/martin-helmich/cloudnativego-backend/src/lib/msgqueue"
"github.com/streadway/amqp"
)
const eventNameHeader = "x-event-name"
type amqpEventListener struct {
connection *amqp.Connection
exchange string
queue string
mapper msgqueue.EventMapper
}
// NewAMQPEventListenerFromEnvironment will create a new event listener from
// the configured environment variables. Important variables are:
//
// - AMQP_URL; the URL of the AMQP broker to connect to
// - AMQP_EXCHANGE; the name of the exchange to bind to
// - AMQP_QUEUE; the name of the queue to bind and subscribe
//
// For missing environment variables, this function will assume sane defaults.
func NewAMQPEventListenerFromEnvironment() (msgqueue.EventListener, error) {
var url string
var exchange string
var queue string
if url = os.Getenv("AMQP_URL"); url == "" {
url = "amqp://localhost:5672"
}
if exchange = os.Getenv("AMQP_EXCHANGE"); exchange == "" {
exchange = "example"
}
if queue = os.Getenv("AMQP_QUEUE"); queue == "" {
queue = "example"
}
conn := <-amqphelper.RetryConnect(url, 5*time.Second)
return NewAMQPEventListener(conn, exchange, queue)
}
// NewAMQPEventListener creates a new event listener.
// It will need an AMQP connection passed as parameter and use this connection
// to create its own channel (note: AMQP channels are not thread-safe, so just
// accepting the connection as a parameter and then creating our own private
// channel is the safest way to ensure this).
func NewAMQPEventListener(conn *amqp.Connection, exchange string, queue string) (msgqueue.EventListener, error) {
listener := amqpEventListener{
connection: conn,
exchange: exchange,
queue: queue,
mapper: msgqueue.NewEventMapper(),
}
err := listener.setup()
if err != nil {
return nil, err
}
return &listener, nil
}
func (a *amqpEventListener) setup() error {
channel, err := a.connection.Channel()
if err != nil {
return err
}
defer channel.Close()
err = channel.ExchangeDeclare(a.exchange, "topic", true, false, false, false, nil)
if err != nil {
return err
}
_, err = channel.QueueDeclare(a.queue, true, false, false, false, nil)
if err != nil {
return fmt.Errorf("could not declare queue %s: %s", a.queue, err)
}
return nil
}
// Listen configures the event listener to listen for a set of events that are
// specified by name as parameter.
// This method will return two channels: One will contain successfully decoded
// events, the other will contain errors for messages that could not be
// successfully decoded.
func (l *amqpEventListener) Listen(eventNames ...string) (<-chan msgqueue.Event, <-chan error, error) {
channel, err := l.connection.Channel()
if err != nil {
return nil, nil, err
}
// Create binding between queue and exchange for each listened event type
for _, event := range eventNames {
if err := channel.QueueBind(l.queue, event, l.exchange, false, nil); err != nil {
return nil, nil, fmt.Errorf("could not bind event %s to queue %s: %s", event, l.queue, err)
}
}
msgs, err := channel.Consume(l.queue, "", false, false, false, false, nil)
if err != nil {
return nil, nil, fmt.Errorf("could not consume queue: %s", err)
}
events := make(chan msgqueue.Event)
errors := make(chan error)
go func() {
for msg := range msgs {
rawEventName, ok := msg.Headers[eventNameHeader]
if !ok {
errors <- fmt.Errorf("message did not contain %s header", eventNameHeader)
msg.Nack(false, false)
continue
}
eventName, ok := rawEventName.(string)
if !ok {
errors <- fmt.Errorf("header %s did not contain string", eventNameHeader)
msg.Nack(false, false)
continue
}
event, err := l.mapper.MapEvent(eventName, msg.Body)
if err != nil {
errors <- fmt.Errorf("could not unmarshal event %s: %s", eventName, err)
msg.Nack(false, false)
continue
}
events <- event
msg.Ack(false)
}
}()
return events, errors, nil
}
func (l *amqpEventListener) Mapper() msgqueue.EventMapper {
return l.mapper
}