Skip to content

Commit

Permalink
feat: Kafka eventsource supports Sarama config customization (#2161)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Sep 10, 2022
1 parent e52ad74 commit 985e748
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 404 deletions.
19 changes: 19 additions & 0 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,10 @@
"io.argoproj.eventsource.v1alpha1.KafkaEventSource": {
"description": "KafkaEventSource refers to event-source for Kafka related events",
"properties": {
"config": {
"description": "Yaml format Sarama config for Kafka connection. It follows the struct of sarama.Config. See https://github.com/Shopify/sarama/blob/main/config.go e.g.\n\nconsumer:\n fetch:\n min: 1\nnet:\n MaxOpenRequests: 5",
"type": "string"
},
"connectionBackoff": {
"$ref": "#/definitions/io.argoproj.common.Backoff",
"description": "Backoff holds parameters applied to connection."
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions common/saramaconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package common

import (
"bytes"
"fmt"

"github.com/Shopify/sarama"
"github.com/spf13/viper"
)

// GetSaramaConfigFromYAMLString parse yaml string to sarama.config.
// Note: All the time.Duration config can not be correctly decoded because it does not implement the decode function.
func GetSaramaConfigFromYAMLString(yaml string) (*sarama.Config, error) {
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewBufferString(yaml)); err != nil {
return nil, err
}
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
if err := v.Unmarshal(cfg); err != nil {
return nil, fmt.Errorf("unable to decode into struct, %w", err)
}
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("failed validating sarama config, %w", err)
}
return cfg, nil
}
43 changes: 43 additions & 0 deletions common/saramaconfig_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetSaramaConfigFromYAMLString(t *testing.T) {
t.Run("YAML Config", func(t *testing.T) {
var yamlExample = string(`
admin:
retry:
max: 105
producer:
maxMessageBytes: 800
consumer:
fetch:
min: 2
net:
MaxOpenRequests: 5
`)
conf, err := GetSaramaConfigFromYAMLString(yamlExample)
assert.NoError(t, err)
assert.Equal(t, 800, conf.Producer.MaxMessageBytes)
assert.Equal(t, 105, conf.Admin.Retry.Max)
assert.Equal(t, int32(2), conf.Consumer.Fetch.Min)
assert.Equal(t, 5, conf.Net.MaxOpenRequests)
})
t.Run("Empty config", func(t *testing.T) {
conf, err := GetSaramaConfigFromYAMLString("")
assert.NoError(t, err)
assert.Equal(t, 1000000, conf.Producer.MaxMessageBytes)
assert.Equal(t, 5, conf.Admin.Retry.Max)
assert.Equal(t, int32(1), conf.Consumer.Fetch.Min)
assert.Equal(t, 5, conf.Net.MaxOpenRequests)
})

t.Run("NON yaml config", func(t *testing.T) {
_, err := GetSaramaConfigFromYAMLString("welcome")
assert.Error(t, err)
})
}
5 changes: 4 additions & 1 deletion eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared
}

func getSaramaConfig(kafkaEventSource *v1alpha1.KafkaEventSource, log *zap.SugaredLogger) (*sarama.Config, error) {
config := sarama.NewConfig()
config, err := common.GetSaramaConfigFromYAMLString(kafkaEventSource.Config)
if err != nil {
return nil, err
}

if kafkaEventSource.Version == "" {
config.Version = sarama.V1_0_0_0
Expand Down
Loading

0 comments on commit 985e748

Please sign in to comment.