Skip to content

Commit

Permalink
Merge #95835
Browse files Browse the repository at this point in the history
95835: streamingccl: redact ssl cert parameters in job description r=adityamaru a=stevendanna

This redacts ssl related query parameter from the job description of the stream ingestion job. We want these redacted because if the user is using an sslinline URL they will contain the certificate and key content itself.

One may claim that we really only need to redact sslkey in the case of sslinline=true. But, many users get spooked by seeing anything that looks like a security related artifact in these jobs descriptions, so I've opted to redact:

    sslkey
    sslcert
    sslrootcert

Epic: none

Release note: None

Co-authored-by: Steven Danna <danna@cockroachlabs.com>
  • Loading branch information
craig[bot] and stevendanna committed Feb 21, 2023
2 parents 154712a + 81c7719 commit 31b610b
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 14 deletions.
38 changes: 28 additions & 10 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,20 +382,38 @@ type tlsCerts struct {
rootCertPool *x509.CertPool
}

const (
// sslInlineURLParam is a non-standard connection URL
// parameter. When true, we assume that sslcert, sslkey, and
// sslrootcert contain URL-encoded data rather than paths.
sslInlineURLParam = "sslinline"

sslModeURLParam = "sslmode"
sslCertURLParam = "sslcert"
sslKeyURLParam = "sslkey"
sslRootCertURLParam = "sslrootcert"
)

var RedactableURLParameters = []string{
sslCertURLParam,
sslKeyURLParam,
sslRootCertURLParam,
}

// uriWithInlineTLSCertsRemoved handles the non-standard sslinline
// option. The returned URL can be passed to pgx. The returned
// tlsCerts struct can be used to apply the certificate data to the
// tls.Config produced by pgx.
func uriWithInlineTLSCertsRemoved(remote *url.URL) (*url.URL, *tlsCerts, error) {
if remote.Query().Get("sslinline") != "true" {
if remote.Query().Get(sslInlineURLParam) != "true" {
return remote, nil, nil
}

retURL := *remote
v := retURL.Query()
cert := v.Get("sslcert")
key := v.Get("sslkey")
rootcert := v.Get("sslrootcert")
cert := v.Get(sslCertURLParam)
key := v.Get(sslKeyURLParam)
rootcert := v.Get(sslRootCertURLParam)

if (cert != "" && key == "") || (cert == "" && key != "") {
return nil, nil, errors.New(`both "sslcert" and "sslkey" are required`)
Expand Down Expand Up @@ -430,14 +448,14 @@ func uriWithInlineTLSCertsRemoved(remote *url.URL) (*url.URL, *tlsCerts, error)
// TODO(ssd): This may be a sign that we should implement the
// entire configTLS function from pgx and remove all tls
// options.
if v.Get("sslmode") == "require" && rootcert != "" {
v.Set("sslmode", "verify-ca")
if v.Get(sslModeURLParam) == "require" && rootcert != "" {
v.Set(sslModeURLParam, "verify-ca")
}

v.Del("sslcert")
v.Del("sslkey")
v.Del("sslrootcert")
v.Del("sslinline")
v.Del(sslCertURLParam)
v.Del(sslKeyURLParam)
v.Del(sslRootCertURLParam)
v.Del(sslInlineURLParam)
retURL.RawQuery = v.Encode()
return &retURL, tlsInfo, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/ccl/streamingccl/replicationutils",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/utilccl",
"//pkg/cloud",
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/jobs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,9 @@ func TestTenantStreamingShowTenant(t *testing.T) {
require.Equal(t, "replicating", status)
require.Equal(t, "none", serviceMode)
require.Equal(t, "source", source)
require.Equal(t, c.SrcURL.String(), sourceUri)
expectedURI, err := redactSourceURI(c.SrcURL.String())
require.NoError(t, err)
require.Equal(t, expectedURI, sourceUri)
require.Equal(t, ingestionJobID, jobId)
require.Less(t, maxReplTime, timeutil.Now())
require.Less(t, protectedTime, timeutil.Now())
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func getReplicationStatsAndStatus(
return nil, jobspb.ReplicationError.String(),
errors.Newf("job with id %d is not a stream ingestion job", job.ID())
}

details.StreamAddress, err = redactSourceURI(details.StreamAddress)
if err != nil {
return nil, jobspb.ReplicationError.String(), err
}

stats, err := replicationutils.GetStreamIngestionStatsNoHeartbeat(ctx, details, job.Progress())
if err != nil {
return nil, jobspb.ReplicationError.String(), err
Expand Down
22 changes: 19 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -35,10 +36,25 @@ import (
const defaultRetentionTTLSeconds = int32(25 * 60 * 60)

func streamIngestionJobDescription(
p sql.PlanHookState, streamIngestion *tree.CreateTenantFromReplication,
p sql.PlanHookState, sourceAddr string, streamIngestion *tree.CreateTenantFromReplication,
) (string, error) {
redactedSourceAddr, err := redactSourceURI(sourceAddr)
if err != nil {
return "", err
}

redactedCreateStmt := &tree.CreateTenantFromReplication{
TenantSpec: streamIngestion.TenantSpec,
ReplicationSourceTenantName: streamIngestion.ReplicationSourceTenantName,
ReplicationSourceAddress: tree.NewDString(redactedSourceAddr),
Options: streamIngestion.Options,
}
ann := p.ExtendedEvalContext().Annotations
return tree.AsStringWithFQNames(streamIngestion, ann), nil
return tree.AsStringWithFQNames(redactedCreateStmt, ann), nil
}

func redactSourceURI(addr string) (string, error) {
return cloud.SanitizeExternalStorageURI(addr, streamclient.RedactableURLParameters)
}

func ingestionTypeCheck(
Expand Down Expand Up @@ -205,7 +221,7 @@ func ingestionPlanHook(
ReplicationStartTime: replicationProducerSpec.ReplicationStartTime,
}

jobDescription, err := streamIngestionJobDescription(p, ingestionStmt)
jobDescription, err := streamIngestionJobDescription(p, from, ingestionStmt)
if err != nil {
return err
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/ccl/streamingccl/streamingest/testdata/simple
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ SELECT clock_timestamp()::timestamp::string
wait-until-high-watermark ts=$start
----

# The job description should be redacted.
#
# The substring business in this test is to extract the hostname which has a random port it in.
exec-sql as=destination-system
CREATE FUNCTION strip_host(s string) returns string language sql AS $$ select concat(substring(s FOR position('@' IN s)), substring(s FROM position('?' IN s))) $$
----

query-sql as=destination-system
SELECT strip_host(description) FROM [SHOW JOBS] WHERE job_type='STREAM INGESTION'
----
CREATE TENANT destination FROM REPLICATION OF source ON 'postgres://root@?sslcert=redacted&sslkey=redacted&sslmode=verify-full&sslrootcert=redacted'

query-sql as=destination-system
SELECT strip_host(source_cluster_uri) FROM [SHOW TENANT destination WITH REPLICATION STATUS]
----
postgres://root@?sslcert=redacted&sslkey=redacted&sslmode=verify-full&sslrootcert=redacted

exec-sql as=source-tenant
CREATE TABLE d.x (id INT PRIMARY KEY, n INT);
----
Expand Down

0 comments on commit 31b610b

Please sign in to comment.