Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: support azure-event-hub:// for azure kafka streaming #115806

Merged
merged 3 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func init() {
changefeedbase.SinkParamCACert,
changefeedbase.SinkParamClientCert,
changefeedbase.SinkParamConfluentAPISecret,
changefeedbase.SinkParamAzureAccessKey,
})
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ func changefeedJobDescription(
changefeedbase.SinkParamCACert,
changefeedbase.SinkParamClientCert,
changefeedbase.SinkParamConfluentAPISecret,
changefeedbase.SinkParamAzureAccessKey,
})
if err != nil {
return "", err
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ const (
SinkParamConfluentAPIKey = `api_key`
SinkParamConfluentAPISecret = `api_secret`

SinkSchemeAzureKafka = `azure-event-hub`
SinkParamAzureAccessKeyName = `shared_access_key_name`
SinkParamAzureAccessKey = `shared_access_key`

RegistryParamCACert = `ca_cert`
RegistryParamClientCert = `client_cert`
RegistryParamClientKey = `client_key`
Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/changefeedccl/show_changefeed_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,22 @@ func TestShowChangefeedJobsRedacted(t *testing.T) {
name: "ca_cert",
uri: fmt.Sprintf("kafka://nope?ca_cert=%s&tls_enabled=true", certSecret),
},
{
name: "shared_access_key",
uri: fmt.Sprintf("azure-event-hub://nope?shared_access_key=%s&shared_access_key_name=plain", apiSecret),
},
} {
t.Run(tc.name, func(t *testing.T) {
createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri)
var jobID jobspb.JobID
sqlDB.QueryRow(t, createStmt).Scan(&jobID)
var sinkURI, description string
sqlDB.QueryRow(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]", jobID).Scan(&sinkURI, &description)
expectedSinkURI := strings.Replace(tc.uri, apiSecret, "redacted", 1)
expectedSinkURI = strings.Replace(expectedSinkURI, certSecret, "redacted", 1)
expectedDescription := strings.Replace(createStmt, apiSecret, "redacted", 1)
expectedDescription = strings.Replace(expectedDescription, certSecret, "redacted", 1)
require.Equal(t, sinkURI, expectedSinkURI)
require.Equal(t, description, expectedDescription)
replacer := strings.NewReplacer(apiSecret, "redacted", certSecret, "redacted")
expectedSinkURI := replacer.Replace(tc.uri)
expectedDescription := replacer.Replace(createStmt)
require.Equal(t, expectedSinkURI, sinkURI)
require.Equal(t, expectedDescription, description)
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_external_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ var supportedExternalConnectionTypes = map[string]connectionpb.ConnectionProvide
changefeedbase.SinkSchemeWebhookHTTP: connectionpb.ConnectionProvider_webhookhttp,
changefeedbase.SinkSchemeWebhookHTTPS: connectionpb.ConnectionProvider_webhookhttps,
changefeedbase.SinkSchemeConfluentKafka: connectionpb.ConnectionProvider_kafka,
changefeedbase.SinkSchemeAzureKafka: connectionpb.ConnectionProvider_kafka,
// TODO (zinger): Not including SinkSchemeExperimentalSQL for now because A: it's undocumented
// and B, in tests it leaks a *gosql.DB and I can't figure out why.
}
Expand Down
49 changes: 48 additions & 1 deletion pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (

func isKafkaSink(u *url.URL) bool {
switch u.Scheme {
case changefeedbase.SinkSchemeConfluentKafka, changefeedbase.SinkSchemeKafka:
case changefeedbase.SinkSchemeConfluentKafka, changefeedbase.SinkSchemeAzureKafka,
changefeedbase.SinkSchemeKafka:
return true
default:
return false
Expand Down Expand Up @@ -894,6 +895,8 @@ func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
switch u.Scheme {
case changefeedbase.SinkSchemeConfluentKafka:
return buildConfluentKafkaConfig(u)
case changefeedbase.SinkSchemeAzureKafka:
return buildAzureKafkaConfig(u)
default:
return buildDefaultKafkaConfig(u)
}
Expand Down Expand Up @@ -1128,6 +1131,50 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
return dialConfig, nil
}

// buildAzureKafkaConfig parses the given sinkURL and constructs its
// correponding kafkaDialConfig for streaming to Azure Event Hub kafka protocol.
// Additionally, it validates options based on the given sinkURL and returns an
// error for unsupported or missing options. The sinkURL must include mandatory
// parameters shared_access_key_name and shared_access_key. Default options,
// including "tls_enabled=true," "sasl_handshake=true," "sasl_enabled=true," and
// "sasl_mechanism=PLAIN," are automatically applied, as they are the only
// supported values.
//
// See
// https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview
// on how to connect to azure event hub kafka protocol.
func buildAzureKafkaConfig(u sinkURL) (dialConfig kafkaDialConfig, _ error) {
hostName := u.Hostname()
// saslUser="$ConnectionString"
// saslPassword="Endpoint=sb://<NamespaceName>.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<KeyValue>;
sharedAccessKeyName := u.consumeParam(changefeedbase.SinkParamAzureAccessKeyName)
if sharedAccessKeyName == `` {
return kafkaDialConfig{},
newMissingParameterError(u.Scheme /*scheme*/, changefeedbase.SinkParamAzureAccessKeyName /*param*/)
}
sharedAccessKey := u.consumeParam(changefeedbase.SinkParamAzureAccessKey)
if sharedAccessKey == `` {
return kafkaDialConfig{},
newMissingParameterError(u.Scheme /*scheme*/, changefeedbase.SinkParamAzureAccessKey /*param*/)
}

dialConfig.saslUser = "$ConnectionString"
dialConfig.saslPassword = fmt.Sprintf(
"Endpoint=sb://%s/;SharedAccessKeyName=%s;SharedAccessKey=%s",
hostName, sharedAccessKeyName, sharedAccessKey)
dialConfig, err := setDefaultParametersForConfluentAndAzure(&u, dialConfig)
if err != nil {
return kafkaDialConfig{}, err
}

remaining := u.remainingQueryParams()
if len(remaining) > 0 {
return kafkaDialConfig{}, newInvalidParameterError(u.Scheme, /*scheme*/
fmt.Sprintf("%v", remaining) /*invalidParams*/)
}
return dialConfig, nil
}

func buildKafkaConfig(
ctx context.Context,
u sinkURL,
Expand Down
110 changes: 110 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ package changefeedccl
import (
"context"
"fmt"
"net/url"
"strings"
"testing"

"github.com/IBM/sarama"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// externalConnectionKafkaSink is a wrapper sink that asserts the underlying
Expand Down Expand Up @@ -249,6 +252,47 @@ func TestChangefeedExternalConnections(t *testing.T) {
uri: "confluent-cloud://nope?api_key=fee&api_secret=bar&ca_cert=abcd",
expectedError: "invalid query parameters",
},
// azure-event-hub scheme tests
{
name: "requires parameter SharedAccessKeyName",
uri: "azure-event-hub://nope?",
expectedError: "requires parameter shared_access_key_name",
},
{
name: "requires parameter SharedAccessKey",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc",
expectedError: "requires parameter shared_access_key",
},
{
name: "requires sasl_enabled=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_enabled=false",
expectedError: "unsupported value false for parameter sasl_enabled, please use true",
},
{
name: "requires parameter sasl_mechanism=PLAIN",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_mechanism=OAUTHBEARER",
expectedError: "unsupported value OAUTHBEARER for parameter sasl_mechanism, please use PLAIN",
},
{
name: "requires parameter sasl_handshake=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_handshake=false",
expectedError: "unsupported value false for parameter sasl_handshake, please use true",
},
{
name: "requires parameter tls_enabled=true",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&tls_enabled=false",
expectedError: "unsupported value false for parameter tls_enabled, please use true",
},
{
name: "invalid query parameters",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&ca_cert=abcd",
expectedError: "invalid query parameters",
},
{
name: "test error with entity_path",
uri: "azure-event-hub://nope?shared_access_key_name=saspolicytpcc&shared_access_key=123&entity_path=history",
expectedError: "invalid query parameters",
},
} {
t.Run(tc.name, func(t *testing.T) {
sqlDB.ExpectErr(
Expand All @@ -269,6 +313,11 @@ func TestChangefeedExternalConnections(t *testing.T) {
sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION confluent2 AS 'confluent-cloud://nope?api_key=fee&api_secret=bar&`+
`sasl_mechanism=PLAIN&tls_enabled=true&topic_prefix=foo&sasl_enabled=true&sasl_handshake=true&`+
`insecure_tls_skip_verify=true'`)
sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION azure1 AS 'azure-event-hub://nope?shared_access_key_name=fee&shared_access_key=123&topic_prefix=foo'`)
sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION azure2 AS 'azure-event-hub://nope?shared_access_key_name=fee&shared_access_key=123&topic_prefix=foo&`+
`sasl_mechanism=PLAIN&tls_enabled=true&sasl_enabled=true&sasl_handshake=true'`)
sqlDB.Exec(t, `CREATE EXTERNAL CONNECTION azure3 AS 'azure-event-hub://nope?shared_access_key_name=fee&shared_access_key=123&topic_name=foo&`+
`sasl_mechanism=PLAIN&tls_enabled=true&sasl_enabled=true&sasl_handshake=true'`)

sqlDB.Exec(t, `CREATE CHANGEFEED FOR foo INTO 'external://nope'`)
sqlDB.Exec(t, `CREATE CHANGEFEED FOR foo INTO 'external://nope-with-params'`)
Expand All @@ -287,3 +336,64 @@ func TestChangefeedExternalConnections(t *testing.T) {
)
})
}

// TestBuildAzureKafkaConfig verifies that buildAzureKafkaConfig correctly
// parses the parameter and constructs kafka dial config correctly for azure
// data streaming.
func TestBuildAzureKafkaConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

makeKafkaDialConfig := func(hostName string, decodedAccessKeyName string, decodedAccessKey string) kafkaDialConfig {
return kafkaDialConfig{
saslUser: "$ConnectionString",
saslPassword: fmt.Sprintf(
"Endpoint=sb://%s/;SharedAccessKeyName=%s;SharedAccessKey=%s",
hostName, decodedAccessKeyName, decodedAccessKey),
saslEnabled: true,
tlsEnabled: true,
saslHandshake: true,
saslMechanism: sarama.SASLTypePlaintext,
}
}

for _, tc := range []struct {
name string
uri string
expectedKafkaDialConfig kafkaDialConfig
}{
{
name: "test basic key/password with sasl_mechanism",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_mechanism=PLAIN",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicytpcc", "123"),
},
{
name: "test basic key/password with sasl_enabled",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_enabled=true",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicytpcc", "123"),
},
{
name: "test basic key/password with tls_enabled",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&tls_enabled=true",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicytpcc", "123"),
},
{
name: "test basic key/password with sasl_handshake",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicytpcc&shared_access_key=123&sasl_enabled=true&tls_enabled=true&sasl_handshake=true",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicytpcc", "123"),
},
{
name: "test more complex key/password with saspolicyhistory policy",
uri: "azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=saspolicyhistory&shared_access_key=q%2BSecretRedacted%3D",
expectedKafkaDialConfig: makeKafkaDialConfig("myeventhubs.servicebus.windows.net", "saspolicyhistory", "q+SecretRedacted="),
},
} {
t.Run(tc.name, func(t *testing.T) {
oldUri, err := url.Parse(tc.uri)
require.NoError(t, err)
actualConfig, expectedError := buildDialConfig(sinkURL{URL: oldUri})
require.NoError(t, expectedError)
require.Equal(t, tc.expectedKafkaDialConfig, actualConfig)
})
}
}
25 changes: 13 additions & 12 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func (ct *cdcTester) startStatsCollection() func() {
}

