Skip to content

Commit

Permalink
Merge #65431
Browse files Browse the repository at this point in the history
65431: changefeedccl: Add query parameter for specifying certificates for schema registries r=spiffyyeng a=spiffyyeng

changefeedccl: Add query parameter for specifying certificates for schema registries

Added query params to schema registry  URL to indicate custom CA
certificates to trust while dialing Confluent schema registries.

Resolves #64622

Release note (enterprise change): Added ca_cert as a query param
to Confluent registry schema URL to trust custom certs on connection.

Co-authored-by: Ryan Min <ryanmin42@gmail.com>
  • Loading branch information
craig[bot] and spiffyy99 committed May 24, 2021
2 parents 3eeb35f + 7bb2cc7 commit d77738d
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 13 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"sink_kafka.go",
"sink_sql.go",
"testing_knobs.go",
"tls.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl",
visibility = ["//visibility:public"],
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"nemeses.go",
"schema_registry.go",
"testfeed.go",
"tls_util.go",
"validator.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest",
Expand Down
16 changes: 16 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package cdctest

import (
"crypto/tls"
"encoding/binary"
"encoding/json"
"net/http"
Expand Down Expand Up @@ -41,6 +42,21 @@ func MakeTestSchemaRegistry() *SchemaRegistry {
return r
}

// MakeTestSchemaRegistryWithTLS creates and starts schema registry for tests with TLS enabled.
func MakeTestSchemaRegistryWithTLS(certificate *tls.Certificate) (*SchemaRegistry, error) {
r := &SchemaRegistry{}
r.mu.schemas = make(map[int32]string)
r.mu.subjects = make(map[string]int32)
r.server = httptest.NewUnstartedServer(http.HandlerFunc(r.register))
if certificate != nil {
r.server.TLS = &tls.Config{
Certificates: []tls.Certificate{*certificate},
}
}
r.server.StartTLS()
return r, nil
}

// Close closes this schema registry.
func (r *SchemaRegistry) Close() {
r.server.Close()
Expand Down
91 changes: 91 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/tls_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdctest

import (
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"math/big"
"net"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

const certLifetime = 30 * 24 * time.Hour

// EncodeBase64ToString stores the base64 encoding of src in dest
func EncodeBase64ToString(src []byte, dest *string) {
if src != nil {
encoded := base64.StdEncoding.EncodeToString(src)
*dest = encoded
}
}

// GenerateCACert generates a new self-signed CA cert using priv
func GenerateCACert(priv *rsa.PrivateKey) ([]byte, *x509.Certificate, error) {
serial, err := randomSerial()
if err != nil {
return nil, nil, err
}

certSpec := &x509.Certificate{
SerialNumber: serial,
Subject: pkix.Name{
Country: []string{"US"},
Organization: []string{"Cockroach Labs"},
OrganizationalUnit: []string{"Engineering"},
CommonName: "Roachtest Temporary Insecure CA",
},
NotBefore: timeutil.Now(),
NotAfter: timeutil.Now().Add(certLifetime),
KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign,
IsCA: true,
BasicConstraintsValid: true,
MaxPathLenZero: true,
IPAddresses: []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")},
}
cert, err := x509.CreateCertificate(rand.Reader, certSpec, certSpec, &priv.PublicKey, priv)
return cert, certSpec, err
}

func pemEncode(dataType string, data []byte) (string, error) {
ret := new(strings.Builder)
err := pem.Encode(ret, &pem.Block{Type: dataType, Bytes: data})
if err != nil {
return "", err
}

return ret.String(), nil
}

// PemEncodePrivateKey encodes key in PEM format
func PemEncodePrivateKey(key *rsa.PrivateKey) (string, error) {
return pemEncode("RSA PRIVATE KEY", x509.MarshalPKCS1PrivateKey(key))
}

// PemEncodeCert encodes cert in PEM format
func PemEncodeCert(cert []byte) (string, error) {
return pemEncode("CERTIFICATE", cert)
}

func randomSerial() (*big.Int, error) {
limit := new(big.Int).Lsh(big.NewInt(1), 128)
ret, err := rand.Int(rand.Reader, limit)
if err != nil {
return nil, errors.Wrap(err, "generate random serial")
}
return ret, nil
}
12 changes: 12 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2550,6 +2550,18 @@ func TestChangefeedErrors(t *testing.T) {
t, `cannot specify both initial_scan and no_initial_scan`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH no_initial_scan, initial_scan`, `kafka://nope`,
)

// Sanity check schema registry tls parameters.
sqlDB.ExpectErr(
t, `param ca_cert must be base 64 encoded`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro', confluent_schema_registry=$2`,
`kafka://nope`, `https://schemareg-nope/?ca_cert=!`,
)
sqlDB.ExpectErr(
t, `failed to parse certificate data`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro', confluent_schema_registry=$2`,
`kafka://nope`, `https://schemareg-nope/?ca_cert=Zm9v`,
)
}

func TestChangefeedDescription(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ const (
SinkParamSASLUser = `sasl_user`
SinkParamSASLPassword = `sasl_password`
SinkParamSASLMechanism = `sasl_mechanism`

RegistryParamCACert = `ca_cert`
)

// ChangefeedOptionExpectValues is used to parse changefeed options using
Expand Down
53 changes: 52 additions & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ type confluentAvroEncoder struct {
keyCache map[tableIDAndVersion]confluentRegisteredKeySchema
valueCache map[tableIDAndVersionPair]confluentRegisteredEnvelopeSchema
resolvedCache map[string]confluentRegisteredEnvelopeSchema

httpClient *httputil.Client
}

type tableIDAndVersion uint64
Expand All @@ -308,6 +310,21 @@ func newConfluentAvroEncoder(
) (*confluentAvroEncoder, error) {
e := &confluentAvroEncoder{registryURL: opts[changefeedbase.OptConfluentSchemaRegistry]}

err := e.setupHTTPClient()
if err != nil {
return nil, err
}

regURL, err := url.Parse(e.registryURL)
if err != nil {
return nil, err
}
// remove query param to ensure compatibility with schema registry implementation
query := regURL.Query()
query.Del(changefeedbase.RegistryParamCACert)
regURL.RawQuery = query.Encode()
e.registryURL = regURL.String()

e.schemaPrefix = opts[changefeedbase.OptAvroSchemaPrefix]
e.targets = targets

Expand Down Expand Up @@ -345,6 +362,40 @@ func newConfluentAvroEncoder(
return e, nil
}

// Setup the httputil.Client to use when dialing Confluent schema registry. If `ca_cert`
// is set as a query param in the registry URL, client should trust the corresponding
// cert while dialing. Otherwise, use the DefaultClient.
func (e *confluentAvroEncoder) setupHTTPClient() error {
regURL, err := url.Parse(e.registryURL)
if err != nil {
return errors.Wrapf(err, "failed parsing registry url:%s", e.registryURL)
}
params := regURL.Query()

var caCert []byte

if caCertString := params.Get(changefeedbase.RegistryParamCACert); caCertString != "" {
err := decodeBase64FromString(caCertString, &caCert)
if err != nil {
return errors.Wrapf(err, `param %s must be base 64 encoded`, changefeedbase.RegistryParamCACert)
}
}

if caCert != nil {
var err error
e.httpClient, err = newClientFromTLSKeyPair(caCert)
if regURL.Scheme == "http" {
log.Warningf(context.Background(), "CA certificate provided but schema registry %s uses HTTP", e.registryURL)
}
if err != nil {
return err
}
} else {
e.httpClient = httputil.DefaultClient
}
return nil
}

// Get the raw SQL-formatted string for a table name
// and apply full_table_name and avro_schema_prefix options
func (e *confluentAvroEncoder) rawTableName(desc catalog.TableDescriptor) string {
Expand Down Expand Up @@ -529,7 +580,7 @@ func (e *confluentAvroEncoder) register(
// actionable issues in the operator's environment that which they might be
// able to resolve if we made them visible in a failure instead.
if err := retry.WithMaxAttempts(ctx, base.DefaultRetryOptions(), 3, func() error {
resp, err := httputil.Post(ctx, url.String(), confluentSchemaContentType, &buf)
resp, err := e.httpClient.Post(ctx, url.String(), confluentSchemaContentType, &buf)
if err != nil {
return errors.Wrap(err, "contacting confluent schema registry")
}
Expand Down
Loading

0 comments on commit d77738d

Please sign in to comment.