From 86abb49ab2d2d299afb453e514a30b4a183c0854 Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Fri, 2 Feb 2024 10:29:31 -0800 Subject: [PATCH] Enable configurable concurrency for pulsar (#3330) Signed-off-by: yaron2 --- pubsub/pulsar/metadata.go | 1 + pubsub/pulsar/metadata.yaml | 8 +++++++- pubsub/pulsar/pulsar.go | 5 ++++- pubsub/pulsar/pulsar_test.go | 2 ++ 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pubsub/pulsar/metadata.go b/pubsub/pulsar/metadata.go index 66fce89875..3533c6de37 100644 --- a/pubsub/pulsar/metadata.go +++ b/pubsub/pulsar/metadata.go @@ -35,6 +35,7 @@ type pulsarMetadata struct { PublicKey string `mapstructure:"publicKey"` PrivateKey string `mapstructure:"privateKey"` Keys string `mapstructure:"keys"` + MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"` Token string `mapstructure:"token"` oauth2.ClientCredentialsMetadata `mapstructure:",squash"` diff --git a/pubsub/pulsar/metadata.yaml b/pubsub/pulsar/metadata.yaml index 62b80714d3..e81d6f75f8 100644 --- a/pubsub/pulsar/metadata.yaml +++ b/pubsub/pulsar/metadata.yaml @@ -170,4 +170,10 @@ metadata: {"name": "ID","type": "int"}, {"name": "Name","type": "string"} ] - } \ No newline at end of file + } + - name: maxConcurrentHandlers + type: number + description: | + Sets the maximum number of concurrent messages sent to the application. Default is 100. + default: '"100"' + example: '"100"' \ No newline at end of file diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index d0db224500..7a77cea601 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -78,6 +78,8 @@ const ( defaultMaxBatchSize = 128 * 1024 // defaultRedeliveryDelay init default for redelivery delay. defaultRedeliveryDelay = 30 * time.Second + // defaultConcurrency controls the number of concurrent messages sent to the app. + defaultConcurrency = 100 subscribeTypeKey = "subscribeType" @@ -122,6 +124,7 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) { BatchingMaxMessages: defaultMaxMessages, BatchingMaxSize: defaultMaxBatchSize, RedeliveryDelay: defaultRedeliveryDelay, + MaxConcurrentHandlers: defaultConcurrency, } if err := kitmd.DecodeMetadata(meta.Properties, &m); err != nil { @@ -392,7 +395,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han return errors.New("component is closed") } - channel := make(chan pulsar.ConsumerMessage, 100) + channel := make(chan pulsar.ConsumerMessage, p.metadata.MaxConcurrentHandlers) topic := p.formatTopic(req.Topic) diff --git a/pubsub/pulsar/pulsar_test.go b/pubsub/pulsar/pulsar_test.go index 9dada153a0..82b8157cd1 100644 --- a/pubsub/pulsar/pulsar_test.go +++ b/pubsub/pulsar/pulsar_test.go @@ -33,6 +33,7 @@ func TestParsePulsarMetadata(t *testing.T) { "batchingMaxPublishDelay": "5s", "batchingMaxSize": "100", "batchingMaxMessages": "200", + "maxConcurrentHandlers": "333", } meta, err := parsePulsarMetadata(m) @@ -45,6 +46,7 @@ func TestParsePulsarMetadata(t *testing.T) { assert.Equal(t, 5*time.Second, meta.BatchingMaxPublishDelay) assert.Equal(t, uint(100), meta.BatchingMaxSize) assert.Equal(t, uint(200), meta.BatchingMaxMessages) + assert.Equal(t, uint(333), meta.MaxConcurrentHandlers) assert.Empty(t, meta.internalTopicSchemas) }