type AuthorizationRuleKeys struct {
PrimaryConnectionString string `json:"primaryConnectionString"`
KeyName string `json:"keyName"`
PrimaryKey string `json:"primaryKey"`
}

func (ct *cdcTester) startCRDBChaos() {
Expand Down Expand Up @@ -265,13 +266,13 @@ func (ct *cdcTester) setupSink(args feedArgs) string {
if err := kafka.installAzureCli(ct.ctx); err != nil {
kafka.t.Fatal(err)
}
connectionString, err := kafka.getConnectionString(ct.ctx)
accessKeyName, accessKey, err := kafka.getAzureEventHubAccess(ct.ctx)
if err != nil {
kafka.t.Fatal(err)
}
sinkURI = fmt.Sprintf(
`kafka://cdc-roachtest.servicebus.windows.net:9093?tls_enabled=true&sasl_enabled=true&sasl_user=$ConnectionString&sasl_password=%s&sasl_mechanism=PLAIN&topic_name=testing`,
url.QueryEscape(connectionString),
`azure-event-hub://cdc-roachtest.servicebus.windows.net:9093?shared_access_key_name=%s&shared_access_key=%s&topic_name=testing`,
url.QueryEscape(accessKeyName), url.QueryEscape(accessKey),
)
default:
ct.t.Fatalf("unknown sink provided: %s", args.sinkType)
Expand Down Expand Up @@ -2295,10 +2296,10 @@ func (k kafkaManager) installAzureCli(ctx context.Context) error {
})
}

