Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Initial impl for externalizing secrets in configs #4990

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config;

/**
* A callback passed to {@link ConfigProvider} for subscribing to changes.
*/
public interface ConfigChangeCallback {

/**
* Performs an action when configuration data changes.
*
* @param path the path at which the data resides
* @param data the configuration data
*/
void onChange(String path, ConfigData data);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config;

import java.util.Map;

/**
* Configuration data from a {@link ConfigProvider}.
*/
public class ConfigData {

private final Map<String, String> data;
private final long ttl;

/**
* Creates a new ConfigData with the given data and TTL (in milliseconds).
*
* @param data a Map of key-value pairs
* @param ttl the time-to-live of the data in milliseconds
*/
public ConfigData(Map<String, String> data, long ttl) {
this.data = data;
this.ttl = ttl;
}

/**
* Creates a new ConfigData with the given data.
*
* @param data a Map of key-value pairs
*/
public ConfigData(Map<String, String> data) {
this(data, Long.MAX_VALUE);
}

/**
* Returns the data.
*
* @return data a Map of key-value pairs
*/
public Map<String, String> data() {
return data;
}

/**
* Returns the TTL (in milliseconds).
*
* @return ttl the time-to-live (in milliseconds) of the data.
*/
public long ttl() {
return ttl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config;

import org.apache.kafka.common.Configurable;

import java.io.Closeable;
import java.util.Set;

/**
* A provider of configuration data, which may optionally support subscriptions to configuration changes.
*/
public interface ConfigProvider extends Configurable, Closeable {

/**
* Retrieves the data at the given path.
*
* @param path the path where the data resides
* @return the configuration data
*/
ConfigData get(String path);

/**
* Retrieves the data with the given keys at the given path.
*
* @param path the path where the data resides
* @param keys the keys whose values will be retrieved
* @return the configuration data
*/
ConfigData get(String path, Set<String> keys);

/**
* Subscribes to changes for the given keys at the given path.
*
* @param path the path where the data resides
* @param keys the keys whose values will be retrieved
* @param callback the callback to invoke upon change
*/
void subscribe(String path, Set<String> keys, ConfigChangeCallback callback);

/**
* Unsubscribes to changes for the given keys at the given path.
*
* @param path the path where the data resides
* @param keys the keys whose values will be retrieved
*/
void unsubscribe(String path, Set<String> keys);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* This class wraps a set of {@link ConfigProvider} instances and uses them to perform
* transformations.
*/
public class ConfigTransformer {
private static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{(.*?):((.*?):)?(.*?)\\}");
private static final String EMPTY_PATH = "";

private final Map<String, ConfigProvider> configProviders;

/**
* Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
*
* @param configProviders the set of {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
this.configProviders = configProviders;
}

/**
* Transforms the given configuration data by using the {@link ConfigProvider} instances to
* look up values to replace the variables in the pattern.
*
* @param configs the configuration values to be transformed
* @return an instance of {@link ConfigTransformerResult}
*/
public ConfigTransformerResult transform(Map<String, String> configs) {
Map<String, Map<String, Set<String>>> keysByProvider = new HashMap<>();
Map<String, Map<String, Map<String, String>>> lookupsByProvider = new HashMap<>();

// Collect the variables from the given configs that need transformation
for (Map.Entry<String, String> config : configs.entrySet()) {
List<ConfigVariable> vars = getVars(config.getKey(), config.getValue(), DEFAULT_PATTERN);
for (ConfigVariable var : vars) {
Map<String, Set<String>> keysByPath = keysByProvider.computeIfAbsent(var.providerName, k -> new HashMap<>());
Set<String> keys = keysByPath.computeIfAbsent(var.path, k -> new HashSet<>());
keys.add(var.variable);
}
}

// Retrieve requested variables from the ConfigProviders
Map<String, Long> ttls = new HashMap<>();
for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
String providerName = entry.getKey();
ConfigProvider provider = configProviders.get(providerName);
Map<String, Set<String>> keysByPath = entry.getValue();
if (provider != null && keysByPath != null) {
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
String path = pathWithKeys.getKey();
Set<String> keys = new HashSet<>(pathWithKeys.getValue());
ConfigData configData = provider.get(path, keys);
Map<String, String> data = configData.data();
long ttl = configData.ttl();
if (ttl >= 0 && ttl < Long.MAX_VALUE) {
ttls.put(path, ttl);
}
Map<String, Map<String, String>> keyValuesByPath =
lookupsByProvider.computeIfAbsent(providerName, k -> new HashMap<>());
keyValuesByPath.put(path, data);
}
}
}

// Perform the transformations by performing variable replacements
Map<String, String> data = new HashMap<>(configs);
for (Map.Entry<String, String> config : configs.entrySet()) {
data.put(config.getKey(), replace(lookupsByProvider, config.getValue(), DEFAULT_PATTERN));
}
return new ConfigTransformerResult(data, ttls);
}

private static List<ConfigVariable> getVars(String key, String value, Pattern pattern) {
List<ConfigVariable> configVars = new ArrayList<>();
Matcher matcher = pattern.matcher(value);
while (matcher.find()) {
configVars.add(new ConfigVariable(matcher));
}
return configVars;
}

private static String replace(Map<String, Map<String, Map<String, String>>> lookupsByProvider,
String value,
Pattern pattern) {
Matcher matcher = pattern.matcher(value);
StringBuilder builder = new StringBuilder();
int i = 0;
while (matcher.find()) {
ConfigVariable configVar = new ConfigVariable(matcher);
Map<String, Map<String, String>> lookupsByPath = lookupsByProvider.get(configVar.providerName);
if (lookupsByPath != null) {
Map<String, String> keyValues = lookupsByPath.get(configVar.path);
String replacement = keyValues.get(configVar.variable);
builder.append(value, i, matcher.start());
if (replacement == null) {
// No replacements will be performed; just return the original value
builder.append(matcher.group(0));
} else {
builder.append(replacement);
}
i = matcher.end();
}
}
builder.append(value, i, value.length());
return builder.toString();
}

private static class ConfigVariable {
final String providerName;
final String path;
final String variable;

ConfigVariable(Matcher matcher) {
this.providerName = matcher.group(1);
this.path = matcher.group(3) != null ? matcher.group(3) : EMPTY_PATH;
this.variable = matcher.group(4);
}

public String toString() {
return "(" + providerName + ":" + (path != null ? path + ":" : "") + variable + ")";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config;

import java.util.Map;

/**
* The result of a transformation from {@link ConfigTransformer}.
*/
public class ConfigTransformerResult {

private Map<String, Long> ttls;
private Map<String, String> data;

/**
* Creates a new ConfigTransformerResult with the given data and TTL values for a set of paths.
*
* @param data a Map of key-value pairs
* @param ttls a Map of path and TTL values (in milliseconds)
*/
public ConfigTransformerResult(Map<String, String> data, Map<String, Long> ttls) {
this.data = data;
this.ttls = ttls;
}

/**
* Returns the data.
*
* @return data a Map of key-value pairs
*/
public Map<String, String> data() {
return data;
}

/**
* Returns the TTL values (in milliseconds).
*
* @return data a Map of path and TTL values
*/
public Map<String, Long> ttls() {
return ttls;
}
}