Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: warn when using https sink urls #107572

Merged
merged 2 commits into from Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Expand Up @@ -904,6 +904,25 @@ func validateSink(
if err != nil {
return err
}
u, err := url.Parse(details.SinkURI)
if err != nil {
return err
}

ambiguousSchemes := map[string][2]string{
changefeedbase.DeprecatedSinkSchemeHTTP: {changefeedbase.SinkSchemeCloudStorageHTTP, changefeedbase.SinkSchemeWebhookHTTP},
changefeedbase.DeprecatedSinkSchemeHTTPS: {changefeedbase.SinkSchemeCloudStorageHTTPS, changefeedbase.SinkSchemeWebhookHTTPS},
}

if disambiguations, isAmbiguous := ambiguousSchemes[u.Scheme]; isAmbiguous {
p.BufferClientNotice(ctx, pgnotice.Newf(
`Interpreting deprecated URI scheme %s as %s. For webhook semantics, use %s.`,
u.Scheme,
disambiguations[0],
disambiguations[1],
))
}

var nilOracle timestampLowerBoundOracle
canarySink, err := getAndDialSink(ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details,
nilOracle, p.User(), jobID, sli)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Expand Up @@ -4821,7 +4821,7 @@ func TestChangefeedErrors(t *testing.T) {
`CREATE CHANGEFEED FOR foo INTO $1`, `webhook-https://fake-host?ca_cert=Zm9v`,
)
sqlDB.ExpectErr(
t, `sink requires https`,
t, `sink requires webhook-https`,
`CREATE CHANGEFEED FOR foo INTO $1`, `webhook-http://fake-host`,
)
sqlDB.ExpectErr(
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Expand Up @@ -162,6 +162,11 @@ const (
DeprecatedSinkSchemeCloudStorageNodelocal = `experimental-nodelocal`
DeprecatedSinkSchemeCloudStorageS3 = `experimental-s3`

// DeprecatedSinkSchemeHTTP is interpreted as cloudstorage over HTTP PUT.
DeprecatedSinkSchemeHTTP = `http`
// DeprecatedSinkSchemeHTTPS is interpreted as cloudstorage over HTTPS PUT.
DeprecatedSinkSchemeHTTPS = `https`

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`
OptPubsubSinkConfig = `pubsub_sink_config`
Expand All @@ -186,13 +191,11 @@ const (
SinkParamTopicName = `topic_name`
SinkSchemeCloudStorageAzure = `azure`
SinkSchemeCloudStorageGCS = `gs`
SinkSchemeCloudStorageHTTP = `http`
SinkSchemeCloudStorageHTTPS = `https`
SinkSchemeCloudStorageHTTP = `file-http`
SinkSchemeCloudStorageHTTPS = `file-https`
SinkSchemeCloudStorageNodelocal = `nodelocal`
SinkSchemeCloudStorageS3 = `s3`
SinkSchemeExperimentalSQL = `experimental-sql`
SinkSchemeHTTP = `http`
SinkSchemeHTTPS = `https`
SinkSchemeKafka = `kafka`
SinkSchemeNull = `null`
SinkSchemeWebhookHTTP = `webhook-http`
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Expand Up @@ -49,6 +49,10 @@ func isCloudStorageSink(u *url.URL) bool {
changefeedbase.SinkSchemeCloudStorageNodelocal, changefeedbase.SinkSchemeCloudStorageHTTP,
changefeedbase.SinkSchemeCloudStorageHTTPS, changefeedbase.SinkSchemeCloudStorageAzure:
return true
// During the deprecation period, we need to keep parsing these as cloudstorage for backwards
// compatibility. Afterwards we'll either remove them or move them to webhook.
case changefeedbase.DeprecatedSinkSchemeHTTP, changefeedbase.DeprecatedSinkSchemeHTTPS:
return true
default:
return false
}
Expand Down Expand Up @@ -375,6 +379,7 @@ func makeCloudStorageSink(
}
}
u.Scheme = strings.TrimPrefix(u.Scheme, `experimental-`)
u.Scheme = strings.TrimPrefix(u.Scheme, `file-`)

sinkID := atomic.AddInt64(&cloudStorageSinkIDAtomic, 1)
sessID, err := generateChangefeedSessionID()
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/sink_external_connection.go
Expand Up @@ -92,6 +92,8 @@ var supportedExternalConnectionTypes = map[string]connectionpb.ConnectionProvide
GcpScheme: connectionpb.ConnectionProvider_gcpubsub,
changefeedbase.SinkSchemeCloudStorageHTTP: connectionpb.ConnectionProvider_http,
changefeedbase.SinkSchemeCloudStorageHTTPS: connectionpb.ConnectionProvider_https,
changefeedbase.DeprecatedSinkSchemeHTTP: connectionpb.ConnectionProvider_http,
changefeedbase.DeprecatedSinkSchemeHTTPS: connectionpb.ConnectionProvider_https,
changefeedbase.SinkSchemeCloudStorageNodelocal: connectionpb.ConnectionProvider_nodelocal,
changefeedbase.SinkSchemeCloudStorageS3: connectionpb.ConnectionProvider_s3,
changefeedbase.SinkSchemeKafka: connectionpb.ConnectionProvider_kafka,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_webhook.go
Expand Up @@ -233,7 +233,7 @@ func makeDeprecatedWebhookSink(
mb metricsRecorderBuilder,
) (Sink, error) {
if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
return nil, errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
return nil, errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeWebhookHTTPS)
}
u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_webhook_v2.go
Expand Up @@ -229,7 +229,7 @@ func validateWebhookOpts(
u sinkURL, encodingOpts changefeedbase.EncodingOptions, opts changefeedbase.WebhookSinkOptions,
) error {
if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeWebhookHTTPS)
}

switch encodingOpts.Format {
Expand Down