/
option.go
424 lines (373 loc) · 12.5 KB
/
option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
package streamconfig
import (
"fmt"
"io"
"time"
"github.com/blendle/go-streamprocessor/v3/stream"
"github.com/blendle/go-streamprocessor/v3/streamconfig/inmemconfig"
"github.com/blendle/go-streamprocessor/v3/streamconfig/kafkaconfig"
"github.com/blendle/go-streamprocessor/v3/streamconfig/pubsubconfig"
"github.com/blendle/go-streamprocessor/v3/streamconfig/standardstreamconfig"
"github.com/confluentinc/confluent-kafka-go/kafka"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
)
// An Option configures a Consumer and/or Producer.
type Option interface {
apply(*Consumer, *Producer)
}
// optionFunc wraps a func so it satisfies the Option interface.
type optionFunc func(*Consumer, *Producer)
func (f optionFunc) apply(c *Consumer, p *Producer) {
if c == nil {
defaults := ConsumerDefaults
c = &defaults
c.Inmem = inmemconfig.ConsumerDefaults
c.Kafka = kafkaconfig.ConsumerDefaults
c.Pubsub = pubsubconfig.ConsumerDefaults
c.Standardstream = standardstreamconfig.ConsumerDefaults
}
if p == nil {
defaults := ProducerDefaults
p = &defaults
p.Inmem = inmemconfig.ProducerDefaults
p.Kafka = kafkaconfig.ProducerDefaults
p.Pubsub = pubsubconfig.ProducerDefaults
p.Standardstream = standardstreamconfig.ProducerDefaults
}
f(c, p)
}
// ConsumerOptions is a convenience accessor to manually set consumer options.
func ConsumerOptions(fn func(c *Consumer)) Option {
return optionFunc(func(c *Consumer, _ *Producer) {
fn(c)
})
}
// ProducerOptions is a convenience accessor to manually set producer options.
func ProducerOptions(fn func(p *Producer)) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
fn(p)
})
}
// DisableEnvironmentConfig prevents the consumer or producer to be configured
// via environment variables, instead of the default configuration to allow
// environment variable-based configurations.
func DisableEnvironmentConfig() Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.AllowEnvironmentBasedConfiguration = false
p.AllowEnvironmentBasedConfiguration = false
})
}
// ManualErrorHandling prevents the consumer or producer to automatically
// handle stream errors. When this option is passed, the application itself
// needs to listen to, and act on the `Errors()` channel.
func ManualErrorHandling() Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.HandleErrors = false
p.HandleErrors = false
})
}
// ManualInterruptHandling prevents the consumer or producer to automatically
// handle interrupt signals. When this option is passed, the application itself
// needs to handle Unix interrupt signals to properly close the consumer or
// producer when required.
func ManualInterruptHandling() Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.HandleInterrupt = false
p.HandleInterrupt = false
})
}
// Logger sets the logger for the consumer or producer.
func Logger(l *zap.Logger) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Logger = l
p.Logger = l
})
}
// Name sets the name for the consumer or producer.
func Name(s string) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Name = s
p.Name = s
})
}
// InmemListen configures the inmem consumer to continuously listen for any new
// messages in the configured store.
//
// This option has no effect when applied to a producer.
func InmemListen() Option {
return optionFunc(func(c *Consumer, _ *Producer) {
c.Inmem.ConsumeOnce = false
})
}
// InmemStore adds a store to the inmem consumer and producer.
func InmemStore(s stream.Store) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Inmem.Store = s
p.Inmem.Store = s
})
}
// KafkaBroker adds a broker to the list of configured Kafka brokers.
func KafkaBroker(s string) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.Brokers = append(c.Kafka.Brokers, s)
p.Kafka.Brokers = append(p.Kafka.Brokers, s)
})
}
// KafkaCommitInterval sets the consumer's CommitInterval.
//
// This option has no effect when applied to a producer.
func KafkaCommitInterval(d time.Duration) Option {
return optionFunc(func(c *Consumer, _ *Producer) {
c.Kafka.CommitInterval = d
})
}
// KafkaCompressionCodec sets the compression codec for the produced messages.
//
// // This option has no effect when applied to a consumer.
func KafkaCompressionCodec(s kafkaconfig.Compression) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.CompressionCodec = s
})
}
// KafkaDebug enabled debugging for Kafka.
func KafkaDebug() Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.Debug.All = true
p.Kafka.Debug.All = true
})
}
// KafkaGroupID sets the group ID for the consumer.
//
// This option has no effect when applied to a producer.
func KafkaGroupID(s string) Option {
return optionFunc(func(c *Consumer, _ *Producer) {
c.Kafka.GroupID = s
})
}
// KafkaGroupIDRandom sets the group ID for the consumer to a random ID. This
// can be used to configure one-off consumers that should not share their state
// in a consumer group.
//
// This option has no effect when applied to a producer.
func KafkaGroupIDRandom() Option {
return optionFunc(func(c *Consumer, _ *Producer) {
c.Kafka.GroupID = fmt.Sprintf("processor-%s", uuid.Must(uuid.NewV4()))
})
}
// KafkaHandleTransientErrors passes _all_ errors to the errors channel,
// including the ones that are considered "transient", and the consumer or
// producer can resolve themselves eventually.
func KafkaHandleTransientErrors() Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.IgnoreErrors = []kafka.ErrorCode{}
p.Kafka.IgnoreErrors = []kafka.ErrorCode{}
})
}
// KafkaHeartbeatInterval sets the consumer or producer HeartbeatInterval.
func KafkaHeartbeatInterval(d time.Duration) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.HeartbeatInterval = d
p.Kafka.HeartbeatInterval = d
})
}
// KafkaID sets the consumer or producer ID.
func KafkaID(s string) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.ID = s
p.Kafka.ID = s
})
}
// KafkaMaxDeliveryRetries sets the MaxDeliveryRetries.
//
// This option has no effect when applied to a consumer.
func KafkaMaxDeliveryRetries(i int) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.MaxDeliveryRetries = i
})
}
// KafkaMaxInFlightRequests sets the maximum allowed in-flight requests for both
// consumers and producers.
func KafkaMaxInFlightRequests(i int) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.MaxInFlightRequests = i
p.Kafka.MaxInFlightRequests = i
})
}
// KafkaMaxPollInterval sets the maximum allowed poll timeout.
//
// This option has no effect when applied to a producer.
func KafkaMaxPollInterval(d time.Duration) Option {
return optionFunc(func(c *Consumer, _ *Producer) {
c.Kafka.MaxPollInterval = d
})
}
// KafkaMaxQueueBufferDuration sets the MaxQueueBufferDuration.
//
// This option has no effect when applied to a consumer.
func KafkaMaxQueueBufferDuration(d time.Duration) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.MaxQueueBufferDuration = d
})
}
// KafkaMaxQueueSizeKBytes sets the MaxQueueSizeKBytes.
//
// This option has no effect when applied to a consumer.
func KafkaMaxQueueSizeKBytes(i int) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.MaxQueueSizeKBytes = i
})
}
// KafkaMaxQueueSizeMessages sets the MaxQueueSizeMessages.
//
// This option has no effect when applied to a consumer.
func KafkaMaxQueueSizeMessages(i int) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.MaxQueueSizeMessages = i
})
}
// KafkaOffsetHead sets the OffsetDefault.
//
// This option has no effect when applied to a producer.
func KafkaOffsetHead(i uint32) Option {
return optionFunc(func(c *Consumer, _ *Producer) {
i64 := int64(i)
c.Kafka.OffsetDefault = &i64
})
}
// KafkaOffsetInitial sets the OffsetInitial.
//
// This option has no effect when applied to a producer.
func KafkaOffsetInitial(s kafkaconfig.Offset) Option {
return optionFunc(func(c *Consumer, _ *Producer) {
c.Kafka.OffsetInitial = s
})
}
// KafkaOffsetTail sets the OffsetDefault.
//
// This option has no effect when applied to a producer.
func KafkaOffsetTail(i uint32) Option {
return optionFunc(func(c *Consumer, _ *Producer) {
i64 := -int64(i)
c.Kafka.OffsetDefault = &i64
})
}
// KafkaOrderedDelivery sets `MaxInFlightRequests` to `1` for the producer, to
// guarantee ordered delivery of messages.
//
// see: https://git.io/vpgiV
// see: https://git.io/vpgDg
//
// This option has no effect when applied to a consumer.
func KafkaOrderedDelivery() Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.MaxInFlightRequests = 1
})
}
// KafkaRequireNoAck configures the producer not to wait for any broker acks.
//
// This option has no effect when applied to a consumer.
func KafkaRequireNoAck() Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.RequiredAcks = kafkaconfig.AckNone
})
}
// KafkaRequireLeaderAck configures the producer wait for a single ack by the
// Kafka cluster leader broker.
//
// This option has no effect when applied to a consumer.
func KafkaRequireLeaderAck() Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.RequiredAcks = kafkaconfig.AckLeader
})
}
// KafkaRequireAllAck configures the producer wait for a acks from all brokers
// available in the Kafka cluster.
//
// This option has no effect when applied to a consumer.
func KafkaRequireAllAck() Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.RequiredAcks = kafkaconfig.AckAll
})
}
// KafkaRetryBackoff configures the producer to use the configured retry
// backoff before retrying a connection failure. See `KafkaMaxDeliveryRetries`
// to configure the amount of retries to execute before returning an error.
//
// This option has no effect when applied to a consumer.
func KafkaRetryBackoff(d time.Duration) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.RetryBackoff = d
})
}
// KafkaSecurityProtocol configures the producer or consumer to use the
// specified security protocol.
func KafkaSecurityProtocol(s kafkaconfig.Protocol) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.SecurityProtocol = s
p.Kafka.SecurityProtocol = s
})
}
// KafkaSessionTimeout configures the producer or consumer to use the
// specified session timeout.
func KafkaSessionTimeout(d time.Duration) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.SessionTimeout = d
p.Kafka.SessionTimeout = d
})
}
// KafkaSSL configures the producer or consumer to use the specified SSL config.
func KafkaSSL(capath, certpath, crlpath, keypassword, keypath, keystorepassword, keystorepath string) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.SSL.CAPath = capath
c.Kafka.SSL.CertPath = certpath
c.Kafka.SSL.CRLPath = crlpath
c.Kafka.SSL.KeyPassword = keypassword
c.Kafka.SSL.KeyPath = keypath
c.Kafka.SSL.KeystorePassword = keystorepassword
c.Kafka.SSL.KeystorePath = keystorepath
p.Kafka.SSL.CAPath = capath
p.Kafka.SSL.CertPath = certpath
p.Kafka.SSL.CRLPath = crlpath
p.Kafka.SSL.KeyPassword = keypassword
p.Kafka.SSL.KeyPath = keypath
p.Kafka.SSL.KeystorePassword = keystorepassword
p.Kafka.SSL.KeystorePath = keystorepath
})
}
// KafkaStatisticsInterval configures the producer or consumer to use the
// specified statistics interval.
func KafkaStatisticsInterval(d time.Duration) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.StatisticsInterval = d
p.Kafka.StatisticsInterval = d
})
}
// KafkaTopic configures the producer or consumer to use the specified topic. In
// case of the consumer, this option can be used multiple times to consume from
// more than one topic. In case of the producer, the last usage of this option
// will set the final topic to produce to.
func KafkaTopic(s string) Option {
return optionFunc(func(c *Consumer, p *Producer) {
c.Kafka.Topics = append(c.Kafka.Topics, s)
p.Kafka.Topic = s
})
}
// StandardstreamWriter sets the writer to use as the message stream to write
// to.
//
// This option has no effect when applied to a consumer.
func StandardstreamWriter(w io.Writer) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Standardstream.Writer = w
})
}
// StandardstreamReader sets the reader to use as the message stream from which
// to read.
//
// This option has no effect when applied to a producer.
func StandardstreamReader(w io.ReadCloser) Option {
return optionFunc(func(c *Consumer, _ *Producer) {
c.Standardstream.Reader = w
})
}