Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters #14309

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
840f2c9
KAFKA-13329: Add preflight validation for connector key and value con…
C0urante Aug 28, 2023
ac9caf8
KAFKA-13328: Add preflight validation for connector header converter …
C0urante Aug 28, 2023
dba0649
Add utility to close Object instances that may be AutoCloseable
C0urante Aug 29, 2023
abdc024
Update clients/src/main/java/org/apache/kafka/common/utils/Utils.java
C0urante Aug 31, 2023
1d10efa
Add newline to end of file, fix Javadoc code snippet example, extract…
C0urante Aug 31, 2023
735d35f
Remove warning about method references from Javadocs for Utils::maybe…
C0urante Sep 5, 2023
a554695
Simplify Utils::ensureConcrete, refine exception handling in Instanti…
C0urante Sep 5, 2023
8dfeca5
Fix failing unit tests
C0urante Sep 5, 2023
6cf1e13
Change Utils::ensureConcrete to Utils::ensureConcreteSubclass
C0urante Oct 31, 2023
29c95bc
KAFKA-13328, KAFKA-13329: Add custom preflight validation support for…
C0urante Aug 29, 2023
5b33be2
Change 'transformation' to 'pluginInstance' in EnrichablePlugin::getC…
C0urante Sep 5, 2023
6144031
Switch to try/finally instead of try-with-resources for Converter/Hea…
C0urante Oct 31, 2023
3c8157d
Make AbstractHerder::validateConverterConfig more generic and future-…
C0urante Oct 31, 2023
519306a
Merge branch 'trunk' into kafka-13328-13329-configdef-validation
C0urante Apr 15, 2024
da8473b
Merge branch 'trunk' into kafka-13328-13329-configdef-validation
C0urante Apr 30, 2024
4c55f4b
Address review comments
C0urante May 1, 2024
637610a
Revert unnecessary changes to Utils.java
C0urante May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -52,6 +52,8 @@
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
Expand Down Expand Up @@ -79,6 +81,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -90,6 +93,10 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;

/**
* Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
* must invoke the lifecycle hooks appropriately.
Expand Down Expand Up @@ -392,6 +399,161 @@ protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}

/**
* General-purpose validation logic for converters that are configured directly
* in a connector config (as opposed to inherited from the worker config).
* @param connectorConfig the configuration for the connector; may not be null
* @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config;
* may be null, in which case no validation will be performed under the assumption that the
* connector will use inherit the converter settings from the worker. Some errors encountered
* during validation may be {@link ConfigValue#addErrorMessage(String) added} to this object
* @param pluginInterface the interface for the plugin type
* (e.g., {@code org.apache.kafka.connect.storage.Converter.class});
* may not be null
* @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef}
* from an instance of the plugin type (e.g., {@code Converter::config});
* may not be null
* @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"});
* may not be null
* @param pluginProperty the property used to define a custom class for the plugin type
* in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
* may not be null
* @param defaultProperties any default properties to include in the configuration that will be used for
* the plugin; may be null

* @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config,
* or null if either no custom validation was performed (possibly because no custom plugin was defined in the
* connector config), or if custom validation failed

* @param <T> the plugin class to perform validation for
*/
private <T> ConfigInfos validateConverterConfig(
Map<String, String> connectorConfig,
ConfigValue pluginConfigValue,
Class<T> pluginInterface,
Function<T, ConfigDef> configDefAccessor,
String pluginName,
String pluginProperty,
Map<String, String> defaultProperties,
Function<String, TemporaryStage> reportStage
) {
Objects.requireNonNull(connectorConfig);
Objects.requireNonNull(pluginInterface);
Objects.requireNonNull(configDefAccessor);
Objects.requireNonNull(pluginName);
Objects.requireNonNull(pluginProperty);

String pluginClass = connectorConfig.get(pluginProperty);

if (pluginClass == null
|| pluginConfigValue == null
|| !pluginConfigValue.errorMessages().isEmpty()
) {
// Either no custom converter was specified, or one was specified but there's a problem with it.
// No need to proceed any further.
return null;
}

T pluginInstance;
String stageDescription = "instantiating the connector's " + pluginName + " for validation";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the connector validation used the tempConnectors cache to re-use the connector objects.
Personally i'm fine with per-call instantiation, but thought it would be worth mentioning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, and definitely worth calling out. I suspect this won't be a resource bottleneck and GC will be sufficient to clean up these instances before they eat up too much memory, but if not then we can certainly look into caching these plugin instances.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This initializes the plugin with the wrong classloader (and I guess applies the configDefAccessor with the wrong classloader too).

There's already Plugins#newPlugin(String) which you could use, but it returns Object. Perhaps we could add a Class argument to make the type signature better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually correct. All calls to validateConverterConfig take place within a LoaderSwap that causes the connector's classloader to be used, which unless I'm mistaken matches the behavior when instantiating tasks (loader swap here, converter instantiation here). It's true that Plugins::newConverter and Plugins::newHeaderConverter are used instead of Utils::newInstance when starting tasks, but when invoking the Plugins methods with classLoaderUsage set to CURRENT_CLASSLOADER, no classloader swapping takes place, so the connector loader is still used.

} catch (ClassNotFoundException | RuntimeException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} catch (ClassNotFoundException | RuntimeException e) {
} catch (Exception e) {

log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to load class " + pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mutating the passed-in value is correct here, but it's not clear from the signature that this happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, added a note to the Javadoc.

return null;
}

