Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/component/azure/servicebus/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Metadata struct {
PublishMaxRetries int `mapstructure:"publishMaxRetries"`
PublishInitialRetryIntervalInMs int `mapstructure:"publishInitialRetryIntervalInMs"`
NamespaceName string `mapstructure:"namespaceName"` // Only for Azure AD
EnableInOrderMessageDelivery bool `mapstructure:"enableInOrderMessageDelivery"`

/** For bindings only **/
QueueName string `mapstructure:"queueName" mdonly:"bindings"` // Only queues
Expand Down
24 changes: 24 additions & 0 deletions common/component/azure/servicebus/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,4 +542,28 @@ func TestParseServiceBusMetadata(t *testing.T) {
parseErr3 := addMetadataToMessage(&msg3, metadata3)
require.Error(t, parseErr3)
})

t.Run("test enableInOrderMessageDelivery", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeProperties["enableInOrderMessageDelivery"] = "true"

// act.
m, err := ParseMetadata(fakeProperties, nil, MetadataModeTopics)

// assert.
require.NoError(t, err)
assert.True(t, m.EnableInOrderMessageDelivery)
})

t.Run("test enableInOrderMessageDelivery default", func(t *testing.T) {
fakeProperties := getFakeProperties()
delete(fakeProperties, "enableInOrderMessageDelivery")

// act.
m, err := ParseMetadata(fakeProperties, nil, MetadataModeTopics)

// assert.
require.NoError(t, err)
assert.False(t, m.EnableInOrderMessageDelivery)
})
}
67 changes: 37 additions & 30 deletions common/component/azure/servicebus/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,32 @@ type HandlerFn func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) (

// Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.
type Subscription struct {
entity string
mu sync.RWMutex
activeMessages map[int64]*azservicebus.ReceivedMessage
activeOperationsChan chan struct{}
requireSessions bool
sessionIdleTimeout time.Duration
timeout time.Duration
lockRenewalInterval time.Duration
maxBulkSubCount int
retriableErrLimiter ratelimit.Limiter
handleChan chan struct{}
logger logger.Logger
entity string
mu sync.RWMutex
activeMessages map[int64]*azservicebus.ReceivedMessage
activeOperationsChan chan struct{}
requireSessions bool
sessionIdleTimeout time.Duration
timeout time.Duration
lockRenewalInterval time.Duration
maxBulkSubCount int
retriableErrLimiter ratelimit.Limiter
handleChan chan struct{}
enableInOrderMessageDelivery bool
logger logger.Logger
}

type SubscriptionOptions struct {
MaxActiveMessages int
TimeoutInSec int
MaxBulkSubCount *int
MaxRetriableEPS int
MaxConcurrentHandlers int
Entity string
LockRenewalInSec int
RequireSessions bool
SessionIdleTimeout time.Duration
MaxActiveMessages int
TimeoutInSec int
MaxBulkSubCount *int
MaxRetriableEPS int
MaxConcurrentHandlers int
Entity string
LockRenewalInSec int
RequireSessions bool
SessionIdleTimeout time.Duration
EnableInOrderMessageDelivery bool
}

// NewBulkSubscription returns a new Subscription object.
Expand All @@ -98,14 +100,15 @@ func NewSubscription(opts SubscriptionOptions, logger logger.Logger) *Subscripti
}

