Skip to content

Commit

Permalink
Merge #115806
Browse files Browse the repository at this point in the history
115806: changefeedccl: support azure-event-hub:// for azure kafka streaming r=jayshrivastava a=wenyihu6

**changefeedccl: support azure-event-hub:// for azure kafka streaming**
Previously, users had to navigate the complexities of obtaining azure event
hub's kafka endpoint and the corresponding sasl_user and sasl_password for azure
streaming.

This patch improves it to support a new scheme azure-event-hub:// with the
following syntax:

```
azure-event-hub://<NamespaceName>.servicebus.windows.net?shared_access_key_name=<KeyName>&shared_access_key=<KeyValue>
```

azure-event-hub:// can now be used to connect to kafka hosted on Azure event
hubs. The sinkURL must include mandatory parameters shared_access_key_name and
shared_access_key.   By default and by requirements, the options
"tls_enabled=true," "sasl_handshake=true," "sasl_enabled=true," and
"sasl_mechanism=PLAIN" are applied, as they are the only supported options.
Other parameters such as "topic_name", and "topic_prefix" are also supported.

Resolves: #103901, #110558

Release note (enterprise change): Changefeeds now support a new scheme
azure-event-hub:// for kafka data streaming to azure event hubs. The sinkURL
must include mandatory parameters shared_access_key_name and shared_access_key.
By default and by requirements, the options "tls_enabled=true,"
"sasl_handshake=true," "sasl_enabled=true," and "sasl_mechanism=PLAIN" are
applied, as they are the only supported options. Other parameters such as
"topic_name", and "topic_prefix" are also supported.

An example URI is:
```
azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=abc&shared_access_key=123
```
---
**changefeedccl: redact shared_access_key from SHOW JOBS**

Now that we support azure-event-hub:// scheme, this patch redacts
shared_access_key info for azure event hub from SHOW JOBS output.
See also: #103901
Epic: None
Release note: None

---
**roachtest/cdc: add roachtest for azure-event-hub://**

This patch adds a roachtest for kafka streaming using azure-event-hub://.


Co-authored-by: Wenyi Hu <wenyi@cockroachlabs.com>
  • Loading branch information
