Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Add query parameter for specifying certificates for schema registries #65431

Merged
merged 1 commit into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"sink_kafka.go",
"sink_sql.go",
"testing_knobs.go",
"tls.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl",
visibility = ["//visibility:public"],
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"nemeses.go",
"schema_registry.go",
"testfeed.go",
"tls_util.go",
"validator.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest",
Expand All @@ -21,6 +22,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_linkedin_goavro_v2//:goavro",
],
Expand Down
16 changes: 16 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package cdctest

import (
"crypto/tls"
"encoding/binary"
"encoding/json"
"net/http"
Expand Down Expand Up @@ -41,6 +42,21 @@ func MakeTestSchemaRegistry() *SchemaRegistry {
return r
}

// MakeTestSchemaRegistryWithTLS creates and starts schema registry for tests with TLS enabled.
func MakeTestSchemaRegistryWithTLS(certificate *tls.Certificate) (*SchemaRegistry, error) {
r := &SchemaRegistry{}
r.mu.schemas = make(map[int32]string)
r.mu.subjects = make(map[string]int32)
r.server = httptest.NewUnstartedServer(http.HandlerFunc(r.register))
if certificate != nil {
r.server.TLS = &tls.Config{
Certificates: []tls.Certificate{*certificate},
}
}
r.server.StartTLS()
return r, nil
}

// Close closes this schema registry.
func (r *SchemaRegistry) Close() {
r.server.Close()
Expand Down
91 changes: 91 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/tls_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdctest

import (
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"math/big"
"net"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

const certLifetime = 30 * 24 * time.Hour

// EncodeBase64ToString stores the base64 encoding of src in dest
func EncodeBase64ToString(src []byte, dest *string) {
if src != nil {
encoded := base64.StdEncoding.EncodeToString(src)
*dest = encoded
}
}

// GenerateCACert generates a new self-signed CA cert using priv
func GenerateCACert(priv *rsa.PrivateKey) ([]byte, *x509.Certificate, error) {
serial, err := randomSerial()
if err != nil {
return nil, nil, err
}

certSpec := &x509.Certificate{
SerialNumber: serial,
Subject: pkix.Name{
Country: []string{"US"},
Organization: []string{"Cockroach Labs"},
OrganizationalUnit: []string{"Engineering"},
CommonName: "Roachtest Temporary Insecure CA",
},
NotBefore: timeutil.Now(),
NotAfter: timeutil.Now().Add(certLifetime),
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign,
IsCA: true,
BasicConstraintsValid: true,
MaxPathLenZero: true,
IPAddresses: []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")},
}
cert, err := x509.CreateCertificate(rand.Reader, certSpec, certSpec, &priv.PublicKey, priv)
return cert, certSpec, err
}

func pemEncode(dataType string, data []byte) (string, error) {
ret := new(strings.Builder)
err := pem.Encode(ret, &pem.Block{Type: dataType, Bytes: data})
if err != nil {
return "", err
}

return ret.String(), nil
}

// PemEncodePrivateKey encodes key in PEM format
func PemEncodePrivateKey(key *rsa.PrivateKey) (string, error) {
return pemEncode("RSA PRIVATE KEY", x509.MarshalPKCS1PrivateKey(key))
}

// PemEncodeCert encodes cert in PEM format
func PemEncodeCert(cert []byte) (string, error) {
return pemEncode("CERTIFICATE", cert)
}

func randomSerial() (*big.Int, error) {
limit := new(big.Int).Lsh(big.NewInt(1), 128)
ret, err := rand.Int(rand.Reader, limit)
if err != nil {
return nil, errors.Wrap(err, "generate random serial")
}
return ret, nil
}
12 changes: 12 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2550,6 +2550,18 @@ func TestChangefeedErrors(t *testing.T) {
t, `cannot specify both initial_scan and no_initial_scan`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH no_initial_scan, initial_scan`, `kafka://nope`,
)

// Sanity check schema registry tls parameters.
sqlDB.ExpectErr(
t, `param ca_cert must be base 64 encoded`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro', confluent_schema_registry=$2`,
`kafka://nope`, `https://schemareg-nope/?ca_cert=!`,
)
sqlDB.ExpectErr(
t, `failed to parse certificate data`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro', confluent_schema_registry=$2`,
`kafka://nope`, `https://schemareg-nope/?ca_cert=Zm9v`,
)
}

func TestChangefeedDescription(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ const (
SinkParamSASLUser = `sasl_user`
SinkParamSASLPassword = `sasl_password`
SinkParamSASLMechanism = `sasl_mechanism`

RegistryParamCACert = `ca_cert`
)

// ChangefeedOptionExpectValues is used to parse changefeed options using
Expand Down
53 changes: 52 additions & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ type confluentAvroEncoder struct {
keyCache map[tableIDAndVersion]confluentRegisteredKeySchema
valueCache map[tableIDAndVersionPair]confluentRegisteredEnvelopeSchema
resolvedCache map[string]confluentRegisteredEnvelopeSchema

httpClient *httputil.Client
}

type tableIDAndVersion uint64
Expand All @@ -308,6 +310,21 @@ func newConfluentAvroEncoder(
) (*confluentAvroEncoder, error) {
e := &confluentAvroEncoder{registryURL: opts[changefeedbase.OptConfluentSchemaRegistry]}

err := e.setupHTTPClient()
if err != nil {
return nil, err
}

regURL, err := url.Parse(e.registryURL)
if err != nil {
return nil, err
}
// remove query param to ensure compatibility with schema registry implementation
query := regURL.Query()
query.Del(changefeedbase.RegistryParamCACert)
regURL.RawQuery = query.Encode()
e.registryURL = regURL.String()

e.schemaPrefix = opts[changefeedbase.OptAvroSchemaPrefix]
e.targets = targets

Expand Down Expand Up @@ -345,6 +362,40 @@ func newConfluentAvroEncoder(
return e, nil
}

// Setup the httputil.Client to use when dialing Confluent schema registry. If `ca_cert`
// is set as a query param in the registry URL, client should trust the corresponding
// cert while dialing. Otherwise, use the DefaultClient.
func (e *confluentAvroEncoder) setupHTTPClient() error {
regURL, err := url.Parse(e.registryURL)
if err != nil {
return errors.Wrapf(err, "failed parsing registry url:% s", e.registryURL)
}
params := regURL.Query()

var caCert []byte

if caCertString := params.Get(changefeedbase.RegistryParamCACert); caCertString != "" {
err := decodeBase64FromString(caCertString, &caCert)
if err != nil {
return errors.Wrapf(err, "param %s must be base 64 encoded", changefeedbase.RegistryParamCACert)
}
}

if caCert != nil {
var err error
e.httpClient, err = newClientFromTLSKeyPair(caCert)
if regURL.Scheme == "http" {
log.Warningf(context.Background(), "CA certificate provided but schema registry %s uses HTTP", e.registryURL)
}
if err != nil {
return err
}
} else {
e.httpClient = httputil.DefaultClient
}
return nil
}

// Get the raw SQL-formatted string for a table name
// and apply full_table_name and avro_schema_prefix options
func (e *confluentAvroEncoder) rawTableName(desc catalog.TableDescriptor) string {
Expand Down Expand Up @@ -529,7 +580,7 @@ func (e *confluentAvroEncoder) register(
// actionable issues in the operator's environment that which they might be
// able to resolve if we made them visible in a failure instead.
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), 3, func() error {
resp, err := httputil.Post(ctx, url.String(), confluentSchemaContentType, &buf)
resp, err := e.httpClient.Post(ctx, url.String(), confluentSchemaContentType, &buf)
if err != nil {
return errors.Wrap(err, "contacting confluent schema registry")
}
Expand Down
Loading