/
config_consumer.go
148 lines (137 loc) · 4.5 KB
/
config_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package kafka
import (
"errors"
"fmt"
"time"
"github.com/Shopify/sarama"
)
// Consumer config constants
const (
OffsetNewest = sarama.OffsetNewest
OffsetOldest = sarama.OffsetOldest
)
var (
defaultMessageConsumeTimeout = 10 * time.Second
defaultNumWorkers = 1
defaultBatchSize = 1
defaultBatchWaitTime = 200 * time.Millisecond
defaultMinRetryPeriod = 200 * time.Millisecond
defaultMaxRetryPeriod = 32 * time.Second
defaultConsumerMinBrokersHealthy = 1
)
// ConsumerGroupConfig exposes the configurable parameters for a consumer group
// to overwrite default config values and any other defult config values set by dp-kafka.
// Any value that is not provided will use the default Sarama config value, or the default dp-kafka value.
// The only 3 compulsory values are:
// - Topic
// - GroupName
// - BrokerAddrs
type ConsumerGroupConfig struct {
// Sarama config overrides
KafkaVersion *string
KeepAlive *time.Duration
RetryBackoff *time.Duration
RetryBackoffFunc *func(retries int) time.Duration
Offset *int64
SecurityConfig *SecurityConfig
MessageConsumeTimeout *time.Duration
// dp-kafka specific config overrides
NumWorkers *int
BatchSize *int
BatchWaitTime *time.Duration
MinRetryPeriod *time.Duration
MaxRetryPeriod *time.Duration
MinBrokersHealthy *int
Topic string
GroupName string
BrokerAddrs []string
}
// Get creates a default sarama config for a consumer-group and overwrites any values provided in cgConfig.
// If any required value is not provided or any override is invalid, an error will be returned
func (c *ConsumerGroupConfig) Get() (*sarama.Config, error) {
// Get default Sarama config and apply overrides
cfg := sarama.NewConfig()
cfg.Consumer.MaxWaitTime = 50 * time.Millisecond
cfg.Consumer.Offsets.Initial = OffsetOldest
cfg.Consumer.Return.Errors = true
cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
cfg.Consumer.Group.Session.Timeout = defaultMessageConsumeTimeout
if c.MessageConsumeTimeout != nil {
cfg.Consumer.Group.Session.Timeout = *c.MessageConsumeTimeout
}
if c.KafkaVersion != nil {
var err error
if cfg.Version, err = sarama.ParseKafkaVersion(*c.KafkaVersion); err != nil {
return nil, fmt.Errorf("error parsing kafka version: %w", err)
}
}
if c.KeepAlive != nil {
cfg.Net.KeepAlive = *c.KeepAlive
}
if c.RetryBackoff != nil {
cfg.Consumer.Retry.Backoff = *c.RetryBackoff
}
if c.RetryBackoffFunc != nil {
cfg.Consumer.Retry.BackoffFunc = *c.RetryBackoffFunc
}
if c.Offset != nil {
if *c.Offset != OffsetNewest && *c.Offset != OffsetOldest {
return nil, errors.New("offset value incorrect")
}
cfg.Consumer.Offsets.Initial = *c.Offset
}
if err := addAnyTLS(c.SecurityConfig, cfg); err != nil {
return nil, fmt.Errorf("error adding tls: %w", err)
}
// Override any other optional value
if c.NumWorkers == nil {
c.NumWorkers = &defaultNumWorkers
}
if c.BatchSize == nil {
c.BatchSize = &defaultBatchSize
}
if c.BatchWaitTime == nil {
c.BatchWaitTime = &defaultBatchWaitTime
}
if c.MinRetryPeriod == nil {
c.MinRetryPeriod = &defaultMinRetryPeriod
}
if c.MaxRetryPeriod == nil {
c.MaxRetryPeriod = &defaultMaxRetryPeriod
}
if c.MinBrokersHealthy == nil {
c.MinBrokersHealthy = &defaultConsumerMinBrokersHealthy
}
if err := c.Validate(); err != nil {
return nil, fmt.Errorf("validation error: %w", err)
}
return cfg, nil
}
// Validate that compulsory values are provided in config
func (c *ConsumerGroupConfig) Validate() error {
if c.Topic == "" {
return errors.New("topic is compulsory but was not provided in config")
}
if c.GroupName == "" {
return errors.New("groupName is compulsory but was not provided in config")
}
if len(c.BrokerAddrs) == 0 {
return errors.New("brokerAddrs is compulsory but was not provided in config")
}
if *c.MinRetryPeriod <= 0 {
return errors.New("minRetryPeriod must be greater than zero")
}
if *c.MaxRetryPeriod <= 0 {
return errors.New("maxRetryPeriod must be greater than zero")
}
if *c.MinRetryPeriod > *c.MaxRetryPeriod {
return errors.New("minRetryPeriod must be smaller or equal to maxRetryPeriod")
}
if *c.MinBrokersHealthy <= 0 {
return errors.New("minBrokersHealthy must be greater than zero")
}
if *c.MinBrokersHealthy > len(c.BrokerAddrs) {
return errors.New("minBrokersHealthy must be smaller or equal to the total number of brokers provided in brokerAddrs")
}
return nil
}