craig[bot] and wenyihu6 committed Mar 2, 2024
2 parents e21529a + 7555372 commit 791969f
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 19 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func init() {
changefeedbase.SinkParamCACert,
changefeedbase.SinkParamClientCert,
changefeedbase.SinkParamConfluentAPISecret,
changefeedbase.SinkParamAzureAccessKey,
})
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ func changefeedJobDescription(
changefeedbase.SinkParamCACert,
changefeedbase.SinkParamClientCert,
changefeedbase.SinkParamConfluentAPISecret,
changefeedbase.SinkParamAzureAccessKey,
})
if err != nil {
return "", err
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ const (
SinkParamConfluentAPIKey = `api_key`
SinkParamConfluentAPISecret = `api_secret`

SinkSchemeAzureKafka = `azure-event-hub`
SinkParamAzureAccessKeyName = `shared_access_key_name`
SinkParamAzureAccessKey = `shared_access_key`

RegistryParamCACert = `ca_cert`
RegistryParamClientCert = `client_cert`
RegistryParamClientKey = `client_key`
Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/changefeedccl/show_changefeed_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,22 @@ func TestShowChangefeedJobsRedacted(t *testing.T) {
name: "ca_cert",
uri: fmt.Sprintf("kafka://nope?ca_cert=%s&tls_enabled=true", certSecret),
},
{
name: "shared_access_key",
uri: fmt.Sprintf("azure-event-hub://nope?shared_access_key=%s&shared_access_key_name=plain", apiSecret),
},
} {
t.Run(tc.name, func(t *testing.T) {
createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri)
var jobID jobspb.JobID
sqlDB.QueryRow(t, createStmt).Scan(&jobID)
var sinkURI, description string
sqlDB.QueryRow(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]", jobID).Scan(&sinkURI, &description)
expectedSinkURI := strings.Replace(tc.uri, apiSecret, "redacted", 1)
expectedSinkURI = strings.Replace(expectedSinkURI, certSecret, "redacted", 1)
expectedDescription := strings.Replace(createStmt, apiSecret, "redacted", 1)
expectedDescription = strings.Replace(expectedDescription, certSecret, "redacted", 1)
require.Equal(t, sinkURI, expectedSinkURI)
require.Equal(t, description, expectedDescription)
replacer := strings.NewReplacer(apiSecret, "redacted", certSecret, "redacted")
expectedSinkURI := replacer.Replace(tc.uri)
expectedDescription := replacer.Replace(createStmt)
require.Equal(t, expectedSinkURI, sinkURI)
require.Equal(t, expectedDescription, description)
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_external_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var supportedExternalConnectionTypes = map[string]connectionpb.ConnectionProvide
changefeedbase.SinkSchemeWebhookHTTP: connectionpb.ConnectionProvider_webhookhttp,
changefeedbase.SinkSchemeWebhookHTTPS: connectionpb.ConnectionProvider_webhookhttps,
changefeedbase.SinkSchemeConfluentKafka: connectionpb.ConnectionProvider_kafka,
changefeedbase.SinkSchemeAzureKafka: connectionpb.ConnectionProvider_kafka,
// TODO (zinger): Not including SinkSchemeExperimentalSQL for now because A: it's undocumented
// and B, in tests it leaks a *gosql.DB and I can't figure out why.
}
Expand Down
49 changes: 48 additions & 1 deletion pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (

func isKafkaSink(u *url.URL) bool {
switch u.Scheme {
case changefeedbase.SinkSchemeConfluentKafka, changefeedbase.SinkSchemeKafka:
case changefeedbase.SinkSchemeConfluentKafka, changefeedbase.SinkSchemeAzureKafka,
changefeedbase.SinkSchemeKafka:
return true
default:
return false
Expand Down Expand Up @@ -894,6 +895,8 @@ func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
switch u.Scheme {
case changefeedbase.SinkSchemeConfluentKafka:
return buildConfluentKafkaConfig(u)
case changefeedbase.SinkSchemeAzureKafka:
return buildAzureKafkaConfig(u)
default:
return buildDefaultKafkaConfig(u)
}
Expand Down Expand Up @@ -1128,6 +1131,50 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
return dialConfig, nil
}

// buildAzureKafkaConfig parses the given sinkURL and constructs its
// correponding kafkaDialConfig for streaming to Azure Event Hub kafka protocol.
// Additionally, it validates options based on the given sinkURL and returns an
// error for unsupported or missing options. The sinkURL must include mandatory
// parameters shared_access_key_name and shared_access_key. Default options,
// including "tls_enabled=true," "sasl_handshake=true," "sasl_enabled=true," and
// "sasl_mechanism=PLAIN," are automatically applied, as they are the only
// supported values.
//
// See
// https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview
// on how to connect to azure event hub kafka protocol.
func buildAzureKafkaConfig(u sinkURL) (dialConfig kafkaDialConfig, _ error) {
hostName := u.Hostname()
// saslUser="$ConnectionString"
// saslPassword="Endpoint=sb://<NamespaceName>.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<KeyValue>;
sharedAccessKeyName := u.consumeParam(changefeedbase.SinkParamAzureAccessKeyName)
if sharedAccessKeyName == `` {
return kafkaDialConfig{},
newMissingParameterError(u.Scheme /*scheme*/, changefeedbase.SinkParamAzureAccessKeyName /*param*/)
}
sharedAccessKey := u.consumeParam(changefeedbase.SinkParamAzureAccessKey)
if sharedAccessKey == `` {
return kafkaDialConfig{},
newMissingParameterError(u.Scheme /*scheme*/, changefeedbase.SinkParamAzureAccessKey /*param*/)
}

dialConfig.saslUser = "$ConnectionString"
dialConfig.saslPassword = fmt.Sprintf(
"Endpoint=sb://%s/;SharedAccessKeyName=%s;SharedAccessKey=%s",
hostName, sharedAccessKeyName, sharedAccessKey)
dialConfig, err := setDefaultParametersForConfluentAndAzure(&u, dialConfig)
if err != nil {
return kafkaDialConfig{}, err
}

remaining := u.remainingQueryParams()
if len(remaining) > 0 {
return kafkaDialConfig{}, newInvalidParameterError(u.Scheme, /*scheme*/
fmt.Sprintf("%v", remaining) /*invalidParams*/)
}
return dialConfig, nil
}

func buildKafkaConfig(
ctx context.Context,
u sinkURL,
Expand Down
110 changes: 110 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ package changefeedccl
import (
"context"
"fmt"
"net/url"
"strings"
"testing"

"github.com/IBM/sarama"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// externalConnectionKafkaSink is a wrapper sink that asserts the underlying
Expand Down Expand Up @@ -249,6 +252,47 @@ func TestChangefeedExternalConnections(t *testing.T) {
uri: "confluent-cloud://nope?api_key=fee&api_secret=bar&ca_cert=abcd",
expectedError: "invalid query parameters",
},
// azure-event-hub scheme tests
{
name: "requires parameter SharedAccessKeyName",
uri: "azure-event-hub://nope?",
expectedError: "requires parameter shared_access_key_name",
},
{
name: "requires parameter SharedAccessKey",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc",
expectedError: "requires parameter shared_access_key",
},
{
name: "requires sasl_enabled=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_enabled=false",
expectedError: "unsupported value false for parameter sasl_enabled, please use true",
},
{
name: "requires parameter sasl_mechanism=PLAIN",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_mechanism=OAUTHBEARER",
expectedError: "unsupported value OAUTHBEARER for parameter sasl_mechanism, please use PLAIN",
},
{
name: "requires parameter sasl_handshake=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_handshake=false",
expectedError: "unsupported value false for parameter sasl_handshake, please use true",
},
{
name: "requires parameter tls_enabled=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&tls_enabled=false",
expectedError: "unsupported value false for parameter tls_enabled, please use true",
},
{
name: "invalid query parameters",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&ca_cert=abcd",
expectedError: "invalid query parameters",
},
{
name: "test error with entity_path",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&entity_path=history",
expectedError: "invalid query parameters",
},
} {
t.Run(tc.name, func(t *testing.T) {
sqlDB.ExpectErr(
Expand All @@ -269,6 +313,11 @@ func TestChangefeedExternalConnections(t *testing.T) {
sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION confluent2 AS 'confluent-cloud://nope?api_key=fee&api_secret=bar&`+
`sasl_mechanism=PLAIN&tls_enabled=true&topic_prefix=foo&sasl_enabled=true&sasl_handshake=true&`+
`insecure_tls_skip_verify=true'`)
sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION azure1 AS 'azure-event-hub://nope?shared_access_key_name=fee&shared_access_key=123&topic_prefix=foo'`)
sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION azure2 AS 'azure-event-hub://nope?shared_access_key_name=fee&shared_access_key=123&topic_prefix=foo&`+
`sasl_mechanism=PLAIN&tls_enabled=true&sasl_enabled=true&sasl_handshake=true'`)
sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION azure3 AS 'azure-event-hub://nope?shared_access_key_name=fee&shared_access_key=123&topic_name=foo&`+
`sasl_mechanism=PLAIN&tls_enabled=true&sasl_enabled=true&sasl_handshake=true'`)

sqlDB.Exec(t, `CREATE CHANGEFEED FOR foo INTO 'external://nope'`)
sqlDB.Exec(t, `CREATE CHANGEFEED FOR foo INTO 'external://nope-with-params'`)
Expand All @@ -287,3 +336,64 @@ func TestChangefeedExternalConnections(t *testing.T) {
)
})
}

// TestBuildAzureKafkaConfig verifies that buildAzureKafkaConfig correctly
// parses the parameter and constructs kafka dial config correctly for azure
// data streaming.
func TestBuildAzureKafkaConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

makeKafkaDialConfig := func(hostName string, decodedAccessKeyName string, decodedAccessKey string) kafkaDialConfig {
return kafkaDialConfig{
saslUser: "$ConnectionString",
saslPassword: fmt.Sprintf(
"Endpoint=sb://%s/;SharedAccessKeyName=%s;SharedAccessKey=%s",
hostName, decodedAccessKeyName, decodedAccessKey),
saslEnabled: true,
tlsEnabled: true,
saslHandshake: true,
saslMechanism: sarama.SASLTypePlaintext,
}
}

for _, tc := range []struct {
name string
uri string
expectedKafkaDialConfig kafkaDialConfig
}{
{
name: "test basic key/password with sasl_mechanism",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_mechanism=PLAIN",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicytpcc", "123"),
},
{
name: "test basic key/password with sasl_enabled",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_enabled=true",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicytpcc", "123"),
},
{
name: "test basic key/password with tls_enabled",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&tls_enabled=true",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicytpcc", "123"),
},
{
name: "test basic key/password with sasl_handshake",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_enabled=true&tls_enabled=true&sasl_handshake=true",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicytpcc", "123"),
},
{
name: "test more complex key/password with saspolicyhistory policy",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicyhistory&shared_access_key=q%2BSecretRedacted%3D",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicyhistory", "q+SecretRedacted="),
},
} {
t.Run(tc.name, func(t *testing.T) {
oldUri, err := url.Parse(tc.uri)
require.NoError(t, err)
actualConfig, expectedError := buildDialConfig(sinkURL{URL: oldUri})
require.NoError(t, expectedError)
require.Equal(t, tc.expectedKafkaDialConfig, actualConfig)
})
}
}
25 changes: 13 additions & 12 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func (ct *cdcTester) startStatsCollection() func() {
}

