New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to configure SSL truststore and keystore for SR Clients #957
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, @xvrl ?
@@ -23,4 +23,51 @@ | |||
"schema.registry.basic.auth.user.info"; | |||
public static final String USER_INFO_CONFIG = | |||
"basic.auth.user.info"; | |||
|
|||
public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way we can avoid duplicating those configs and somehow reuse existing configdefs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing config defs were only on the server side. Moreover, on the client side, the configs need namespacing as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract the configs used across clients and server into a single utility with various methods for adding config sets to a ConfigDef?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mageshn, I used AK security configs client side in Elastic connector: confluentinc/kafka-connect-elasticsearch#278
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why config-def from rest-utils precludes using AK configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mageshn Any update? Can you fix the conflict as well?
any update? |
@mageshn, the ES Connector also is an SSL Client, and I think this strategy of code re-use works really well. WDYT? |
Can some one please take look at it and merge it ? If its just waiting on conflict resolution I can take it up? |
Would you check it? I want to use this feature. |
@yohei1126 I can...But I need some support of commiters who can merge my code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments so far.
super(config, props); | ||
Importance.MEDIUM, VALUE_SUBJECT_NAME_STRATEGY_DOC) | ||
.define( | ||
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_CONFIG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this will create a config name of schema.registryssl.keystore.location
without a .
between registry
and ssl
. Should the CLIENT_NAMESPACE
constant value end with .
to make concatenation easier?
try { | ||
createKeystore( | ||
(String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_CONFIG), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here about the prefix needed a trailing .
.
@@ -118,7 +118,7 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config, | |||
this.isEligibleForMasterElector = config.getBoolean(SchemaRegistryConfig.MASTER_ELIGIBILITY); | |||
this.myIdentity = new SchemaRegistryIdentity(host, schemeAndPort.port, | |||
isEligibleForMasterElector, schemeAndPort.scheme); | |||
this.sslFactory = new SslFactory(config); | |||
this.sslFactory = new SslFactory(config.originalsWithPrefix(""), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not originals()
?
@mageshn do you want to resolve the conflicts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There doesn't seem to be any testing. |
Thanks for the integration tests -- they will pay off for stability +1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tests @mageshn ! Just left a minor comment, otherwise LGTM!
props.put(SchemaRegistryConfig.SSL_CLIENT_AUTHENTICATION_CONFIG, SchemaRegistryConfig.SSL_CLIENT_AUTHENTICATION_REQUIRED); | ||
|
||
} catch (Exception e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to rethrow the exception?
Is it possible to specify the keystore and the truststore (not their locations!) via API? The goal is to work without having to deal with the file system alltogether. This is of importance in e.g. cloud environment. Thanks |
hi @mageshn, I am still seeing I am just wondering if the new jar is only required in the clients or would the schema registry server needs to be update to 5.4.0 also |
I am still facing that issue with the required JVM level properties for SR clients using Avro SerDe. ## rpm -qa confluent*
confluent-common-5.4.2-1.noarch
confluent-control-center-fe-5.4.2-1.noarch
confluent-control-center-5.4.2-1.noarch
confluent-hub-client-5.4.2-1.noarch
confluent-kafka-2.12-5.4.2-1.noarch
confluent-rest-utils-5.4.2-1.noarch
confluent-rebalancer-5.4.2-1.noarch
confluent-schema-registry-5.4.2-1.noarch
## cat /etc/ikep/secrets/local_client.ssl.properties
security.protocol=SSL
ssl.truststore.location=/etc/ikep/secrets/local_client.truststore.p12
ssl.truststore.type=PKCS12
ssl.truststore.password=localPwd
ssl.keystore.location=/etc/ikep/secrets/local_client.keystore.p12
ssl.keystore.type=PKCS12
ssl.keystore.password=localPwd
ssl.key.password=localPwd
ssl.endpoint.identification.algorithm=
## Start avro-console-producer
echo '{"f1":"value1"}' | kafka-avro-console-producer \
--broker-list kafka:9${IKEP_LOCAL_ENV:-09}5 \
--topic local_a_testAvro \
--producer.config /etc/ikep/secrets/local_client.ssl.properties \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' \
--property schema.registry.url="https://schema-registry:8${IKEP_LOCAL_ENV:-08}1"
[2020-05-12 12:00:50,613] ERROR Failed to send HTTP request to endpoint: https://schema-registry:8081/subjects/local_a_testAvro-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService:254)
javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:320)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:263)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:258)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:645)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:421)
at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:177)
at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:164)
at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1151)
at java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1062)
at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:402)
at java.base/sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:567)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1362)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1337)
at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:250)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:434)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:426)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:412)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:181)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:55)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
at java.base/sun.security.validator.Validator.validate(Validator.java:264)
at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:222)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
... 27 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
... 33 more
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:320)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:263)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:258)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:645)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:421)
at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:177)
at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:164)
at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1151)
at java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1062)
at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:402)
at java.base/sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:567)
at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1362)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1337)
at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:250)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:434)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:426)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:412)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:181)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:55)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
at java.base/sun.security.validator.Validator.validate(Validator.java:264)
at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:222)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
... 27 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
... 33 more |
What is the solution here? What configuration can we use to specify a SR that uses a truststore/keystore? |
Historically, SR clients were required to specify the key store and trust store settings via the JVM args. This was not consistent with the rest of the platform and AK. This PR enabled the ability to specify these via configurations. If the approach looks good, I will revise this PR to include doc changes.