-
Notifications
You must be signed in to change notification settings - Fork 914
/
amqp.go
129 lines (110 loc) · 3.83 KB
/
amqp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package common
import (
"crypto/tls"
"fmt"
"github.com/streadway/amqp"
)
// AMQPConnector ...
type AMQPConnector struct{}
// Connect opens a connection to RabbitMQ, declares an exchange, opens a channel,
// declares and binds the queue and enables publish notifications
func (ac *AMQPConnector) Connect(url string, tlsConfig *tls.Config, exchange, exchangeType, queueName string, queueDurable, queueDelete bool, queueBindingKey string, exchangeDeclareArgs, queueDeclareArgs, queueBindingArgs amqp.Table) (*amqp.Connection, *amqp.Channel, amqp.Queue, <-chan amqp.Confirmation, <-chan *amqp.Error, error) {
// Connect to server
conn, channel, err := ac.Open(url, tlsConfig)
if err != nil {
return nil, nil, amqp.Queue{}, nil, nil, err
}
if exchange != "" {
// Declare an exchange
if err = channel.ExchangeDeclare(
exchange, // name of the exchange
exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
exchangeDeclareArgs, // arguments
); err != nil {
return conn, channel, amqp.Queue{}, nil, nil, fmt.Errorf("Exchange declare error: %s", err)
}
}
var queue amqp.Queue
if queueName != "" {
// Declare a queue
queue, err = channel.QueueDeclare(
queueName, // name
queueDurable, // durable
queueDelete, // delete when unused
false, // exclusive
false, // no-wait
queueDeclareArgs, // arguments
)
if err != nil {
return conn, channel, amqp.Queue{}, nil, nil, fmt.Errorf("Queue declare error: %s", err)
}
// Bind the queue
if err = channel.QueueBind(
queue.Name, // name of the queue
queueBindingKey, // binding key
exchange, // source exchange
false, // noWait
queueBindingArgs, // arguments
); err != nil {
return conn, channel, queue, nil, nil, fmt.Errorf("Queue bind error: %s", err)
}
}
// Enable publish confirmations
if err = channel.Confirm(false); err != nil {
return conn, channel, queue, nil, nil, fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
return conn, channel, queue, channel.NotifyPublish(make(chan amqp.Confirmation, 1)), conn.NotifyClose(make(chan *amqp.Error, 1)), nil
}
// DeleteQueue deletes a queue by name
func (ac *AMQPConnector) DeleteQueue(channel *amqp.Channel, queueName string) error {
// First return value is number of messages removed
_, err := channel.QueueDelete(
queueName, // name
false, // ifUnused
false, // ifEmpty
false, // noWait
)
return err
}
// InspectQueue provides information about a specific queue
func (*AMQPConnector) InspectQueue(channel *amqp.Channel, queueName string) (*amqp.Queue, error) {
queueState, err := channel.QueueInspect(queueName)
if err != nil {
return nil, fmt.Errorf("Queue inspect error: %s", err)
}
return &queueState, nil
}
// Open new RabbitMQ connection
func (ac *AMQPConnector) Open(url string, tlsConfig *tls.Config) (*amqp.Connection, *amqp.Channel, error) {
// Connect
// From amqp docs: DialTLS will use the provided tls.Config when it encounters an amqps:// scheme
// and will dial a plain connection when it encounters an amqp:// scheme.
conn, err := amqp.DialTLS(url, tlsConfig)
if err != nil {
return nil, nil, fmt.Errorf("Dial error: %s", err)
}
// Open a channel
channel, err := conn.Channel()
if err != nil {
return nil, nil, fmt.Errorf("Open channel error: %s", err)
}
return conn, channel, nil
}
// Close connection
func (ac *AMQPConnector) Close(channel *amqp.Channel, conn *amqp.Connection) error {
if channel != nil {
if err := channel.Close(); err != nil {
return fmt.Errorf("Close channel error: %s", err)
}
}
if conn != nil {
if err := conn.Close(); err != nil {
return fmt.Errorf("Close connection error: %s", err)
}
}
return nil
}