diff --git a/build/teamcity/cockroach/nightlies/stress_engflow.sh b/build/teamcity/cockroach/nightlies/stress_engflow.sh index 1d9605236941..339be14c5ee1 100755 --- a/build/teamcity/cockroach/nightlies/stress_engflow.sh +++ b/build/teamcity/cockroach/nightlies/stress_engflow.sh @@ -6,6 +6,4 @@ export EXTRA_TEST_ARGS="--config use_ci_timeouts" THIS_DIR=$(cd "$(dirname "$0")" && pwd) -unset GITHUB_API_TOKEN - $THIS_DIR/stress_engflow_impl.sh diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 408aeede8ff6..d92ae8f602de 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -107,7 +107,13 @@ func init() { jobspb.ChangefeedDetailsMarshaler = func(m *jobspb.ChangefeedDetails, marshaller *jsonpb.Marshaler) ([]byte, error) { if protoreflect.ShouldRedact(marshaller) { var err error - m.SinkURI, err = cloud.SanitizeExternalStorageURI(m.SinkURI, nil) + // Redacts user sensitive information from sinkURI. + m.SinkURI, err = cloud.SanitizeExternalStorageURI(m.SinkURI, []string{ + changefeedbase.SinkParamSASLPassword, + changefeedbase.SinkParamCACert, + changefeedbase.SinkParamClientCert, + changefeedbase.SinkParamConfluentAPISecret, + }) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index b5b82982179e..79ed25d318fd 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -978,10 +978,12 @@ func changefeedJobDescription( sinkURI string, opts changefeedbase.StatementOptions, ) (string, error) { + // Redacts user sensitive information from job description. cleanedSinkURI, err := cloud.SanitizeExternalStorageURI(sinkURI, []string{ changefeedbase.SinkParamSASLPassword, changefeedbase.SinkParamCACert, changefeedbase.SinkParamClientCert, + changefeedbase.SinkParamConfluentAPISecret, }) if err != nil { return "", err diff --git a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel index 4eeaa13b823b..3e7ca39e6b8c 100644 --- a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel @@ -43,7 +43,7 @@ go_library( go_test( name = "schemafeed_test", - size = "medium", + size = "large", srcs = [ "helpers_test.go", "main_test.go", diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 7c2e944a69c8..44f6a0100295 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -115,6 +116,75 @@ func TestShowChangefeedJobsBasic(t *testing.T) { cdcTest(t, testFn, feedTestOmitSinks("webhook", "sinkless"), feedTestNoExternalConnection) } +// TestShowChangefeedJobsRedacted verifies that SHOW CHANGEFEED JOB, SHOW +// CHANGEFEED JOBS, and SHOW JOBS redact sensitive information (including keys +// and secrets) for its output. Regression for #113503. +func TestShowChangefeedJobsRedacted(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, stopServer := makeServer(t) + defer stopServer() + + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + knobs.WrapSink = func(s Sink, _ jobspb.JobID) Sink { + if _, ok := s.(*externalConnectionKafkaSink); ok { + return s + } + return &externalConnectionKafkaSink{sink: s} + } + + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + const apiSecret = "bar" + const certSecret = "Zm9v" + for _, tc := range []struct { + name string + uri string + expectedSinkURI string + expectedDescription string + }{ + { + name: "api_secret", + uri: fmt.Sprintf("confluent-cloud://nope?api_key=fee&api_secret=%s", apiSecret), + }, + { + name: "sasl_password", + uri: fmt.Sprintf("kafka://nope/?sasl_enabled=true&sasl_handshake=false&sasl_password=%s&sasl_user=aa", apiSecret), + }, + { + name: "ca_cert", + uri: fmt.Sprintf("kafka://nope?ca_cert=%s&tls_enabled=true", certSecret), + }, + } { + t.Run(tc.name, func(t *testing.T) { + createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri) + var jobID jobspb.JobID + sqlDB.QueryRow(t, createStmt).Scan(&jobID) + var sinkURI, description string + sqlDB.QueryRow(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]", jobID).Scan(&sinkURI, &description) + expectedSinkURI := strings.Replace(tc.uri, apiSecret, "redacted", 1) + expectedSinkURI = strings.Replace(expectedSinkURI, certSecret, "redacted", 1) + expectedDescription := strings.Replace(createStmt, apiSecret, "redacted", 1) + expectedDescription = strings.Replace(expectedDescription, certSecret, "redacted", 1) + require.Equal(t, sinkURI, expectedSinkURI) + require.Equal(t, description, expectedDescription) + }) + } + + t.Run("jobs", func(t *testing.T) { + queryStr := sqlDB.QueryStr(t, "SELECT description from [SHOW JOBS]") + require.NotContains(t, queryStr, apiSecret) + require.NotContains(t, queryStr, certSecret) + queryStr = sqlDB.QueryStr(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOBS]") + require.NotContains(t, queryStr, apiSecret) + require.NotContains(t, queryStr, certSecret) + }) +} + func TestShowChangefeedJobs(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)