Skip to content

Commit

Permalink
changefeedccl: support azure-event-hub:// for azure kafka streaming
Browse files Browse the repository at this point in the history
Previously, users had to navigate the complexities of obtaining azure event
hub's kafka endpoint and the corresponding sasl_user and sasl_password for azure
streaming.

This patch improves it to support a new scheme azure-event-hub:// with the
following syntax:

```
azure-event-hub://<NamespaceName>.servicebus.windows.net?shared_access_key_name=<KeyName>&shared_access_key=<KeyValue>
```

azure-event-hub:// can now be used to connect to kafka hosted on Azure event
hubs. The sinkURL must include mandatory parameters shared_access_key_name and
shared_access_key.   By default and by requirements, the options
"tls_enabled=true," "sasl_handshake=true," "sasl_enabled=true," and
"sasl_mechanism=PLAIN" are applied, as they are the only supported options.
Other parameters such as "insecure_tls_skip_verify", "topic_name", and
"topic_prefix" are also supported.

Fixes: #103901

Release note (enterprise change): Changefeeds now support a new scheme
azure-event-hub:// for kafka data streaming to azure event hubs. The sinkURL
must include mandatory parameters shared_access_key_name and shared_access_key.
By default and by requirements, the options "tls_enabled=true,"
"sasl_handshake=true," "sasl_enabled=true," and "sasl_mechanism=PLAIN" are
applied, as they are the only supported options. Other parameters such as
"insecure_tls_skip_verify", "topic_name", and "topic_prefix" are also supported.

An example URI is:
```
azure-event-hub://myventhubs.servicebus.windows.net:9093?shared_access_key_name=abc&shared_access_key=123
```
  • Loading branch information
