-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: Add query parameter for specifying certificates for schema registries #65431
Conversation
854781a
to
a2be55d
Compare
3532899
to
295c680
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @spiffyyeng)
a discussion (no related file):
Thanks for working on this!
I didn't do a line-by-line review yet, but I wanted to get some of my larger concerns to you for consideration. Happy to discuss any of them further!
pkg/ccl/changefeedccl/encoder.go, line 374 at r1 (raw file):
return errors.Wrapf(err, `param %s must be a bool`, changefeedbase.SinkParamRegistryTLSEnabled) } }
Over in sink_kafka there is a consumeBool helper func, perhaps we can promote that to a method on the sinkURL struct itself? However, in my other comment I argue for removing this option altogether so perhaps that isn't necessary.
pkg/ccl/changefeedccl/encoder.go, line 389 at r1 (raw file):
} } else if caCert != nil { return errors.Errorf(`%s requires %s=true`, changefeedbase.SinkParamRegistryCACert, changefeedbase.SinkParamRegistryTLSEnabled)
Can we avoid requiring the tls_enabled flag here. We have that option for kafka because there is know way to know from the schema whether it expects TLS or not. But for the schema registry, whether or not to use TLS is clear from the schema being used (http vs https).
If we went the route of not requiring the tls_enabled flag, it might be nice to warn the user if they've specified a ca with an http sink.
pkg/ccl/changefeedccl/tls.go, line 76 at r1 (raw file):
} client := &httputil.Client{
This is used as an alternative to the DefaultClient. The default client additionally sets some timeouts and disables keepalives. It might be good to avoid having different behaviors between the client. We could do that by calling NewClientWithTimeout here and then mutating the returned struct.
pkg/ccl/changefeedccl/tls.go, line 139 at r1 (raw file):
} return ret, nil }
We've got some of these TLS-related functions in a few places now. Perhaps as a follow-up we could work on a nice util package where we could share them. Not necessary in this PR though.
pkg/ccl/changefeedccl/changefeedbase/options.go, line 95 at r1 (raw file):
SinkParamFileSize = `file_size` SinkParamRegistryCACert = `registry_ca_cert` SinkParamRegistryTLSEnabled = `registry_tls_enabled`
I feel like it is a bit odd for these to be URL parameters to the kafka parameters while the registry URL itself is an option. Perhaps these should be options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 11 files at r1.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @spiffyyeng and @stevendanna)
pkg/ccl/changefeedccl/changefeed_processors.go, line 165 at r1 (raw file):
} if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, ca.spec.Feed.Targets, &sinkURL{URL: parsedSink}); err != nil {
I'm a bit confused why we need to pass SinkURI to create an encoder. It appears that the schema registry cert params could have been specified as part of the confluent_schema_registry
option (which has to be specified anyway)? Why do we want to add these schema specific params to SinkURI?
pkg/ccl/changefeedccl/encoder.go, line 374 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Over in sink_kafka there is a consumeBool helper func, perhaps we can promote that to a method on the sinkURL struct itself? However, in my other comment I argue for removing this option altogether so perhaps that isn't necessary.
Also, I'd love to pull out some of the tls option parsing and setup into some helper methods.
pkg/ccl/changefeedccl/tls.go, line 76 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
This is used as an alternative to the DefaultClient. The default client additionally sets some timeouts and disables keepalives. It might be good to avoid having different behaviors between the client. We could do that by calling NewClientWithTimeout here and then mutating the returned struct.
Just a note to self: I assume we handle timeouts in http/https talking to registry much the same way we'd handle sink errors -- i.e. by retrying the whole changefeed.
pkg/ccl/changefeedccl/tls.go, line 88 at r1 (raw file):
} func generateCACert(priv *rsa.PrivateKey) ([]byte, *x509.Certificate, error) {
This looks like a test function -- it doesn't belong here, actually, majority of these methods).
You might consider moving them to tls_test.go, or even tls.go under cdctest package.
Both of those things make sense to me - definitely don't think we should add the tls flag if it isn't needed. We'll just have to be careful of how we doc it- warning is also a good idea.
Two thoughts for making it a query param: 1) ca_cert is specified as a query param 2) Normally the pattern is sink-specific stuff goes in as a query param, which was kind of the thinking here. Per our discussion earlier, confluent schema registry doesn't necessarily need to be used for kafka, so maybe that's actually a reason to use an option instead.
What would specifying it this way look like? |
@amruss |
295c680
to
c3e1d02
Compare
@miretskiy @amruss @stevendanna okay, I've changed it so that the registry schema URL has a query param |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This , but please wait for @stevendanna to also chime in.
Reviewed 7 of 13 files at r2.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @miretskiy, @spiffyyeng, and @stevendanna)
pkg/ccl/changefeedccl/encoder_test.go, line 381 at r1 (raw file):
} t.Run("format=experimental_avro,envelope=key_only", func(t *testing.T) {
if you feel adventurous, you can end and end to end test to changefeed_stmt, where you can try running kafkaFeed (you'd need to rebase on master to get kafkaFeed for tests). Also, you'd probably need to change kafkaFeed implementation a bit to create registry w/ tls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more comments. Overall this is moving in the right direction I think. Let me know if you want to talk synchronously about any of them.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @miretskiy, @spiffyyeng, and @stevendanna)
pkg/ccl/changefeedccl/encoder.go, line 287 at r2 (raw file):
resolvedCache map[string]confluentRegisteredEnvelopeSchema httpClient *httputil.Client
🤔 [just thinking out loud] In the general casehttp.Client
has a CloseIdleConnections method that we might want to call when we no longer need a http.Client that isn't going to live for the lifetime of the program. However, since our httputil.Client sets DisableKeepalives, I don't think we have to call it since by default our connections will only be used for a single request.
pkg/ccl/changefeedccl/encoder.go, line 361 at r2 (raw file):
regURL, err := url.Parse(e.registryURL) if err != nil { return errors.Errorf("failed parsing registry url:%s due to error:%v", e.registryURL, err)
When wrapping an error like this, there are some functions you can call specifically for wrapping that you might want to prefer:
errors.Wrapf(err, "failed parsing registry url %q", e.registryURL)
pkg/ccl/changefeedccl/encoder.go, line 367 at r2 (raw file):
var caCert []byte if caCertString := params.Get(changefeedbase.RegistryParamCACert); caCertString != "" {
I agree that ca_cert as a parameter to the registry URL feels better than as a parameter to the kafka URL, but I think this will pose a problem we need to address:
We are leaving the ca_cert query parameter on the URL. Are we sure that Confluent's schema registry implementation will allow this? Our test schema registry is very accepting of all requests, so my guess is if we only tested against the test implementation we won't see a problem.
Conversely, since this is an HTTP API we are talking to, perhaps some other schema registry implementation requires query parameters that have nothing to do with us but conflicts with parameters we think we control? How do we know if the query parameter we are using is actually safe to remove?
For now, we could punt on the harder version of this question and say that we control the ca_cert parameter and that we don't send it onto the schema registry if it is provided.
pkg/ccl/changefeedccl/encoder.go, line 377 at r2 (raw file):
var err error e.httpClient, err = newClientFromTLSKeyPair(caCert) if strings.HasPrefix(e.registryURL, "http://") {
The parsed URL you constructed above should have a Scheme
member that you can compare directly against.
pkg/ccl/changefeedccl/encoder.go, line 378 at r2 (raw file):
e.httpClient, err = newClientFromTLSKeyPair(caCert) if strings.HasPrefix(e.registryURL, "http://") { log.Warningf(context.Background(), "attempting to use CA cert for schema registry:%s with HTTP scheme", e.registryURL)
Perhaps
log.Warningf(context.Background(), "CA certificate provided but schema registry %q uses HTTP", e.registryURL)
pkg/ccl/changefeedccl/encoder_test.go, line 321 at r2 (raw file):
} func NewCACertBase64Encoded() (*tls.Certificate, string, error) {
I don't think we need to export this
pkg/ccl/changefeedccl/tls.go, line 54 at r2 (raw file):
if !rootCAs.AppendCertsFromPEM(caCert) { return nil, errors.Errorf("failed to parse certificate data:%s", string(caCert)) }
Not to self, looks like AppedCertsFromPem deals with len(caCerts) == 0 without returning an error, which is likely what we want.
pkg/ccl/changefeedccl/tls.go, line 65 at r2 (raw file):
clientWrapper := httputil.NewClientWithTimeout(httputil.StandardHTTPTimeout) clientWrapper.Client = client
I don't think this is quite what you want since it is going to replace the entire client, which will still have the effect of constructing a client with different defaults.
pkg/util/httputil/client.go, line 25 at r2 (raw file):
var DefaultClient = NewClientWithTimeout(StandardHTTPTimeout) // StandardHTTPTimeout is the default timeeout to use for HTTP connections.
s/timeeout/timeout/
c3e1d02
to
7bb2cc7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @miretskiy and @spiffyyeng)
a discussion (no related file):
Nice work!
I've left a few comments, but none are blocking, so take them or leave them.
As a follow-up, we might want to add something the the cdc/schemareg roachtest to make sure this is tested against the confluent schema registry occasionally.
Part of me still feels that an options-based way to pass the certs might be slightly cleaner, but this is more consistent.
pkg/ccl/changefeedccl/encoder.go, line 371 at r3 (raw file):
regURL, err := url.Parse(e.registryURL) if err != nil { return errors.Wrapf(err, "failed parsing registry url:%s", e.registryURL)
[nit] I think in most places we would add a space after the :
return errors.Wrapf(err, "failed parsing registry url: %s", e.registryURL)
pkg/ccl/changefeedccl/encoder.go, line 380 at r3 (raw file):
err := decodeBase64FromString(caCertString, &caCert) if err != nil { return errors.Wrapf(err, `param %s must be base 64 encoded`, changefeedbase.RegistryParamCACert)
[nit] We use a raw string here (backquotes) but double-quoted strings in the rest of this function.
pkg/ccl/changefeedccl/tls.go, line 76 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Just a note to self: I assume we handle timeouts in http/https talking to registry much the same way we'd handle sink errors -- i.e. by retrying the whole changefeed.
Yes, we mark the errors from the schema registry POST as Retryable. We have 3 retries around that POST so it won't immediately retry the whole changefeed. It is worth noting that we should only be contacting the registry at startup and on schema changes.
pkg/ccl/changefeedccl/tls.go, line 61 at r3 (raw file):
client := httputil.NewClientWithTimeout(httputil.StandardHTTPTimeout) transport := client.Client.Transport.(*http.Transport)
If we wanted to be paranoid about NewClientWithTimeout changing such that this would panic, I suppose we could do something like:
if transport, ok := client.Client.Transport.(*http.Transport); ok {
transport.TLSClientConfig = tlsConfig
client.Client.Transport = transport
} else {
client.Client.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}
But, we also have tests that will blow up if the type ever changes, so I'm cool with it as-is.
Interestingly, it looks like there aren't many callers of NewClientWithTimeout. I feel like in the long run, that httputil API isn't quite right as what you usually want to get is a Transport with all the default configuration and not a Client directly. But, let's tackle that some other day.
pkg/ccl/changefeedccl/tls.go, line 63 at r3 (raw file):
transport := client.Client.Transport.(*http.Transport) transport.TLSClientConfig = tlsConfig client.Client.Transport = transport
Just as an FYI, I don't think you need to write this back because you are modifying it through the reference above. But, it doesn't hurt anything and is perhaps a bit more clear.
bors r+ |
65431: changefeedccl: Add query parameter for specifying certificates for schema registries r=spiffyyeng a=spiffyyeng changefeedccl: Add query parameter for specifying certificates for schema registries Added query params to schema registry URL to indicate custom CA certificates to trust while dialing Confluent schema registries. Resolves #64622 Release note (enterprise change): Added ca_cert as a query param to Confluent registry schema URL to trust custom certs on connection. Co-authored-by: Ryan Min <ryanmin42@gmail.com>
Build failed: |
7bb2cc7
to
da38a42
Compare
da38a42
to
7c870ac
Compare
…hema registries Added query params to schema registry URL to indicate custom CA certificates to trust while dialing Confluent schema registries. Resolves cockroachdb#64622 Release note (enterprise change): Added ca_cert as a query param to Confluent registry schema URL to trust custom certs on connection.
7c870ac
to
a44ba8d
Compare
bors r+ |
Build succeeded: |
changefeedccl: Add query parameter for specifying certificates for schema registries
Added query params to schema registry URL to indicate custom CA
certificates to trust while dialing Confluent schema registries.
Resolves #64622
Release note (enterprise change): Added ca_cert as a query param
to Confluent registry schema URL to trust custom certs on connection.