Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create custom Properties and Map configs classes with toString method that hides password properties #1196

Merged
merged 4 commits into from Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
215 changes: 204 additions & 11 deletions kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.MapDifference;
import com.google.common.collect.MapDifference.ValueDifference;
import com.google.common.collect.Maps;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaJsonSerializerConfig;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig;
Expand All @@ -43,19 +44,23 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.core.MediaType;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Range;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -963,12 +968,20 @@ public KafkaRestConfig(ConfigDef configDef, Properties props, Time time) {
this(configDef, props);
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public Properties getOriginalProperties() {
Properties properties = new Properties();
Properties properties = new PropertiesWithSafeToString(this);
properties.putAll(originals());
return properties;
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public Map<String, Object> getSchemaRegistryConfigs() {
ImmutableSet<String> mask =
ImmutableSet.<String>builder()
Expand Down Expand Up @@ -1022,23 +1035,37 @@ public Map<String, Object> getSchemaRegistryConfigs() {
}
configs.put(USE_LATEST_VERSION, false);

return configs;
return new ConfigsWithSafeToString(configs, this);
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public final Map<String, Object> getJsonSerializerConfigs() {
Set<String> mask = singleton(KafkaJsonSerializerConfig.JSON_INDENT_OUTPUT);
return new ConfigsBuilder(mask).addConfigs("client.").addConfigs("producer.").build();
Map<String, Object> configs =
new ConfigsBuilder(mask).addConfigs("client.").addConfigs("producer.").build();
return new ConfigsWithSafeToString(configs, this);
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public final Map<String, Object> getAvroSerializerConfigs() {
Set<String> mask = AbstractKafkaSchemaSerDeConfig.baseConfigDef().names();
HashMap<String, Object> configs =
new HashMap<>(
new ConfigsBuilder(mask).addConfigs("client.").addConfigs("producer.").build());
configs.putAll(getSchemaRegistryConfigs());
return configs;
return new ConfigsWithSafeToString(configs, this);
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public final Map<String, Object> getJsonschemaSerializerConfigs() {
Set<String> mask =
ImmutableSet.of(
Expand All @@ -1050,19 +1077,27 @@ public final Map<String, Object> getJsonschemaSerializerConfigs() {
new HashMap<>(
new ConfigsBuilder(mask).addConfigs("client.").addConfigs("producer.").build());
configs.putAll(getSchemaRegistryConfigs());
return configs;
return new ConfigsWithSafeToString(configs, this);
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public final Map<String, Object> getProtobufSerializerConfigs() {
Set<String> mask =
singleton(KafkaProtobufSerializerConfig.REFERENCE_SUBJECT_NAME_STRATEGY_CONFIG);
HashMap<String, Object> configs =
new HashMap<>(
new ConfigsBuilder(mask).addConfigs("client.").addConfigs("producer.").build());
configs.putAll(getSchemaRegistryConfigs());
return configs;
return new ConfigsWithSafeToString(configs, this);
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public Properties getProducerProperties() {
Map<String, Object> producerConfigs =
new ConfigsBuilder()
Expand All @@ -1072,7 +1107,7 @@ public Properties getProducerProperties() {
.addConfigs("schema.registry.", false)
.build();

Properties producerProperties = new Properties();
Properties producerProperties = new PropertiesWithSafeToString(this);
producerProperties.putAll(producerConfigs);

// KREST-4606: Disable idempotency until at the very least KAFKA-13668 is fixed, but maybe
Expand All @@ -1088,13 +1123,23 @@ public Properties getProducerProperties() {
return producerProperties;
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public Map<String, Object> getProducerConfigs() {
return getProducerProperties().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().toString(), Entry::getValue));
return new ConfigsWithSafeToString(
getProducerProperties().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().toString(), Entry::getValue)),
this);
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public Properties getConsumerProperties() {
Properties consumerProps = new Properties();
Properties consumerProps = new PropertiesWithSafeToString(this);

consumerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, getString(BOOTSTRAP_SERVERS_CONFIG));
consumerProps.setProperty(MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS_VALUE);
Expand All @@ -1107,8 +1152,12 @@ public Properties getConsumerProperties() {
return consumerProps;
}

/**
* Callers of this function are advised to use the return value as a configuration object as it
* can hide sensitive information in logging.
*/
public Properties getAdminProperties() {
Properties adminProps = new Properties();
Properties adminProps = new PropertiesWithSafeToString(this);

adminProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, getString(BOOTSTRAP_SERVERS_CONFIG));

Expand Down Expand Up @@ -1287,4 +1336,148 @@ private static ConfigValue create(String origin, Object value) {
return new AutoValue_KafkaRestConfig_ConfigValue(origin, value);
}
}

// this set contains configs that are a password type but not listed in AbstractConfig
private static final Set<String> passwordTypeConfigs =
ImmutableSet.of(
SaslConfigs.SASL_JAAS_CONFIG,
SchemaRegistryClientConfig.SCHEMA_REGISTRY_USER_INFO_CONFIG,
SchemaRegistryClientConfig.USER_INFO_CONFIG,
SchemaRegistryClientConfig.BEARER_AUTH_TOKEN_CONFIG,
SchemaRegistryClientConfig.BEARER_AUTH_CLIENT_SECRET);

/**
* This function relies on {@link AbstractConfig} object to detect whether a config is a {@link
* Password}. Therefore, it is important for the implementation to make sure that a config is
* registered correctly to not accidentally leaking sensitive information. If there is a config
* that is sensitive but not registered correctly in its corresponding {@link AbstractConfig}
* object, please add that config to {@link KafkaRestConfig#passwordTypeConfigs} field above.
*/
static <K, V> String mapToStringHideSensitiveConfigs(Map<K, V> map, AbstractConfig config) {
StringBuilder sb = new StringBuilder();
Set<Entry<K, V>> entries = map.entrySet();
sb.append('{');
Iterator<Entry<K, V>> it = entries.iterator();
while (it.hasNext()) {
Entry<K, V> entry = it.next();
K key = entry.getKey();
V value = entry.getValue();
if (key instanceof String) {
if (passwordTypeConfigs.contains(key)) {
toStringKeyAndValue(sb, key, Password.HIDDEN, it.hasNext());
} else {
Type type = config.typeOf((String) key);
if (type == Type.PASSWORD) {
toStringKeyAndValue(sb, key, config.getPassword((String) key), it.hasNext());
TemirAskhat marked this conversation as resolved.
Show resolved Hide resolved
} else {
toStringKeyAndValue(sb, key, value, it.hasNext());
}
}
}
}
return sb.append('}').toString();
}

private static void toStringKeyAndValue(
StringBuilder sb, Object key, Object value, boolean appendDelimiter) {
sb.append(key);
sb.append("=");
sb.append(value);
if (appendDelimiter) {
sb.append(", ");
}
}

/** {@link Properties} class with toString function that hide sensitive configs */
public static class PropertiesWithSafeToString extends Properties {

private final AbstractConfig config;

public PropertiesWithSafeToString(AbstractConfig config) {
super();
this.config = config;
}

@Override
public synchronized String toString() {
return mapToStringHideSensitiveConfigs(this, config);
}
}

/** {@link Map} configs class with toString function that hide sensitive configs */
public static class ConfigsWithSafeToString implements Map<String, Object> {

private final Map<String, Object> delegate;
private final AbstractConfig config;

public ConfigsWithSafeToString(Map<String, Object> delegate, AbstractConfig config) {
this.delegate = delegate;
this.config = config;
}

@Override
public String toString() {
return mapToStringHideSensitiveConfigs(this, config);
}

@Override
public int size() {
return delegate.size();
}

@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key);
}

@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}

@Override
public Object get(Object key) {
return delegate.get(key);
}

@Override
public Object put(String key, Object value) {
return delegate.put(key, value);
}

@Override
public Object remove(Object key) {
return delegate.remove(key);
}

@Override
public void putAll(Map<? extends String, ?> m) {
delegate.putAll(m);
}

@Override
public void clear() {
delegate.clear();
}

@Override
public Set<String> keySet() {
return delegate.keySet();
}

@Override
public Collection<Object> values() {
return delegate.values();
}

@Override
public Set<Entry<String, Object>> entrySet() {
return delegate.entrySet();
}
}
}