-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
105 lines (88 loc) · 2.2 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// version: 0.0.1
// broker wrapper is lib to manage creation of exchanges and consumers
// only type supported is rabbitmq
package broker
import (
"fmt"
)
var delay = 5 // reconnet delay 5 seconds
// broker struct
type Broker struct {
Endpoint string
Type string // only rabbitmq supported
connections map[string]*Connection
}
// broker options like username password
type EndpointOptions struct {
Protocol string
Username string
Password string
Port string
}
// new broker
func NewBroker(endpoint string, opts ...*EndpointOptions) *Broker {
// check broker options are provided, update endpoint
if len(opts) != 0 {
options := opts[0]
// set defaults
if options.Protocol == "" {
options.Protocol = "amqp"
}
if options.Port == "" {
options.Port = "5671"
}
endpoint = fmt.Sprintf("%s://%s:%s@%s", options.Protocol, options.Username, options.Password, endpoint)
// check if port is provided
endpoint = fmt.Sprintf("%s:%s/", endpoint, options.Port)
} else {
// append protocol to endpoint
endpoint = endpoint
}
// check type of broker if multiple supported
return &Broker{
Endpoint: endpoint,
Type: "rabbitmq",
connections: map[string]*Connection{},
}
}
// only declare and bind
func (b *Broker) QueueDeclareAndBind(exchange, routeKey, queueName string) (string, error) {
conn, err := b.GetConnection(ConsumerConnection)
if err != nil {
return "", err
}
// user consumer connection and add new channel for this routine
ch, err := conn.AddChannel()
// check if any errors
if err != nil {
return "", err
}
// declare queue
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
// usally when the qeueu exist only between service to broker name is not defined
// then it's a exclusive queue
(queueName == ""), // exclusive
false, // no-wait
nil, // arguments
)
// check if any error
if err != nil {
return "", err
}
// bind queue to echange
err = ch.QueueBind(
q.Name, // queue name
routeKey, // routing key
exchange, // exchange
false, // no-wait
nil, // arguments
)
// check if any errors
if err != nil {
return "", err
}
return q.Name, nil
}