try {
ConfigDef configDef;
stageDescription = "retrieving the configuration definition from the connector's " + pluginName;
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
configDef = configDefAccessor.apply(pluginInstance);
} catch (RuntimeException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} catch (RuntimeException e) {
} catch (Exception e) {

log.error("Failed to load ConfigDef from {} of type {}", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to load ConfigDef from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
}
if (configDef == null) {
log.warn("{}.config() has returned a null ConfigDef; no further preflight config validation for this converter will be performed", pluginClass);
// Older versions of Connect didn't do any converter validation.
// Even though converters are technically required to return a non-null ConfigDef object from their config() method,
// we permit this case in order to avoid breaking existing converters that, despite not adhering to this requirement,
// can be used successfully with a connector.
return null;
}
final String pluginPrefix = pluginProperty + ".";
Map<String, String> pluginConfig = Utils.entriesWithPrefix(connectorConfig, pluginPrefix);
if (defaultProperties != null)
defaultProperties.forEach(pluginConfig::putIfAbsent);

List<ConfigValue> configValues;
stageDescription = "performing config validation for the connector's " + pluginName;
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
configValues = configDef.validate(pluginConfig);
} catch (RuntimeException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} catch (RuntimeException e) {
} catch (Exception e) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this suggestion. When could a checked exception be thrown?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this and the Utils.newInstance comment i was thinking about preparing for #13185 but I suppose these can be addressed in that PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it--yeah, I'd prefer to leave as-is to reduce FUD in case #13185 doesn't get merged soon.

log.error("Failed to perform config validation for {} of type {}", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to perform config validation for " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
}

return prefixedConfigInfos(configDef.configKeys(), configValues, pluginPrefix);
} finally {
Utils.maybeCloseQuietly(pluginInstance, pluginName + " " + pluginClass);
}
}

private ConfigInfos validateHeaderConverterConfig(
Map<String, String> connectorConfig,
ConfigValue headerConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
headerConverterConfigValue,
HeaderConverter.class,
HeaderConverter::config,
"header converter",
HEADER_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()),
reportStage
);
}

private ConfigInfos validateKeyConverterConfig(
Map<String, String> connectorConfig,
ConfigValue keyConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
keyConverterConfigValue,
Converter.class,
Converter::config,
"key converter",
KEY_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName()),
reportStage
);
}

private ConfigInfos validateValueConverterConfig(
Map<String, String> connectorConfig,
ConfigValue valueConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
valueConverterConfigValue,
Converter.class,
Converter::config,
"value converter",
VALUE_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()),
reportStage
);
}

@Override
public void validateConnectorConfig(Map<String, String> connectorProps, Callback<ConfigInfos> callback) {
validateConnectorConfig(connectorProps, callback, true);
Expand Down Expand Up @@ -562,8 +724,25 @@ ConfigInfos validateConnectorConfig(
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());
ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));

