From 1e20ce321a06d1eab3da9f3136dd5440359db78f Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 8 Dec 2023 15:00:44 -0500 Subject: [PATCH] KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes (#14304) Reviewers: Yash Mayya , Greg Harris --- .../org/apache/kafka/common/utils/Utils.java | 42 +++++ .../connect/runtime/ConnectorConfig.java | 42 ++--- .../util/ConcreteSubClassValidator.java | 48 ++++++ .../util/InstantiableClassValidator.java | 47 +++++ .../ConnectorValidationIntegrationTest.java | 161 ++++++++++++++++++ .../connect/runtime/ConnectorConfigTest.java | 38 ++--- 6 files changed, 337 insertions(+), 41 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 6e1c3cefbb1d..924b9792fa82 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.utils; +import java.lang.reflect.Modifier; import java.nio.BufferUnderflowException; import java.nio.ByteOrder; import java.nio.file.StandardOpenOption; @@ -1108,6 +1109,15 @@ public interface UncheckedCloseable extends AutoCloseable { void close(); } + /** + * Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface, + * and if an exception is thrown, it is logged at the WARN level. + */ + public static void maybeCloseQuietly(Object maybeCloseable, String name) { + if (maybeCloseable instanceof AutoCloseable) + closeQuietly((AutoCloseable) maybeCloseable, name); + } + /** * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level. * Be cautious when passing method references as an argument. For example: @@ -1576,6 +1586,38 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } + /** + * Ensure that the class is concrete (i.e., not abstract), and that it subclasses a given base class. + * If it is abstract or does not subclass the given base class, throw a {@link ConfigException} + * with a friendly error message suggesting a list of concrete child subclasses (if any are known). + * @param baseClass the expected superclass; may not be null + * @param klass the class to check; may not be null + * @throws ConfigException if the class is not concrete + */ + public static void ensureConcreteSubclass(Class baseClass, Class klass) { + Objects.requireNonNull(baseClass); + Objects.requireNonNull(klass); + + if (!baseClass.isAssignableFrom(klass)) { + String inheritFrom = baseClass.isInterface() ? "implement" : "extend"; + String baseClassType = baseClass.isInterface() ? "interface" : "class"; + throw new ConfigException("Class " + klass + " does not " + inheritFrom + " the " + baseClass.getSimpleName() + " " + baseClassType); + } + + if (Modifier.isAbstract(klass.getModifiers())) { + String childClassNames = Stream.of(klass.getClasses()) + .filter(baseClass::isAssignableFrom) + .filter(c -> !Modifier.isAbstract(c.getModifiers())) + .filter(c -> Modifier.isPublic(c.getModifiers())) + .map(Class::getName) + .collect(Collectors.joining(", ")); + String message = "This class is abstract and cannot be created."; + if (!Utils.isBlank(childClassNames)) + message += " Did you mean " + childClassNames + "?"; + throw new ConfigException(message); + } + } + /** * Convert time instant to readable string for logging * @param timestamp the timestamp of the instant to be converted. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 4f20a32f81b2..f33f00e40efa 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -28,12 +28,15 @@ import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; +import org.apache.kafka.connect.util.ConcreteSubClassValidator; +import org.apache.kafka.connect.util.InstantiableClassValidator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -42,7 +45,6 @@ import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars; @@ -82,10 +84,18 @@ public class ConnectorConfig extends AbstractConfig { public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG; public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC; public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class"; + private static final ConfigDef.Validator KEY_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of( + ConcreteSubClassValidator.forSuperClass(Converter.class), + new InstantiableClassValidator() + ); public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG; public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC; public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class"; + private static final ConfigDef.Validator VALUE_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of( + ConcreteSubClassValidator.forSuperClass(Converter.class), + new InstantiableClassValidator() + ); public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG; public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC; @@ -93,6 +103,10 @@ public class ConnectorConfig extends AbstractConfig { // The Connector config should not have a default for the header converter, since the absence of a config property means that // the worker config settings should be used. Thus, we set the default to null here. public static final String HEADER_CONVERTER_CLASS_DEFAULT = null; + private static final ConfigDef.Validator HEADER_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of( + ConcreteSubClassValidator.forSuperClass(HeaderConverter.class), + new InstantiableClassValidator() + ); public static final String TASKS_MAX_CONFIG = "tasks.max"; private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; @@ -181,9 +195,9 @@ public static ConfigDef configDef() { .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY) .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY) .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY) - .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY) - .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY) - .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY) + .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY) + .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY) + .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY) .define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY) .define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY) .define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART, @@ -491,21 +505,11 @@ protected ConfigDef initialConfigDef() { * @param cls The subclass of the baseclass. */ ConfigDef getConfigDefFromConfigProvidingClass(String key, Class cls) { - if (cls == null || !baseClass.isAssignableFrom(cls)) { - throw new ConfigException(key, String.valueOf(cls), "Not a " + baseClass.getSimpleName()); - } - if (Modifier.isAbstract(cls.getModifiers())) { - String childClassNames = Stream.of(cls.getClasses()) - .filter(cls::isAssignableFrom) - .filter(c -> !Modifier.isAbstract(c.getModifiers())) - .filter(c -> Modifier.isPublic(c.getModifiers())) - .map(Class::getName) - .collect(Collectors.joining(", ")); - String message = Utils.isBlank(childClassNames) ? - aliasKind + " is abstract and cannot be created." : - aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?"; - throw new ConfigException(key, String.valueOf(cls), message); + if (cls == null) { + throw new ConfigException(key, null, "Not a " + baseClass.getSimpleName()); } + Utils.ensureConcreteSubclass(baseClass, cls); + T transformation; try { transformation = Utils.newInstance(cls, baseClass); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java new file mode 100644 index 000000000000..cbb4dedb126d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.Utils; + +public class ConcreteSubClassValidator implements ConfigDef.Validator { + private final Class expectedSuperClass; + + private ConcreteSubClassValidator(Class expectedSuperClass) { + this.expectedSuperClass = expectedSuperClass; + } + + public static ConcreteSubClassValidator forSuperClass(Class expectedSuperClass) { + return new ConcreteSubClassValidator(expectedSuperClass); + } + + @Override + public void ensureValid(String name, Object value) { + if (value == null) { + // The value will be null if the class couldn't be found; no point in performing follow-up validation + return; + } + + Class cls = (Class) value; + Utils.ensureConcreteSubclass(expectedSuperClass, cls); + } + + @Override + public String toString() { + return "A concrete subclass of " + expectedSuperClass.getName(); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java new file mode 100644 index 000000000000..796be4ed4833 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +public class InstantiableClassValidator implements ConfigDef.Validator { + + @Override + public void ensureValid(String name, Object value) { + if (value == null) { + // The value will be null if the class couldn't be found; no point in performing follow-up validation + return; + } + + Class cls = (Class) value; + try { + Object o = cls.getDeclaredConstructor().newInstance(); + Utils.maybeCloseQuietly(o, o + " (instantiated for preflight validation"); + } catch (NoSuchMethodException | IllegalAccessException e) { + throw new ConfigException(name, cls.getName(), "Could not find a public no-argument constructor for class" + (e.getMessage() != null ? ": " + e.getMessage() : "")); + } catch (ReflectiveOperationException | LinkageError | RuntimeException e) { + throw new ConfigException(name, cls.getName(), "Could not instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : "")); + } + } + + @Override + public String toString() { + return "A class with a public, no-argument constructor"; + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java index 88646112ffca..b3e37c9eeeb8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java @@ -16,7 +16,13 @@ */ package org.apache.kafka.connect.integration; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.transforms.Filter; import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone; @@ -27,6 +33,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -296,6 +303,55 @@ public void testConnectorHasMissingConverterClass() throws InterruptedException ); } + @Test + public void testConnectorHasInvalidConverterClassType() throws InterruptedException { + Map config = defaultSinkConnectorProps(); + config.put(KEY_CONVERTER_CLASS_CONFIG, MonitorableSinkConnector.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a converter with a class of the wrong type is specified", + 0 + ); + } + + @Test + public void testConnectorHasAbstractConverter() throws InterruptedException { + Map config = defaultSinkConnectorProps(); + config.put(KEY_CONVERTER_CLASS_CONFIG, AbstractTestConverter.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when an abstract converter class is specified" + ); + } + + @Test + public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException { + Map config = defaultSinkConnectorProps(); + config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a converter class with no suitable constructor is specified" + ); + } + + @Test + public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException { + Map config = defaultSinkConnectorProps(); + config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a converter class that throws an exception on instantiation is specified" + ); + } + @Test public void testConnectorHasMissingHeaderConverterClass() throws InterruptedException { Map config = defaultSinkConnectorProps(); @@ -309,6 +365,111 @@ public void testConnectorHasMissingHeaderConverterClass() throws InterruptedExce ); } + @Test + public void testConnectorHasInvalidHeaderConverterClassType() throws InterruptedException { + Map config = defaultSinkConnectorProps(); + config.put(HEADER_CONVERTER_CLASS_CONFIG, MonitorableSinkConnector.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a header converter with a class of the wrong type is specified" + ); + } + + @Test + public void testConnectorHasAbstractHeaderConverter() throws InterruptedException { + Map config = defaultSinkConnectorProps(); + config.put(HEADER_CONVERTER_CLASS_CONFIG, AbstractTestConverter.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when an abstract header converter class is specified" + ); + } + + @Test + public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws InterruptedException { + Map config = defaultSinkConnectorProps(); + config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a header converter class with no suitable constructor is specified" + ); + } + + @Test + public void testConnectorHasHeaderConverterThatThrowsExceptionOnInstantiation() throws InterruptedException { + Map config = defaultSinkConnectorProps(); + config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName()); + connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( + config.get(CONNECTOR_CLASS_CONFIG), + config, + 1, + "Connector config should fail preflight validation when a header converter class that throws an exception on instantiation is specified" + ); + } + + public static abstract class TestConverter implements Converter, HeaderConverter { + + // Defined by both Converter and HeaderConverter interfaces + @Override + public ConfigDef config() { + return null; + } + + // Defined by Converter interface + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + return null; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + return null; + } + + // Defined by HeaderConverter interface + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + } + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return null; + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return new byte[0]; + } + } + + public static abstract class AbstractTestConverter extends TestConverter { + } + + public static class TestConverterWithPrivateConstructor extends TestConverter { + private TestConverterWithPrivateConstructor() { + } + } + + public static class TestConverterWithConstructorThatThrowsException extends TestConverter { + public TestConverterWithConstructorThatThrowsException() { + throw new ConnectException("whoops"); + } + } + private Map defaultSourceConnectorProps() { // setup up props for the source connector Map props = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java index c113039912e4..f4e890fcaf36 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java @@ -198,13 +198,10 @@ public void abstractTransform() { props.put("connector.class", TestConnector.class.getName()); props.put("transforms", "a"); props.put("transforms.a.type", AbstractTransformation.class.getName()); - try { - new ConnectorConfig(MOCK_PLUGINS, props); - } catch (ConfigException ex) { - assertTrue( - ex.getMessage().contains("Transformation is abstract and cannot be created.") - ); - } + ConfigException ex = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props)); + assertTrue( + ex.getMessage().contains("This class is abstract and cannot be created.") + ); } @Test public void abstractKeyValueTransform() { @@ -213,19 +210,16 @@ public void abstractKeyValueTransform() { props.put("connector.class", TestConnector.class.getName()); props.put("transforms", "a"); props.put("transforms.a.type", AbstractKeyValueTransformation.class.getName()); - try { - new ConnectorConfig(MOCK_PLUGINS, props); - } catch (ConfigException ex) { - assertTrue( - ex.getMessage().contains("Transformation is abstract and cannot be created.") - ); - assertTrue( - ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName()) - ); - assertTrue( - ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName()) - ); - } + ConfigException ex = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props)); + assertTrue( + ex.getMessage().contains("This class is abstract and cannot be created.") + ); + assertTrue( + ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName()) + ); + assertTrue( + ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName()) + ); } @Test @@ -240,7 +234,7 @@ public void wrongPredicateType() { props.put("predicates", "my-pred"); props.put("predicates.my-pred.type", TestConnector.class.getName()); ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props)); - assertTrue(e.getMessage().contains("Not a Predicate")); + assertEquals("Class " + TestConnector.class + " does not implement the Predicate interface", e.getMessage()); } @Test @@ -287,7 +281,7 @@ public void abstractPredicate() { props.put("predicates.my-pred.type", AbstractTestPredicate.class.getName()); props.put("predicates.my-pred.int", "84"); ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props)); - assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created")); + assertTrue(e.getMessage().contains("This class is abstract and cannot be created")); } private void assertTransformationStageWithPredicate(Map props, boolean expectedNegated) {