Skip to content

Commit

Permalink
roachtest/cdc: add roachtest for azure-event-hub://
Browse files Browse the repository at this point in the history
This patch changes roachtest kafka-azure for azure event hub using the new
scheme azure-event-hub://.
  • Loading branch information
wenyihu6 committed Mar 1, 2024
1 parent 811ecd6 commit 7555372
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
7 changes: 3 additions & 4 deletions pkg/ccl/changefeedccl/show_changefeed_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,9 @@ func TestShowChangefeedJobsRedacted(t *testing.T) {
sqlDB.QueryRow(t, createStmt).Scan(&jobID)
var sinkURI, description string
sqlDB.QueryRow(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]", jobID).Scan(&sinkURI, &description)
expectedSinkURI := strings.Replace(tc.uri, apiSecret, "redacted", 1)
expectedSinkURI = strings.Replace(expectedSinkURI, certSecret, "redacted", 1)
expectedDescription := strings.Replace(createStmt, apiSecret, "redacted", 1)
expectedDescription = strings.Replace(expectedDescription, certSecret, "redacted", 1)
replacer := strings.NewReplacer(apiSecret, "redacted", certSecret, "redacted")
expectedSinkURI := replacer.Replace(tc.uri)
expectedDescription := replacer.Replace(createStmt)
require.Equal(t, expectedSinkURI, sinkURI)
require.Equal(t, expectedDescription, description)
})
Expand Down
25 changes: 13 additions & 12 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func (ct *cdcTester) startStatsCollection() func() {
}

type AuthorizationRuleKeys struct {
PrimaryConnectionString string `json:"primaryConnectionString"`
KeyName string `json:"keyName"`
PrimaryKey string `json:"primaryKey"`
}

func (ct *cdcTester) startCRDBChaos() {
Expand Down Expand Up @@ -265,13 +266,13 @@ func (ct *cdcTester) setupSink(args feedArgs) string {
if err := kafka.installAzureCli(ct.ctx); err != nil {
kafka.t.Fatal(err)
}
connectionString, err := kafka.getConnectionString(ct.ctx)
accessKeyName, accessKey, err := kafka.getAzureEventHubAccess(ct.ctx)
if err != nil {
kafka.t.Fatal(err)
}
sinkURI = fmt.Sprintf(
`kafka://cdc-roachtest.servicebus.windows.net:9093?tls_enabled=true&sasl_enabled=true&sasl_user=$ConnectionString&sasl_password=%s&sasl_mechanism=PLAIN&topic_name=testing`,
url.QueryEscape(connectionString),
`azure-event-hub://cdc-roachtest.servicebus.windows.net:9093?shared_access_key_name=%s&shared_access_key=%s&topic_name=testing`,
url.QueryEscape(accessKeyName), url.QueryEscape(accessKey),
)
default:
ct.t.Fatalf("unknown sink provided: %s", args.sinkType)
Expand Down Expand Up @@ -2295,10 +2296,10 @@ func (k kafkaManager) installAzureCli(ctx context.Context) error {
})
}

// getConnectionString retrieves the Azure Event Hub connection string for the
// cdc-roachtest event hub set up in the CRL Azure account for roachtest
// getAzureEventHubAccess retrieves the Azure Event Hub access key name and key
// for the cdc-roachtest event hub set up in the CRL Azure account for roachtest
// testing.
func (k kafkaManager) getConnectionString(ctx context.Context) (string, error) {
func (k kafkaManager) getAzureEventHubAccess(ctx context.Context) (string, string, error) {
// The necessary credential env vars have been added to TeamCity agents by
// dev-inf. Note that running this test on roachprod would not work due to
// lacking the required credentials env vars set up.
Expand All @@ -2312,29 +2313,29 @@ func (k kafkaManager) getConnectionString(ctx context.Context) (string, error) {
cmdStr := fmt.Sprintf("az login --service-principal -t %s -u %s -p=%s", azureTenantID, azureClientID, azureClientSecret)
_, err := k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az login`")
return "", "", errors.Wrap(err, "error running `az login`")
}

cmdStr = fmt.Sprintf("az account set --subscription %s", azureSubscriptionID)
_, err = k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az account set` command")
return "", "", errors.Wrap(err, "error running `az account set` command")
}

cmdStr = "az eventhubs namespace authorization-rule keys list --name cdc-roachtest-auth-rule " +
"--namespace-name cdc-roachtest --resource-group e2e-infra-event-hub-rg"
results, err := k.c.RunWithDetailsSingleNode(ctx, k.t.L(), option.WithNodes(k.kafkaSinkNode), cmdStr)
if err != nil {
return "", errors.Wrap(err, "error running `az eventhubs` command")
return "", "", errors.Wrap(err, "error running `az eventhubs` command")
}

var keys AuthorizationRuleKeys
err = json.Unmarshal([]byte(results.Stdout), &keys)
if err != nil {
return "", errors.Wrap(err, "error unmarshalling az eventhubs keys")
return "", "", errors.Wrap(err, "error unmarshalling az eventhubs keys")
}

return keys.PrimaryConnectionString, nil
return keys.KeyName, keys.PrimaryKey, nil
}

func (k kafkaManager) runWithRetry(ctx context.Context, cmd string) {
Expand Down

0 comments on commit 7555372

Please sign in to comment.