// do custom converter-specific validation
ConfigInfos headerConverterConfigInfos = validateHeaderConverterConfig(
connectorProps,
validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG),
reportStage
);
ConfigInfos keyConverterConfigInfos = validateKeyConverterConfig(
connectorProps,
validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG),
reportStage
);
ConfigInfos valueConverterConfigInfos = validateValueConverterConfig(
connectorProps,
validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG),
reportStage
);

ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
AbstractConfig connectorConfig = new AbstractConfig(new ConfigDef(), connectorProps, doLog);
String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
ConfigInfos producerConfigInfos = null;
Expand Down Expand Up @@ -612,7 +791,15 @@ ConfigInfos validateConnectorConfig(
connectorClientConfigOverridePolicy);
}
}
return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos);
return mergeConfigInfos(connType,
configInfos,
producerConfigInfos,
consumerConfigInfos,
adminConfigInfos,
headerConverterConfigInfos,
keyConverterConfigInfos,
valueConverterConfigInfos
);
}
}

Expand All @@ -638,10 +825,6 @@ private static ConfigInfos validateClientOverrides(String connName,
org.apache.kafka.connect.health.ConnectorType connectorType,
ConnectorClientConfigRequest.ClientType clientType,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
int errorCount = 0;
List<ConfigInfo> configInfoList = new LinkedList<>();
Map<String, ConfigKey> configKeys = configDef.configKeys();
Set<String> groups = new LinkedHashSet<>();
Map<String, Object> clientConfigs = new HashMap<>();
for (Map.Entry<String, Object> rawClientConfig : connectorConfig.originalsWithPrefix(prefix).entrySet()) {
String configName = rawClientConfig.getKey();
Expand All @@ -655,27 +838,38 @@ private static ConfigInfos validateClientOverrides(String connName,
ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
connName, connectorType, connectorClass, clientConfigs, clientType);
List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
if (configValues != null) {
for (ConfigValue validatedConfigValue : configValues) {
ConfigKey configKey = configKeys.get(validatedConfigValue.name());
ConfigKeyInfo configKeyInfo = null;
if (configKey != null) {
if (configKey.group != null) {
groups.add(configKey.group);
}
configKeyInfo = convertConfigKey(configKey, prefix);
}

ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
if (!configValue.errorMessages().isEmpty()) {
errorCount++;
return prefixedConfigInfos(configDef.configKeys(), configValues, prefix);
}

private static ConfigInfos prefixedConfigInfos(Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, String prefix) {
int errorCount = 0;
Set<String> groups = new LinkedHashSet<>();
List<ConfigInfo> configInfos = new ArrayList<>();

if (configValues == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this null check is only relevant when the value is coming from the overridePolicy.validate, in validateConverterConfig, I think the ConfigDef#validate call will always be non-null.

Copy link
Contributor Author

@C0urante C0urante May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigDef::validate is non-final, and plugin instances may return a subclass from their config methods that possibly returns null.

I acknowledge that this is extremely unlikely, but it seems like this null guard is the best way to handle that scenario as opposed to, e.g., throwing an error and causing a 500 response to be returned. Thoughts?

return new ConfigInfos("", 0, new ArrayList<>(groups), configInfos);
}

for (ConfigValue validatedConfigValue : configValues) {
ConfigKey configKey = configKeys.get(validatedConfigValue.name());
ConfigKeyInfo configKeyInfo = null;
if (configKey != null) {
if (configKey.group != null) {
groups.add(configKey.group);
}
ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
configKeyInfo = convertConfigKey(configKey, prefix);
}

ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
if (configValue.errorMessages().size() > 0) {
errorCount++;
}
ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo));
}
return new ConfigInfos(connectorClass.toString(), errorCount, new ArrayList<>(groups), configInfoList);
return new ConfigInfos("", errorCount, new ArrayList<>(groups), configInfos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i don't like this empty string, but i see that it has no overall effect. I wonder if the validator can just return List and compute errorCount/groups at the end.

This doesn't have to be done in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a little weird with the empty string here. Hopefully it's fine for now but if we continue augmenting and refactoring this class I agree that it might be worth changing.

}

// public for testing
Expand Down
Expand Up @@ -529,13 +529,13 @@ ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) {
}
Utils.ensureConcreteSubclass(baseClass, cls);

T transformation;
T pluginInstance;
try {
transformation = Utils.newInstance(cls, baseClass);
pluginInstance = Utils.newInstance(cls, baseClass);
} catch (Exception e) {
throw new ConfigException(key, String.valueOf(cls), "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage());
}
ConfigDef configDef = config(transformation);
ConfigDef configDef = config(pluginInstance);
if (null == configDef) {
throw new ConnectException(
String.format(
Expand Down