// getConnectionString retrieves the Azure Event Hub connection string for the
// cdc-roachtest event hub set up in the CRL Azure account for roachtest
// getAzureEventHubAccess retrieves the Azure Event Hub access key name and key
// for the cdc-roachtest event hub set up in the CRL Azure account for roachtest
// testing.
func (k kafkaManager) getConnectionString(ctx context.Context) (string, error) {
func (k kafkaManager) getAzureEventHubAccess(ctx context.Context) (string, string, error) {
// The necessary credential env vars have been added to TeamCity agents by
// dev-inf. Note that running this test on roachprod would not work due to
// lacking the required credentials env vars set up.
Expand All @@ -2312,29 +2313,29 @@ func (k kafkaManager) getConnectionString(ctx context.Context) (string, error) {
cmdStr := fmt.Sprintf("az login --service-principal -t %s -u %s -p=%s", azureTenantID, azureClientID, azureClientSecret)
_, err := k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az login`")
return "", "", errors.Wrap(err, "error running `az login`")
}

cmdStr = fmt.Sprintf("az account set --subscription %s", azureSubscriptionID)
_, err = k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az account set` command")
return "", "", errors.Wrap(err, "error running `az account set` command")
}

cmdStr = "az eventhubs namespace authorization-rule keys list --name cdc-roachtest-auth-rule " +
"--namespace-name cdc-roachtest --resource-group e2e-infra-event-hub-rg"
results, err := k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az eventhubs` command")
return "", "", errors.Wrap(err, "error running `az eventhubs` command")
}

var keys AuthorizationRuleKeys
err = json.Unmarshal([]byte(results.Stdout), &keys)
if err != nil {
return "", errors.Wrap(err, "error unmarshalling az eventhubs keys")
return "", "", errors.Wrap(err, "error unmarshalling az eventhubs keys")
}

return keys.PrimaryConnectionString, nil
return keys.KeyName, keys.PrimaryKey, nil
}

func (k kafkaManager) runWithRetry(ctx context.Context, cmd string) {
Expand Down