diff --git a/common/component/azure/servicebus/metadata.go b/common/component/azure/servicebus/metadata.go index fb4ce8a20b..49088fdd65 100644 --- a/common/component/azure/servicebus/metadata.go +++ b/common/component/azure/servicebus/metadata.go @@ -48,6 +48,7 @@ type Metadata struct { PublishMaxRetries int `mapstructure:"publishMaxRetries"` PublishInitialRetryIntervalInMs int `mapstructure:"publishInitialRetryIntervalInMs"` NamespaceName string `mapstructure:"namespaceName"` // Only for Azure AD + EnableInOrderMessageDelivery bool `mapstructure:"enableInOrderMessageDelivery"` /** For bindings only **/ QueueName string `mapstructure:"queueName" mdonly:"bindings"` // Only queues diff --git a/common/component/azure/servicebus/metadata_test.go b/common/component/azure/servicebus/metadata_test.go index 01900da5c1..724979a28c 100644 --- a/common/component/azure/servicebus/metadata_test.go +++ b/common/component/azure/servicebus/metadata_test.go @@ -542,4 +542,28 @@ func TestParseServiceBusMetadata(t *testing.T) { parseErr3 := addMetadataToMessage(&msg3, metadata3) require.Error(t, parseErr3) }) + + t.Run("test enableInOrderMessageDelivery", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeProperties["enableInOrderMessageDelivery"] = "true" + + // act. + m, err := ParseMetadata(fakeProperties, nil, MetadataModeTopics) + + // assert. + require.NoError(t, err) + assert.True(t, m.EnableInOrderMessageDelivery) + }) + + t.Run("test enableInOrderMessageDelivery default", func(t *testing.T) { + fakeProperties := getFakeProperties() + delete(fakeProperties, "enableInOrderMessageDelivery") + + // act. + m, err := ParseMetadata(fakeProperties, nil, MetadataModeTopics) + + // assert. + require.NoError(t, err) + assert.False(t, m.EnableInOrderMessageDelivery) + }) } diff --git a/common/component/azure/servicebus/subscription.go b/common/component/azure/servicebus/subscription.go index ecdf7c00bb..3faf1c10ec 100644 --- a/common/component/azure/servicebus/subscription.go +++ b/common/component/azure/servicebus/subscription.go @@ -53,30 +53,32 @@ type HandlerFn func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ( // Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue. type Subscription struct { - entity string - mu sync.RWMutex - activeMessages map[int64]*azservicebus.ReceivedMessage - activeOperationsChan chan struct{} - requireSessions bool - sessionIdleTimeout time.Duration - timeout time.Duration - lockRenewalInterval time.Duration - maxBulkSubCount int - retriableErrLimiter ratelimit.Limiter - handleChan chan struct{} - logger logger.Logger + entity string + mu sync.RWMutex + activeMessages map[int64]*azservicebus.ReceivedMessage + activeOperationsChan chan struct{} + requireSessions bool + sessionIdleTimeout time.Duration + timeout time.Duration + lockRenewalInterval time.Duration + maxBulkSubCount int + retriableErrLimiter ratelimit.Limiter + handleChan chan struct{} + enableInOrderMessageDelivery bool + logger logger.Logger } type SubscriptionOptions struct { - MaxActiveMessages int - TimeoutInSec int - MaxBulkSubCount *int - MaxRetriableEPS int - MaxConcurrentHandlers int - Entity string - LockRenewalInSec int - RequireSessions bool - SessionIdleTimeout time.Duration + MaxActiveMessages int + TimeoutInSec int + MaxBulkSubCount *int + MaxRetriableEPS int + MaxConcurrentHandlers int + Entity string + LockRenewalInSec int + RequireSessions bool + SessionIdleTimeout time.Duration + EnableInOrderMessageDelivery bool } // NewBulkSubscription returns a new Subscription object. @@ -98,14 +100,15 @@ func NewSubscription(opts SubscriptionOptions, logger logger.Logger) *Subscripti } s := &Subscription{ - entity: opts.Entity, - activeMessages: make(map[int64]*azservicebus.ReceivedMessage), - timeout: time.Duration(opts.TimeoutInSec) * time.Second, - lockRenewalInterval: time.Duration(opts.LockRenewalInSec) * time.Second, - sessionIdleTimeout: opts.SessionIdleTimeout, - maxBulkSubCount: *opts.MaxBulkSubCount, - requireSessions: opts.RequireSessions, - logger: logger, + entity: opts.Entity, + activeMessages: make(map[int64]*azservicebus.ReceivedMessage), + timeout: time.Duration(opts.TimeoutInSec) * time.Second, + lockRenewalInterval: time.Duration(opts.LockRenewalInSec) * time.Second, + sessionIdleTimeout: opts.SessionIdleTimeout, + maxBulkSubCount: *opts.MaxBulkSubCount, + requireSessions: opts.RequireSessions, + enableInOrderMessageDelivery: opts.EnableInOrderMessageDelivery, + logger: logger, // This is a pessimistic estimate of the number of total operations that can be active at any given time. // In case of a non-bulk subscription, one operation is one message. activeOperationsChan: make(chan struct{}, opts.MaxActiveMessages/(*opts.MaxBulkSubCount)), @@ -274,7 +277,11 @@ func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler Handle } // Handle the messages in background - go s.handleAsync(ctx, msgs, handler, receiver) + if s.enableInOrderMessageDelivery { + s.handleAsync(ctx, msgs, handler, receiver) + } else { + go s.handleAsync(ctx, msgs, handler, receiver) + } } } diff --git a/pubsub/azure/servicebus/queues/metadata.yaml b/pubsub/azure/servicebus/queues/metadata.yaml index b931d95a40..606753d9c3 100644 --- a/pubsub/azure/servicebus/queues/metadata.yaml +++ b/pubsub/azure/servicebus/queues/metadata.yaml @@ -107,4 +107,11 @@ metadata: type: number example: "1000" default: "500" + - name: enableInOrderMessageDelivery + type: bool + required: false + default: "false" + example: "true" + description: | + Enable in order processing of messages within a session. When set to true, messages with the same session ID are processed sequentially. \ No newline at end of file diff --git a/pubsub/azure/servicebus/queues/servicebus.go b/pubsub/azure/servicebus/queues/servicebus.go index 342a4e227e..c7937951d0 100644 --- a/pubsub/azure/servicebus/queues/servicebus.go +++ b/pubsub/azure/servicebus/queues/servicebus.go @@ -88,14 +88,15 @@ func (a *azureServiceBus) Subscribe(ctx context.Context, req pubsub.SubscribeReq sub := impl.NewSubscription( impl.SubscriptionOptions{ - MaxActiveMessages: a.metadata.MaxActiveMessages, - TimeoutInSec: a.metadata.TimeoutInSec, - MaxBulkSubCount: nil, - MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec, - MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers, - Entity: "queue " + req.Topic, - LockRenewalInSec: a.metadata.LockRenewalInSec, - RequireSessions: false, + MaxActiveMessages: a.metadata.MaxActiveMessages, + TimeoutInSec: a.metadata.TimeoutInSec, + MaxBulkSubCount: nil, + MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec, + MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers, + Entity: "queue " + req.Topic, + LockRenewalInSec: a.metadata.LockRenewalInSec, + RequireSessions: false, + EnableInOrderMessageDelivery: a.metadata.EnableInOrderMessageDelivery, }, a.logger, ) @@ -111,14 +112,15 @@ func (a *azureServiceBus) BulkSubscribe(ctx context.Context, req pubsub.Subscrib maxBulkSubCount := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxMessagesCount, defaultMaxBulkSubCount) sub := impl.NewSubscription( impl.SubscriptionOptions{ - MaxActiveMessages: a.metadata.MaxActiveMessages, - TimeoutInSec: a.metadata.TimeoutInSec, - MaxBulkSubCount: &maxBulkSubCount, - MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec, - MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers, - Entity: "queue " + req.Topic, - LockRenewalInSec: a.metadata.LockRenewalInSec, - RequireSessions: false, + MaxActiveMessages: a.metadata.MaxActiveMessages, + TimeoutInSec: a.metadata.TimeoutInSec, + MaxBulkSubCount: &maxBulkSubCount, + MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec, + MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers, + Entity: "queue " + req.Topic, + LockRenewalInSec: a.metadata.LockRenewalInSec, + RequireSessions: false, + EnableInOrderMessageDelivery: a.metadata.EnableInOrderMessageDelivery, }, a.logger, ) diff --git a/pubsub/azure/servicebus/topics/metadata.yaml b/pubsub/azure/servicebus/topics/metadata.yaml index e54e2b9fe8..259e29995b 100644 --- a/pubsub/azure/servicebus/topics/metadata.yaml +++ b/pubsub/azure/servicebus/topics/metadata.yaml @@ -112,4 +112,11 @@ metadata: type: number example: "1000" default: "500" + - name: enableInOrderMessageDelivery + type: bool + required: false + default: "false" + example: "true" + description: | + Enable in order processing of messages within a session. When set to true, messages with the same session ID are processed sequentially. diff --git a/pubsub/azure/servicebus/topics/servicebus.go b/pubsub/azure/servicebus/topics/servicebus.go index 1f83add614..5356670977 100644 --- a/pubsub/azure/servicebus/topics/servicebus.go +++ b/pubsub/azure/servicebus/topics/servicebus.go @@ -91,15 +91,16 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub sub := impl.NewSubscription( impl.SubscriptionOptions{ - MaxActiveMessages: a.metadata.MaxActiveMessages, - TimeoutInSec: a.metadata.TimeoutInSec, - MaxBulkSubCount: nil, - MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec, - MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers, - Entity: "topic " + req.Topic, - LockRenewalInSec: a.metadata.LockRenewalInSec, - RequireSessions: requireSessions, - SessionIdleTimeout: sessionIdleTimeout, + MaxActiveMessages: a.metadata.MaxActiveMessages, + TimeoutInSec: a.metadata.TimeoutInSec, + MaxBulkSubCount: nil, + MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec, + MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers, + Entity: "topic " + req.Topic, + LockRenewalInSec: a.metadata.LockRenewalInSec, + RequireSessions: requireSessions, + SessionIdleTimeout: sessionIdleTimeout, + EnableInOrderMessageDelivery: a.metadata.EnableInOrderMessageDelivery, }, a.logger, ) @@ -123,15 +124,16 @@ func (a *azureServiceBus) BulkSubscribe(subscribeCtx context.Context, req pubsub maxBulkSubCount := commonutils.GetIntValOrDefault(req.BulkSubscribeConfig.MaxMessagesCount, defaultMaxBulkSubCount) sub := impl.NewSubscription( impl.SubscriptionOptions{ - MaxActiveMessages: a.metadata.MaxActiveMessages, - TimeoutInSec: a.metadata.TimeoutInSec, - MaxBulkSubCount: &maxBulkSubCount, - MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec, - MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers, - Entity: "topic " + req.Topic, - LockRenewalInSec: a.metadata.LockRenewalInSec, - RequireSessions: requireSessions, - SessionIdleTimeout: sessionIdleTimeout, + MaxActiveMessages: a.metadata.MaxActiveMessages, + TimeoutInSec: a.metadata.TimeoutInSec, + MaxBulkSubCount: &maxBulkSubCount, + MaxRetriableEPS: a.metadata.MaxRetriableErrorsPerSec, + MaxConcurrentHandlers: a.metadata.MaxConcurrentHandlers, + Entity: "topic " + req.Topic, + LockRenewalInSec: a.metadata.LockRenewalInSec, + RequireSessions: requireSessions, + SessionIdleTimeout: sessionIdleTimeout, + EnableInOrderMessageDelivery: a.metadata.EnableInOrderMessageDelivery, }, a.logger, )