wenyihu6 committed Dec 8, 2023
1 parent 6c3d18d commit 4b91a03
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 76 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ const (
SinkParamConfluentAPIKey = `api_key`
SinkParamConfluentAPISecret = `api_secret`

SinkSchemeAzureKafka = `azure-event-hub`
SinkParamAzureAccessKeyName = `shared_access_key_name`
SinkParamAzureAccessKey = `shared_access_key`

RegistryParamCACert = `ca_cert`
RegistryParamClientCert = `client_cert`
RegistryParamClientKey = `client_key`
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_external_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var supportedExternalConnectionTypes = map[string]connectionpb.ConnectionProvide
changefeedbase.SinkSchemeWebhookHTTP: connectionpb.ConnectionProvider_webhookhttp,
changefeedbase.SinkSchemeWebhookHTTPS: connectionpb.ConnectionProvider_webhookhttps,
changefeedbase.SinkSchemeConfluentKafka: connectionpb.ConnectionProvider_kafka,
changefeedbase.SinkSchemeAzureKafka: connectionpb.ConnectionProvider_kafka,
// TODO (zinger): Not including SinkSchemeExperimentalSQL for now because A: it's undocumented
// and B, in tests it leaks a *gosql.DB and I can't figure out why.
}
Expand Down
231 changes: 155 additions & 76 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import (

func isKafkaSink(u *url.URL) bool {
switch u.Scheme {
case changefeedbase.SinkSchemeConfluentKafka, changefeedbase.SinkSchemeKafka:
case changefeedbase.SinkSchemeConfluentKafka, changefeedbase.SinkSchemeAzureKafka,
changefeedbase.SinkSchemeKafka:
return true
default:
return false
Expand Down Expand Up @@ -877,19 +878,9 @@ type kafkaDialConfig struct {
saslGrantType string
}

// TODO: refactor this function by splitting it up.
// There is a large number of security-related params and it's hard to tell what
// combinations work, which combinations work but should not work, which
// combinations should not be passed together but still work etc.
// It makes sense to split based on SASL mechanism and separate the TLS
// related params (which I believe are common to all the SASL schemes). It would
// also make sense to update the docs to reflect these cases
// rather than having a huge table of auth params like we have now.
func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
if u.Scheme == changefeedbase.SinkSchemeConfluentKafka {
return buildConfluentKafkaConfig(u)
}

// buildDefaultKafkaConfig parses the given sinkURL and constructs its
// corresponding dialConfig for kafka.
func buildDefaultKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
dialConfig := kafkaDialConfig{}

if _, err := u.consumeBool(changefeedbase.SinkParamTLSEnabled, &dialConfig.tlsEnabled); err != nil {
Expand Down Expand Up @@ -990,75 +981,63 @@ func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
return dialConfig, nil
}

// buildConfluentKafkaConfig constructs a simple dial config which is supported
// by kafka on confluent cloud. The dial config should have `api_key` and
// `api_secret`. It automatically sets should also have sasl_enabled=true,
// sasl_mechanism=PLAIN, tls_enabled=true, and sasl_handshake=true.
//
// See https://docs.confluent.io/platform/current/security/security_tutorial.html#overview
// information on how to connect to confluent cloud kafka. There are also
// instructions when you go to the `Clients` page of the cluster in confluent
// cloud.
func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
newMissingParameterError := func(param string) error {
return errors.Newf("scheme %s requires parameter %s", changefeedbase.SinkSchemeConfluentKafka, param)
}
newRequiredValueError := func(param string, unsupportedValue, allowedValue string) error {
return errors.Newf("unsupported value %s for parameter %s, please use %s instead", unsupportedValue,
param, allowedValue)
}
// newMissingParameterError returns an error message for missing parameters in
// sinkURL.
func newMissingParameterError(scheme string, param string) error {
return errors.Newf("scheme %s requires parameter %s", scheme, param)
}

// Check for api_key and api_secret.
dialConfig := kafkaDialConfig{}
if dialConfig.saslUser = u.consumeParam(changefeedbase.SinkParamConfluentAPIKey); dialConfig.saslUser == `` {
return kafkaDialConfig{}, newMissingParameterError(changefeedbase.SinkParamConfluentAPIKey)
}
if dialConfig.saslPassword = u.consumeParam(changefeedbase.SinkParamConfluentAPISecret); dialConfig.saslPassword == `` {
return kafkaDialConfig{}, newMissingParameterError(changefeedbase.SinkParamConfluentAPISecret)
}
// newMissingParameterError returns an error message for using unsupported
// values in sinkURL.
func newRequiredValueError(param string, unsupportedValue, allowedValue string) error {
return errors.Newf("unsupported value %s for parameter %s, please use %s instead",
unsupportedValue, param, allowedValue)
}

// If sasl_enabled is specified, it must be set to true.
if wasSet, err := u.consumeBool(changefeedbase.SinkParamSASLEnabled, &dialConfig.saslEnabled); err != nil {
return kafkaDialConfig{}, err
} else if wasSet && !dialConfig.saslEnabled {
return kafkaDialConfig{}, newRequiredValueError(changefeedbase.SinkParamSASLEnabled, "false",
"true")
}
// If sasl_mechanism is specified, it must be set to PLAIN.
if dialConfig.saslMechanism = u.consumeParam(changefeedbase.SinkParamSASLMechanism); dialConfig.saslMechanism != `` &&
dialConfig.saslMechanism != sarama.SASLTypePlaintext {
return kafkaDialConfig{}, newRequiredValueError(changefeedbase.SinkParamSASLMechanism, dialConfig.saslMechanism,
sarama.SASLTypePlaintext)
}
// If tls_enabled is specified, it must be set to true.
if wasSet, err := u.consumeBool(changefeedbase.SinkParamTLSEnabled, &dialConfig.tlsEnabled); err != nil {
return kafkaDialConfig{}, err
} else if wasSet && !dialConfig.tlsEnabled {
return kafkaDialConfig{}, newRequiredValueError(changefeedbase.SinkParamTLSEnabled, "false", "true")
}
// If sasl_handshake is specified, it must be set to true.
if wasSet, err := u.consumeBool(changefeedbase.SinkParamSASLHandshake, &dialConfig.saslHandshake); err != nil {
return kafkaDialConfig{}, err
} else if wasSet && !dialConfig.saslHandshake {
return kafkaDialConfig{}, newRequiredValueError(changefeedbase.SinkParamSASLHandshake, "false", "true")
// validateAndConsumeParams consumes and validates if the given sinkURL contains
// any unsupported values for the parameters using paramsWithAcceptedValues.
func validateAndConsumeParamsIfSet(u *sinkURL, paramsWithAcceptedValues map[string]string) error {
for param, allowedValue := range paramsWithAcceptedValues {
if v := u.consumeParam(param); v != "" && v != allowedValue {
return newRequiredValueError(param, v, allowedValue)
}
}
return nil
}

if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil {
return kafkaDialConfig{}, err
// validateAndConsumeBoolParamsIfSet mirrors validateAndConsumeParamsIfSet but
// specifically for boolean parameters.
func validateAndConsumeBoolParamsIfSet(u *sinkURL, paramsWithAcceptedValues map[string]bool) error {
for param, allowedBoolValue := range paramsWithAcceptedValues {
var dest bool
wasSet, err := u.consumeBool(param, &dest)
if err != nil {
return err
}
if wasSet && dest != allowedBoolValue {
return errors.Newf("unsupported value %t for parameter %s, please use %t instead", dest,
param, allowedBoolValue)
}
}
return nil
}

dialConfig.saslEnabled = true
dialConfig.saslMechanism = sarama.SASLTypePlaintext
dialConfig.tlsEnabled = true
dialConfig.saslHandshake = true

remaining := u.remainingQueryParams()
if len(remaining) > 0 {
return kafkaDialConfig{}, errors.Newf("invalid query parameters for scheme %s", remaining, changefeedbase.SinkParamConfluentAPISecret)
// TODO(cdc): refactor this function by splitting it up. There is a large number
// of security-related params and it's hard to tell what combinations work,
// which combinations work but should not work, which combinations should not be
// passed together but still work etc. It makes sense to split based on SASL
// mechanism and separate the TLS related params (which I believe are common to
// all the SASL schemes). It would also make sense to update the docs to reflect
// these cases rather than having a huge table of auth params like we have now.
func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
switch u.Scheme {
case changefeedbase.SinkSchemeConfluentKafka:
return buildConfluentKafkaConfig(u)
case changefeedbase.SinkSchemeAzureKafka:
return buildAzureKafkaConfig(u)
default:
return buildDefaultKafkaConfig(u)
}

// Ignore all other configurations.
return dialConfig, nil
}

func buildKafkaConfig(
Expand Down Expand Up @@ -1144,6 +1123,106 @@ func buildKafkaConfig(
return config, nil
}

// extendKafkaConfigWithDefault populates the given kafkaDialConfig with other
// parameters from the sinkURL. Additionally, it validates options based on the
// given sinkURL and returns an error for unsupported values.
func extendKafkaConfigWithDefault(u *sinkURL, dialConfig kafkaDialConfig) (kafkaDialConfig, error) {
// Check required values for parameters.
boolParamsWithRequiredValues := map[string]bool{
changefeedbase.SinkParamSASLEnabled: true,
changefeedbase.SinkParamTLSEnabled: true,
changefeedbase.SinkParamSASLHandshake: true,
}
if err := validateAndConsumeBoolParamsIfSet(u, boolParamsWithRequiredValues); err != nil {
return kafkaDialConfig{}, err
}
stringParamsWithRequiredValues := map[string]string{
changefeedbase.SinkParamSASLMechanism: sarama.SASLTypePlaintext,
}
if err := validateAndConsumeParamsIfSet(u, stringParamsWithRequiredValues); err != nil {
return kafkaDialConfig{}, err
}

// Set values.
dialConfig.tlsEnabled = true
dialConfig.saslHandshake = true
dialConfig.saslEnabled = true
dialConfig.saslMechanism = sarama.SASLTypePlaintext

if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil {
return kafkaDialConfig{}, err
}

remaining := u.remainingQueryParams()
if len(remaining) > 0 {
return kafkaDialConfig{}, errors.Newf("invalid query parameters for scheme %s", remaining, changefeedbase.SinkParamConfluentAPISecret)
}

// Ignore all other configurations.
return dialConfig, nil
}

// buildAzureKafkaConfig parses the given sinkURL and constructs its
// correponding kafkaDialConfig for streaming to Azure Event Hub kafka protocol.
// Additionally, it validates options based on the given sinkURL and returns an
// error for unsupported or missing options. The sinkURL must include mandatory
// parameters shared_access_key_name and shared_access_key. Default options,
// including "tls_enabled=true," "sasl_handshake=true," "sasl_enabled=true," and
// "sasl_mechanism=PLAIN," are automatically applied, as they are the only
// supported values.
//
// See
// https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview
// on how to connect to azure event hub kafka protocol.
func buildAzureKafkaConfig(u sinkURL) (dialConfig kafkaDialConfig, _ error) {
hostName := u.Hostname()
// saslUser="$ConnectionString"
// saslPassword="Endpoint=sb://<NamespaceName>.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<KeyValue>;
sharedAccessKeyName := u.consumeParam(changefeedbase.SinkParamAzureAccessKeyName)
if sharedAccessKeyName == `` {
return kafkaDialConfig{}, newMissingParameterError(u.Scheme, changefeedbase.SinkParamAzureAccessKeyName)
}
sharedAccessKey := u.consumeParam(changefeedbase.SinkParamAzureAccessKey)
if sharedAccessKey == `` {
return kafkaDialConfig{}, newMissingParameterError(u.Scheme, changefeedbase.SinkParamAzureAccessKey)
}

dialConfig.saslUser = "$ConnectionString"
dialConfig.saslPassword = fmt.Sprintf(
"Endpoint=sb://%s/;SharedAccessKeyName=%s;SharedAccessKey=%s",
hostName, sharedAccessKeyName, sharedAccessKey)
dc, err := extendKafkaConfigWithDefault(&u, dialConfig)
if err != nil {
return kafkaDialConfig{}, err
}
return dc, nil
}

// buildConfluentKafkaConfig constructs a simple dial config which is supported
// by kafka on confluent cloud. The dial config should have `api_key` and
// `api_secret`. It automatically sets should also have sasl_enabled=true,
// sasl_mechanism=PLAIN, tls_enabled=true, and sasl_handshake=true.
//
// See https://docs.confluent.io/platform/current/security/security_tutorial.html#overview
// information on how to connect to confluent cloud kafka. There are also
// instructions when you go to the `Clients` pagex of the cluster in confluent
// cloud.
func buildConfluentKafkaConfig(u sinkURL) (dialConfig kafkaDialConfig, _ error) {
// Check for api_key and api_secret.
if dialConfig.saslUser = u.consumeParam(changefeedbase.SinkParamConfluentAPIKey); dialConfig.saslUser == "" {
return kafkaDialConfig{}, newMissingParameterError(u.Scheme, changefeedbase.SinkParamConfluentAPIKey)
}
if dialConfig.saslPassword = u.consumeParam(changefeedbase.SinkParamConfluentAPISecret); dialConfig.saslPassword == "" {
return kafkaDialConfig{}, newMissingParameterError(u.Scheme, changefeedbase.SinkParamConfluentAPISecret)
}

dc, err := extendKafkaConfigWithDefault(&u, dialConfig)
if err != nil {
return kafkaDialConfig{}, err
}
return dc, nil
}

func makeKafkaSink(
ctx context.Context,
u sinkURL,
Expand Down
86 changes: 86 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"fmt"
"net/url"
"strings"
"testing"

Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// externalConnectionKafkaSink is a wrapper sink that asserts the underlying
Expand Down Expand Up @@ -249,6 +251,47 @@ func TestChangefeedExternalConnections(t *testing.T) {
uri: "confluent-cloud://nope?api_key=fee&api_secret=bar&ca_cert=abcd",
expectedError: "invalid query parameters",
},
// azure-event-hub scheme tests
{
name: "requires parameter SharedAccessKeyName",
uri: "azure-event-hub://nope?",
expectedError: "requires parameter shared_access_key_name",
},
{
name: "requires parameter SharedAccessKey",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc",
expectedError: "requires parameter shared_access_key",
},
{
name: "requires sasl_enabled=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_enabled=false",
expectedError: "unsupported value false for parameter sasl_enabled, please use true",
},
{
name: "requires parameter sasl_mechanism=PLAIN",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_mechanism=OAUTHBEARER",
expectedError: "unsupported value OAUTHBEARER for parameter sasl_mechanism, please use PLAIN",
},
{
name: "requires parameter sasl_handshake=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_handshake=false",
expectedError: "unsupported value false for parameter sasl_handshake, please use true",
},
{
name: "requires parameter tls_enabled=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&tls_enabled=false",
expectedError: "unsupported value false for parameter tls_enabled, please use true",
},
{
name: "invalid query parameters",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&ca_cert=abcd",
expectedError: "invalid query parameters",
},
{
name: "test error with entity_path",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&entity_path=history",
expectedError: "invalid query parameters",
},
} {
t.Run(tc.name, func(t *testing.T) {
sqlDB.ExpectErr(
Expand Down Expand Up @@ -287,3 +330,46 @@ func TestChangefeedExternalConnections(t *testing.T) {
)
})
}

// TestBuildAzureKafkaConfig verifies that buildAzureKafkaConfig correctly
// parses the parameter and constructs kafka dial config correctly for azure
// data streaming.
func TestBuildAzureKafkaConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

for _, tc := range []struct {
name string
oldUri string
newUri string
}{
{
name: "test basic key/password",
oldUri: "kafka://wenyieventhubs.servicebus.windows.net:9093?tls_enabled=true&sasl_enabled=true&sasl_user=$ConnectionString&sasl_password=Endpoint%3Dsb%3A%2F%2Fwenyieventhubs.servicebus.windows.net%2F%3BSharedAccessKeyName%3Dsaspolicytpcc%3BSharedAccessKey%3D123&sasl_mechanism=PLAIN",
newUri: "azure-event-hub://wenyieventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_mechanism=PLAIN",
},
{
name: "test more complex key/password",
oldUri: "kafka://wenyieventhubs.servicebus.windows.net:9093?tls_enabled=true&sasl_enabled=true&sasl_user=$ConnectionString&sasl_password=Endpoint%3Dsb%3A%2F%2Fwenyieventhubs.servicebus.windows.net%2F%3BSharedAccessKeyName%3DRootManageSharedAccessKey%3BSharedAccessKey%3DBPkhDFhzc9v9z9grIx73en31n3tpIj%2BAEhBHykhg&sasl_mechanism=PLAIN",
newUri: "azure-event-hub://wenyieventhubs.servicebus.windows.net:9093?shared_access_key_name=RootManageSharedAccessKey&shared_access_key=BPkhDFhzc9v9z9grIx73en31n3tpIj%2BAEhBHykhg",
},
{
name: "test more complex key/password",
oldUri: "kafka://wenyieventhubs.servicebus.windows.net:9093?tls_enabled=true&sasl_enabled=true&sasl_user=$ConnectionString&sasl_password=Endpoint%3Dsb%3A%2F%2Fwenyieventhubs.servicebus.windows.net%2F%3BSharedAccessKeyName%3Dsaspolicyhistory%3BSharedAccessKey%3D7LDWg1Cv5wiNsXXDqXFQG77LldyV97Pdq%2BAEhE2eJyE%3D&sasl_mechanism=PLAIN",
newUri: "azure-event-hub://wenyieventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicyhistory&shared_access_key=7LDWg1Cv5wiNsXXDqXFQG77LldyV97Pdq%2BAEhE2eJyE%3D",
},
} {
t.Run(tc.name, func(t *testing.T) {
oldUri, err := url.Parse(tc.oldUri)
require.NoError(t, err)
newUri, err := url.Parse(tc.newUri)
require.NoError(t, err)
// oldUri uses kafka:// (buildDefaultKafkaConfig), and newUri uses
// azure-event-hub:// (buildAzureKafkaConfig).
expectedConfig, expectedError := buildDialConfig(sinkURL{URL: oldUri})
actualConfig, actualError := buildDialConfig(sinkURL{URL: newUri})
require.Equal(t, expectedError, actualError)
require.Equal(t, expectedConfig, actualConfig)
})
}
}

0 comments on commit 4b91a03

Please sign in to comment.