-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
163 lines (131 loc) · 3.66 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package smplmsg
import (
"crypto/rand"
"io"
"time"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)
// DeliveryMode is the type of MIME type used to delivery of messages
type DeliveryMode uint8
// ContentType is the type of MIME type used for the content of the messages
type ContentType string
const (
// Transient means higher throughput but messages will not be restored on broker restart
Transient DeliveryMode = 1
// Persistent means lower throughput but messages will be restored on broker restart
Persistent DeliveryMode = 2
// OctetStream is a MIME type used as a content type
OctetStream ContentType = "application/octeet-stream"
defaultRetryTimeout = 5 * time.Second
)
type client struct {
uri string
exchange string
clientID string
errorCh chan error
endMonitoring chan struct{}
timeout time.Duration
retryTimeout time.Duration
amqpCh *amqpCh
contentType ContentType
deliveryMode DeliveryMode
}
type amqpCh struct {
conn *amqp.Connection
ch *amqp.Channel
errCh chan *amqp.Error
}
// NewSubscriber initializes and returns a Client that implements the Subscriber interface
func NewSubscriber(URI, exchange, clientID string, opts ...ClientOption) (Subscriber, error) {
c, err := newClient(URI, exchange, clientID, opts...)
if err != nil {
return nil, errors.Wrap(err, "failed to initilize new subscriber")
}
return c, nil
}
// NewPublisher initializes and returns a Client that implements the Publisher interface
func NewPublisher(URI, exchange, clientID string, opts ...ClientOption) (Publisher, error) {
c, err := newClient(URI, exchange, clientID, opts...)
if err != nil {
return nil, errors.Wrap(err, "failed to initilize new publisher")
}
return c, nil
}
// NewPubSub initializes and returns a Client that implements the PublisherSubscriber interface
func NewPubSub(URI, exchange, clientID string, opts ...ClientOption) (PublisherSubscriber, error) {
c, err := newClient(URI, exchange, clientID, opts...)
if err != nil {
return nil, errors.Wrap(err, "failed to initilize new pub/sub")
}
return c, nil
}
func newClient(URI, exchange, clientID string, opts ...ClientOption) (*client, error) {
conn, err := amqp.Dial(URI)
if err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, err
}
c := &client{
uri: URI,
clientID: clientID,
exchange: exchange,
contentType: OctetStream,
deliveryMode: Transient,
retryTimeout: defaultRetryTimeout,
endMonitoring: make(chan struct{}),
amqpCh: &amqpCh{
conn: conn,
ch: ch,
errCh: conn.NotifyClose(make(chan *amqp.Error)),
},
}
for _, opt := range opts {
opt(c)
}
err = declareExchange(ch, exchange)
if err != nil {
return nil, errors.Wrap(err, "failed to declare exchange")
}
go c.monitorConnection()
return c, nil
}
// Close closes the channels
func (c *client) Close() error {
close(c.endMonitoring)
if err := c.amqpCh.ch.Close(); err != nil {
return errors.Wrap(err, "failed to close channel")
}
if err := c.amqpCh.conn.Close(); err != nil {
return errors.Wrap(err, "failed to close connection")
}
return nil
}
func declareExchange(ch *amqp.Channel, exchange string) error {
err := ch.ExchangeDeclare(
exchange, // name
"topic", // type
true, // durable
true, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
)
if err != nil {
return errors.Wrap(err, "failed to declare exchange")
}
return nil
}
func uuid() string {
fillWithRandomBits := func(b []byte) {
if _, err := io.ReadFull(rand.Reader, b); err != nil {
panic(err)
}
}
uuid := make([]byte, 16)
fillWithRandomBits(uuid)
return string(uuid)
}