Skip to content

Commit

Permalink
kafka: refactoring configuration, adding SASL struct
Browse files Browse the repository at this point in the history
This change refactors the Kafka configuration, moving all the parameters required
by SASL into a substructure.
  • Loading branch information
sravotto committed May 2, 2024
1 parent ab4b08b commit 5e0e2ba
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 65 deletions.
73 changes: 38 additions & 35 deletions internal/source/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,25 @@ type EagerConfig Config
// Config contains the configuration necessary for creating a
// replication connection. ServerID and SourceConn are mandatory.
type Config struct {
ConveyorConfig conveyor.Config
DLQ dlq.Config
Script script.Config
Sequencer sequencer.Config
Staging sinkprod.StagingConfig
Target sinkprod.TargetConfig
TargetSchema ident.Schema
TLS secure.Config
ConveyorConfig conveyor.Config

BatchSize int // How many messages to accumulate before committing to the target
Brokers []string // The address of the Kafka brokers
Group string // the Kafka consumer group id.
MaxTimestamp string // Only accept messages at or older than this timestamp
MinTimestamp string // Only accept messages at or newer than this timestamp
ResolvedInterval time.Duration // Minimal duration between resolved timestamps.
SASL SASLConfig // SASL parameters
Strategy string // Kafka consumer group re-balance strategy
Topics []string // The list of topics that the consumer should use.

// SASL parameters
saslClientID string
saslClientSecret string
saslGrantType string
saslMechanism string
saslScopes []string
saslTokenURL string
saslUser string
saslPassword string

// The following are computed.

// The kafka connector configuration.
Expand All @@ -81,6 +72,18 @@ type Config struct {
timeRange hlc.Range
}

// SASLConfig defines the SASL parameters.
type SASLConfig struct {
ClientID string
ClientSecret string
GrantType string
Mechanism string
Password string
Scopes []string
TokenURL string
User string
}

// Bind adds flags to the set. It delegates to the embedded Config.Bind.
func (c *Config) Bind(f *pflag.FlagSet) {
c.DLQ.Bind(f)
Expand Down Expand Up @@ -114,14 +117,14 @@ Please see the CREATE CHANGEFEED documentation for details.
f.StringArrayVar(&c.Topics, "topic", nil, "the topic(s) that the consumer should use")

// SASL configuration
f.StringVar(&c.saslClientID, "saslClientId", "", "client ID for OAuth authentication from a third-party provider")
f.StringVar(&c.saslClientSecret, "saslClientSecret", "", "Client secret for OAuth authentication from a third-party provider")
f.StringVar(&c.saslGrantType, "saslGrantType", "", "Override the default OAuth client credentials grant type for other implementations")
f.StringVar(&c.saslMechanism, "saslMechanism", "", "Can be set to OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512, or PLAIN")
f.StringArrayVar(&c.saslScopes, "saslScope", nil, "Scopes that the OAuth token should have access for.")
f.StringVar(&c.saslTokenURL, "saslTokenURL", "", "Client token URL for OAuth authentication from a third-party provider")
f.StringVar(&c.saslUser, "saslUser", "", "SASL username")
f.StringVar(&c.saslPassword, "saslPassword", "", "SASL password")
f.StringVar(&c.SASL.ClientID, "ClientId", "", "client ID for OAuth authentication from a third-party provider")
f.StringVar(&c.SASL.ClientSecret, "ClientSecret", "", "Client secret for OAuth authentication from a third-party provider")
f.StringVar(&c.SASL.GrantType, "GrantType", "", "Override the default OAuth client credentials grant type for other implementations")
f.StringVar(&c.SASL.Mechanism, "Mechanism", "", "Can be set to OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512, or PLAIN")
f.StringArrayVar(&c.SASL.Scopes, "Scope", nil, "Scopes that the OAuth token should have access for.")
f.StringVar(&c.SASL.TokenURL, "TokenURL", "", "Client token URL for OAuth authentication from a third-party provider")
f.StringVar(&c.SASL.User, "User", "", "SASL username")
f.StringVar(&c.SASL.Password, "Password", "", "SASL password")
}

// Preflight updates the configuration with sane defaults or returns an
Expand Down Expand Up @@ -200,10 +203,10 @@ func (c *Config) preflight(ctx context.Context) error {
}
sc.Net.TLS.Config = c.TLS.AsTLSConfig()
sc.Net.TLS.Enable = sc.Net.TLS.Config != nil
// if saslMechanism is not null, then authentication is done via SASL.
if c.saslMechanism != "" {
// if Mechanism is not null, then authentication is done via SASL.
if c.SASL.Mechanism != "" {
sc.Net.SASL.Enable = true
switch c.saslMechanism {
switch c.SASL.Mechanism {
case sarama.SASLTypeSCRAMSHA512:
sc.Net.SASL.SCRAMClientGeneratorFunc = sha512ClientGenerator
case sarama.SASLTypeSCRAMSHA256:
Expand All @@ -215,10 +218,10 @@ func (c *Config) preflight(ctx context.Context) error {
return err
}
}
sc.Net.SASL.Mechanism = sarama.SASLMechanism(c.saslMechanism)
sc.Net.SASL.User = c.saslUser
sc.Net.SASL.Password = c.saslPassword
log.Infof("Using SASL %s", c.saslMechanism)
sc.Net.SASL.Mechanism = sarama.SASLMechanism(c.SASL.Mechanism)
sc.Net.SASL.User = c.SASL.User
sc.Net.SASL.Password = c.SASL.Password
log.Infof("Using SASL %s", c.SASL.Mechanism)
}
sc.Consumer.Offsets.Initial = sarama.OffsetOldest
c.saramaConfig = sc
Expand All @@ -230,33 +233,33 @@ func (c *Config) newTokenProvider(ctx context.Context) (sarama.AccessTokenProvid
// clientcredentials library as defined by the spec, however non-compliant
// auth server implementations may want a custom type
var endpointParams url.Values
if c.saslGrantType != `` {
endpointParams = url.Values{"grant_type": {c.saslGrantType}}
if c.SASL.GrantType != `` {
endpointParams = url.Values{"grant_type": {c.SASL.GrantType}}
}
if c.saslTokenURL == "" {
if c.SASL.TokenURL == "" {
return nil, errors.New("OAUTH2 requires a token URL")

}
tokenURL, err := url.Parse(c.saslTokenURL)
tokenURL, err := url.Parse(c.SASL.TokenURL)
if err != nil {
return nil, errors.Wrap(err, "malformed token url")
}
if c.saslClientID == "" {
if c.SASL.ClientID == "" {
return nil, errors.New("OAUTH2 requires a client id")

}
if c.saslClientSecret == "" {
if c.SASL.ClientSecret == "" {
return nil, errors.New("OAUTH2 requires a client secret")
}
// the clientcredentials.Config's TokenSource method creates an
// oauth2.TokenSource implementation which returns tokens for the given
// endpoint, returning the same cached result until its expiration has been
// reached, and then once expired re-requesting a new token from the endpoint.
cfg := clientcredentials.Config{
ClientID: c.saslClientID,
ClientSecret: c.saslClientSecret,
ClientID: c.SASL.ClientID,
ClientSecret: c.SASL.ClientSecret,
TokenURL: tokenURL.String(),
Scopes: c.saslScopes,
Scopes: c.SASL.Scopes,
EndpointParams: endpointParams,
}
return &tokenProvider{
Expand Down
70 changes: 43 additions & 27 deletions internal/source/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ func TestPreflight(t *testing.T) {
Topics: []string{"mytopic"},
Strategy: "sticky",
TLS: tlsConfig,
saslMechanism: sarama.SASLTypePlaintext,
SASL: SASLConfig{
Mechanism: sarama.SASLTypePlaintext,
},
},
wantErr: "invalid configuration (Net.SASL.User must not be empty when SASL is enabled)",
},
Expand All @@ -189,9 +191,11 @@ func TestPreflight(t *testing.T) {
Topics: []string{"mytopic"},
Strategy: "sticky",
TLS: tlsConfig,
saslMechanism: sarama.SASLTypePlaintext,
saslUser: "user",
saslPassword: "pass",
SASL: SASLConfig{
Mechanism: sarama.SASLTypePlaintext,
User: "user",
Password: "pass",
},
},
strategy: []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()},
timeRange: maxRange,
Expand All @@ -208,9 +212,11 @@ func TestPreflight(t *testing.T) {
Topics: []string{"mytopic"},
Strategy: "sticky",
TLS: tlsConfig,
saslMechanism: sarama.SASLTypeSCRAMSHA256,
saslUser: "user",
saslPassword: "pass",
SASL: SASLConfig{
Mechanism: sarama.SASLTypeSCRAMSHA256,
User: "user",
Password: "pass",
},
},
strategy: []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()},
timeRange: maxRange,
Expand All @@ -227,9 +233,11 @@ func TestPreflight(t *testing.T) {
Topics: []string{"mytopic"},
Strategy: "sticky",
TLS: tlsConfig,
saslMechanism: sarama.SASLTypeSCRAMSHA512,
saslUser: "user",
saslPassword: "pass",
SASL: SASLConfig{
Mechanism: sarama.SASLTypeSCRAMSHA512,
User: "user",
Password: "pass",
},
},
strategy: []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()},
timeRange: maxRange,
Expand All @@ -246,10 +254,12 @@ func TestPreflight(t *testing.T) {
Topics: []string{"mytopic"},
Strategy: "sticky",
TLS: tlsConfig,
saslMechanism: sarama.SASLTypeOAuth,
saslTokenURL: "http://example.com",
saslClientID: "myclient",
saslClientSecret: "mysecret",
SASL: SASLConfig{
Mechanism: sarama.SASLTypeOAuth,
TokenURL: "http://example.com",
ClientID: "myclient",
ClientSecret: "mysecret",
},
},
strategy: []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()},
timeRange: maxRange,
Expand All @@ -266,39 +276,45 @@ func TestPreflight(t *testing.T) {
Topics: []string{"mytopic"},
Strategy: "sticky",
TLS: tlsConfig,
saslMechanism: sarama.SASLTypeOAuth,
saslClientID: "myclient",
saslClientSecret: "mysecret",
SASL: SASLConfig{
Mechanism: sarama.SASLTypeOAuth,
ClientID: "myclient",
ClientSecret: "mysecret",
},
},
wantErr: "OAUTH2 requires a token URL",
},
{
name: "oath2 no client",
name: "oath2 no client secret",
in: &Config{
Group: "mygroup",
Brokers: []string{"mybroker"},
ResolvedInterval: time.Second,
Topics: []string{"mytopic"},
Strategy: "sticky",
TLS: tlsConfig,
saslMechanism: sarama.SASLTypeOAuth,
saslTokenURL: "http://example.com",
saslClientID: "myclient",
SASL: SASLConfig{
Mechanism: sarama.SASLTypeOAuth,
TokenURL: "http://example.com",
ClientID: "myclient",
},
},
wantErr: "OAUTH2 requires a client secret",
},
{
name: "oath2",
name: "oath2 no client id",
in: &Config{
Group: "mygroup",
Brokers: []string{"mybroker"},
ResolvedInterval: time.Second,
Topics: []string{"mytopic"},
Strategy: "sticky",
TLS: tlsConfig,
saslMechanism: sarama.SASLTypeOAuth,
saslTokenURL: "http://example.com",
saslClientSecret: "mysecret",
SASL: SASLConfig{
Mechanism: sarama.SASLTypeOAuth,
TokenURL: "http://example.com",
ClientSecret: "mysecret",
},
},
wantErr: "OAUTH2 requires a client id",
},
Expand Down Expand Up @@ -337,8 +353,8 @@ func TestPreflight(t *testing.T) {
case sarama.SASLTypeOAuth:
a.IsType(&tokenProvider{},
config.saramaConfig.Net.SASL.TokenProvider)
a.NotEmpty(config.saslTokenURL)
_, err := url.Parse(config.saslTokenURL)
a.NotEmpty(config.SASL.TokenURL)
_, err := url.Parse(config.SASL.TokenURL)
a.NoError(err)
a.Empty(config.saramaConfig.Net.SASL.SCRAMClientGeneratorFunc)
default:
Expand Down
8 changes: 5 additions & 3 deletions internal/source/kafka/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ func TestConn(t *testing.T) {
ResolvedInterval: time.Second,
Strategy: "sticky",
Topics: []string{"my-topic"},
saslMechanism: sarama.SASLTypePlaintext,
saslUser: "user",
saslPassword: "test",
SASL: SASLConfig{
Mechanism: sarama.SASLTypePlaintext,
User: "user",
Password: "test",
},
}
err := config.preflight(ctx)
a.NoError(err)
Expand Down

0 comments on commit 5e0e2ba

Please sign in to comment.