-
-
Notifications
You must be signed in to change notification settings - Fork 79
/
config.go
305 lines (277 loc) · 8.86 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package config
import (
"fmt"
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
"github.com/pkg/errors"
"net"
"net/url"
"strings"
"time"
)
const defaultClientID = "kafka-proxy"
var (
// Version is the current version of the app, generated at build time
Version = "unknown"
)
type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error)
type ListenerConfig struct {
BrokerAddress string
ListenerAddress string
AdvertisedAddress string
}
type Config struct {
Http struct {
ListenAddress string
MetricsPath string
HealthPath string
Disable bool
}
Debug struct {
ListenAddress string
DebugPath string
Enabled bool
}
Log struct {
Format string
Level string
}
Proxy struct {
DefaultListenerIP string
BootstrapServers []ListenerConfig
ExternalServers []ListenerConfig
DisableDynamicListeners bool
RequestBufferSize int
ResponseBufferSize int
ListenerReadBufferSize int // SO_RCVBUF
ListenerWriteBufferSize int // SO_SNDBUF
ListenerKeepAlive time.Duration
TLS struct {
Enable bool
ListenerCertFile string
ListenerKeyFile string
ListenerKeyPassword string
CAChainCertFile string
ListenerCipherSuites []string
ListenerCurvePreferences []string
}
}
Auth struct {
Local struct {
Enable bool
Command string
Mechanism string
Parameters []string
LogLevel string
Timeout time.Duration
}
Gateway struct {
Client struct {
Enable bool
Method string
Magic uint64
Command string
Parameters []string
LogLevel string
Timeout time.Duration
}
Server struct {
Enable bool
Method string
Magic uint64
Command string
Parameters []string
LogLevel string
Timeout time.Duration
}
}
}
Kafka struct {
ClientID string
MaxOpenRequests int
ForbiddenApiKeys []int
DialTimeout time.Duration // How long to wait for the initial connection.
WriteTimeout time.Duration // How long to wait for a request.
ReadTimeout time.Duration // How long to wait for a response.
KeepAlive time.Duration
ConnectionReadBufferSize int // SO_RCVBUF
ConnectionWriteBufferSize int // SO_SNDBUF
TLS struct {
Enable bool
InsecureSkipVerify bool
ClientCertFile string
ClientKeyFile string
ClientKeyPassword string
CAChainCertFile string
}
SASL struct {
Enable bool
Username string
Password string
JaasConfigFile string
}
}
ForwardProxy struct {
Url string
Scheme string
Address string
Username string
Password string
}
}
func (c *Config) InitBootstrapServers(bootstrapServersMapping []string) (err error) {
c.Proxy.BootstrapServers, err = getListenerConfigs(bootstrapServersMapping)
return err
}
func (c *Config) InitExternalServers(externalServersMapping []string) (err error) {
c.Proxy.ExternalServers, err = getListenerConfigs(externalServersMapping)
return err
}
func (c *Config) InitSASLCredentials() (err error) {
if c.Kafka.SASL.JaasConfigFile != "" {
credentials, err := NewJaasCredentialFromFile(c.Kafka.SASL.JaasConfigFile)
if err != nil {
return err
}
c.Kafka.SASL.Username = credentials.Username
c.Kafka.SASL.Password = credentials.Password
}
return nil
}
func getListenerConfigs(serversMapping []string) ([]ListenerConfig, error) {
listenerConfigs := make([]ListenerConfig, 0)
if serversMapping != nil {
for _, v := range serversMapping {
pair := strings.Split(v, ",")
if len(pair) != 2 && len(pair) != 3 {
return nil, errors.New("server-mapping must be in form 'remotehost:remoteport,localhost:localport(,advhost:advport)'")
}
remoteHost, remotePort, err := util.SplitHostPort(pair[0])
if err != nil {
return nil, err
}
localHost, localPort, err := util.SplitHostPort(pair[1])
if err != nil {
return nil, err
}
advertisedHost, advertisedPort := localHost, localPort
if len(pair) == 3 {
advertisedHost, advertisedPort, err = util.SplitHostPort(pair[2])
if err != nil {
return nil, err
}
}
listenerConfig := ListenerConfig{
BrokerAddress: net.JoinHostPort(remoteHost, fmt.Sprint(remotePort)),
ListenerAddress: net.JoinHostPort(localHost, fmt.Sprint(localPort)),
AdvertisedAddress: net.JoinHostPort(advertisedHost, fmt.Sprint(advertisedPort))}
listenerConfigs = append(listenerConfigs, listenerConfig)
}
}
return listenerConfigs, nil
}
func NewConfig() *Config {
c := &Config{}
c.Kafka.ClientID = defaultClientID
c.Kafka.MaxOpenRequests = 256
c.Kafka.DialTimeout = 15 * time.Second
c.Kafka.ReadTimeout = 30 * time.Second
c.Kafka.WriteTimeout = 30 * time.Second
c.Kafka.KeepAlive = 60 * time.Second
c.Kafka.ForbiddenApiKeys = make([]int, 0)
c.Http.MetricsPath = "/metrics"
c.Http.HealthPath = "/health"
c.Proxy.DefaultListenerIP = "127.0.0.1"
c.Proxy.DisableDynamicListeners = false
c.Proxy.RequestBufferSize = 4096
c.Proxy.ResponseBufferSize = 4096
c.Proxy.ListenerKeepAlive = 60 * time.Second
return c
}
func (c *Config) Validate() error {
if c.Kafka.SASL.Enable && (c.Kafka.SASL.Username == "" || c.Kafka.SASL.Password == "") {
return errors.New("SASL.Username and SASL.Password are required when SASL is enabled")
}
if c.Kafka.KeepAlive < 0 {
return errors.New("KeepAlive must be greater or equal 0")
}
if c.Kafka.DialTimeout < 0 {
return errors.New("DialTimeout must be greater or equal 0")
}
if c.Kafka.ReadTimeout < 0 {
return errors.New("ReadTimeout must be greater or equal 0")
}
if c.Kafka.WriteTimeout < 0 {
return errors.New("WriteTimeout must be greater or equal 0")
}
if c.Kafka.MaxOpenRequests < 1 {
return errors.New("MaxOpenRequests must be greater than 0")
}
// proxy
if c.Proxy.BootstrapServers == nil || len(c.Proxy.BootstrapServers) == 0 {
return errors.New("list of bootstrap-server-mapping must not be empty")
}
if c.Proxy.DefaultListenerIP == "" {
return errors.New("DefaultListenerIP must not be empty")
}
if net.ParseIP(c.Proxy.DefaultListenerIP) == nil {
return errors.New("DefaultListerIP is not a valid IP")
}
if c.Proxy.RequestBufferSize < 1 {
return errors.New("RequestBufferSize must be greater than 0")
}
if c.Proxy.ResponseBufferSize < 1 {
return errors.New("ResponseBufferSize must be greater than 0")
}
if c.Proxy.ListenerKeepAlive < 0 {
return errors.New("ListenerKeepAlive must be greater or equal 0")
}
if c.Proxy.TLS.Enable && (c.Proxy.TLS.ListenerKeyFile == "" || c.Proxy.TLS.ListenerCertFile == "") {
return errors.New("ListenerKeyFile and ListenerCertFile are required when Proxy TLS is enabled")
}
if c.Auth.Local.Enable && c.Auth.Local.Command == "" {
return errors.New("Command is required when Auth.Local.Enable is enabled")
}
if c.Auth.Local.Enable && (c.Auth.Local.Mechanism != "PLAIN" && c.Auth.Local.Mechanism != "OAUTHBEARER") {
return errors.New("Mechanism PLAIN or OAUTHBEARER is required when Auth.Local.Enable is enabled")
}
if c.Auth.Local.Enable && c.Auth.Local.Timeout <= 0 {
return errors.New("Auth.Local.Timeout must be greater than 0")
}
if c.Auth.Gateway.Client.Enable && (c.Auth.Gateway.Client.Command == "" || c.Auth.Gateway.Client.Method == "" || c.Auth.Gateway.Client.Magic == 0) {
return errors.New("Command, Method and Magic are required when Auth.Gateway.Client.Enable is enabled")
}
if c.Auth.Gateway.Client.Enable && c.Auth.Gateway.Client.Timeout <= 0 {
return errors.New("Auth.Gateway.Client.Timeout must be greater than 0")
}
if c.Auth.Gateway.Server.Enable && (c.Auth.Gateway.Server.Command == "" || c.Auth.Gateway.Server.Method == "" || c.Auth.Gateway.Server.Magic == 0) {
return errors.New("Command, Method and Magic are required when Auth.Gateway.Server.Enable is enabled")
}
if c.Auth.Gateway.Server.Enable && c.Auth.Gateway.Server.Timeout <= 0 {
return errors.New("Auth.Gateway.Server.Timeout must be greater than 0")
}
// http://username:password@hostname:port or socks5://username:password@hostname:port
if c.ForwardProxy.Url != "" {
var proxyUrl *url.URL
var err error
if proxyUrl, err = url.Parse(c.ForwardProxy.Url); err != nil {
return err
}
if proxyUrl.Port() == "" {
return errors.New("Port part of ForwardProxy.Url must not be empty")
}
c.ForwardProxy.Address = proxyUrl.Host
if proxyUrl.Scheme != "http" && proxyUrl.Scheme != "socks5" {
return errors.New("ForwardProxy.Url Scheme must be http or socks5")
}
c.ForwardProxy.Scheme = proxyUrl.Scheme
if proxyUrl.User != nil {
password, _ := proxyUrl.User.Password()
if proxyUrl.User.Username() == "" || password == "" {
return errors.New("Both ForwardProxy Url Username and Password must be provided")
}
c.ForwardProxy.Username = proxyUrl.User.Username()
c.ForwardProxy.Password = password
}
}
return nil
}