type AuthorizationRuleKeys struct {
PrimaryConnectionString string `json:"primaryConnectionString"`
KeyName string `json:"keyName"`
PrimaryKey string `json:"primaryKey"`
}

func (ct *cdcTester) startCRDBChaos() {
Expand Down Expand Up @@ -265,13 +266,13 @@ func (ct *cdcTester) setupSink(args feedArgs) string {
if err := kafka.installAzureCli(ct.ctx); err != nil {
kafka.t.Fatal(err)
}
connectionString, err := kafka.getConnectionString(ct.ctx)
accessKeyName, accessKey, err := kafka.getAzureEventHubAccess(ct.ctx)
if err != nil {
kafka.t.Fatal(err)
}
sinkURI = fmt.Sprintf(
`kafka://cdc-roachtest.servicebus.windows.net:9093?tls_enabled=true&sasl_enabled=true&sasl_user=$ConnectionString&sasl_password=%s&sasl_mechanism=PLAIN&topic_name=testing`,
url.QueryEscape(connectionString),
`azure-event-hub://cdc-roachtest.servicebus.windows.net:9093?shared_access_key_name=%s&shared_access_key=%s&topic_name=testing`,
url.QueryEscape(accessKeyName), url.QueryEscape(accessKey),
)
default:
ct.t.Fatalf("unknown sink provided: %s", args.sinkType)
Expand Down Expand Up @@ -2295,10 +2296,10 @@ func (k kafkaManager) installAzureCli(ctx context.Context) error {
})
}

