Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes #14304

Merged
merged 9 commits into from Dec 8, 2023
42 changes: 42 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;

import java.lang.reflect.Modifier;
import java.nio.BufferUnderflowException;
import java.nio.ByteOrder;
import java.nio.file.StandardOpenOption;
Expand Down Expand Up @@ -1097,6 +1098,15 @@ public interface UncheckedCloseable extends AutoCloseable {
void close();
}

/**
* Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface,
* and if an exception is thrown, it is logged at the WARN level.
*/
public static void maybeCloseQuietly(Object maybeCloseable, String name) {
if (maybeCloseable instanceof AutoCloseable)
closeQuietly((AutoCloseable) maybeCloseable, name);
}

/**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
* <b>Be cautious when passing method references as an argument.</b> For example:
Expand Down Expand Up @@ -1565,6 +1575,38 @@ public static String[] enumOptions(Class<? extends Enum<?>> enumClass) {
.toArray(String[]::new);
}

/**
* Ensure that the class is concrete (i.e., not abstract), and that it subclasses a given base class.
* If it is abstract or does not subclass the given base class, throw a {@link ConfigException}
* with a friendly error message suggesting a list of concrete child subclasses (if any are known).
* @param baseClass the expected superclass; may not be null
* @param klass the class to check; may not be null
* @throws ConfigException if the class is not concrete
*/
public static void ensureConcreteSubclass(Class<?> baseClass, Class<?> klass) {
Objects.requireNonNull(baseClass);
Objects.requireNonNull(klass);

if (!baseClass.isAssignableFrom(klass)) {
String inheritFrom = baseClass.isInterface() ? "implement" : "extend";
String baseClassType = baseClass.isInterface() ? "interface" : "class";
throw new ConfigException("Class " + klass + " does not " + inheritFrom + " the " + baseClass.getSimpleName() + " " + baseClassType);
}

if (Modifier.isAbstract(klass.getModifiers())) {
String childClassNames = Stream.of(klass.getClasses())
.filter(baseClass::isAssignableFrom)
.filter(c -> !Modifier.isAbstract(c.getModifiers()))
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = "This class is abstract and cannot be created.";
if (!Utils.isBlank(childClassNames))
message += " Did you mean " + childClassNames + "?";
throw new ConfigException(message);
}
}

/**
* Convert time instant to readable string for logging
* @param timestamp the timestamp of the instant to be converted.
Expand Down
Expand Up @@ -28,12 +28,15 @@
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.ConcreteSubClassValidator;
import org.apache.kafka.connect.util.InstantiableClassValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -42,7 +45,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
Expand Down Expand Up @@ -82,17 +84,29 @@ public class ConnectorConfig extends AbstractConfig {
public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC;
public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class";
private static final ConfigDef.Validator KEY_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(Converter.class),
new InstantiableClassValidator()
);

public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
private static final ConfigDef.Validator VALUE_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(Converter.class),
new InstantiableClassValidator()
);

public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class";
// The Connector config should not have a default for the header converter, since the absence of a config property means that
// the worker config settings should be used. Thus, we set the default to null here.
public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
private static final ConfigDef.Validator HEADER_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(HeaderConverter.class),
new InstantiableClassValidator()
C0urante marked this conversation as resolved.
Show resolved Hide resolved
);

public static final String TASKS_MAX_CONFIG = "tasks.max";
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
Expand Down Expand Up @@ -181,9 +195,9 @@ public static ConfigDef configDef() {
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
.define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
Expand Down Expand Up @@ -491,21 +505,11 @@ protected ConfigDef initialConfigDef() {
* @param cls The subclass of the baseclass.
*/
ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) {
if (cls == null || !baseClass.isAssignableFrom(cls)) {
throw new ConfigException(key, String.valueOf(cls), "Not a " + baseClass.getSimpleName());
}
if (Modifier.isAbstract(cls.getModifiers())) {
String childClassNames = Stream.of(cls.getClasses())
.filter(cls::isAssignableFrom)
.filter(c -> !Modifier.isAbstract(c.getModifiers()))
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = Utils.isBlank(childClassNames) ?
aliasKind + " is abstract and cannot be created." :
aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?";
throw new ConfigException(key, String.valueOf(cls), message);
if (cls == null) {
throw new ConfigException(key, null, "Not a " + baseClass.getSimpleName());
}
Utils.ensureConcreteSubclass(baseClass, cls);

T transformation;
try {
transformation = Utils.newInstance(cls, baseClass);
Expand Down
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;

public class ConcreteSubClassValidator implements ConfigDef.Validator {
private final Class<?> expectedSuperClass;

private ConcreteSubClassValidator(Class<?> expectedSuperClass) {
this.expectedSuperClass = expectedSuperClass;
}

public static ConcreteSubClassValidator forSuperClass(Class<?> expectedSuperClass) {
C0urante marked this conversation as resolved.
Show resolved Hide resolved
return new ConcreteSubClassValidator(expectedSuperClass);
}

@Override
public void ensureValid(String name, Object value) {
if (value == null) {
// The value will be null if the class couldn't be found; no point in performing follow-up validation
return;
}
yashmayya marked this conversation as resolved.
Show resolved Hide resolved

Class<?> cls = (Class<?>) value;
Utils.ensureConcreteSubclass(expectedSuperClass, cls);
}

@Override
public String toString() {
return "A concrete subclass of " + expectedSuperClass.getName();
}
}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;

public class InstantiableClassValidator implements ConfigDef.Validator {

@Override
public void ensureValid(String name, Object value) {
if (value == null) {
// The value will be null if the class couldn't be found; no point in performing follow-up validation
return;
}

Class<?> cls = (Class<?>) value;
try {
Object o = cls.getDeclaredConstructor().newInstance();
Utils.maybeCloseQuietly(o, o + " (instantiated for preflight validation");
} catch (NoSuchMethodException | IllegalAccessException e) {
throw new ConfigException(name, cls.getName(), "Could not find a public no-argument constructor for class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
} catch (ReflectiveOperationException | LinkageError | RuntimeException e) {
throw new ConfigException(name, cls.getName(), "Could not instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
}
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public String toString() {
return "A class with a public, no-argument constructor";
}
}