-
Notifications
You must be signed in to change notification settings - Fork 13
/
sasl.go
105 lines (92 loc) · 3.45 KB
/
sasl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package kafka
import (
"errors"
"github.com/Shopify/sarama"
)
type SASLAuth struct {
SASLVersion *int `toml:"sasl_version"`
SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"`
SASLMechanism string `toml:"sasl_mechanism"`
SASLGSSAPIServiceName string `toml:"sasl_gssapi_service_name"` // GSSAPI config
SASLGSSAPIAuthType string `toml:"sasl_gssapi_auth_type"` // GSSAPI config
SASLGSSAPIKerberosConfigPath string `toml:"sasl_gssapi_kerberos_config_path"` // GSSAPI config
SASLGSSAPIKeyTabPath string `toml:"sasl_gssapi_key_tab_path"` // GSSAPI config
SASLGSSAPIRealm string `toml:"sasl_gssapi_realm"` // GSSAPI config
SASLAccessToken string `toml:"sasl_access_token"` // OAUTHBEARER config. experimental. undoubtedly this is not good enough.
SASLGSSAPIDisablePAFXFAST bool `toml:"sasl_gssapi_disable_pafxfast"` // GSSAPI config
}
// SetSASLConfig configures SASL for kafka (sarama)
func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error {
config.Net.SASL.User = k.SASLUsername
config.Net.SASL.Password = k.SASLPassword
if k.SASLMechanism != "" {
config.Net.SASL.Mechanism = sarama.SASLMechanism(k.SASLMechanism)
switch config.Net.SASL.Mechanism {
case sarama.SASLTypeSCRAMSHA256:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case sarama.SASLTypeSCRAMSHA512:
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
case sarama.SASLTypeOAuth:
config.Net.SASL.TokenProvider = k // use self as token provider.
case sarama.SASLTypeGSSAPI:
config.Net.SASL.GSSAPI.ServiceName = k.SASLGSSAPIServiceName
config.Net.SASL.GSSAPI.AuthType = gssapiAuthType(k.SASLGSSAPIAuthType)
config.Net.SASL.GSSAPI.Username = k.SASLUsername
config.Net.SASL.GSSAPI.Password = k.SASLPassword
config.Net.SASL.GSSAPI.DisablePAFXFAST = k.SASLGSSAPIDisablePAFXFAST
config.Net.SASL.GSSAPI.KerberosConfigPath = k.SASLGSSAPIKerberosConfigPath
config.Net.SASL.GSSAPI.KeyTabPath = k.SASLGSSAPIKeyTabPath
config.Net.SASL.GSSAPI.Realm = k.SASLGSSAPIRealm
case sarama.SASLTypePlaintext:
// nothing.
default:
}
}
if k.SASLUsername != "" || k.SASLMechanism != "" {
config.Net.SASL.Enable = true
version, err := SASLVersion(config.Version, k.SASLVersion)
if err != nil {
return err
}
config.Net.SASL.Version = version
}
return nil
}
// Token does nothing smart, it just grabs a hard-coded token from config.
func (k *SASLAuth) Token() (*sarama.AccessToken, error) {
return &sarama.AccessToken{
Token: k.SASLAccessToken,
Extensions: map[string]string{},
}, nil
}
func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error) {
if saslVersion == nil {
if kafkaVersion.IsAtLeast(sarama.V1_0_0_0) {
return sarama.SASLHandshakeV1, nil
}
return sarama.SASLHandshakeV0, nil
}
switch *saslVersion {
case 0:
return sarama.SASLHandshakeV0, nil
case 1:
return sarama.SASLHandshakeV1, nil
default:
return 0, errors.New("invalid SASL version")
}
}
func gssapiAuthType(authType string) int {
switch authType {
case "KRB5_USER_AUTH":
return sarama.KRB5_USER_AUTH
case "KRB5_KEYTAB_AUTH":
return sarama.KRB5_KEYTAB_AUTH
default:
return 0
}
}