-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.go
134 lines (115 loc) · 3.85 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package amqp
import (
"fmt"
"github.com/bogdanrat/web-server/service/queue"
"github.com/streadway/amqp"
"log"
)
type amqpEventListener struct {
connection *amqp.Connection
exchange string
queue string
mapper queue.EventMapper
}
func NewListener(conn *amqp.Connection, exchangeName string, queueName string) (queue.EventListener, error) {
listener := &amqpEventListener{
connection: conn,
exchange: exchangeName,
queue: queueName,
}
mapper, err := queue.NewEventMapper(queue.StaticMapper)
if err != nil {
return nil, err
}
listener.mapper = mapper
if err := listener.setup(); err != nil {
return nil, err
}
return listener, nil
}
func (l *amqpEventListener) Listen(eventNames ...string) (<-chan queue.Event, <-chan error, error) {
channel, err := l.connection.Channel()
if err != nil {
return nil, nil, err
}
for _, eventName := range eventNames {
// QueueBind binds an exchange to a queue so that publishing to the exchange will be routed to the queue
// when the publishing routing key matches the binding routing key
if err := channel.QueueBind(l.queue, eventName, l.exchange, false, nil); err != nil {
return nil, nil, err
}
}
// Consume() immediately starts delivering queued messages.
messages, err := channel.Consume(
l.queue,
"", // consumer: when empty, a unique identifier will be automatically generated
false, // autoAck: when true, received messages will be acknowledged automatically; when false, use Ack() method
false, // exclusive: when true, this consumer will be the only one allowed to consume this queue
false, // noLocal: this consumer should not be delivered messages that were published on the same channel
false, // noWait: instructs the library not to wait for confirmation from the broker
nil,
)
if err != nil {
return nil, nil, err
}
events := make(chan queue.Event)
errors := make(chan error)
go func() {
for message := range messages {
// use the x-event-name header to map message back to their respective struct types
rawEventName, ok := message.Headers[queue.EventNameHeader]
if !ok {
errors <- fmt.Errorf("message did not contain %s header", queue.EventNameHeader)
// Nack() negatively acknowledge the delivery of message(s)
// This method must not be used to select or requeue messages the client wishes not to handle,
// rather it is to inform the server that the client is incapable of handling this message at this time.
// When requeue is true, request the server to deliver this message to a different consumer.
// If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
if err := message.Nack(false, false); err != nil {
log.Printf("Error Nack: %s\n", err)
}
continue
}
eventName, ok := rawEventName.(string)
if !ok {
errors <- fmt.Errorf("header %s did not contain string value", queue.EventNameHeader)
if err := message.Nack(false, false); err != nil {
log.Printf("Error Nack: %s\n", err)
}
continue
}
event, err := l.mapper.MapEvent(eventName, message.Body)
if err != nil {
errors <- fmt.Errorf("could not unmarshal event %s: %s", eventName, err)
if err := message.Nack(false, false); err != nil {
log.Printf("Error Nack: %s\n", err)
}
}
events <- event
err = message.Ack(false)
if err != nil {
errors <- fmt.Errorf("could not acknowledge message: %s", err)
}
}
}()
return events, errors, nil
}
func (l *amqpEventListener) setup() error {
channel, err := l.connection.Channel()
if err != nil {
return err
}
defer channel.Close()
// QueueDeclare declares a queue to hold messages and deliver to consumers
_, err = channel.QueueDeclare(l.queue,
true,
false,
false,
false,
nil,
)
return err
}
func (l *amqpEventListener) EventMapper() queue.EventMapper {
return l.mapper
}