Skip to content

Commit

Permalink
Rebase & use configs defs from AK
Browse files Browse the repository at this point in the history
  • Loading branch information
mageshn committed Jul 29, 2019
1 parent 57354af commit 2b4bc11
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 180 deletions.
@@ -1,12 +1,12 @@
/*
* Copyright 2018 Confluent Inc.
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -29,17 +29,12 @@
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy;

import java.util.List;
import java.util.Map;

/**
* Base class for configs for serializers and deserializers, defining a few common configs and
* defaults.
*/
public class AbstractKafkaAvroSerDeConfig extends AbstractConfig {

private static final String CLIENT_NAMESPACE = "schema.registry";

/**
* Configurations beginning with this prefix can be used to specify headers to include in requests
* made to Schema Registry. For example, to include an {@code Authorization} header with a value
Expand All @@ -53,7 +48,7 @@ public class AbstractKafkaAvroSerDeConfig extends AbstractConfig {
public static final String
SCHEMA_REGISTRY_URL_DOC =
"Comma-separated list of URLs for schema registry instances that can be used to register "
+ "or look up schemas. "
+ "or look up schemas. "
+ "If you wish to get a connection to a mocked schema registry for testing, "
+ "you can specify a scope using the 'mock://' pseudo-protocol. For example, "
+ "'mock://my-scope-name' corresponds to "
Expand All @@ -74,7 +69,7 @@ public class AbstractKafkaAvroSerDeConfig extends AbstractConfig {
public static final String BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT = "URL";
public static final String BASIC_AUTH_CREDENTIALS_SOURCE_DOC =
"Specify how to pick the credentials for Basic Auth header. "
+ "The supported values are URL, USER_INFO and SASL_INHERIT";
+ "The supported values are URL, USER_INFO and SASL_INHERIT";

public static final String BEARER_AUTH_CREDENTIALS_SOURCE = SchemaRegistryClientConfig
.BEARER_AUTH_CREDENTIALS_SOURCE;
Expand Down Expand Up @@ -104,22 +99,18 @@ public class AbstractKafkaAvroSerDeConfig extends AbstractConfig {
TopicNameStrategy.class.getName();
public static final String KEY_SUBJECT_NAME_STRATEGY_DOC =
"Determines how to construct the subject name under which the key schema is registered "
+ "with the schema registry. By default, <topic>-key is used as subject.";
+ "with the schema registry. By default, <topic>-key is used as subject.";

public static final String VALUE_SUBJECT_NAME_STRATEGY = "value.subject.name.strategy";
public static final String VALUE_SUBJECT_NAME_STRATEGY_DEFAULT =
TopicNameStrategy.class.getName();
public static final String VALUE_SUBJECT_NAME_STRATEGY_DOC =
"Determines how to construct the subject name under which the value schema is registered "
+ "with the schema registry. By default, <topic>-value is used as subject.";


public AbstractKafkaAvroSerDeConfig(ConfigDef config, Map<?, ?> props) {
super(config, props);
}
+ "with the schema registry. By default, <topic>-value is used as subject.";

public static ConfigDef baseConfigDef() {
return new ConfigDef()
ConfigDef configDef = new ConfigDef();
configDef
.define(SCHEMA_REGISTRY_URL_CONFIG, Type.LIST,
Importance.HIGH, SCHEMA_REGISTRY_URL_DOC)
.define(MAX_SCHEMAS_PER_SUBJECT_CONFIG, Type.INT, MAX_SCHEMAS_PER_SUBJECT_DEFAULT,
Expand All @@ -129,85 +120,24 @@ public static ConfigDef baseConfigDef() {
.define(BASIC_AUTH_CREDENTIALS_SOURCE, Type.STRING, BASIC_AUTH_CREDENTIALS_SOURCE_DEFAULT,
Importance.MEDIUM, BASIC_AUTH_CREDENTIALS_SOURCE_DOC)
.define(BEARER_AUTH_CREDENTIALS_SOURCE, Type.STRING, BEARER_AUTH_CREDENTIALS_SOURCE_DEFAULT,
Importance.MEDIUM, BEARER_AUTH_CREDENTIALS_SOURCE_DOC)
Importance.MEDIUM, BEARER_AUTH_CREDENTIALS_SOURCE_DOC)
.define(SCHEMA_REGISTRY_USER_INFO_CONFIG, Type.PASSWORD, SCHEMA_REGISTRY_USER_INFO_DEFAULT,
Importance.MEDIUM, SCHEMA_REGISTRY_USER_INFO_DOC)
.define(USER_INFO_CONFIG, Type.PASSWORD, USER_INFO_DEFAULT,
Importance.MEDIUM, SCHEMA_REGISTRY_USER_INFO_DOC)
.define(BEARER_AUTH_TOKEN_CONFIG, Type.PASSWORD, BEARER_AUTH_TOKEN_DEFAULT,
Importance.MEDIUM, BEARER_AUTH_TOKEN_DOC)
Importance.MEDIUM, BEARER_AUTH_TOKEN_DOC)
.define(KEY_SUBJECT_NAME_STRATEGY, Type.CLASS, KEY_SUBJECT_NAME_STRATEGY_DEFAULT,
Importance.MEDIUM, KEY_SUBJECT_NAME_STRATEGY_DOC)
.define(VALUE_SUBJECT_NAME_STRATEGY, Type.CLASS, VALUE_SUBJECT_NAME_STRATEGY_DEFAULT,
Importance.MEDIUM, VALUE_SUBJECT_NAME_STRATEGY_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_CONFIG,
Type.STRING,
SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_DEFAULT,
Importance.HIGH,
SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_DOC
)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYSTORE_PASSWORD_CONFIG,
Type.PASSWORD,
SchemaRegistryClientConfig.SSL_KEYSTORE_PASSWORD_DEFAULT,
Importance.HIGH,
SchemaRegistryClientConfig.SSL_KEYSTORE_PASSWORD_DOC
)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEY_PASSWORD_CONFIG,
Type.PASSWORD,
SchemaRegistryClientConfig.SSL_KEY_PASSWORD_DEFAULT,
Importance.HIGH,
SchemaRegistryClientConfig.SSL_KEY_PASSWORD_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_CONFIG,
Type.STRING,
SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_DEFAULT,
Importance.MEDIUM,
SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG,
Type.STRING,
SchemaRegistryClientConfig.SSL_KEYMANAGER_ALGORITHM_DEFAULT,
Importance.LOW,
SchemaRegistryClientConfig.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_TRUSTSTORE_LOCATION_CONFIG,
Type.STRING,
SchemaRegistryClientConfig.SSL_TRUSTSTORE_LOCATION_DEFAULT,
Importance.HIGH,
SchemaRegistryClientConfig.SSL_TRUSTSTORE_LOCATION_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG,
Type.PASSWORD,
SchemaRegistryClientConfig.SSL_TRUSTSTORE_PASSWORD_DEFAULT,
Importance.HIGH,
SchemaRegistryClientConfig.SSL_TRUSTSTORE_PASSWORD_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_TRUSTSTORE_TYPE_CONFIG,
Type.STRING,
SchemaRegistryClientConfig.SSL_TRUSTSTORE_TYPE_DEFAULT,
Importance.MEDIUM,
SchemaRegistryClientConfig.SSL_TRUSTSTORE_TYPE_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
Type.STRING,
SchemaRegistryClientConfig.SSL_TRUSTMANAGER_ALGORITHM_DEFAULT,
Importance.LOW,
SchemaRegistryClientConfig.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_PROTOCOL_CONFIG,
Type.STRING,
SchemaRegistryClientConfig.SSL_PROTOCOL_DEFAULT,
Importance.MEDIUM,
SchemaRegistryClientConfig.SSL_PROTOCOL_DOC)
.define(
CLIENT_NAMESPACE + SchemaRegistryClientConfig.SSL_PROVIDER_CONFIG,
Type.STRING,
SchemaRegistryClientConfig.SSL_PROVIDER_DEFAULT,
Importance.MEDIUM,
SchemaRegistryClientConfig.SSL_PROVIDER_DOC);
Importance.MEDIUM, VALUE_SUBJECT_NAME_STRATEGY_DOC);
SchemaRegistryClientConfig.withClientSslSupport(
configDef, SchemaRegistryClientConfig.CLIENT_NAMESPACE);
return configDef;
}

public AbstractKafkaAvroSerDeConfig(ConfigDef config, Map<?, ?> props) {
super(config, props);
}

public int getMaxSchemasPerSubject() {
Expand Down
Expand Up @@ -19,13 +19,15 @@
import java.util.Collections;
import java.util.Objects;

import io.confluent.kafka.schemaregistry.client.security.SslFactory;
import org.apache.avro.Schema;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.Versions;
Expand Down Expand Up @@ -120,6 +122,14 @@ public CachedSchemaRegistryClient(
}
if (configs != null && !configs.isEmpty()) {
restService.configure(configs);

Map<String, Object> sslConfigs = configs.entrySet().stream()
.filter(e -> e.getKey().startsWith(SchemaRegistryClientConfig.CLIENT_NAMESPACE))
.collect(Collectors.toMap(
e -> e.getKey().substring(SchemaRegistryClientConfig.CLIENT_NAMESPACE.length()),
Map.Entry::getValue));
SslFactory sslFactory = new SslFactory(sslConfigs);
restService.setSslFactory(sslFactory);
}
}

Expand Down
Expand Up @@ -16,7 +16,12 @@

package io.confluent.kafka.schemaregistry.client;

import io.confluent.common.config.ConfigDef;

public class SchemaRegistryClientConfig {

public static final String CLIENT_NAMESPACE = "schema.registry.";

public static final String BASIC_AUTH_CREDENTIALS_SOURCE = "basic.auth.credentials.source";
@Deprecated
public static final String SCHEMA_REGISTRY_USER_INFO_CONFIG =
Expand All @@ -26,50 +31,29 @@ public class SchemaRegistryClientConfig {
public static final String BEARER_AUTH_CREDENTIALS_SOURCE = "bearer.auth.credentials.source";
public static final String BEARER_AUTH_TOKEN_CONFIG = "bearer.auth.token";

public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";
public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
public static final String SSL_KEYSTORE_LOCATION_DOC =
"Location of the keystore file to use for SSL. This is required for HTTPS.";
public static final String SSL_KEYSTORE_LOCATION_DEFAULT = "";
public static final String SSL_KEYSTORE_PASSWORD_DOC =
"The store password for the keystore file.";
public static final String SSL_KEYSTORE_PASSWORD_DEFAULT = "";
public static final String SSL_KEY_PASSWORD_DOC =
"The password of the private key in the keystore file.";
public static final String SSL_KEY_PASSWORD_DEFAULT = "";
public static final String SSL_KEYSTORE_TYPE_DOC =
"The type of keystore file.";
public static final String SSL_KEYSTORE_TYPE_DEFAULT = "JKS";
public static final String SSL_KEYMANAGER_ALGORITHM_DOC =
"The algorithm used by the key manager factory for SSL connections. "
+ "Leave blank to use Jetty's default.";
public static final String SSL_KEYMANAGER_ALGORITHM_DEFAULT = "";
public static final String SSL_TRUSTSTORE_LOCATION_DOC =
"Location of the trust store. Required only to authenticate HTTPS clients.";
public static final String SSL_TRUSTSTORE_LOCATION_DEFAULT = "";
public static final String SSL_TRUSTSTORE_PASSWORD_DOC =
"The store password for the trust store file.";
public static final String SSL_TRUSTSTORE_PASSWORD_DEFAULT = "";
public static final String SSL_TRUSTSTORE_TYPE_DOC =
"The type of trust store file.";
public static final String SSL_TRUSTSTORE_TYPE_DEFAULT = "JKS";
public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC =
"The algorithm used by the trust manager factory for SSL connections. "
+ "Leave blank to use Jetty's default.";
public static final String SSL_TRUSTMANAGER_ALGORITHM_DEFAULT = "";
public static final String SSL_PROTOCOL_DOC =
"The SSL protocol used to generate the SslContextFactory.";
public static final String SSL_PROTOCOL_DEFAULT = "TLS";
public static final String SSL_PROVIDER_DOC =
"The SSL security provider name. Leave blank to use Java's default.";
public static final String SSL_PROVIDER_DEFAULT = "";

public static void withClientSslSupport(ConfigDef configDef, String namespace) {
org.apache.kafka.common.config.ConfigDef sslConfigDef = new org.apache.kafka.common.config
.ConfigDef();
sslConfigDef.withClientSslSupport();

for (org.apache.kafka.common.config.ConfigDef.ConfigKey configKey
: sslConfigDef.configKeys().values()) {
configDef.define(namespace + configKey.name,
typeFor(configKey.type),
configKey.defaultValue != null ? configKey.defaultValue : "",
importanceFor(configKey.importance),
configKey.documentation);
}
}

private static ConfigDef.Type typeFor(org.apache.kafka.common.config.ConfigDef.Type type) {
return ConfigDef.Type.valueOf(type.name());
}

private static ConfigDef.Importance importanceFor(
org.apache.kafka.common.config.ConfigDef.Importance importance) {
return ConfigDef.Importance.valueOf(importance.name());
}

}
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.schemaregistry.client.security.SslFactory;
import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProviderFactory;
import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider;

Expand All @@ -43,7 +44,6 @@
import java.util.Map;

import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;

import io.confluent.kafka.schemaregistry.client.rest.entities.Config;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
Expand Down Expand Up @@ -135,7 +135,7 @@ public class RestService implements Configurable {
}

private UrlList baseUrls;
private SSLSocketFactory sslSocketFactory;
private SslFactory sslFactory;
private BasicAuthCredentialProvider basicAuthCredentialProvider;
private BearerAuthCredentialProvider bearerAuthCredentialProvider;
private Map<String, String> httpHeaders;
Expand Down Expand Up @@ -188,8 +188,8 @@ private static boolean isNonEmpty(String s) {
return s != null && !s.isEmpty();
}

public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
this.sslSocketFactory = sslSocketFactory;
public void setSslFactory(SslFactory sslFactory) {
this.sslFactory = sslFactory;
}

/**
Expand Down Expand Up @@ -272,8 +272,10 @@ private <T> T sendHttpRequest(String requestUrl, String method, byte[] requestBo
}

private void setupSsl(HttpURLConnection connection) {
if (connection instanceof HttpsURLConnection && sslSocketFactory != null) {
((HttpsURLConnection)connection).setSSLSocketFactory(sslSocketFactory);
if (connection instanceof HttpsURLConnection && sslFactory != null
&& sslFactory.sslContext() != null) {
((HttpsURLConnection)connection)
.setSSLSocketFactory(sslFactory.sslContext().getSocketFactory());
}
}

Expand Down
Expand Up @@ -15,7 +15,7 @@

package io.confluent.kafka.schemaregistry.client.security;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import org.apache.kafka.common.config.SslConfigs;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
Expand All @@ -39,30 +39,28 @@ public class SslFactory {
private SecurityStore truststore;
private SSLContext sslContext;

private static final String CLIENT_PREFIX = "client";

public SslFactory(Map<String, ?> configs, boolean client) {
String prefix = client ? CLIENT_PREFIX : "";
this.protocol = (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_PROTOCOL_CONFIG);
this.provider = (String) configs.get(prefix + SchemaRegistryClientConfig.SSL_PROVIDER_CONFIG);
public SslFactory(Map<String, ?> configs) {
this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);

this.kmfAlgorithm = (String) configs.get(
prefix + SchemaRegistryClientConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG);
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
this.tmfAlgorithm = (String) configs.get(
prefix + SchemaRegistryClientConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);

try {
createKeystore(
(String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEYSTORE_TYPE_CONFIG),
(String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEYSTORE_LOCATION_CONFIG),
(String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEYSTORE_PASSWORD_CONFIG),
(String) configs.get(prefix + SchemaRegistryClientConfig.SSL_KEY_PASSWORD_CONFIG)
(String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
(String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
(String) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
(String) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)
);

createTruststore(
(String) configs.get(prefix + SchemaRegistryClientConfig.SSL_TRUSTSTORE_TYPE_CONFIG),
(String) configs.get(prefix + SchemaRegistryClientConfig.SSL_TRUSTSTORE_LOCATION_CONFIG),
(String) configs.get(prefix + SchemaRegistryClientConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG)
(String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
(String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
(String) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)
);

this.sslContext = createSslContext();
Expand Down

0 comments on commit 2b4bc11

Please sign in to comment.