// getConnectionString retrieves the Azure Event Hub connection string for the
// cdc-roachtest event hub set up in the CRL Azure account for roachtest
// getAzureEventHubAccess retrieves the Azure Event Hub access key name and key
// for the cdc-roachtest event hub set up in the CRL Azure account for roachtest
// testing.
func (k kafkaManager) getConnectionString(ctx context.Context) (string, error) {
func (k kafkaManager) getAzureEventHubAccess(ctx context.Context) (string, string, error) {
// The necessary credential env vars have been added to TeamCity agents by
// dev-inf. Note that running this test on roachprod would not work due to
// lacking the required credentials env vars set up.
Expand All @@ -2312,29 +2313,29 @@ func (k kafkaManager) getConnectionString(ctx context.Context) (string, error) {
cmdStr := fmt.Sprintf("az login --service-principal -t %s -u %s -p=%s", azureTenantID, azureClientID, azureClientSecret)
_, err := k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az login`")
return "", "", errors.Wrap(err, "error running `az login`")
}

cmdStr = fmt.Sprintf("az account set --subscription %s", azureSubscriptionID)
_, err = k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az account set` command")
return "", "", errors.Wrap(err, "error running `az account set` command")
}

cmdStr = "az eventhubs namespace authorization-rule keys list --name cdc-roachtest-auth-rule " +
"--namespace-name cdc-roachtest --resource-group e2e-infra-event-hub-rg"
results, err := k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az eventhubs` command")
return "", "", errors.Wrap(err, "error running `az eventhubs` command")
}

var keys AuthorizationRuleKeys
err = json.Unmarshal([]byte(results.Stdout), &keys)
if err != nil {
return "", errors.Wrap(err, "error unmarshalling az eventhubs keys")
return "", "", errors.Wrap(err, "error unmarshalling az eventhubs keys")
}

return keys.PrimaryConnectionString, nil
return keys.KeyName, keys.PrimaryKey, nil
}

func (k kafkaManager) runWithRetry(ctx context.Context, cmd string) {
Expand Down

0 comments on commit 791969f

Please sign in to comment.