Skip to content

Commit

Permalink
externalconn, streamingccl: support external connections as replicati…
Browse files Browse the repository at this point in the history
…on sources

This PR adds support for creating external connections with the
postgresql scheme and using them in a
`CREATE TENANT ... FROM REPLICATION OF external://name` statement.
Among other benefits, this lets users validate their postgresql
connection strings before creating the job, as creating the external
connection will ping the source DB.

Release note (enterprise change): Added support for `CREATE EXTERNAL CONNECTION ... AS "postgresql://"` or `"postgres://"`. These external connections may be specified as the source in streaming replication.
  • Loading branch information
HonoreDB committed Feb 10, 2023
1 parent 8e24570 commit e260ebe
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/cloud/externalconn",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/repstream/streampb",
Expand All @@ -23,6 +24,7 @@ go_library(
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/isql",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
Expand Down
30 changes: 28 additions & 2 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -140,7 +142,7 @@ type Subscription interface {

// NewStreamClient creates a new stream client based on the stream address.
func NewStreamClient(
ctx context.Context, streamAddress streamingccl.StreamAddress,
ctx context.Context, streamAddress streamingccl.StreamAddress, db isql.DB,
) (Client, error) {
var streamClient Client
streamURL, err := streamAddress.URL()
Expand All @@ -153,6 +155,15 @@ func NewStreamClient(
// The canonical PostgreSQL URL scheme is "postgresql", however our
// own client commands also accept "postgres".
return NewPartitionedStreamClient(ctx, streamURL)
case "external":
if db == nil {
return nil, errors.AssertionFailedf("nil db handle can't be used to dereference external URI")
}
addr, err := lookupExternalConnection(ctx, streamURL.Host, db)
if err != nil {
return nil, err
}
return NewStreamClient(ctx, addr, db)
case RandomGenScheme:
streamClient, err = newRandomStreamClient(streamURL)
if err != nil {
Expand All @@ -165,6 +176,21 @@ func NewStreamClient(
return streamClient, nil
}

func lookupExternalConnection(
ctx context.Context, name string, localDB isql.DB,
) (streamingccl.StreamAddress, error) {
// Retrieve the external connection object from the system table.
var ec externalconn.ExternalConnection
if err := localDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
ec, err = externalconn.LoadExternalConnection(ctx, name, txn)
return err
}); err != nil {
return "", errors.Wrap(err, "failed to load external connection object")
}
return streamingccl.StreamAddress(ec.ConnectionProto().UnredactedURI()), nil
}

// GetFirstActiveClient iterates through each provided stream address
// and returns the first client it's able to successfully Dial.
func GetFirstActiveClient(ctx context.Context, streamAddresses []string) (Client, error) {
Expand All @@ -174,7 +200,7 @@ func GetFirstActiveClient(ctx context.Context, streamAddresses []string) (Client
var combinedError error = nil
for _, address := range streamAddresses {
streamAddress := streamingccl.StreamAddress(address)
client, err := NewStreamClient(ctx, streamAddress)
client, err := NewStreamClient(ctx, streamAddress, nil)
if err == nil {
err = client.Dial(ctx)
if err == nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
name = "streamingest",
srcs = [
"alter_replication_job.go",
"external_connection.go",
"metrics.go",
"stream_ingest_manager.go",
"stream_ingestion_frontier_processor.go",
Expand All @@ -22,6 +23,8 @@ go_library(
"//pkg/ccl/streamingccl/replicationutils",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/utilccl",
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
Expand All @@ -34,6 +37,7 @@ go_library(
"//pkg/repstream",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
Expand Down
55 changes: 55 additions & 0 deletions pkg/ccl/streamingccl/streamingest/external_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingest

import (
"context"
"net/url"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
)

// The canonical PostgreSQL URL scheme is "postgresql", however our
// own client commands also accept "postgres".
func postgresSchemes() [2]string {
return [2]string{"postgres", "postgresql"}
}

func validatePostgresConnectionURI(
ctx context.Context, execCfg interface{}, user username.SQLUsername, uri string,
) error {
parsedURI, err := url.Parse(uri)
if err != nil {
return err
}
conn, err := streamclient.NewPartitionedStreamClient(ctx, parsedURI)
if err != nil {
return err
}
if err = conn.Dial(ctx); err != nil {
return err
}
return conn.Close(ctx)
}

func init() {
for _, scheme := range postgresSchemes() {
externalconn.RegisterConnectionDetailsFromURIFactory(
scheme,
connectionpb.ConnectionProvider_sql,
externalconn.SimpleURIFactory,
)

externalconn.RegisterDefaultValidation(scheme, validatePostgresConnectionURI)
}

}
9 changes: 4 additions & 5 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,10 @@ func connectToActiveClient(
// topology it may have been changed to a new valid address via an ALTER
// statement
}

// Without a list of addresses from existing progress we use the stream
// address from the creation statement
streamAddress := streamingccl.StreamAddress(details.StreamAddress)
client, err := streamclient.NewStreamClient(ctx, streamAddress)
client, err := streamclient.NewStreamClient(ctx, streamAddress, ingestionJob.GetInternalDB())

return client, errors.Wrapf(err, "ingestion job %d failed to connect to stream address or existing topology for planning", ingestionJob.ID())
}
Expand Down Expand Up @@ -612,11 +611,11 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
}

func (s *streamIngestionResumer) cancelProducerJob(
ctx context.Context, details jobspb.StreamIngestionDetails,
ctx context.Context, details jobspb.StreamIngestionDetails, db isql.DB,
) {
streamID := streampb.StreamID(details.StreamID)
addr := streamingccl.StreamAddress(details.StreamAddress)
client, err := streamclient.NewStreamClient(ctx, addr)
client, err := streamclient.NewStreamClient(ctx, addr, db)
if err != nil {
log.Warningf(ctx, "encountered error when creating the stream client: %v", err)
return
Expand Down Expand Up @@ -645,7 +644,7 @@ func (s *streamIngestionResumer) OnFailOrCancel(
// ingestion anymore.
jobExecCtx := execCtx.(sql.JobExecContext)
details := s.job.Details().(jobspb.StreamIngestionDetails)
s.cancelProducerJob(ctx, details)
s.cancelProducerJob(ctx, details, jobExecCtx.ExecCfg().InternalDB)

execCfg := jobExecCtx.ExecCfg()
return execCfg.InternalDB.Txn(ctx, func(
Expand Down
15 changes: 13 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,13 @@ SET enable_experimental_stream_replication = true;
sourceSQL.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&startTime)

destSQL.Exec(t,
`CREATE TENANT "destination-tenant" FROM REPLICATION OF "source-tenant" ON $1 `,
pgURL.String(),
fmt.Sprintf(`CREATE EXTERNAL CONNECTION "replication-source-addr" AS "%s"`,
pgURL.String()),
)

destSQL.Exec(t,
`CREATE TENANT "destination-tenant" FROM REPLICATION OF "source-tenant" ON $1`,
"external://replication-source-addr",
)
streamProducerJobID, ingestionJobID := replicationtestutils.GetStreamJobIds(t, ctx, destSQL, "destination-tenant")

Expand Down Expand Up @@ -203,6 +208,12 @@ func TestTenantStreamingCreationErrors(t *testing.T) {
DestSysSQL.Exec(t, "CREATE TENANT \"100\"")
DestSysSQL.ExpectErr(t, "pq: tenant with name \"100\" already exists",
`CREATE TENANT "100" FROM REPLICATION OF source ON $1`, srcPgURL.String())

badPgURL := srcPgURL
badPgURL.Host = "nonexistent_test_endpoint"
DestSysSQL.ExpectErr(t, "pq: failed to construct External Connection details: failed to connect",
fmt.Sprintf(`CREATE EXTERNAL CONNECTION "replication-source-addr" AS "%s"`,
badPgURL.String()))
}

func TestCutoverBuiltin(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func ingestionPlanHook(
}

// Create a new stream with stream client.
client, err := streamclient.NewStreamClient(ctx, streamAddress)
client, err := streamclient.NewStreamClient(ctx, streamAddress, p.ExecCfg().InternalDB)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
streamClient = sip.forceClientForTests
log.Infof(ctx, "using testing client")
} else {
streamClient, err = streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(addr))
streamClient, err = streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(addr), db)
if err != nil {
sip.MoveToDraining(errors.Wrapf(err, "creating client for partition spec %q from %q", token, addr))
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func TestRandomClientGeneration(t *testing.T) {
streamAddr := getTestRandomClientURI(tenantID, tenantName)

// The random client returns system and table data partitions.
streamClient, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(streamAddr))
streamClient, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(streamAddr), nil)
require.NoError(t, err)

randomStreamClient, ok := streamClient.(*streamclient.RandomStreamClient)
Expand Down
4 changes: 3 additions & 1 deletion pkg/cloud/externalconn/connectionpb/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ func (d *ConnectionDetails) Type() ConnectionType {
return TypeStorage
case ConnectionProvider_gcp_kms, ConnectionProvider_aws_kms:
return TypeKMS
case ConnectionProvider_kafka, ConnectionProvider_http, ConnectionProvider_https, ConnectionProvider_sql,
case ConnectionProvider_kafka, ConnectionProvider_http, ConnectionProvider_https,
ConnectionProvider_webhookhttp, ConnectionProvider_webhookhttps, ConnectionProvider_gcpubsub:
// Changefeed sink providers are TypeStorage for now because they overlap with backup storage providers.
return TypeStorage
case ConnectionProvider_sql:
return TypeForeignData
default:
panic(errors.AssertionFailedf("ConnectionDetails.Type called on a details with an unknown type: %s", d.Provider.String()))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/externalconn/connectionpb/connection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum ConnectionType {
UNSPECIFIED = 0 [(gogoproto.enumvalue_customname) = "TypeUnspecified"];
STORAGE = 1 [(gogoproto.enumvalue_customname) = "TypeStorage"];
KMS = 2 [(gogoproto.enumvalue_customname) = "TypeKMS"];
FOREIGNDATA = 3 [(gogoproto.enumvalue_customname) = "TypeForeignData"];
}

// SimpleURI encapsulates the information that represents an External Connection
Expand Down

0 comments on commit e260ebe

Please sign in to comment.