-
Notifications
You must be signed in to change notification settings - Fork 798
/
azuresb.go
553 lines (495 loc) · 17.1 KB
/
azuresb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
// Copyright 2018 The Go Cloud Development Kit Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package azuresb provides an implementation of pubsub using Azure Service
// Bus Topic and Subscription.
// See https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview for an overview.
//
// # URLs
//
// For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers
// for the scheme "azuresb".
// The default URL opener will use a Service Bus Connection String based on
// the environment variable "SERVICEBUS_CONNECTION_STRING".
// To customize the URL opener, or for more details on the URL format,
// see URLOpener.
// See https://gocloud.dev/concepts/urls/ for background information.
//
// # Message Delivery Semantics
//
// Azure ServiceBus supports at-least-once semantics in the default Peek-Lock
// mode; messages will be redelivered if they are not Acked, or if they are
// explicitly Nacked.
//
// ServiceBus also supports a Receive-Delete mode, which essentially auto-acks a
// message when it is delivered, resulting in at-most-once semantics. Set
// SubscriberOptions.ReceiveAndDelete to true to tell azuresb.Subscription that
// you've enabled Receive-Delete mode. When enabled, pubsub.Message.Ack is a
// no-op, pubsub.Message.Nackable will return false, and pubsub.Message.Nack
// will panic.
//
// See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery
// for more background.
//
// # As
//
// azuresb exposes the following types for As:
// - Topic: *servicebus.Topic
// - Subscription: *servicebus.Subscription
// - Message.BeforeSend: *servicebus.Message
// - Message.AfterSend: None
// - Message: *servicebus.Message
// - Error: common.Retryable, *amqp.Error, *amqp.DetachError
package azuresb // import "gocloud.dev/pubsub/azuresb"
import (
"context"
"errors"
"fmt"
"net/url"
"os"
"path"
"strings"
"sync"
"time"
common "github.com/Azure/azure-amqp-common-go/v3"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-amqp"
"gocloud.dev/gcerrors"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/batcher"
"gocloud.dev/pubsub/driver"
)
const (
listenerTimeout = 2 * time.Second
)
var sendBatcherOpts = &batcher.Options{
MaxBatchSize: 1, // SendBatch only supports one message at a time
MaxHandlers: 100, // max concurrency for sends
}
var recvBatcherOpts = &batcher.Options{
MaxBatchSize: 50,
MaxHandlers: 100, // max concurrency for reads
}
var ackBatcherOpts = &batcher.Options{
MaxBatchSize: 1,
MaxHandlers: 100, // max concurrency for acks
}
func init() {
o := new(defaultOpener)
pubsub.DefaultURLMux().RegisterTopic(Scheme, o)
pubsub.DefaultURLMux().RegisterSubscription(Scheme, o)
}
// defaultURLOpener creates an URLOpener with ConnectionString initialized from
// the environment variable SERVICEBUS_CONNECTION_STRING.
type defaultOpener struct {
init sync.Once
opener *URLOpener
err error
}
func (o *defaultOpener) defaultOpener() (*URLOpener, error) {
o.init.Do(func() {
cs := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if cs == "" {
o.err = errors.New("SERVICEBUS_CONNECTION_STRING environment variable not set")
return
}
o.opener = &URLOpener{ConnectionString: cs}
})
return o.opener, o.err
}
func (o *defaultOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
opener, err := o.defaultOpener()
if err != nil {
return nil, fmt.Errorf("open topic %v: %v", u, err)
}
return opener.OpenTopicURL(ctx, u)
}
func (o *defaultOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
opener, err := o.defaultOpener()
if err != nil {
return nil, fmt.Errorf("open subscription %v: %v", u, err)
}
return opener.OpenSubscriptionURL(ctx, u)
}
// Scheme is the URL scheme azuresb registers its URLOpeners under on pubsub.DefaultMux.
const Scheme = "azuresb"
// URLOpener opens Azure Service Bus URLs like "azuresb://mytopic" for
// topics or "azuresb://mytopic?subscription=mysubscription" for subscriptions.
//
// - The URL's host+path is used as the topic name.
// - For subscriptions, the subscription name must be provided in the
// "subscription" query parameter.
//
// No other query parameters are supported.
type URLOpener struct {
// ConnectionString is the Service Bus connection string (required).
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
ConnectionString string
// ClientOptions are options when creating the Client.
ServiceBusClientOptions *servicebus.ClientOptions
// Options passed when creating the ServiceBus Topic/Subscription.
ServiceBusSenderOptions *servicebus.NewSenderOptions
ServiceBusReceiverOptions *servicebus.ReceiverOptions
// TopicOptions specifies the options to pass to OpenTopic.
TopicOptions TopicOptions
// SubscriptionOptions specifies the options to pass to OpenSubscription.
SubscriptionOptions SubscriptionOptions
}
func (o *URLOpener) sbClient(kind string, u *url.URL) (*servicebus.Client, error) {
if o.ConnectionString == "" {
return nil, fmt.Errorf("open %s %v: ConnectionString is required", kind, u)
}
client, err := NewClientFromConnectionString(o.ConnectionString, o.ServiceBusClientOptions)
if err != nil {
return nil, fmt.Errorf("open %s %v: invalid connection string %q: %v", kind, u, o.ConnectionString, err)
}
return client, nil
}
// OpenTopicURL opens a pubsub.Topic based on u.
func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
sbClient, err := o.sbClient("topic", u)
if err != nil {
return nil, err
}
for param := range u.Query() {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
}
topicName := path.Join(u.Host, u.Path)
sbSender, err := NewSender(sbClient, topicName, o.ServiceBusSenderOptions)
if err != nil {
return nil, fmt.Errorf("open topic %v: couldn't open topic %q: %v", u, topicName, err)
}
return OpenTopic(ctx, sbSender, &o.TopicOptions)
}
// OpenSubscriptionURL opens a pubsub.Subscription based on u.
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
sbClient, err := o.sbClient("subscription", u)
if err != nil {
return nil, err
}
topicName := path.Join(u.Host, u.Path)
q := u.Query()
subName := q.Get("subscription")
q.Del("subscription")
if subName == "" {
return nil, fmt.Errorf("open subscription %v: missing required query parameter subscription", u)
}
for param := range q {
return nil, fmt.Errorf("open subscription %v: invalid query parameter %q", u, param)
}
sbReceiver, err := NewReceiver(sbClient, topicName, subName, o.ServiceBusReceiverOptions)
if err != nil {
return nil, fmt.Errorf("open subscription %v: couldn't open subscription %q: %v", u, subName, err)
}
return OpenSubscription(ctx, sbClient, sbReceiver, &o.SubscriptionOptions)
}
type topic struct {
sbSender *servicebus.Sender
}
// TopicOptions provides configuration options for an Azure SB Topic.
type TopicOptions struct {
// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
}
// NewClientFromConnectionString returns a *servicebus.Client from a Service Bus connection string.
// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
func NewClientFromConnectionString(connectionString string, opts *servicebus.ClientOptions) (*servicebus.Client, error) {
return servicebus.NewClientFromConnectionString(connectionString, opts)
}
// NewSender returns a *servicebus.Sender associated with a Service Bus Client.
func NewSender(sbClient *servicebus.Client, topicName string, opts *servicebus.NewSenderOptions) (*servicebus.Sender, error) {
return sbClient.NewSender(topicName, opts)
}
// NewReceiver returns a *servicebus.Receiver associated with a Service Bus Topic.
func NewReceiver(sbClient *servicebus.Client, topicName, subscriptionName string, opts *servicebus.ReceiverOptions) (*servicebus.Receiver, error) {
return sbClient.NewReceiverForSubscription(topicName, subscriptionName, opts)
}
// OpenTopic initializes a pubsub Topic on a given Service Bus Sender.
func OpenTopic(ctx context.Context, sbSender *servicebus.Sender, opts *TopicOptions) (*pubsub.Topic, error) {
t, err := openTopic(ctx, sbSender, opts)
if err != nil {
return nil, err
}
if opts == nil {
opts = &TopicOptions{}
}
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(t, bo), nil
}
// openTopic returns the driver for OpenTopic. This function exists so the test
// harness can get the driver interface implementation if it needs to.
func openTopic(ctx context.Context, sbSender *servicebus.Sender, _ *TopicOptions) (driver.Topic, error) {
if sbSender == nil {
return nil, errors.New("azuresb: OpenTopic requires a Service Bus Sender")
}
return &topic{sbSender: sbSender}, nil
}
// SendBatch implements driver.Topic.SendBatch.
func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error {
if len(dms) != 1 {
panic("azuresb.SendBatch should only get one message at a time")
}
dm := dms[0]
sbms := &servicebus.Message{Body: dm.Body}
if len(dm.Metadata) > 0 {
sbms.ApplicationProperties = map[string]interface{}{}
for k, v := range dm.Metadata {
sbms.ApplicationProperties[k] = v
}
}
if dm.BeforeSend != nil {
asFunc := func(i interface{}) bool {
if p, ok := i.(**servicebus.Message); ok {
*p = sbms
return true
}
return false
}
if err := dm.BeforeSend(asFunc); err != nil {
return err
}
}
err := t.sbSender.SendMessage(ctx, sbms, nil)
if err != nil {
return err
}
if dm.AfterSend != nil {
asFunc := func(i interface{}) bool { return false }
if err := dm.AfterSend(asFunc); err != nil {
return err
}
}
return nil
}
func (t *topic) IsRetryable(err error) bool {
_, retryable := errorCode(err)
return retryable
}
func (t *topic) As(i interface{}) bool {
p, ok := i.(**servicebus.Sender)
if !ok {
return false
}
*p = t.sbSender
return true
}
// ErrorAs implements driver.Topic.ErrorAs
func (*topic) ErrorAs(err error, i interface{}) bool {
return errorAs(err, i)
}
func errorAs(err error, i interface{}) bool {
switch v := err.(type) {
case *amqp.DetachError:
if p, ok := i.(**amqp.DetachError); ok {
*p = v
return true
}
case *amqp.Error:
if p, ok := i.(**amqp.Error); ok {
*p = v
return true
}
case common.Retryable:
if p, ok := i.(*common.Retryable); ok {
*p = v
return true
}
}
return false
}
func (*topic) ErrorCode(err error) gcerrors.ErrorCode {
code, _ := errorCode(err)
return code
}
// Close implements driver.Topic.Close.
func (*topic) Close() error { return nil }
type subscription struct {
sbReceiver *servicebus.Receiver
opts *SubscriptionOptions
}
// SubscriptionOptions will contain configuration for subscriptions.
type SubscriptionOptions struct {
// If false, the serviceBus.Subscription MUST be in the default Peek-Lock mode.
// If true, the serviceBus.Subscription MUST be in Receive-and-Delete mode.
// When true: pubsub.Message.Ack will be a no-op, pubsub.Message.Nackable
// will return true, and pubsub.Message.Nack will panic.
ReceiveAndDelete bool
// ReceiveBatcherOptions adds constraints to the default batching done for receives.
ReceiveBatcherOptions batcher.Options
// AckBatcherOptions adds constraints to the default batching done for acks.
// Only used when ReceiveAndDelete is false.
AckBatcherOptions batcher.Options
}
// OpenSubscription initializes a pubsub Subscription on a given Service Bus Subscription and its parent Service Bus Topic.
func OpenSubscription(ctx context.Context, sbClient *servicebus.Client, sbReceiver *servicebus.Receiver, opts *SubscriptionOptions) (*pubsub.Subscription, error) {
ds, err := openSubscription(ctx, sbClient, sbReceiver, opts)
if err != nil {
return nil, err
}
if opts == nil {
opts = &SubscriptionOptions{}
}
rbo := recvBatcherOpts.NewMergedOptions(&opts.ReceiveBatcherOptions)
abo := ackBatcherOpts.NewMergedOptions(&opts.AckBatcherOptions)
return pubsub.NewSubscription(ds, rbo, abo), nil
}
// openSubscription returns a driver.Subscription.
func openSubscription(ctx context.Context, sbClient *servicebus.Client, sbReceiver *servicebus.Receiver, opts *SubscriptionOptions) (driver.Subscription, error) {
if sbClient == nil {
return nil, errors.New("azuresb: OpenSubscription requires a Service Bus Client")
}
if sbReceiver == nil {
return nil, errors.New("azuresb: OpenSubscription requires a Service Bus Receiver")
}
if opts == nil {
opts = &SubscriptionOptions{}
}
return &subscription{sbReceiver: sbReceiver, opts: opts}, nil
}
// IsRetryable implements driver.Subscription.IsRetryable.
func (s *subscription) IsRetryable(err error) bool {
_, retryable := errorCode(err)
return retryable
}
// As implements driver.Subscription.As.
func (s *subscription) As(i interface{}) bool {
p, ok := i.(**servicebus.Receiver)
if !ok {
return false
}
*p = s.sbReceiver
return true
}
// ErrorAs implements driver.Subscription.ErrorAs
func (s *subscription) ErrorAs(err error, i interface{}) bool {
return errorAs(err, i)
}
func (s *subscription) ErrorCode(err error) gcerrors.ErrorCode {
code, _ := errorCode(err)
return code
}
// ReceiveBatch implements driver.Subscription.ReceiveBatch.
func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) {
// ReceiveMessages will block until rctx is Done; we want to return after
// a reasonably short delay even if there are no messages. So, create a
// sub context for the RPC.
rctx, cancel := context.WithTimeout(ctx, listenerTimeout)
defer cancel()
var messages []*driver.Message
sbmsgs, err := s.sbReceiver.ReceiveMessages(rctx, maxMessages, nil)
for _, sbmsg := range sbmsgs {
metadata := map[string]string{}
for key, value := range sbmsg.ApplicationProperties {
if strVal, ok := value.(string); ok {
metadata[key] = strVal
}
}
messages = append(messages, &driver.Message{
LoggableID: sbmsg.MessageID,
Body: sbmsg.Body,
Metadata: metadata,
AckID: sbmsg,
AsFunc: messageAsFunc(sbmsg),
})
}
// Mask rctx timeouts, they are expected if no messages are available.
if err == rctx.Err() {
err = nil
}
return messages, err
}
func messageAsFunc(sbmsg *servicebus.ReceivedMessage) func(interface{}) bool {
return func(i interface{}) bool {
p, ok := i.(**servicebus.ReceivedMessage)
if !ok {
return false
}
*p = sbmsg
return true
}
}
// SendAcks implements driver.Subscription.SendAcks.
func (s *subscription) SendAcks(ctx context.Context, ids []driver.AckID) error {
if s.opts.ReceiveAndDelete {
// Ack is a no-op in Receive-and-Delete mode.
return nil
}
var err error
for _, id := range ids {
oneErr := s.sbReceiver.CompleteMessage(ctx, id.(*servicebus.ReceivedMessage), nil)
if oneErr != nil {
err = oneErr
}
}
return err
}
// CanNack implements driver.CanNack.
func (s *subscription) CanNack() bool {
if s == nil {
return false
}
return !s.opts.ReceiveAndDelete
}
// SendNacks implements driver.Subscription.SendNacks.
func (s *subscription) SendNacks(ctx context.Context, ids []driver.AckID) error {
if !s.CanNack() {
panic("unreachable")
}
var err error
for _, id := range ids {
oneErr := s.sbReceiver.AbandonMessage(ctx, id.(*servicebus.ReceivedMessage), nil)
if oneErr != nil {
err = oneErr
}
}
return err
}
// errorCode returns an error code and whether err is retryable.
func errorCode(err error) (gcerrors.ErrorCode, bool) {
// Unfortunately Azure sometimes returns common.Retryable or even
// errors.errorString, which don't expose anything other than the error
// string :-(.
if strings.Contains(err.Error(), "status code 404") {
return gcerrors.NotFound, false
}
var cond amqp.ErrorCondition
var aderr *amqp.DetachError
var aerr *amqp.Error
if errors.As(err, &aderr) {
if aderr.RemoteError == nil {
return gcerrors.NotFound, false
}
cond = aderr.RemoteError.Condition
} else if errors.As(err, &aerr) {
cond = aerr.Condition
}
switch cond {
case amqp.ErrorNotFound:
return gcerrors.NotFound, false
case amqp.ErrorPreconditionFailed:
return gcerrors.FailedPrecondition, false
case amqp.ErrorInternalError:
return gcerrors.Internal, true
case amqp.ErrorNotImplemented:
return gcerrors.Unimplemented, false
case amqp.ErrorUnauthorizedAccess, amqp.ErrorNotAllowed:
return gcerrors.PermissionDenied, false
case amqp.ErrorResourceLimitExceeded:
return gcerrors.ResourceExhausted, true
case amqp.ErrorInvalidField:
return gcerrors.InvalidArgument, false
}
return gcerrors.Unknown, true
}
// Close implements driver.Subscription.Close.
func (*subscription) Close() error { return nil }