Skip to content

Commit

Permalink
Merge pull request #21 from NoteGio/feature/configurable-flush
Browse files Browse the repository at this point in the history
ethdb/cdc: Make kafka flush frequency configurable
  • Loading branch information
AusIV committed Oct 10, 2020
2 parents 3448391 + 157b150 commit 6b05247
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions ethdb/cdc/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func ParseKafkaURL(brokerURL string) ([]string, *sarama.Config) {
config.Producer.Retry.Max = 10000000
}

if val, err := strconv.Atoi(parsedURL.Query().Get("queue.buffering.max.ms")); err == nil {
config.Producer.Flush.Frequency = time.Duration(val) * time.Millisecond
}

if val, err := strconv.Atoi(parsedURL.Query().Get("retry.backoff.ms")); err == nil {
config.Producer.Retry.Backoff = time.Duration(val) * time.Millisecond
}
Expand Down

0 comments on commit 6b05247

Please sign in to comment.