Skip to content

Commit

Permalink
Merge pull request #115567 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-115535

release-23.2: changefeedccl: redact user-sensitive info from SHOW JOBS output
  • Loading branch information
wenyihu6 committed Dec 6, 2023
2 parents e28e69d + da52700 commit bccf65d
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 1 deletion.
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Expand Up @@ -107,7 +107,13 @@ func init() {
jobspb.ChangefeedDetailsMarshaler = func(m *jobspb.ChangefeedDetails, marshaller *jsonpb.Marshaler) ([]byte, error) {
if protoreflect.ShouldRedact(marshaller) {
var err error
m.SinkURI, err = cloud.SanitizeExternalStorageURI(m.SinkURI, nil)
// Redacts user sensitive information from sinkURI.
m.SinkURI, err = cloud.SanitizeExternalStorageURI(m.SinkURI, []string{
changefeedbase.SinkParamSASLPassword,
changefeedbase.SinkParamCACert,
changefeedbase.SinkParamClientCert,
changefeedbase.SinkParamConfluentAPISecret,
})
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Expand Up @@ -988,10 +988,12 @@ func changefeedJobDescription(
sinkURI string,
opts changefeedbase.StatementOptions,
) (string, error) {
// Redacts user sensitive information from job description.
cleanedSinkURI, err := cloud.SanitizeExternalStorageURI(sinkURI, []string{
changefeedbase.SinkParamSASLPassword,
changefeedbase.SinkParamCACert,
changefeedbase.SinkParamClientCert,
changefeedbase.SinkParamConfluentAPISecret,
})
if err != nil {
return "", err
Expand Down
70 changes: 70 additions & 0 deletions pkg/ccl/changefeedccl/show_changefeed_jobs_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -115,6 +116,75 @@ func TestShowChangefeedJobsBasic(t *testing.T) {
cdcTest(t, testFn, feedTestOmitSinks("webhook", "sinkless"), feedTestNoExternalConnection)
}

// TestShowChangefeedJobsRedacted verifies that SHOW CHANGEFEED JOB, SHOW
// CHANGEFEED JOBS, and SHOW JOBS redact sensitive information (including keys
// and secrets) for its output. Regression for #113503.
func TestShowChangefeedJobsRedacted(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, stopServer := makeServer(t)
defer stopServer()

knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.WrapSink = func(s Sink, _ jobspb.JobID) Sink {
if _, ok := s.(*externalConnectionKafkaSink); ok {
return s
}
return &externalConnectionKafkaSink{sink: s}
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

const apiSecret = "bar"
const certSecret = "Zm9v"
for _, tc := range []struct {
name string
uri string
expectedSinkURI string
expectedDescription string
}{
{
name: "api_secret",
uri: fmt.Sprintf("confluent-cloud://nope?api_key=fee&api_secret=%s", apiSecret),
},
{
name: "sasl_password",
uri: fmt.Sprintf("kafka://nope/?sasl_enabled=true&sasl_handshake=false&sasl_password=%s&sasl_user=aa", apiSecret),
},
{
name: "ca_cert",
uri: fmt.Sprintf("kafka://nope?ca_cert=%s&tls_enabled=true", certSecret),
},
} {
t.Run(tc.name, func(t *testing.T) {
createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo INTO '%s'`, tc.uri)
var jobID jobspb.JobID
sqlDB.QueryRow(t, createStmt).Scan(&jobID)
var sinkURI, description string
sqlDB.QueryRow(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOB $1]", jobID).Scan(&sinkURI, &description)
expectedSinkURI := strings.Replace(tc.uri, apiSecret, "redacted", 1)
expectedSinkURI = strings.Replace(expectedSinkURI, certSecret, "redacted", 1)
expectedDescription := strings.Replace(createStmt, apiSecret, "redacted", 1)
expectedDescription = strings.Replace(expectedDescription, certSecret, "redacted", 1)
require.Equal(t, sinkURI, expectedSinkURI)
require.Equal(t, description, expectedDescription)
})
}

t.Run("jobs", func(t *testing.T) {
queryStr := sqlDB.QueryStr(t, "SELECT description from [SHOW JOBS]")
require.NotContains(t, queryStr, apiSecret)
require.NotContains(t, queryStr, certSecret)
queryStr = sqlDB.QueryStr(t, "SELECT sink_uri, description from [SHOW CHANGEFEED JOBS]")
require.NotContains(t, queryStr, apiSecret)
require.NotContains(t, queryStr, certSecret)
})
}

func TestShowChangefeedJobs(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit bccf65d

Please sign in to comment.