Skip to content

Commit

Permalink
KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, valu…
Browse files Browse the repository at this point in the history
…e, and header converter classes (apache#14304)

Reviewers: Yash Mayya <yash.mayya@gmail.com>, Greg Harris <greg.harris@aiven.io>
  • Loading branch information
C0urante authored and anurag-harness committed Feb 9, 2024
1 parent 76db5fc commit 1e20ce3
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 41 deletions.
42 changes: 42 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;

import java.lang.reflect.Modifier;
import java.nio.BufferUnderflowException;
import java.nio.ByteOrder;
import java.nio.file.StandardOpenOption;
Expand Down Expand Up @@ -1108,6 +1109,15 @@ public interface UncheckedCloseable extends AutoCloseable {
void close();
}

/**
* Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface,
* and if an exception is thrown, it is logged at the WARN level.
*/
public static void maybeCloseQuietly(Object maybeCloseable, String name) {
if (maybeCloseable instanceof AutoCloseable)
closeQuietly((AutoCloseable) maybeCloseable, name);
}

/**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
* <b>Be cautious when passing method references as an argument.</b> For example:
Expand Down Expand Up @@ -1576,6 +1586,38 @@ public static String[] enumOptions(Class<? extends Enum<?>> enumClass) {
.toArray(String[]::new);
}

/**
* Ensure that the class is concrete (i.e., not abstract), and that it subclasses a given base class.
* If it is abstract or does not subclass the given base class, throw a {@link ConfigException}
* with a friendly error message suggesting a list of concrete child subclasses (if any are known).
* @param baseClass the expected superclass; may not be null
* @param klass the class to check; may not be null
* @throws ConfigException if the class is not concrete
*/
public static void ensureConcreteSubclass(Class<?> baseClass, Class<?> klass) {
Objects.requireNonNull(baseClass);
Objects.requireNonNull(klass);

if (!baseClass.isAssignableFrom(klass)) {
String inheritFrom = baseClass.isInterface() ? "implement" : "extend";
String baseClassType = baseClass.isInterface() ? "interface" : "class";
throw new ConfigException("Class " + klass + " does not " + inheritFrom + " the " + baseClass.getSimpleName() + " " + baseClassType);
}

if (Modifier.isAbstract(klass.getModifiers())) {
String childClassNames = Stream.of(klass.getClasses())
.filter(baseClass::isAssignableFrom)
.filter(c -> !Modifier.isAbstract(c.getModifiers()))
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = "This class is abstract and cannot be created.";
if (!Utils.isBlank(childClassNames))
message += " Did you mean " + childClassNames + "?";
throw new ConfigException(message);
}
}

/**
* Convert time instant to readable string for logging
* @param timestamp the timestamp of the instant to be converted.
Expand Down
Expand Up @@ -28,12 +28,15 @@
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.ConcreteSubClassValidator;
import org.apache.kafka.connect.util.InstantiableClassValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -42,7 +45,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
Expand Down Expand Up @@ -82,17 +84,29 @@ public class ConnectorConfig extends AbstractConfig {
public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC;
public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class";
private static final ConfigDef.Validator KEY_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(Converter.class),
new InstantiableClassValidator()
);

public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
private static final ConfigDef.Validator VALUE_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(Converter.class),
new InstantiableClassValidator()
);

public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class";
// The Connector config should not have a default for the header converter, since the absence of a config property means that
// the worker config settings should be used. Thus, we set the default to null here.
public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
private static final ConfigDef.Validator HEADER_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(HeaderConverter.class),
new InstantiableClassValidator()
);

public static final String TASKS_MAX_CONFIG = "tasks.max";
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
Expand Down Expand Up @@ -181,9 +195,9 @@ public static ConfigDef configDef() {
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
.define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
Expand Down Expand Up @@ -491,21 +505,11 @@ protected ConfigDef initialConfigDef() {
* @param cls The subclass of the baseclass.
*/
ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) {
if (cls == null || !baseClass.isAssignableFrom(cls)) {
throw new ConfigException(key, String.valueOf(cls), "Not a " + baseClass.getSimpleName());
}
if (Modifier.isAbstract(cls.getModifiers())) {
String childClassNames = Stream.of(cls.getClasses())
.filter(cls::isAssignableFrom)
.filter(c -> !Modifier.isAbstract(c.getModifiers()))
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = Utils.isBlank(childClassNames) ?
aliasKind + " is abstract and cannot be created." :
aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?";
throw new ConfigException(key, String.valueOf(cls), message);
if (cls == null) {
throw new ConfigException(key, null, "Not a " + baseClass.getSimpleName());
}
Utils.ensureConcreteSubclass(baseClass, cls);

T transformation;
try {
transformation = Utils.newInstance(cls, baseClass);
Expand Down
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;

public class ConcreteSubClassValidator implements ConfigDef.Validator {
private final Class<?> expectedSuperClass;

private ConcreteSubClassValidator(Class<?> expectedSuperClass) {
this.expectedSuperClass = expectedSuperClass;
}

public static ConcreteSubClassValidator forSuperClass(Class<?> expectedSuperClass) {
return new ConcreteSubClassValidator(expectedSuperClass);
}

@Override
public void ensureValid(String name, Object value) {
if (value == null) {
// The value will be null if the class couldn't be found; no point in performing follow-up validation
return;
}

Class<?> cls = (Class<?>) value;
Utils.ensureConcreteSubclass(expectedSuperClass, cls);
}

@Override
public String toString() {
return "A concrete subclass of " + expectedSuperClass.getName();
}
}
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;

public class InstantiableClassValidator implements ConfigDef.Validator {

@Override
public void ensureValid(String name, Object value) {
if (value == null) {
// The value will be null if the class couldn't be found; no point in performing follow-up validation
return;
}

Class<?> cls = (Class<?>) value;
try {
Object o = cls.getDeclaredConstructor().newInstance();
Utils.maybeCloseQuietly(o, o + " (instantiated for preflight validation");
} catch (NoSuchMethodException | IllegalAccessException e) {
throw new ConfigException(name, cls.getName(), "Could not find a public no-argument constructor for class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
} catch (ReflectiveOperationException | LinkageError | RuntimeException e) {
throw new ConfigException(name, cls.getName(), "Could not instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
}
}

@Override
public String toString() {
return "A class with a public, no-argument constructor";
}
}

0 comments on commit 1e20ce3

Please sign in to comment.