/
publisher.go
157 lines (136 loc) · 4.5 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package simpleamqp
import (
"bytes"
"compress/gzip"
"log"
"strconv"
"time"
"github.com/streadway/amqp"
)
const (
timeToWaitForChannel = 5 * time.Second // seconds to wait and don't block when writting to channel publisher.outputMessages
COMPRESS_HEADER = "compress"
)
type messageToPublish struct {
routingKey string
message []byte
expiration string
headers map[string]interface{}
}
// AMQPPublisher represents an AMQP Publisher that can publish messages with or without TTL
type AMQPPublisher interface {
Publish(string, []byte, ...map[string]interface{})
PublishWithTTL(string, []byte, int, ...map[string]interface{})
}
// AmqpPublisher holds the brokerURI, exchange name and channel where to submit messages to be publish to rabbitmq
type AmqpPublisher struct {
brokerURI string
exchange string
outputMessages chan messageToPublish
}
// NewAmqpPublisher returns an AmqpPublisher
func NewAmqpPublisher(brokerURI, exchange string) *AmqpPublisher {
publisher := AmqpPublisher{
brokerURI: brokerURI,
exchange: exchange,
outputMessages: make(chan messageToPublish, 1024),
}
go func() {
for {
err := publisher.publishLoop()
log.Println("[simpleamqp] Waiting", timeToReconnect, "to reconnect due ", err)
time.Sleep(timeToReconnect)
}
}()
return &publisher
}
// Publish publish a message using the given routing key
func (publisher *AmqpPublisher) Publish(routingKey string, message []byte, headers ...map[string]interface{}) {
if len(headers) > 0 {
compressedMessage, err := compress(message, headers[0])
if err != nil {
log.Println("[simpleamqp] Error compressing message", err)
}
publisher.queueMessageToPublish(messageToPublish{routingKey: routingKey, message: compressedMessage, headers: headers[0]})
} else {
publisher.queueMessageToPublish(messageToPublish{routingKey: routingKey, message: message})
}
}
// PublishWithTTL publish a message waiting the given TTL
func (publisher *AmqpPublisher) PublishWithTTL(routingKey string, message []byte, ttl int, headers ...map[string]interface{}) {
if len(headers) > 0 {
compressedMessage, err := compress(message, headers[0])
if err != nil {
log.Println("[simpleamqp] Error compressing message", err)
}
publisher.queueMessageToPublish(messageToPublish{routingKey: routingKey, message: compressedMessage, expiration: strconv.Itoa(ttl), headers: headers[0]})
} else {
publisher.queueMessageToPublish(messageToPublish{routingKey: routingKey, message: message, expiration: strconv.Itoa(ttl)})
}
}
// Queue the message to be published and return inmediatly
// The message will be published to the AmqpPublisher exchange using the given routingKey
// If the message can't be queued after some short time (because the channel is full) a log is printed and the message is discarded
func (publisher *AmqpPublisher) queueMessageToPublish(messageToPublish messageToPublish) {
timeoutTimer := time.NewTimer(timeToWaitForChannel)
defer timeoutTimer.Stop()
afterTimeout := timeoutTimer.C
select {
case publisher.outputMessages <- messageToPublish:
case <-afterTimeout:
log.Println("[simpleamqp] Publish channel full", messageToPublish)
}
}
func (publisher *AmqpPublisher) publish(channel *amqp.Channel, messageToPublish messageToPublish) error {
err := channel.Publish(
publisher.exchange,
messageToPublish.routingKey,
false,
false,
amqp.Publishing{
Headers: messageToPublish.headers,
ContentType: "application/json",
ContentEncoding: "",
Body: messageToPublish.message,
DeliveryMode: amqp.Transient,
Priority: 0,
Expiration: messageToPublish.expiration,
})
return err
}
func (publisher *AmqpPublisher) publishLoop() error {
conn, ch, err := setup(publisher.brokerURI)
if err != nil {
return err
}
defer conn.Close()
defer ch.Close()
err = exchangeDeclare(ch, publisher.exchange)
if err != nil {
return err
}
for {
messageToPublish := <-publisher.outputMessages
err := publisher.publish(ch, messageToPublish)
if err != nil {
return err
}
log.Println("[simpleamqp] Published", messageToPublish.routingKey, string(messageToPublish.message))
}
}
func compress(input []byte, headers map[string]interface{}) ([]byte, error) {
if headers[COMPRESS_HEADER] == true {
var compressedBuffer bytes.Buffer
writer := gzip.NewWriter(&compressedBuffer)
_, err := writer.Write(input)
if err != nil {
return nil, err
}
err = writer.Close()
if err != nil {
return nil, err
}
return compressedBuffer.Bytes(), nil
}
return input, nil
}