Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13327: Gracefully report connector validation errors instead of returning 500 responses #14303

Merged
merged 4 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
}

protected Map<String, ConfigValue> validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map<String, String> config) {
return configDef.validateAll(config);
Map<String, ConfigValue> result = configDef.validateAll(config);
SinkConnectorConfig.validate(config, result);
return result;
}

protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector connector, ConfigDef configDef, Map<String, String> config) {
Expand Down Expand Up @@ -478,7 +480,6 @@ ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, boolean
enrichedConfigDef = ConnectorConfig.enrich(plugins(), SourceConnectorConfig.configDef(), connectorProps, false);
validatedConnectorConfig = validateSourceConnectorConfig((SourceConnector) connector, enrichedConfigDef, connectorProps);
} else {
SinkConnectorConfig.validate(connectorProps);
connectorType = org.apache.kafka.connect.health.ConnectorType.SINK;
enrichedConfigDef = ConnectorConfig.enrich(plugins(), SinkConnectorConfig.configDef(), connectorProps, false);
validatedConnectorConfig = validateSinkConnectorConfig((SinkConnector) connector, enrichedConfigDef, connectorProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,9 @@ void enrich(ConfigDef newDef) {
final ConfigDef.Validator typeValidator = ConfigDef.LambdaValidator.with(
(String name, Object value) -> {
validateProps(prefix);
getConfigDefFromConfigProvidingClass(typeConfig, (Class<?>) value);
if (value != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value here is null if the class couldn't be loaded, in which case, it's not necessary to try to do any further kind of validation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this hides the exception message from "Not a (something)" and only shows the "Missing required configuration (name) which has no default value". I think that is reasonable.

The other call-site for this method is ConnectorConfig.EnrichablePlugin which swallows this error on the validate() code path and propagates them when instantiating the ConnectorConfig. Instantiating the ConnectorConfig will also throw the "Missing required configuration" error, so it is not necessary to throw the error.

I think you could safely change the getConfigDefFromConfigProvidingClass implementation to return an empty stream when the value is null, rather than throwing an exception. I don't think this is necessary, but maybe it keeps these two code paths more similar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could safely change the getConfigDefFromConfigProvidingClass implementation to return an empty stream when the value is null, rather than throwing an exception.

Are you sure? It looks like the error swallowing in ConfigDef.EnrichablePlugin::populateConfigDef takes place conditionally, and we still do throw exceptions that originate from the getConfigDefFromConfigProvidingClass sometimes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the error swallowing in ConfigDef.EnrichablePlugin::populateConfigDef takes place conditionally

Yeah, the condition depends on whether validation is being executed or whether the full ConnectorConfig is being constructed.

we still do throw exceptions that originate from the getConfigDefFromConfigProvidingClass sometimes.

I think the other exceptions for non-null classes are fine to leave as-is. I think we could only reasonably change the null-class behavior.

Copy link
Contributor Author

@C0urante C0urante Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I misread your comment!

I'm hesitant to change the behavior of instantiating a ConnectorConfig unless it provides a practical, positive impact for users. It doesn't seem significantly cleaner to move the null check either, since guarding against null Class<?> instances is an inherent necessity of the current ConfigDef.Validator API (and one that's taken into account with validator classes added in #14304).

I've added a comment indicating when we expect null values in the validator. I'm hopeful that this should be enough to offset the cognitive load added with this change.

getConfigDefFromConfigProvidingClass(typeConfig, (Class<?>) value);
}
},
() -> "valid configs for " + alias + " " + aliasKind.toLowerCase(Locale.ENGLISH));
newDef.define(typeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.util.RegexValidator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -90,50 +93,83 @@ public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
* @param props sink configuration properties
*/
public static void validate(Map<String, String> props) {
final boolean hasTopicsConfig = hasTopicsConfig(props);
final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
final boolean hasDlqTopicConfig = hasDlqTopicConfig(props);
Map<String, ConfigValue> validatedConfig = new LinkedHashMap<>();
validate(props, validatedConfig);
validatedConfig.values().stream()
.filter(configValue -> !configValue.errorMessages().isEmpty())
.findFirst()
.ifPresent(configValue -> {
throw new ConfigException(configValue.name(), configValue.value(), configValue.errorMessages().get(0));
});
}

/**
* Perform preflight validation for the sink-specific properties for a connector.
*
* @param props the configuration for the sink connector
* @param validatedConfig any already-known {@link ConfigValue validation results} for the configuration.
* May be empty, but may not be null. Any configuration errors discovered by this method will
* be {@link ConfigValue#addErrorMessage(String) added} to a value in this map, adding a new
* entry if one for the problematic property does not already exist.
*/
public static void validate(Map<String, String> props, Map<String, ConfigValue> validatedConfig) {
final String topicsList = props.get(TOPICS_CONFIG);
final String topicsRegex = props.get(TOPICS_REGEX_CONFIG);
final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, "").trim();
final boolean hasTopicsConfig = !Utils.isBlank(topicsList);
final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex);
final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic);

if (hasTopicsConfig && hasTopicsRegexConfig) {
throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
" are mutually exclusive options, but both are set.");
String errorMessage = TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set.";
addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage);
addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, errorMessage);
}