s := &Subscription{
entity: opts.Entity,
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
timeout: time.Duration(opts.TimeoutInSec) * time.Second,
lockRenewalInterval: time.Duration(opts.LockRenewalInSec) * time.Second,
sessionIdleTimeout: opts.SessionIdleTimeout,
maxBulkSubCount: *opts.MaxBulkSubCount,
requireSessions: opts.RequireSessions,
logger: logger,
entity: opts.Entity,
activeMessages: make(map[int64]*azservicebus.ReceivedMessage),
timeout: time.Duration(opts.TimeoutInSec) * time.Second,
lockRenewalInterval: time.Duration(opts.LockRenewalInSec) * time.Second,
sessionIdleTimeout: opts.SessionIdleTimeout,
maxBulkSubCount: *opts.MaxBulkSubCount,
requireSessions: opts.RequireSessions,
enableInOrderMessageDelivery: opts.EnableInOrderMessageDelivery,
logger: logger,
// This is a pessimistic estimate of the number of total operations that can be active at any given time.
// In case of a non-bulk subscription, one operation is one message.
activeOperationsChan: make(chan struct{}, opts.MaxActiveMessages/(*opts.MaxBulkSubCount)),
Expand Down Expand Up @@ -274,7 +277,11 @@ func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler Handle
}

// Handle the messages in background
go s.handleAsync(ctx, msgs, handler, receiver)
if s.enableInOrderMessageDelivery {
s.handleAsync(ctx, msgs, handler, receiver)
} else {
go s.handleAsync(ctx, msgs, handler, receiver)
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions pubsub/azure/servicebus/queues/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,11 @@ metadata:
type: number
example: "1000"
default: "500"
- name: enableInOrderMessageDelivery
type: bool
required: false
default: "false"
example: "true"
description: |
Enable in order processing of messages within a session. When set to true, messages with the same session ID are processed sequentially.

34 changes: 18 additions & 16 deletions pubsub/azure/servicebus/queues/servicebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func (a *azureServiceBus) Subscribe(ctx context.Context, req pubsub.SubscribeReq

sub := impl.NewSubscription(
impl.SubscriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: nil,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "queue " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: false,
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: nil,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "queue " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: false,
EnableInOrderMessageDelivery: a.metadata.EnableInOrderMessageDelivery,
},
a.logger,
)
Expand All @@ -111,14 +112,15 @@ func (a *azureServiceBus) BulkSubscribe(ctx context.Context, req pubsub.Subscrib
maxBulkSubCount := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxMessagesCount, defaultMaxBulkSubCount)
sub := impl.NewSubscription(
impl.SubscriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: &maxBulkSubCount,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "queue " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: false,
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: &maxBulkSubCount,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "queue " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: false,
EnableInOrderMessageDelivery: a.metadata.EnableInOrderMessageDelivery,
},
a.logger,
)
Expand Down
7 changes: 7 additions & 0 deletions pubsub/azure/servicebus/topics/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ metadata:
type: number
example: "1000"
default: "500"
- name: enableInOrderMessageDelivery
type: bool
required: false
default: "false"
example: "true"
description: |
Enable in order processing of messages within a session. When set to true, messages with the same session ID are processed sequentially.

38 changes: 20 additions & 18 deletions pubsub/azure/servicebus/topics/servicebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,16 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub

sub := impl.NewSubscription(
impl.SubscriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: nil,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "topic " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: requireSessions,
SessionIdleTimeout: sessionIdleTimeout,
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: nil,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "topic " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: requireSessions,
SessionIdleTimeout: sessionIdleTimeout,
EnableInOrderMessageDelivery: a.metadata.EnableInOrderMessageDelivery,
},
a.logger,
)
Expand All @@ -123,15 +124,16 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub
maxBulkSubCount := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxMessagesCount, defaultMaxBulkSubCount)
sub := impl.NewSubscription(
impl.SubscriptionOptions{
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: &maxBulkSubCount,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "topic " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: requireSessions,
SessionIdleTimeout: sessionIdleTimeout,
MaxActiveMessages: a.metadata.MaxActiveMessages,
TimeoutInSec: a.metadata.TimeoutInSec,
MaxBulkSubCount: &maxBulkSubCount,
MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec,
MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers,
Entity: "topic " + req.Topic,
LockRenewalInSec: a.metadata.LockRenewalInSec,
RequireSessions: requireSessions,
SessionIdleTimeout: sessionIdleTimeout,
EnableInOrderMessageDelivery: a.metadata.EnableInOrderMessageDelivery,
},
a.logger,
)
Expand Down