forked from elireisman/sarama-easy
/
config.go
175 lines (145 loc) · 5.58 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package kafka
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"time"
"github.com/Shopify/sarama"
from_env "github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
)
// simple Kafka config abstraction; can be populated from env vars
// via FromEnv() or fields can applied to CLI flags by the caller.
type Config struct {
Brokers string `envconfig:"KAFKA_BROKERS"`
Version string `envconfig:"KAFKA_VERSION"`
Verbose bool `envconfig:"KAFKA_VERBOSE"`
ClientID string `envconfig:"KAFKA_CLIENT_ID"`
Topics string `envconfig:"KAFKA_TOPICS"`
TLSEnabled bool `envconfig:"KAFKA_TLS_ENABLED"`
TLSKey string `envconfig:"KAFKA_TLS_KEY"`
TLSCert string `envconfig:"KAFKA_TLS_CERT"`
CACerts string `envconfig:"KAFKA_CA_CERTS"`
// Consumer specific parameters
Group string `envconfig:"KAFKA_GROUP"`
RebalanceStrategy string `envconfig:"KAFKA_REBALANCE_STRATEGY"`
RebalanceTimeout time.Duration `envconfig:"KAFKA_REBALANCE_TIMEOUT"`
InitOffsets string `envconfig:"KAFKA_INIT_OFFSETS"`
CommitInterval time.Duration `envconfig:"KAFKA_COMMIT_INTERVAL"`
// Producer specific parameters
FlushInterval time.Duration `envconfig:"KAFKA_FLUSH_INTERVAL"`
// Schema Registry server
SchemaRegistryServers string `envconfig:"KAFKA_SCHEMA_REGISTRY_SERVERS"`
IsolationLevel string `envconfig:"KAFKA_ISOLATION_LEVEL"`
}
// returns a new kafka.Config with reasonable defaults for some values
func NewKafkaConfig() Config {
return Config{
Brokers: "localhost:9092",
Version: "1.1.0",
Group: "default-group",
ClientID: "sarama-easy",
RebalanceStrategy: "roundrobin",
RebalanceTimeout: 1 * time.Minute,
InitOffsets: "latest",
CommitInterval: 10 * time.Second,
FlushInterval: 1 * time.Second,
}
}
// hydrate kafka.Config using environment variables
func FromEnv() (Config, error) {
var conf Config
err := from_env.Process("", &conf)
return conf, err
}
const errorQueueSize = 32
// apply env config properties to a Sarama consumer config
func configureConsumer(envConf Config) (*sarama.Config, error) {
saramaConf := sarama.NewConfig()
// Kafka broker version is mandatory for API compatability
version, err := sarama.ParseKafkaVersion(envConf.Version)
if err != nil {
return nil, errors.Wrapf(err, "error parsing Kafka version: %v", envConf.Version)
}
saramaConf.Version = version
saramaConf.ClientID = envConf.ClientID
saramaConf.Consumer.Return.Errors = true
saramaConf.Consumer.Offsets.CommitInterval = envConf.CommitInterval
saramaConf.Consumer.Group.Rebalance.Timeout = envConf.RebalanceTimeout
saramaConf.Consumer.Group.Rebalance.Retry.Max = 6
saramaConf.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
if err := configureTLS(envConf, saramaConf); err != nil {
return nil, err
}
// configure group rebalance strategy
switch envConf.RebalanceStrategy {
case "roundrobin":
saramaConf.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
saramaConf.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
return nil, errors.Errorf("unrecognized consumer group partition strategy: %s", envConf.RebalanceStrategy)
}
// configure group rebalance strategy
switch envConf.IsolationLevel {
case "ReadUncommitted":
saramaConf.Consumer.IsolationLevel = sarama.ReadUncommitted
case "ReadCommitted":
saramaConf.Consumer.IsolationLevel = sarama.ReadCommitted
default:
saramaConf.Consumer.IsolationLevel = sarama.ReadUncommitted
}
// conf init offsets default: only honored if brokers on Kafka side have no pre-stored offsets for group
switch envConf.InitOffsets {
case "earliest":
saramaConf.Consumer.Offsets.Initial = sarama.OffsetOldest
case "latest":
saramaConf.Consumer.Offsets.Initial = sarama.OffsetNewest
default:
return nil, errors.Errorf("failed to parse Kafka initial offset from service saramaConf: %s", envConf.InitOffsets)
}
return saramaConf, nil
}
// apply env config properties into a Sarama producer config
func configureProducer(envConf Config) (*sarama.Config, error) {
saramaConf := sarama.NewConfig()
version, err := sarama.ParseKafkaVersion(envConf.Version)
if err != nil {
return nil, errors.Wrapf(err, "error parsing Kafka version: %v", envConf.Version)
}
if err := configureTLS(envConf, saramaConf); err != nil {
return nil, err
}
// Produce side configs (TODO: tune and customize more settings if needed)
saramaConf.Version = version
saramaConf.ClientID = envConf.ClientID
saramaConf.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
saramaConf.Producer.Compression = sarama.CompressionSnappy // Compress messages
saramaConf.Producer.Flush.Frequency = envConf.FlushInterval
saramaConf.Producer.Return.Successes = false
saramaConf.Producer.Return.Errors = true
return saramaConf, nil
}
// side effect TLS setup into Sarama config if env config specifies to do so
func configureTLS(envConf Config, saramaConf *sarama.Config) error {
// configure TLS
if envConf.TLSEnabled {
cert, err := tls.LoadX509KeyPair(envConf.TLSCert, envConf.TLSKey)
if err != nil {
return errors.Wrapf(err, "failed to load TLS cert(%s) and key(%s)", envConf.TLSCert, envConf.TLSKey)
}
ca, err := ioutil.ReadFile(envConf.CACerts)
if err != nil {
return errors.Wrapf(err, "failed to load CA cert bundle at: %s", envConf.CACerts)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(ca)
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
}
saramaConf.Net.TLS.Enable = true
saramaConf.Net.TLS.Config = tlsCfg
}
return nil
}