-
-
Notifications
You must be signed in to change notification settings - Fork 126
/
config.go
131 lines (115 loc) · 3.51 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package kafka
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"io/ioutil"
"log"
"time"
"github.com/Shopify/sarama"
)
type Config struct {
BootstrapServers *[]string
Timeout int
CACert string
ClientCert string
ClientCertKey string
TLSEnabled bool
SkipTLSVerify bool
SASLUsername string
SASLPassword string
SASLMechanism string
}
func (c *Config) newKafkaConfig() (*sarama.Config, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V2_1_0_0
kafkaConfig.ClientID = "terraform-provider-kafka"
kafkaConfig.Admin.Timeout = time.Duration(c.Timeout) * time.Second
if c.saslEnabled() {
switch c.SASLMechanism {
case "scram-sha512":
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
case "scram-sha256":
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
case "plain":
default:
log.Fatalf("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", c.SASLMechanism)
}
kafkaConfig.Net.SASL.Enable = true
kafkaConfig.Net.SASL.Password = c.SASLPassword
kafkaConfig.Net.SASL.User = c.SASLUsername
kafkaConfig.Net.SASL.Handshake = true
} else {
log.Println("[WARN] No SASL for you")
}
if c.TLSEnabled {
tlsConfig, err := newTLSConfig(
c.ClientCert,
c.ClientCertKey,
c.CACert)
if err != nil {
return kafkaConfig, err
}
kafkaConfig.Net.TLS.Enable = true
kafkaConfig.Net.TLS.Config = tlsConfig
kafkaConfig.Net.TLS.Config.InsecureSkipVerify = c.SkipTLSVerify
}
return kafkaConfig, nil
}
func (c *Config) saslEnabled() bool {
return c.SASLUsername != "" || c.SASLPassword != ""
}
func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
return newTLSConfig(clientCert, clientKey, caCert)
}
func newTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
tlsConfig := tls.Config{}
// Load client cert
if clientCert != "" && clientKey != "" {
cert, err := tls.X509KeyPair([]byte(clientCert), []byte(clientKey))
if err != nil {
// try from file
cert, err = tls.LoadX509KeyPair(clientCert, clientKey)
if err != nil {
log.Printf("[ERROR] Error creating client pair \ncert:\n%s\n key\n%s\n", clientCert, clientKey)
return &tlsConfig, err
}
}
tlsConfig.Certificates = []tls.Certificate{cert}
} else {
log.Println("[WARN] skipping TLS client config")
}
if caCert == "" {
log.Println("[WARN] no CA file set skipping")
return &tlsConfig, nil
}
caCertPool, _ := x509.SystemCertPool()
if caCertPool == nil {
caCertPool = x509.NewCertPool()
}
caPEM, _ := pem.Decode([]byte(caCert))
log.Println("[INFO] adding rootybou")
if caPEM == nil {
log.Println("[WARN] no caPem, checking from file")
// try as file
caCert, err := ioutil.ReadFile(caCert)
if err != nil {
log.Println("[ERROR] unable to read CA")
return &tlsConfig, err
}
log.Println("[WARN] Adding pem from file")
caCertPool.AppendCertsFromPEM(caCert)
} else {
ok := caCertPool.AppendCertsFromPEM([]byte(caCert))
fmt.Printf("set cert pool %v", ok)
if !ok {
return &tlsConfig, fmt.Errorf("Couldn't add the caPem")
}
}
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, nil
}