This repository has been archived by the owner on Oct 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 52
/
subscription_manager.go
586 lines (499 loc) · 21.1 KB
/
subscription_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
package servicebus
import (
"context"
"encoding/xml"
"errors"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/Azure/go-autorest/autorest/date"
"github.com/Azure/go-autorest/autorest/to"
"github.com/devigned/tab"
"github.com/Azure/azure-service-bus-go/atom"
)
type (
// SubscriptionManager provides CRUD functionality for Service Bus Subscription
SubscriptionManager struct {
*entityManager
Topic *Topic
}
// FilterDescriber can transform itself into a FilterDescription
FilterDescriber interface {
ToFilterDescription() FilterDescription
}
// ActionDescriber can transform itself into a ActionDescription
ActionDescriber interface {
ToActionDescription() ActionDescription
}
// RuleDescription is the content type for Subscription Rule management requests
RuleDescription struct {
XMLName xml.Name `xml:"RuleDescription"`
BaseEntityDescription
CreatedAt *date.Time `xml:"CreatedAt,omitempty"`
Filter FilterDescription `xml:"Filter"`
Action *ActionDescription `xml:"Action,omitempty"`
}
// DefaultRuleDescription is the content type for Subscription Rule management requests
DefaultRuleDescription struct {
XMLName xml.Name `xml:"DefaultRuleDescription"`
Filter FilterDescription `xml:"Filter"`
Name *string `xml:"Name,omitempty"`
}
// FilterDescription describes a filter which can be applied to a subscription to filter messages from the topic.
//
// Subscribers can define which messages they want to receive from a topic. These messages are specified in the
// form of one or more named subscription rules. Each rule consists of a condition that selects particular messages
// and an action that annotates the selected message. For each matching rule condition, the subscription produces a
// copy of the message, which may be differently annotated for each matching rule.
//
// Each newly created topic subscription has an initial default subscription rule. If you don't explicitly specify a
// filter condition for the rule, the applied filter is the true filter that enables all messages to be selected
// into the subscription. The default rule has no associated annotation action.
FilterDescription struct {
XMLName xml.Name `xml:"Filter"`
CorrelationFilter
Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"`
SQLExpression *string `xml:"SqlExpression,omitempty"`
CompatibilityLevel int `xml:"CompatibilityLevel,omitempty"`
}
// ActionDescription describes an action upon a message that matches a filter
//
// With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or
// replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL
// UPDATE statement syntax. The action is performed on the message after it has been matched and before the message
// is selected into the subscription. The changes to the message properties are private to the message copied into
// the subscription.
ActionDescription struct {
Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"`
SQLExpression string `xml:"SqlExpression"`
RequiresPreprocessing bool `xml:"RequiresPreprocessing"`
CompatibilityLevel int `xml:"CompatibilityLevel,omitempty"`
}
// RuleEntity is the Azure Service Bus description of a Subscription Rule for management activities
RuleEntity struct {
*RuleDescription
*Entity
}
// ruleContent is a specialized Subscription body for an Atom entry
ruleContent struct {
XMLName xml.Name `xml:"content"`
Type string `xml:"type,attr"`
RuleDescription RuleDescription `xml:"RuleDescription"`
}
ruleEntry struct {
*atom.Entry
Content *ruleContent `xml:"content"`
}
ruleFeed struct {
*atom.Feed
Entries []ruleEntry `xml:"entry"`
}
// SubscriptionDescription is the content type for Subscription management requests
SubscriptionDescription struct {
XMLName xml.Name `xml:"SubscriptionDescription"`
BaseEntityDescription
LockDuration *string `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
RequiresSession *bool `xml:"RequiresSession,omitempty"`
DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
DefaultRuleDescription *DefaultRuleDescription `xml:"DefaultRuleDescription,omitempty"`
DeadLetteringOnMessageExpiration *bool `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
DeadLetteringOnFilterEvaluationExceptions *bool `xml:"DeadLetteringOnFilterEvaluationExceptions,omitempty"`
MessageCount *int64 `xml:"MessageCount,omitempty"` // MessageCount - The number of messages in the queue.
MaxDeliveryCount *int32 `xml:"MaxDeliveryCount,omitempty"` // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
Status *EntityStatus `xml:"Status,omitempty"`
CreatedAt *date.Time `xml:"CreatedAt,omitempty"`
UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"`
AccessedAt *date.Time `xml:"AccessedAt,omitempty"`
AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"`
ForwardTo *string `xml:"ForwardTo,omitempty"` // ForwardTo - absolute URI of the entity to forward messages
ForwardDeadLetteredMessagesTo *string `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages
CountDetails *CountDetails `xml:"CountDetails,omitempty"`
}
// SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities
SubscriptionEntity struct {
*SubscriptionDescription
*Entity
}
// subscriptionFeed is a specialized feed containing Topic Subscriptions
subscriptionFeed struct {
*atom.Feed
Entries []subscriptionEntry `xml:"entry"`
}
// subscriptionEntryContent is a specialized Topic feed Subscription
subscriptionEntry struct {
*atom.Entry
Content *subscriptionContent `xml:"content"`
}
// subscriptionContent is a specialized Subscription body for an Atom entry
subscriptionContent struct {
XMLName xml.Name `xml:"content"`
Type string `xml:"type,attr"`
SubscriptionDescription SubscriptionDescription `xml:"SubscriptionDescription"`
}
// SubscriptionManagementOption represents named options for assisting Subscription creation
SubscriptionManagementOption func(*SubscriptionDescription) error
)
// NewSubscriptionManager creates a new SubscriptionManager for a Service Bus Topic
func (t *Topic) NewSubscriptionManager() *SubscriptionManager {
return &SubscriptionManager{
entityManager: newEntityManager(t.namespace.getHTTPSHostURI(), t.namespace.TokenProvider),
Topic: t,
}
}
// NewSubscriptionManager creates a new SubscriptionManger for a Service Bus Namespace
func (ns *Namespace) NewSubscriptionManager(topicName string) (*SubscriptionManager, error) {
t, err := ns.NewTopic(topicName)
if err != nil {
return nil, err
}
return &SubscriptionManager{
entityManager: newEntityManager(t.namespace.getHTTPSHostURI(), t.namespace.TokenProvider),
Topic: t,
}, nil
}
// Delete deletes a Service Bus Topic entity by name
func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.Delete")
defer span.End()
res, err := sm.entityManager.Delete(ctx, sm.getResourceURI(name))
defer closeRes(ctx, res)
return err
}
// Put creates or updates a Service Bus Topic
func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionManagementOption) (*SubscriptionEntity, error) {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.Put")
defer span.End()
sd := new(SubscriptionDescription)
for _, opt := range opts {
if err := opt(sd); err != nil {
return nil, err
}
}
sd.ServiceBusSchema = to.StringPtr(serviceBusSchema)
qe := &subscriptionEntry{
Entry: &atom.Entry{
AtomSchema: atomSchema,
},
Content: &subscriptionContent{
Type: applicationXML,
SubscriptionDescription: *sd,
},
}
var mw []MiddlewareFunc
if sd.ForwardTo != nil {
mw = append(mw, addSupplementalAuthorization(*sd.ForwardTo, sm.TokenProvider()))
}
if sd.ForwardDeadLetteredMessagesTo != nil {
mw = append(mw, addDeadLetterSupplementalAuthorization(*sd.ForwardDeadLetteredMessagesTo, sm.TokenProvider()))
}
reqBytes, err := xml.Marshal(qe)
if err != nil {
return nil, err
}
str := string(reqBytes)
str = strings.Replace(str, `xmlns:XMLSchema-instance="`+schemaInstance+`" XMLSchema-instance:type`, `xmlns:i="`+schemaInstance+`" i:type`, -1)
reqBytes = xmlDoc([]byte(str))
res, err := sm.entityManager.Put(ctx, sm.getResourceURI(name), reqBytes, mw...)
defer closeRes(ctx, res)
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var entry subscriptionEntry
err = xml.Unmarshal(b, &entry)
if err != nil {
return nil, formatManagementError(b)
}
return subscriptionEntryToEntity(&entry), nil
}
// List fetches all of the Topics for a Service Bus Namespace
func (sm *SubscriptionManager) List(ctx context.Context) ([]*SubscriptionEntity, error) {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.List")
defer span.End()
res, err := sm.entityManager.Get(ctx, "/"+sm.Topic.Name+"/subscriptions")
defer closeRes(ctx, res)
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var feed subscriptionFeed
err = xml.Unmarshal(b, &feed)
if err != nil {
return nil, formatManagementError(b)
}
subs := make([]*SubscriptionEntity, len(feed.Entries))
for idx, entry := range feed.Entries {
subs[idx] = subscriptionEntryToEntity(&entry)
}
return subs, nil
}
// Get fetches a Service Bus Topic entity by name
func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntity, error) {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.Get")
defer span.End()
res, err := sm.entityManager.Get(ctx, sm.getResourceURI(name))
defer closeRes(ctx, res)
if err != nil {
return nil, err
}
if res.StatusCode == http.StatusNotFound {
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var entry subscriptionEntry
err = xml.Unmarshal(b, &entry)
if err != nil {
if isEmptyFeed(b) {
// seems the only way to catch 404 is if the feed is empty. If no subscriptions exist, the GET returns 200
// and an empty feed.
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
return nil, formatManagementError(b)
}
return subscriptionEntryToEntity(&entry), nil
}
// ListRules returns the slice of subscription filter rules
//
// By default when the subscription is created, there exists a single "true" filter which matches all messages.
func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName string) ([]*RuleEntity, error) {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.ListRules")
defer span.End()
res, err := sm.entityManager.Get(ctx, sm.getRulesResourceURI(subscriptionName))
defer closeRes(ctx, res)
if err != nil {
return nil, err
}
if res.StatusCode == http.StatusNotFound {
return nil, ErrNotFound{EntityPath: res.Request.URL.Path}
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var feed ruleFeed
err = xml.Unmarshal(b, &feed)
if err != nil {
return nil, formatManagementError(b)
}
rules := make([]*RuleEntity, len(feed.Entries))
for idx, entry := range feed.Entries {
rules[idx] = ruleEntryToEntity(&entry)
}
return rules, nil
}
// PutRuleWithAction creates a new Subscription rule to filter messages from the topic and then perform an action
func (sm *SubscriptionManager) PutRuleWithAction(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber, action ActionDescriber) (*RuleEntity, error) {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.PutRuleWithAction")
defer span.End()
ad := action.ToActionDescription()
rd := &RuleDescription{
BaseEntityDescription: BaseEntityDescription{
ServiceBusSchema: to.StringPtr(serviceBusSchema),
InstanceMetadataSchema: to.StringPtr(schemaInstance),
},
Filter: filter.ToFilterDescription(),
Action: &ad,
}
return sm.putRule(ctx, subscriptionName, ruleName, rd)
}
// PutRule creates a new Subscription rule to filter messages from the topic
func (sm *SubscriptionManager) PutRule(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber) (*RuleEntity, error) {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.PutRule")
defer span.End()
rd := &RuleDescription{
BaseEntityDescription: BaseEntityDescription{
ServiceBusSchema: to.StringPtr(serviceBusSchema),
InstanceMetadataSchema: to.StringPtr(schemaInstance),
},
Filter: filter.ToFilterDescription(),
}
return sm.putRule(ctx, subscriptionName, ruleName, rd)
}
func (sm *SubscriptionManager) putRule(ctx context.Context, subscriptionName, ruleName string, rd *RuleDescription) (*RuleEntity, error) {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.putRule")
defer span.End()
re := &ruleEntry{
Entry: &atom.Entry{
AtomSchema: atomSchema,
},
Content: &ruleContent{
Type: applicationXML,
RuleDescription: *rd,
},
}
reqBytes, err := xml.Marshal(re)
if err != nil {
return nil, err
}
// TODO: fix the unmarshal / marshal of xml with this attribute or ask the service to fix it. This is sad, but works.
str := string(reqBytes)
str = strings.Replace(str, `xmlns:XMLSchema-instance="`+schemaInstance+`" XMLSchema-instance:type`, "i:type", -1)
reqBytes = xmlDoc([]byte(str))
res, err := sm.entityManager.Put(ctx, sm.getRuleResourceURI(subscriptionName, ruleName), reqBytes)
defer closeRes(ctx, res)
if err != nil {
return nil, err
}
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
var entry ruleEntry
err = xml.Unmarshal(b, &entry)
if err != nil {
return nil, formatManagementError(b)
}
return ruleEntryToEntity(&entry), nil
}
// DeleteRule will delete a rule on the subscription
func (sm *SubscriptionManager) DeleteRule(ctx context.Context, subscriptionName, ruleName string) error {
ctx, span := sm.startSpanFromContext(ctx, "sb.SubscriptionManager.DeleteRule")
defer span.End()
res, err := sm.entityManager.Delete(ctx, sm.getRuleResourceURI(subscriptionName, ruleName))
defer closeRes(ctx, res)
return err
}
func ruleEntryToEntity(entry *ruleEntry) *RuleEntity {
return &RuleEntity{
RuleDescription: &entry.Content.RuleDescription,
Entity: &Entity{
Name: entry.Title,
ID: entry.ID,
},
}
}
func subscriptionEntryToEntity(entry *subscriptionEntry) *SubscriptionEntity {
return &SubscriptionEntity{
SubscriptionDescription: &entry.Content.SubscriptionDescription,
Entity: &Entity{
Name: entry.Title,
ID: entry.ID,
},
}
}
func (sm *SubscriptionManager) getResourceURI(name string) string {
return "/" + sm.Topic.Name + "/subscriptions/" + name
}
func (sm *SubscriptionManager) getRulesResourceURI(subscriptionName string) string {
return sm.getResourceURI(subscriptionName) + "/rules"
}
func (sm *SubscriptionManager) getRuleResourceURI(subscriptionName, ruleName string) string {
return sm.getResourceURI(subscriptionName) + "/rules/" + ruleName
}
// SubscriptionWithBatchedOperations configures the subscription to batch server-side operations.
func SubscriptionWithBatchedOperations() SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
s.EnableBatchedOperations = ptrBool(true)
return nil
}
}
// SubscriptionWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to
// the specified target entity.
//
// The ability to forward dead letter messages to a target requires the connection have management authorization. If
// the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized
// error will be returned on the PUT.
func SubscriptionWithForwardDeadLetteredMessagesTo(target Targetable) SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
uri := target.TargetURI()
s.ForwardDeadLetteredMessagesTo = &uri
return nil
}
}
// SubscriptionWithAutoForward configures the queue to automatically forward messages to the specified entity path
//
// The ability to AutoForward to a target requires the connection have management authorization. If the connection
// string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be
// returned on the PUT.
func SubscriptionWithAutoForward(target Targetable) SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
uri := target.TargetURI()
s.ForwardTo = &uri
return nil
}
}
// SubscriptionWithLockDuration configures the subscription to have a duration of a peek-lock; that is, the amount of
// time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default
// value is 1 minute.
func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
if window == nil {
duration := time.Duration(1 * time.Minute)
window = &duration
}
s.LockDuration = ptrString(durationTo8601Seconds(*window))
return nil
}
}
// SubscriptionWithRequiredSessions will ensure the subscription requires senders and receivers to have sessionIDs
func SubscriptionWithRequiredSessions() SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
s.RequiresSession = ptrBool(true)
return nil
}
}
// SubscriptionWithDeadLetteringOnMessageExpiration will ensure the Subscription sends expired messages to the dead
// letter queue
func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
s.DeadLetteringOnMessageExpiration = ptrBool(true)
return nil
}
}
// SubscriptionWithAutoDeleteOnIdle configures the subscription to automatically delete after the specified idle
// interval. The minimum duration is 5 minutes.
func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
if window != nil {
if window.Minutes() < 5 {
return errors.New("window must be greater than 5 minutes")
}
s.AutoDeleteOnIdle = ptrString(durationTo8601Seconds(*window))
}
return nil
}
}
// SubscriptionWithMessageTimeToLive configures the subscription to set a time to live on messages. This is the duration
// after which the message expires, starting from when the message is sent to Service Bus. This is the default value
// used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.
func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
if window == nil {
duration := time.Duration(14 * 24 * time.Hour)
window = &duration
}
s.DefaultMessageTimeToLive = ptrString(durationTo8601Seconds(*window))
return nil
}
}
// SubscriptionWithDefaultRuleDescription configures the subscription to set a
// default rule
func SubscriptionWithDefaultRuleDescription(filter FilterDescriber, name string) SubscriptionManagementOption {
return func(s *SubscriptionDescription) error {
rule := &DefaultRuleDescription{
Filter: filter.ToFilterDescription(),
Name: &name,
}
s.DefaultRuleDescription = rule
return nil
}
}
func closeRes(ctx context.Context, res *http.Response) {
if res == nil {
return
}
if err := res.Body.Close(); err != nil {
tab.For(ctx).Error(err)
}
}