Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: allow per changefeed kafka quota config #118643

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (j *compressionCodec) UnmarshalText(b []byte) error {
// from sarama.Config. This facilitates users with limited sarama
// configurations.
type saramaConfig struct {
ClientID string `json:",omitempty"`
// These settings mirror ones in sarama config.
// We just tag them w/ JSON annotations.
// Flush describes settings specific to producer flushing.
Expand Down Expand Up @@ -250,7 +251,7 @@ func defaultSaramaConfig() *saramaConfig {
// this workaround is the one that's been running in roachtests and I'd want
// to test this one more before changing it.
config.Flush.MaxMessages = 1000

config.ClientID = "CockroachDB"
return config
}

Expand Down Expand Up @@ -837,6 +838,8 @@ func (c *saramaConfig) Apply(kafka *sarama.Config) error {
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages
kafka.ClientID = c.ClientID

if c.Version != "" {
parsedVersion, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
Expand Down Expand Up @@ -1092,7 +1095,6 @@ func buildKafkaConfig(
return nil, err
}
config := sarama.NewConfig()
config.ClientID = `CockroachDB`
config.Producer.Return.Successes = true
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
Expand Down
24 changes: 24 additions & 0 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,22 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
cfg, err = getSaramaConfig(opts)
require.NoError(t, err)
require.NoError(t, cfg.Validate())

saramaCfg := sarama.NewConfig()
opts = `{"ClientID": "clientID1"}`
cfg, _ = getSaramaConfig(opts)
err = cfg.Apply(saramaCfg)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
require.NoError(t, saramaCfg.Validate())

opts = `{"Flush": {"Messages": 1000, "Frequency": "1s"}, "ClientID": "clientID1"}`
cfg, _ = getSaramaConfig(opts)
err = cfg.Apply(saramaCfg)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
require.NoError(t, saramaCfg.Validate())
require.True(t, cfg.ClientID == "clientID1")
})
t.Run("validate returns error for bad flush configuration", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`)
Expand All @@ -644,6 +660,14 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
cfg, err = getSaramaConfig(opts)
require.NoError(t, err)
require.Error(t, cfg.Validate())

opts = `{"Version": "0.8.2.0", "ClientID": "bad_client_id*"}`
saramaCfg := sarama.NewConfig()
cfg, _ = getSaramaConfig(opts)
err = cfg.Apply(saramaCfg)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
require.Error(t, saramaCfg.Validate())
})
t.Run("apply parses valid version", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"version": "0.8.2.0"}`)
Expand Down