forked from lovoo/goka
/
config.go
33 lines (27 loc) · 1 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package kafka
import (
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)
// NewConfig creates a (bsm) sarama configuration with default values.
func NewConfig() *cluster.Config {
config := cluster.NewConfig()
config.Version = sarama.V0_10_1_0
// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime
// this configures the initial offset for streams. Tables are always
// consumed from OffsetOldest.
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries
// consumer group configuration
config.Group.Return.Notifications = true
return config
}