Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: redact ssl cert parameters in job description #95835

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,20 +382,38 @@ type tlsCerts struct {
rootCertPool *x509.CertPool
}

const (
// sslInlineURLParam is a non-standard connection URL
// parameter. When true, we assume that sslcert, sslkey, and
// sslrootcert contain URL-encoded data rather than paths.
sslInlineURLParam = "sslinline"

sslModeURLParam = "sslmode"
sslCertURLParam = "sslcert"
sslKeyURLParam = "sslkey"
sslRootCertURLParam = "sslrootcert"
)

var RedactableURLParameters = []string{
sslCertURLParam,
sslKeyURLParam,
sslRootCertURLParam,
}

// uriWithInlineTLSCertsRemoved handles the non-standard sslinline
// option. The returned URL can be passed to pgx. The returned
// tlsCerts struct can be used to apply the certificate data to the
// tls.Config produced by pgx.
func uriWithInlineTLSCertsRemoved(remote *url.URL) (*url.URL, *tlsCerts, error) {
if remote.Query().Get("sslinline") != "true" {
if remote.Query().Get(sslInlineURLParam) != "true" {
return remote, nil, nil
}

retURL := *remote
v := retURL.Query()
cert := v.Get("sslcert")
key := v.Get("sslkey")
rootcert := v.Get("sslrootcert")
cert := v.Get(sslCertURLParam)
key := v.Get(sslKeyURLParam)
rootcert := v.Get(sslRootCertURLParam)

if (cert != "" && key == "") || (cert == "" && key != "") {
return nil, nil, errors.New(`both "sslcert" and "sslkey" are required`)
Expand Down Expand Up @@ -430,14 +448,14 @@ func uriWithInlineTLSCertsRemoved(remote *url.URL) (*url.URL, *tlsCerts, error)
// TODO(ssd): This may be a sign that we should implement the
// entire configTLS function from pgx and remove all tls
// options.
if v.Get("sslmode") == "require" && rootcert != "" {
v.Set("sslmode", "verify-ca")
if v.Get(sslModeURLParam) == "require" && rootcert != "" {
v.Set(sslModeURLParam, "verify-ca")
}

v.Del("sslcert")
v.Del("sslkey")
v.Del("sslrootcert")
v.Del("sslinline")
v.Del(sslCertURLParam)
v.Del(sslKeyURLParam)
v.Del(sslRootCertURLParam)
v.Del(sslInlineURLParam)
retURL.RawQuery = v.Encode()
return &retURL, tlsInfo, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/ccl/streamingccl/replicationutils",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/jobs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,9 @@ func TestTenantStreamingShowTenant(t *testing.T) {
require.Equal(t, "replicating", status)
require.Equal(t, "none", serviceMode)
require.Equal(t, "source", source)
require.Equal(t, c.SrcURL.String(), sourceUri)
expectedURI, err := redactSourceURI(c.SrcURL.String())
require.NoError(t, err)
require.Equal(t, expectedURI, sourceUri)
require.Equal(t, ingestionJobID, jobId)
require.Less(t, maxReplTime, timeutil.Now())
require.Less(t, protectedTime, timeutil.Now())
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func getReplicationStatsAndStatus(
return nil, jobspb.ReplicationError.String(),
errors.Newf("job with id %d is not a stream ingestion job", job.ID())
}

details.StreamAddress, err = redactSourceURI(details.StreamAddress)
if err != nil {
return nil, jobspb.ReplicationError.String(), err
}

stats, err := replicationutils.GetStreamIngestionStatsNoHeartbeat(ctx, details, job.Progress())
if err != nil {
return nil, jobspb.ReplicationError.String(), err
Expand Down
22 changes: 19 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -35,10 +36,25 @@ import (
const defaultRetentionTTLSeconds = int32(25 * 60 * 60)

func streamIngestionJobDescription(
p sql.PlanHookState, streamIngestion *tree.CreateTenantFromReplication,
p sql.PlanHookState, sourceAddr string, streamIngestion *tree.CreateTenantFromReplication,
) (string, error) {
redactedSourceAddr, err := redactSourceURI(sourceAddr)
if err != nil {
return "", err
}

redactedCreateStmt := &tree.CreateTenantFromReplication{
TenantSpec: streamIngestion.TenantSpec,
ReplicationSourceTenantName: streamIngestion.ReplicationSourceTenantName,
ReplicationSourceAddress: tree.NewDString(redactedSourceAddr),
Options: streamIngestion.Options,
}
ann := p.ExtendedEvalContext().Annotations
return tree.AsStringWithFQNames(streamIngestion, ann), nil
return tree.AsStringWithFQNames(redactedCreateStmt, ann), nil
}

func redactSourceURI(addr string) (string, error) {
return cloud.SanitizeExternalStorageURI(addr, streamclient.RedactableURLParameters)
}

func ingestionTypeCheck(
Expand Down Expand Up @@ -205,7 +221,7 @@ func ingestionPlanHook(
ReplicationStartTime: replicationProducerSpec.ReplicationStartTime,
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
jobDescription, err := streamIngestionJobDescription(p, from, ingestionStmt)
if err != nil {
return err
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/ccl/streamingccl/streamingest/testdata/simple
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ SELECT clock_timestamp()::timestamp::string
wait-until-high-watermark ts=$start
----

# The job description should be redacted.
#
# The substring business in this test is to extract the hostname which has a random port it in.
exec-sql as=destination-system
CREATE FUNCTION strip_host(s string) returns string language sql AS $$ select concat(substring(s FOR position('@' IN s)), substring(s FROM position('?' IN s))) $$
----

query-sql as=destination-system
SELECT strip_host(description) FROM [SHOW JOBS] WHERE job_type='STREAM INGESTION'
----
CREATE TENANT destination FROM REPLICATION OF source ON 'postgres://root@?sslcert=redacted&sslkey=redacted&sslmode=verify-full&sslrootcert=redacted'

query-sql as=destination-system
SELECT strip_host(source_cluster_uri) FROM [SHOW TENANT destination WITH REPLICATION STATUS]
----
postgres://root@?sslcert=redacted&sslkey=redacted&sslmode=verify-full&sslrootcert=redacted

exec-sql as=source-tenant
CREATE TABLE d.x (id INT PRIMARY KEY, n INT);
----
Expand Down