/
config.go
92 lines (80 loc) · 2.66 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package exporter
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"strings"
"github.com/Shopify/sarama"
)
func fillSaslFields(config *sarama.Config, opts KafkaOpts) error {
if opts.UseSASL {
// Convert to lowercase so that SHA512 and SHA256 is still valid
opts.SaslMechanism = strings.ToLower(opts.SaslMechanism)
switch opts.SaslMechanism {
case "scram-sha512":
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
case "scram-sha256":
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
case "gssapi":
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeGSSAPI)
config.Net.SASL.GSSAPI.ServiceName = opts.ServiceName
config.Net.SASL.GSSAPI.KerberosConfigPath = opts.KerberosConfigPath
config.Net.SASL.GSSAPI.Realm = opts.Realm
config.Net.SASL.GSSAPI.Username = opts.SaslUsername
if opts.KerberosAuthType == "keytabAuth" {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
config.Net.SASL.GSSAPI.KeyTabPath = opts.KeyTabPath
} else {
config.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
config.Net.SASL.GSSAPI.Password = opts.SaslPassword
}
if opts.SaslDisablePAFXFast {
config.Net.SASL.GSSAPI.DisablePAFXFAST = true
}
case "plain":
default:
return fmt.Errorf(
`invalid sasl mechanism "%s": can only be "scram-sha256", "scram-sha512", "gssapi" or "plain"`,
opts.SaslMechanism,
)
}
config.Net.SASL.Enable = true
config.Net.SASL.Handshake = opts.UseSASLHandshake
if opts.SaslUsername != "" {
config.Net.SASL.User = opts.SaslUsername
}
if opts.SaslPassword != "" {
config.Net.SASL.Password = opts.SaslPassword
}
}
return nil
}
func fillTlsFields(config *sarama.Config, opts KafkaOpts) error {
if opts.UseTLS {
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
ServerName: opts.TlsServerName,
InsecureSkipVerify: opts.TlsInsecureSkipTLSVerify,
}
if opts.TlsCAFile != "" {
if ca, err := os.ReadFile(opts.TlsCAFile); err == nil {
config.Net.TLS.Config.RootCAs = x509.NewCertPool()
config.Net.TLS.Config.RootCAs.AppendCertsFromPEM(ca)
} else {
return err
}
}
if opts.TlsCertFile != "" && opts.TlsKeyFile != "" {
cert, err := tls.LoadX509KeyPair(opts.TlsCertFile, opts.TlsKeyFile)
if err == nil {
config.Net.TLS.Config.Certificates = []tls.Certificate{cert}
} else {
return err
}
}
}
return nil
}