Skip to content

Commit

Permalink
Enable configurable concurrency for pulsar (#3330)
Browse files Browse the repository at this point in the history
Signed-off-by: yaron2 <schneider.yaron@live.com>
  • Loading branch information
yaron2 committed Feb 2, 2024
1 parent c693061 commit 86abb49
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 2 deletions.
1 change: 1 addition & 0 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type pulsarMetadata struct {
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"`

Token string `mapstructure:"token"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
Expand Down
8 changes: 7 additions & 1 deletion pubsub/pulsar/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,10 @@ metadata:
{"name": "ID","type": "int"},
{"name": "Name","type": "string"}
]
}
}
- name: maxConcurrentHandlers
type: number
description: |
Sets the maximum number of concurrent messages sent to the application. Default is 100.
default: '"100"'
example: '"100"'
5 changes: 4 additions & 1 deletion pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ const (
defaultMaxBatchSize = 128 * 1024
// defaultRedeliveryDelay init default for redelivery delay.
defaultRedeliveryDelay = 30 * time.Second
// defaultConcurrency controls the number of concurrent messages sent to the app.
defaultConcurrency = 100

subscribeTypeKey = "subscribeType"

Expand Down Expand Up @@ -122,6 +124,7 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
BatchingMaxMessages: defaultMaxMessages,
BatchingMaxSize: defaultMaxBatchSize,
RedeliveryDelay: defaultRedeliveryDelay,
MaxConcurrentHandlers: defaultConcurrency,
}

if err := kitmd.DecodeMetadata(meta.Properties, &m); err != nil {
Expand Down Expand Up @@ -392,7 +395,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
return errors.New("component is closed")
}

channel := make(chan pulsar.ConsumerMessage, 100)
channel := make(chan pulsar.ConsumerMessage, p.metadata.MaxConcurrentHandlers)

topic := p.formatTopic(req.Topic)

Expand Down
2 changes: 2 additions & 0 deletions pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestParsePulsarMetadata(t *testing.T) {
"batchingMaxPublishDelay": "5s",
"batchingMaxSize": "100",
"batchingMaxMessages": "200",
"maxConcurrentHandlers": "333",
}
meta, err := parsePulsarMetadata(m)

Expand All @@ -45,6 +46,7 @@ func TestParsePulsarMetadata(t *testing.T) {
assert.Equal(t, 5*time.Second, meta.BatchingMaxPublishDelay)
assert.Equal(t, uint(100), meta.BatchingMaxSize)
assert.Equal(t, uint(200), meta.BatchingMaxMessages)
assert.Equal(t, uint(333), meta.MaxConcurrentHandlers)
assert.Empty(t, meta.internalTopicSchemas)
}

Expand Down

0 comments on commit 86abb49

Please sign in to comment.