/
rabbit_conf.go
87 lines (70 loc) · 2.2 KB
/
rabbit_conf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package images
import (
"errors"
"github.com/streadway/amqp"
)
const (
exchangeKind = "direct"
exchangeDurable = true
exchangeAutoDelete = false
exchangeInternal = false
exchangeNoWait = false
queueDurable = true
queueAutoDelete = false
queueExclusive = false
queueNoWait = false
publishMandatory = false
publishImmediate = false
prefetchCount = 1
prefetchSize = 0
prefetchGlobal = false
consumeAutoAck = false
consumeExclusive = false
consumeNoLocal = false
consumeNoWait = false
ExchangeName = "images"
ResizeQueueName = "resize_queue"
ResizeConsumerTag = "resize_consumer"
ResizeWorkers = 10
ResizeBindingKey = "resize_image_key"
CreateQueueName = "create_queue"
CreateConsumerTag = "create_consumer"
CreateWorkers = 5
CreateBindingKey = "create_image_key"
UploadHotelImageQueue = "upload_hotel_image_queue"
UploadHotelImageConsumerTag = "upload_hotel_image_consumer_tag"
UploadHotelImageWorkers = 10
UploadHotelImageBindingKey = "upload_hotel_image_binding_key"
)
// Initialize consumers
func (c *ConsumerImpl) Initialize() error {
if err := c.Dial(); err != nil {
return errors.Join(err, errors.New("images.ConsumerDial"))
}
updateImageChan, err := c.CreateExchangeAndQueue(ExchangeName, UploadHotelImageQueue, UploadHotelImageBindingKey)
if err != nil {
return errors.Join(err, errors.New("images.CreateExchangeAndQueue"))
}
c.channels = append(c.channels, updateImageChan)
resizeChan, err := c.CreateExchangeAndQueue(ExchangeName, ResizeQueueName, ResizeBindingKey)
if err != nil {
return errors.Join(err, errors.New("images.CreateExchangeAndQueue"))
}
c.channels = append(c.channels, resizeChan)
createImgChan, err := c.CreateExchangeAndQueue(ExchangeName, CreateQueueName, CreateBindingKey)
if err != nil {
return errors.Join(err, errors.New("images.CreateExchangeAndQueue"))
}
c.channels = append(c.channels, createImgChan)
return nil
}
// CloseChannels close active channels
func (c *ConsumerImpl) CloseChannels() {
for _, channel := range c.channels {
go func(ch *amqp.Channel) {
if err := ch.Close(); err != nil {
c.logger.Errorf("CloseChannels ch.Close error: %v", err)
}
}(channel)
}
}