diff --git a/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDeConfig.java b/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDeConfig.java index 514fdefeae3..f335d8030e3 100644 --- a/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDeConfig.java +++ b/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDeConfig.java @@ -1,12 +1,12 @@ /* * Copyright 2018 Confluent Inc. - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,17 +29,12 @@ import io.confluent.kafka.serializers.subject.TopicNameStrategy; import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy; -import java.util.List; -import java.util.Map; - /** * Base class for configs for serializers and deserializers, defining a few common configs and * defaults. */ public class AbstractKafkaAvroSerDeConfig extends AbstractConfig { - private static final String CLIENT_NAMESPACE = "schema.registry"; - /** * Configurations beginning with this prefix can be used to specify headers to include in requests * made to Schema Registry. For example, to include an {@code Authorization} header with a value @@ -53,7 +48,7 @@ public class AbstractKafkaAvroSerDeConfig extends AbstractConfig { public static final String SCHEMA_REGISTRY_URL_DOC = "Comma-separated list of URLs for schema registry instances that can be used to register " - + "or look up schemas. " + + "or look up schemas. " + "If you wish to get a connection to a mocked schema registry for testing, " + "you can specify a scope using the 'mock://' pseudo-protocol. For example, " + "'mock://my-scope-name' corresponds to " @@ -74,7 +69,7 @@ public class AbstractKafkaAvroSerDeConfig extends AbstractConfig { public static final String BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT = "URL"; public static final String BASIC_AUTH_CREDENTIALS_SOURCE_DOC = "Specify how to pick the credentials for Basic Auth header. " - + "The supported values are URL, USER_INFO and SASL_INHERIT"; + + "The supported values are URL, USER_INFO and SASL_INHERIT"; public static final String BEARER_AUTH_CREDENTIALS_SOURCE = SchemaRegistryClientConfig .BEARER_AUTH_CREDENTIALS_SOURCE; @@ -104,22 +99,18 @@ public class AbstractKafkaAvroSerDeConfig extends AbstractConfig { TopicNameStrategy.class.getName(); public static final String KEY_SUBJECT_NAME_STRATEGY_DOC = "Determines how to construct the subject name under which the key schema is registered " - + "with the schema registry. By default, -key is used as subject."; + + "with the schema registry. By default, -key is used as subject."; public static final String VALUE_SUBJECT_NAME_STRATEGY = "value.subject.name.strategy"; public static final String VALUE_SUBJECT_NAME_STRATEGY_DEFAULT = TopicNameStrategy.class.getName(); public static final String VALUE_SUBJECT_NAME_STRATEGY_DOC = "Determines how to construct the subject name under which the value schema is registered " - + "with the schema registry. By default, -value is used as subject."; - - - public AbstractKafkaAvroSerDeConfig(ConfigDef config, Map props) { - super(config, props); - } + + "with the schema registry. By default, -value is used as subject."; public static ConfigDef baseConfigDef() { - return new ConfigDef() + ConfigDef configDef = new ConfigDef(); + configDef .define(SCHEMA_REGISTRY_URL_CONFIG, Type.LIST, Importance.HIGH, SCHEMA_REGISTRY_URL_DOC) .define(MAX_SCHEMAS_PER_SUBJECT_CONFIG, Type.INT, MAX_SCHEMAS_PER_SUBJECT_DEFAULT, @@ -129,85 +120,24 @@ public static ConfigDef baseConfigDef() { .define(BASIC_AUTH_CREDENTIALS_SOURCE, Type.STRING, BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT, Importance.MEDIUM, BASIC_AUTH_CREDENTIALS_SOURCE_DOC) .define(BEARER_AUTH_CREDENTIALS_SOURCE, Type.STRING, BEARER_AUTH_CREDENTIALS_SOURCE_DEFAULT, - Importance.MEDIUM, BEARER_AUTH_CREDENTIALS_SOURCE_DOC) + Importance.MEDIUM, BEARER_AUTH_CREDENTIALS_SOURCE_DOC) .define(SCHEMA_REGISTRY_USER_INFO_CONFIG, Type.PASSWORD, SCHEMA_REGISTRY_USER_INFO_DEFAULT, Importance.MEDIUM, SCHEMA_REGISTRY_USER_INFO_DOC) .define(USER_INFO_CONFIG, Type.PASSWORD, USER_INFO_DEFAULT, Importance.MEDIUM, SCHEMA_REGISTRY_USER_INFO_DOC) .define(BEARER_AUTH_TOKEN_CONFIG, Type.PASSWORD, BEARER_AUTH_TOKEN_DEFAULT, - Importance.MEDIUM, BEARER_AUTH_TOKEN_DOC) + Importance.MEDIUM, BEARER_AUTH_TOKEN_DOC) .define(KEY_SUBJECT_NAME_STRATEGY, Type.CLASS, KEY_SUBJECT_NAME_STRATEGY_DEFAULT, Importance.MEDIUM, KEY_SUBJECT_NAME_STRATEGY_DOC) .define(VALUE_SUBJECT_NAME_STRATEGY, Type.CLASS, VALUE_SUBJECT_NAME_STRATEGY_DEFAULT, - Importance.MEDIUM, VALUE_SUBJECT_NAME_STRATEGY_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_CONFIG, - Type.STRING, - SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_DEFAULT, - Importance.HIGH, - SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_DOC - ) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYSTORE_PASSWORD_CONFIG, - Type.PASSWORD, - SchemaRegistryClientConfig.SSL_KEYSTORE_PASSWORD_DEFAULT, - Importance.HIGH, - SchemaRegistryClientConfig.SSL_KEYSTORE_PASSWORD_DOC - ) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEY_PASSWORD_CONFIG, - Type.PASSWORD, - SchemaRegistryClientConfig.SSL_KEY_PASSWORD_DEFAULT, - Importance.HIGH, - SchemaRegistryClientConfig.SSL_KEY_PASSWORD_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_CONFIG, - Type.STRING, - SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_DEFAULT, - Importance.MEDIUM, - SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG, - Type.STRING, - SchemaRegistryClientConfig.SSL_KEYMANAGER_ALGORITHM_DEFAULT, - Importance.LOW, - SchemaRegistryClientConfig.SSL_KEYMANAGER_ALGORITHM_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_TRUSTSTORE_LOCATION_CONFIG, - Type.STRING, - SchemaRegistryClientConfig.SSL_TRUSTSTORE_LOCATION_DEFAULT, - Importance.HIGH, - SchemaRegistryClientConfig.SSL_TRUSTSTORE_LOCATION_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG, - Type.PASSWORD, - SchemaRegistryClientConfig.SSL_TRUSTSTORE_PASSWORD_DEFAULT, - Importance.HIGH, - SchemaRegistryClientConfig.SSL_TRUSTSTORE_PASSWORD_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_TRUSTSTORE_TYPE_CONFIG, - Type.STRING, - SchemaRegistryClientConfig.SSL_TRUSTSTORE_TYPE_DEFAULT, - Importance.MEDIUM, - SchemaRegistryClientConfig.SSL_TRUSTSTORE_TYPE_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, - Type.STRING, - SchemaRegistryClientConfig.SSL_TRUSTMANAGER_ALGORITHM_DEFAULT, - Importance.LOW, - SchemaRegistryClientConfig.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_PROTOCOL_CONFIG, - Type.STRING, - SchemaRegistryClientConfig.SSL_PROTOCOL_DEFAULT, - Importance.MEDIUM, - SchemaRegistryClientConfig.SSL_PROTOCOL_DOC) - .define( - CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_PROVIDER_CONFIG, - Type.STRING, - SchemaRegistryClientConfig.SSL_PROVIDER_DEFAULT, - Importance.MEDIUM, - SchemaRegistryClientConfig.SSL_PROVIDER_DOC); + Importance.MEDIUM, VALUE_SUBJECT_NAME_STRATEGY_DOC); + SchemaRegistryClientConfig.withClientSslSupport( + configDef, SchemaRegistryClientConfig.CLIENT_NAMESPACE); + return configDef; + } + + public AbstractKafkaAvroSerDeConfig(ConfigDef config, Map props) { + super(config, props); } public int getMaxSchemasPerSubject() { diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java index aa03ae64d12..3ff9d344a1b 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.Objects; +import io.confluent.kafka.schemaregistry.client.security.SslFactory; import org.apache.avro.Schema; import java.io.IOException; @@ -26,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import io.confluent.kafka.schemaregistry.client.rest.RestService; import io.confluent.kafka.schemaregistry.client.rest.Versions; @@ -120,6 +122,14 @@ public CachedSchemaRegistryClient( } if (configs != null && !configs.isEmpty()) { restService.configure(configs); + + Map sslConfigs = configs.entrySet().stream() + .filter(e -> e.getKey().startsWith(SchemaRegistryClientConfig.CLIENT_NAMESPACE)) + .collect(Collectors.toMap( + e -> e.getKey().substring(SchemaRegistryClientConfig.CLIENT_NAMESPACE.length()), + Map.Entry::getValue)); + SslFactory sslFactory = new SslFactory(sslConfigs); + restService.setSslFactory(sslFactory); } } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java index 4813222c2fa..08c354edcbe 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java @@ -16,7 +16,12 @@ package io.confluent.kafka.schemaregistry.client; +import io.confluent.common.config.ConfigDef; + public class SchemaRegistryClientConfig { + + public static final String CLIENT_NAMESPACE = "schema.registry."; + public static final String BASIC_AUTH_CREDENTIALS_SOURCE = "basic.auth.credentials.source"; @Deprecated public static final String SCHEMA_REGISTRY_USER_INFO_CONFIG = @@ -26,50 +31,29 @@ public class SchemaRegistryClientConfig { public static final String BEARER_AUTH_CREDENTIALS_SOURCE = "bearer.auth.credentials.source"; public static final String BEARER_AUTH_TOKEN_CONFIG = "bearer.auth.token"; - public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"; - public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password"; - public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password"; - public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type"; - public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm"; - public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location"; - public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password"; - public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type"; - public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm"; - public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol"; - public static final String SSL_PROVIDER_CONFIG = "ssl.provider"; - public static final String SSL_KEYSTORE_LOCATION_DOC = - "Location of the keystore file to use for SSL. This is required for HTTPS."; - public static final String SSL_KEYSTORE_LOCATION_DEFAULT = ""; - public static final String SSL_KEYSTORE_PASSWORD_DOC = - "The store password for the keystore file."; - public static final String SSL_KEYSTORE_PASSWORD_DEFAULT = ""; - public static final String SSL_KEY_PASSWORD_DOC = - "The password of the private key in the keystore file."; - public static final String SSL_KEY_PASSWORD_DEFAULT = ""; - public static final String SSL_KEYSTORE_TYPE_DOC = - "The type of keystore file."; - public static final String SSL_KEYSTORE_TYPE_DEFAULT = "JKS"; - public static final String SSL_KEYMANAGER_ALGORITHM_DOC = - "The algorithm used by the key manager factory for SSL connections. " - + "Leave blank to use Jetty's default."; - public static final String SSL_KEYMANAGER_ALGORITHM_DEFAULT = ""; - public static final String SSL_TRUSTSTORE_LOCATION_DOC = - "Location of the trust store. Required only to authenticate HTTPS clients."; - public static final String SSL_TRUSTSTORE_LOCATION_DEFAULT = ""; - public static final String SSL_TRUSTSTORE_PASSWORD_DOC = - "The store password for the trust store file."; - public static final String SSL_TRUSTSTORE_PASSWORD_DEFAULT = ""; - public static final String SSL_TRUSTSTORE_TYPE_DOC = - "The type of trust store file."; - public static final String SSL_TRUSTSTORE_TYPE_DEFAULT = "JKS"; - public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = - "The algorithm used by the trust manager factory for SSL connections. " - + "Leave blank to use Jetty's default."; - public static final String SSL_TRUSTMANAGER_ALGORITHM_DEFAULT = ""; - public static final String SSL_PROTOCOL_DOC = - "The SSL protocol used to generate the SslContextFactory."; - public static final String SSL_PROTOCOL_DEFAULT = "TLS"; - public static final String SSL_PROVIDER_DOC = - "The SSL security provider name. Leave blank to use Java's default."; - public static final String SSL_PROVIDER_DEFAULT = ""; + + public static void withClientSslSupport(ConfigDef configDef, String namespace) { + org.apache.kafka.common.config.ConfigDef sslConfigDef = new org.apache.kafka.common.config + .ConfigDef(); + sslConfigDef.withClientSslSupport(); + + for (org.apache.kafka.common.config.ConfigDef.ConfigKey configKey + : sslConfigDef.configKeys().values()) { + configDef.define(namespace + configKey.name, + typeFor(configKey.type), + configKey.defaultValue != null ? configKey.defaultValue : "", + importanceFor(configKey.importance), + configKey.documentation); + } + } + + private static ConfigDef.Type typeFor(org.apache.kafka.common.config.ConfigDef.Type type) { + return ConfigDef.Type.valueOf(type.name()); + } + + private static ConfigDef.Importance importanceFor( + org.apache.kafka.common.config.ConfigDef.Importance importance) { + return ConfigDef.Importance.valueOf(importance.name()); + } + } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java index a71f20dd81b..efb229795e5 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import io.confluent.kafka.schemaregistry.client.security.SslFactory; import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProviderFactory; import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider; @@ -43,7 +44,6 @@ import java.util.Map; import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLSocketFactory; import io.confluent.kafka.schemaregistry.client.rest.entities.Config; import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage; @@ -135,7 +135,7 @@ public class RestService implements Configurable { } private UrlList baseUrls; - private SSLSocketFactory sslSocketFactory; + private SslFactory sslFactory; private BasicAuthCredentialProvider basicAuthCredentialProvider; private BearerAuthCredentialProvider bearerAuthCredentialProvider; private Map httpHeaders; @@ -188,8 +188,8 @@ private static boolean isNonEmpty(String s) { return s != null && !s.isEmpty(); } - public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) { - this.sslSocketFactory = sslSocketFactory; + public void setSslFactory(SslFactory sslFactory) { + this.sslFactory = sslFactory; } /** @@ -272,8 +272,10 @@ private T sendHttpRequest(String requestUrl, String method, byte[] requestBo } private void setupSsl(HttpURLConnection connection) { - if (connection instanceof HttpsURLConnection && sslSocketFactory != null) { - ((HttpsURLConnection)connection).setSSLSocketFactory(sslSocketFactory); + if (connection instanceof HttpsURLConnection && sslFactory != null + && sslFactory.sslContext() != null) { + ((HttpsURLConnection)connection) + .setSSLSocketFactory(sslFactory.sslContext().getSocketFactory()); } } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/security/SslFactory.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/security/SslFactory.java index 1954475ee80..05ac2fcaca2 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/security/SslFactory.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/security/SslFactory.java @@ -15,7 +15,7 @@ package io.confluent.kafka.schemaregistry.client.security; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import org.apache.kafka.common.config.SslConfigs; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -39,30 +39,28 @@ public class SslFactory { private SecurityStore truststore; private SSLContext sslContext; - private static final String CLIENT_PREFIX = "client"; - public SslFactory(Map configs, boolean client) { - String prefix = client ? CLIENT_PREFIX : ""; - this.protocol = (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_PROTOCOL_CONFIG); - this.provider = (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_PROVIDER_CONFIG); + public SslFactory(Map configs) { + this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG); + this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG); this.kmfAlgorithm = (String) configs.get( - prefix + SchemaRegistryClientConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG); + SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG); this.tmfAlgorithm = (String) configs.get( - prefix + SchemaRegistryClientConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG); + SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG); try { createKeystore( - (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_CONFIG), - (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_CONFIG), - (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEYSTORE_PASSWORD_CONFIG), - (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEY_PASSWORD_CONFIG) + (String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), + (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), + (String) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), + (String) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) ); createTruststore( - (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_TRUSTSTORE_TYPE_CONFIG), - (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_TRUSTSTORE_LOCATION_CONFIG), - (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG) + (String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), + (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), + (String) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) ); this.sslContext = createSslContext(); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index 0f200df391b..ba1b024b34a 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -15,25 +15,6 @@ package io.confluent.kafka.schemaregistry.storage; -import org.apache.avro.reflect.Nullable; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.Vector; -import java.util.concurrent.TimeUnit; - import io.confluent.common.metrics.JmxReporter; import io.confluent.common.metrics.MetricConfig; import io.confluent.common.metrics.MetricName; @@ -53,6 +34,7 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.client.rest.utils.UrlList; +import io.confluent.kafka.schemaregistry.client.security.SslFactory; import io.confluent.kafka.schemaregistry.exceptions.IdDoesNotMatchException; import io.confluent.kafka.schemaregistry.exceptions.IdGenerationException; import io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException; @@ -70,7 +52,6 @@ import io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector; import io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector; import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig; -import io.confluent.kafka.schemaregistry.client.security.SslFactory; import io.confluent.kafka.schemaregistry.rest.VersionId; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; @@ -80,6 +61,24 @@ import io.confluent.rest.Application; import io.confluent.rest.RestConfig; import io.confluent.rest.exceptions.RestException; +import org.apache.avro.reflect.Nullable; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.TimeUnit; public class KafkaSchemaRegistry implements SchemaRegistry, MasterAwareSchemaRegistry { @@ -128,7 +127,7 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config, this.allowModeChanges = config.getBoolean(SchemaRegistryConfig.MODE_MUTABILITY); this.myIdentity = new SchemaRegistryIdentity(host, schemeAndPort.port, isEligibleForMasterElector, schemeAndPort.scheme); - this.sslFactory = new SslFactory(config.originalsWithPrefix(""), false); + this.sslFactory = new SslFactory(config.originals()); this.kafkaStoreTimeoutMs = config.getInt(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG); this.initTimeout = config.getInt(SchemaRegistryConfig.KAFKASTORE_INIT_TIMEOUT_CONFIG); @@ -291,9 +290,7 @@ public void setMaster(@Nullable SchemaRegistryIdentity newMaster) masterRestService = null; } else { masterRestService = new RestService(masterIdentity.getUrl()); - if (sslFactory != null && sslFactory.sslContext() != null) { - masterRestService.setSslSocketFactory(sslFactory.sslContext().getSocketFactory()); - } + masterRestService.setSslFactory(sslFactory); } if (masterIdentity != null && !masterIdentity.equals(previousMaster) && isMaster()) {