-
Notifications
You must be signed in to change notification settings - Fork 14
/
config.go
104 lines (89 loc) · 3.22 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package config
import (
"math"
"strconv"
"time"
"github.com/Trendyol/go-dcp/config"
"github.com/Trendyol/go-dcp/helpers"
"github.com/segmentio/kafka-go"
)
type Kafka struct {
ProducerBatchBytes any `yaml:"producerBatchBytes"`
CollectionTopicMapping map[string]string `yaml:"collectionTopicMapping"`
InterCAPath string `yaml:"interCAPath"`
ScramUsername string `yaml:"scramUsername"`
ScramPassword string `yaml:"scramPassword"`
RootCAPath string `yaml:"rootCAPath"`
ClientID string `yaml:"clientID"`
Balancer string `yaml:"balancer"`
Brokers []string `yaml:"brokers"`
MetadataTopics []string `yaml:"metadataTopics"`
ProducerMaxAttempts int `yaml:"producerMaxAttempts"`
ProducerBatchTimeout time.Duration `yaml:"producerBatchTimeout"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
RequiredAcks int `yaml:"requiredAcks"`
ProducerBatchSize int `yaml:"producerBatchSize"`
MetadataTTL time.Duration `yaml:"metadataTTL"`
ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"`
Compression int8 `yaml:"compression"`
SecureConnection bool `yaml:"secureConnection"`
AllowAutoTopicCreation bool `yaml:"allowAutoTopicCreation"`
}
func (k *Kafka) GetBalancer() kafka.Balancer {
switch k.Balancer {
case "", "Hash":
return &kafka.Hash{}
case "LeastBytes":
return &kafka.LeastBytes{}
case "RoundRobin":
return &kafka.RoundRobin{}
case "ReferenceHash":
return &kafka.ReferenceHash{}
case "CRC32Balancer":
return kafka.CRC32Balancer{}
case "Murmur2Balancer":
return kafka.Murmur2Balancer{}
default:
panic("invalid kafka balancer method, given: " + k.Balancer)
}
}
func (k *Kafka) GetCompression() int8 {
if k.Compression < 0 || k.Compression > 4 {
panic("invalid kafka compression method, given: " + strconv.Itoa(int(k.Compression)))
}
return k.Compression
}
type Connector struct {
Kafka Kafka `yaml:"kafka" mapstructure:"kafka"`
Dcp config.Dcp `yaml:",inline" mapstructure:",squash"`
}
func (c *Connector) ApplyDefaults() {
if c.Kafka.ReadTimeout == 0 {
c.Kafka.ReadTimeout = 30 * time.Second
}
if c.Kafka.WriteTimeout == 0 {
c.Kafka.WriteTimeout = 30 * time.Second
}
if c.Kafka.ProducerBatchTickerDuration == 0 {
c.Kafka.ProducerBatchTickerDuration = 10 * time.Second
}
if c.Kafka.ProducerBatchSize == 0 {
c.Kafka.ProducerBatchSize = 2000
}
if c.Kafka.ProducerBatchBytes == nil {
c.Kafka.ProducerBatchBytes = helpers.ResolveUnionIntOrStringValue("10mb")
}
if c.Kafka.RequiredAcks == 0 {
c.Kafka.RequiredAcks = 1
}
if c.Kafka.MetadataTTL == 0 {
c.Kafka.MetadataTTL = 60 * time.Second
}
if c.Kafka.ProducerMaxAttempts == 0 {
c.Kafka.ProducerMaxAttempts = math.MaxInt
}
if c.Kafka.ProducerBatchTimeout == 0 {
c.Kafka.ProducerBatchTimeout = time.Nanosecond
}
}