if (!hasTopicsConfig && !hasTopicsRegexConfig) {
throw new ConfigException("Must configure one of " +
SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
String errorMessage = "Must configure one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG;
addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage);
addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, errorMessage);
}

if (hasDlqTopicConfig) {
String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
if (hasTopicsConfig) {
List<String> topics = parseTopicsList(props);
if (topics.contains(dlqTopic)) {
throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the list of "
+ "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics));
String errorMessage = String.format(
"The DLQ topic '%s' may not be included in the list of topics ('%s=%s') consumed by the connector",
dlqTopic, TOPICS_CONFIG, topics
);
addErrorMessage(validatedConfig, TOPICS_CONFIG, topicsList, errorMessage);
}
}
if (hasTopicsRegexConfig) {
String topicsRegexStr = props.get(SinkTask.TOPICS_REGEX_CONFIG);
Pattern pattern = Pattern.compile(topicsRegexStr);
Pattern pattern = Pattern.compile(topicsRegex);
if (pattern.matcher(dlqTopic).matches()) {
throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the regex matching the "
+ "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr));
String errorMessage = String.format(
"The DLQ topic '%s' may not be included in the regex matching the topics ('%s=%s') consumed by the connector",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use "match" rather than "include" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing language on trunk, but I agree that "match" is clearer than "include". Done 👍

dlqTopic, TOPICS_REGEX_CONFIG, topicsRegex
);
addErrorMessage(validatedConfig, TOPICS_REGEX_CONFIG, topicsRegex, errorMessage);
}
}
}
}

private static void addErrorMessage(Map<String, ConfigValue> validatedConfig, String name, String value, String errorMessage) {
validatedConfig.computeIfAbsent(
name,
p -> new ConfigValue(name, value, Collections.emptyList(), new ArrayList<>())
).addErrorMessage(
errorMessage
);
}

public static boolean hasTopicsConfig(Map<String, String> props) {
String topicsStr = props.get(TOPICS_CONFIG);
return !Utils.isBlank(topicsStr);
}

public static boolean hasTopicsRegexConfig(Map<String, String> props) {
String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
return !Utils.isBlank(topicsRegexStr);
}

public static boolean hasDlqTopicConfig(Map<String, String> props) {
String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG);
return !Utils.isBlank(dqlTopicStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
Expand Down Expand Up @@ -906,7 +908,7 @@ public void deleteConnectorConfig(final String connName, final Callback<Created<
@Override
protected Map<String, ConfigValue> validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map<String, String> config) {
Map<String, ConfigValue> result = super.validateSinkConnectorConfig(connector, configDef, config);
validateSinkConnectorGroupId(result);
validateSinkConnectorGroupId(config, result);
return result;
}

Expand All @@ -919,12 +921,25 @@ protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector
}


private void validateSinkConnectorGroupId(Map<String, ConfigValue> validatedConfig) {
ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG);
String name = (String) validatedName.value();
if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
validatedName.addErrorMessage("Consumer group for sink connector named " + name +
" conflicts with Connect worker group " + workerGroupId);
private void validateSinkConnectorGroupId(Map<String, String> config, Map<String, ConfigValue> validatedConfig) {
String overriddenConsumerGroupIdConfig = CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + GROUP_ID_CONFIG;
if (config.containsKey(overriddenConsumerGroupIdConfig)) {
String consumerGroupId = config.get(overriddenConsumerGroupIdConfig);
ConfigValue validatedGroupId = validatedConfig.computeIfAbsent(
overriddenConsumerGroupIdConfig,
p -> new ConfigValue(overriddenConsumerGroupIdConfig, consumerGroupId, Collections.emptyList(), new ArrayList<>())
);
if (workerGroupId.equals(consumerGroupId)) {
validatedGroupId.addErrorMessage("Consumer group " + consumerGroupId +
" conflicts with Connect worker group " + workerGroupId);
}
} else {
ConfigValue validatedName = validatedConfig.get(ConnectorConfig.NAME_CONFIG);
String name = (String) validatedName.value();
if (workerGroupId.equals(SinkUtils.consumerGroupId(name))) {
validatedName.addErrorMessage("Consumer group for sink connector named " + name +
" conflicts with Connect worker group " + workerGroupId);
}
}
}

Expand Down