-
Notifications
You must be signed in to change notification settings - Fork 1
/
routing.go
65 lines (60 loc) · 2.49 KB
/
routing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package rabbitmq
import amqp "github.com/rabbitmq/amqp091-go"
type RoutingTypeSendData struct {
Exchange string `json:"exchange"`
RoutingKey string `json:"routing_key"`
Publishing amqp.Publishing `json:"publishing"`
ExchangeDeclareArgs amqp.Table `json:"exchange_declare_args"`
ExchangeDeclareDurable bool `json:"exchange_declare_durable"` // true
}
type RoutingTypeReceiveData struct {
Exchange string `json:"exchange"`
RoutingKey string `json:"routing_key"`
AutoAck bool `json:"auto_ack"`
ExchangeDeclareArgs amqp.Table `json:"exchange_declare_args"`
QueueDeclareArgs amqp.Table `json:"queue_declare_args"`
QueueBindArgs amqp.Table `json:"queue_bind_args"`
QueueDeclareDurable bool `json:"queue_declare_durable"`
ExchangeDeclareDurable bool `json:"exchange_declare_durable"` // true
Exclusive bool `json:"exclusive"` // true
}
func (conn Connection) RoutingTypeSend(data RoutingTypeSendData) (err error) {
var exchangeDeclareArgs amqp.Table = nil
if len(data.ExchangeDeclareArgs) > 0 {
exchangeDeclareArgs = data.ExchangeDeclareArgs
}
return conn.Send(SendData{
Exchange: data.Exchange,
RoutingKey: data.RoutingKey,
Type: amqp.ExchangeDirect,
ExchangeDeclareArgs: exchangeDeclareArgs,
ExchangeDeclareDurable: data.ExchangeDeclareDurable,
Publishing: data.Publishing,
})
}
func (conn Connection) RoutingTypeReceive(data RoutingTypeReceiveData) (messages <-chan amqp.Delivery, ch *amqp.Channel, err error) {
var exchangeDeclareArgs amqp.Table = nil
if len(data.ExchangeDeclareArgs) > 0 {
exchangeDeclareArgs = data.ExchangeDeclareArgs
}
var queueDeclareArgs amqp.Table = nil
if len(data.QueueDeclareArgs) > 0 {
queueDeclareArgs = data.QueueDeclareArgs
}
var queueBindArgs amqp.Table = nil
if len(data.QueueBindArgs) > 0 {
queueBindArgs = data.QueueBindArgs
}
return conn.Receive(ReceiveData{
Exchange: data.Exchange,
RoutingKey: data.RoutingKey,
Type: amqp.ExchangeDirect,
ExchangeDeclareDurable: data.ExchangeDeclareDurable,
QueueDeclareDurable: data.QueueDeclareDurable,
Exclusive: true,
ExchangeDeclareArgs: exchangeDeclareArgs,
QueueDeclareArgs: queueDeclareArgs,
QueueBindArgs: queueBindArgs,
AutoAck: data.AutoAck,
})
}