diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 6e1c3cefbb1d..924b9792fa82 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;
+import java.lang.reflect.Modifier;
import java.nio.BufferUnderflowException;
import java.nio.ByteOrder;
import java.nio.file.StandardOpenOption;
@@ -1108,6 +1109,15 @@ public interface UncheckedCloseable extends AutoCloseable {
void close();
}
+ /**
+ * Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface,
+ * and if an exception is thrown, it is logged at the WARN level.
+ */
+ public static void maybeCloseQuietly(Object maybeCloseable, String name) {
+ if (maybeCloseable instanceof AutoCloseable)
+ closeQuietly((AutoCloseable) maybeCloseable, name);
+ }
+
/**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
* Be cautious when passing method references as an argument. For example:
@@ -1576,6 +1586,38 @@ public static String[] enumOptions(Class extends Enum>> enumClass) {
.toArray(String[]::new);
}
+ /**
+ * Ensure that the class is concrete (i.e., not abstract), and that it subclasses a given base class.
+ * If it is abstract or does not subclass the given base class, throw a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child subclasses (if any are known).
+ * @param baseClass the expected superclass; may not be null
+ * @param klass the class to check; may not be null
+ * @throws ConfigException if the class is not concrete
+ */
+ public static void ensureConcreteSubclass(Class> baseClass, Class> klass) {
+ Objects.requireNonNull(baseClass);
+ Objects.requireNonNull(klass);
+
+ if (!baseClass.isAssignableFrom(klass)) {
+ String inheritFrom = baseClass.isInterface() ? "implement" : "extend";
+ String baseClassType = baseClass.isInterface() ? "interface" : "class";
+ throw new ConfigException("Class " + klass + " does not " + inheritFrom + " the " + baseClass.getSimpleName() + " " + baseClassType);
+ }
+
+ if (Modifier.isAbstract(klass.getModifiers())) {
+ String childClassNames = Stream.of(klass.getClasses())
+ .filter(baseClass::isAssignableFrom)
+ .filter(c -> !Modifier.isAbstract(c.getModifiers()))
+ .filter(c -> Modifier.isPublic(c.getModifiers()))
+ .map(Class::getName)
+ .collect(Collectors.joining(", "));
+ String message = "This class is abstract and cannot be created.";
+ if (!Utils.isBlank(childClassNames))
+ message += " Did you mean " + childClassNames + "?";
+ throw new ConfigException(message);
+ }
+ }
+
/**
* Convert time instant to readable string for logging
* @param timestamp the timestamp of the instant to be converted.
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 4f20a32f81b2..f33f00e40efa 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -28,12 +28,15 @@
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
+import org.apache.kafka.connect.util.ConcreteSubClassValidator;
+import org.apache.kafka.connect.util.InstantiableClassValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -42,7 +45,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -82,10 +84,18 @@ public class ConnectorConfig extends AbstractConfig {
public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC;
public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class";
+ private static final ConfigDef.Validator KEY_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
+ ConcreteSubClassValidator.forSuperClass(Converter.class),
+ new InstantiableClassValidator()
+ );
public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
+ private static final ConfigDef.Validator VALUE_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
+ ConcreteSubClassValidator.forSuperClass(Converter.class),
+ new InstantiableClassValidator()
+ );
public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
@@ -93,6 +103,10 @@ public class ConnectorConfig extends AbstractConfig {
// The Connector config should not have a default for the header converter, since the absence of a config property means that
// the worker config settings should be used. Thus, we set the default to null here.
public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
+ private static final ConfigDef.Validator HEADER_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
+ ConcreteSubClassValidator.forSuperClass(HeaderConverter.class),
+ new InstantiableClassValidator()
+ );
public static final String TASKS_MAX_CONFIG = "tasks.max";
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
@@ -181,9 +195,9 @@ public static ConfigDef configDef() {
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
- .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
- .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
- .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
+ .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
+ .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
+ .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
.define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
@@ -491,21 +505,11 @@ protected ConfigDef initialConfigDef() {
* @param cls The subclass of the baseclass.
*/
ConfigDef getConfigDefFromConfigProvidingClass(String key, Class> cls) {
- if (cls == null || !baseClass.isAssignableFrom(cls)) {
- throw new ConfigException(key, String.valueOf(cls), "Not a " + baseClass.getSimpleName());
- }
- if (Modifier.isAbstract(cls.getModifiers())) {
- String childClassNames = Stream.of(cls.getClasses())
- .filter(cls::isAssignableFrom)
- .filter(c -> !Modifier.isAbstract(c.getModifiers()))
- .filter(c -> Modifier.isPublic(c.getModifiers()))
- .map(Class::getName)
- .collect(Collectors.joining(", "));
- String message = Utils.isBlank(childClassNames) ?
- aliasKind + " is abstract and cannot be created." :
- aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?";
- throw new ConfigException(key, String.valueOf(cls), message);
+ if (cls == null) {
+ throw new ConfigException(key, null, "Not a " + baseClass.getSimpleName());
}
+ Utils.ensureConcreteSubclass(baseClass, cls);
+
T transformation;
try {
transformation = Utils.newInstance(cls, baseClass);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java
new file mode 100644
index 000000000000..cbb4dedb126d
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+
+public class ConcreteSubClassValidator implements ConfigDef.Validator {
+ private final Class> expectedSuperClass;
+
+ private ConcreteSubClassValidator(Class> expectedSuperClass) {
+ this.expectedSuperClass = expectedSuperClass;
+ }
+
+ public static ConcreteSubClassValidator forSuperClass(Class> expectedSuperClass) {
+ return new ConcreteSubClassValidator(expectedSuperClass);
+ }
+
+ @Override
+ public void ensureValid(String name, Object value) {
+ if (value == null) {
+ // The value will be null if the class couldn't be found; no point in performing follow-up validation
+ return;
+ }
+
+ Class> cls = (Class>) value;
+ Utils.ensureConcreteSubclass(expectedSuperClass, cls);
+ }
+
+ @Override
+ public String toString() {
+ return "A concrete subclass of " + expectedSuperClass.getName();
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java
new file mode 100644
index 000000000000..796be4ed4833
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+public class InstantiableClassValidator implements ConfigDef.Validator {
+
+ @Override
+ public void ensureValid(String name, Object value) {
+ if (value == null) {
+ // The value will be null if the class couldn't be found; no point in performing follow-up validation
+ return;
+ }
+
+ Class> cls = (Class>) value;
+ try {
+ Object o = cls.getDeclaredConstructor().newInstance();
+ Utils.maybeCloseQuietly(o, o + " (instantiated for preflight validation");
+ } catch (NoSuchMethodException | IllegalAccessException e) {
+ throw new ConfigException(name, cls.getName(), "Could not find a public no-argument constructor for class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+ } catch (ReflectiveOperationException | LinkageError | RuntimeException e) {
+ throw new ConfigException(name, cls.getName(), "Could not instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "A class with a public, no-argument constructor";
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
index 88646112ffca..b3e37c9eeeb8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
@@ -16,7 +16,13 @@
*/
package org.apache.kafka.connect.integration;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
@@ -27,6 +33,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -296,6 +303,55 @@ public void testConnectorHasMissingConverterClass() throws InterruptedException
);
}
+ @Test
+ public void testConnectorHasInvalidConverterClassType() throws InterruptedException {
+ Map config = defaultSinkConnectorProps();
+ config.put(KEY_CONVERTER_CLASS_CONFIG, MonitorableSinkConnector.class.getName());
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+ config.get(CONNECTOR_CLASS_CONFIG),
+ config,
+ 1,
+ "Connector config should fail preflight validation when a converter with a class of the wrong type is specified",
+ 0
+ );
+ }
+
+ @Test
+ public void testConnectorHasAbstractConverter() throws InterruptedException {
+ Map config = defaultSinkConnectorProps();
+ config.put(KEY_CONVERTER_CLASS_CONFIG, AbstractTestConverter.class.getName());
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+ config.get(CONNECTOR_CLASS_CONFIG),
+ config,
+ 1,
+ "Connector config should fail preflight validation when an abstract converter class is specified"
+ );
+ }
+
+ @Test
+ public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException {
+ Map config = defaultSinkConnectorProps();
+ config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName());
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+ config.get(CONNECTOR_CLASS_CONFIG),
+ config,
+ 1,
+ "Connector config should fail preflight validation when a converter class with no suitable constructor is specified"
+ );
+ }
+
+ @Test
+ public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
+ Map config = defaultSinkConnectorProps();
+ config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName());
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+ config.get(CONNECTOR_CLASS_CONFIG),
+ config,
+ 1,
+ "Connector config should fail preflight validation when a converter class that throws an exception on instantiation is specified"
+ );
+ }
+
@Test
public void testConnectorHasMissingHeaderConverterClass() throws InterruptedException {
Map config = defaultSinkConnectorProps();
@@ -309,6 +365,111 @@ public void testConnectorHasMissingHeaderConverterClass() throws InterruptedExce
);
}
+ @Test
+ public void testConnectorHasInvalidHeaderConverterClassType() throws InterruptedException {
+ Map config = defaultSinkConnectorProps();
+ config.put(HEADER_CONVERTER_CLASS_CONFIG, MonitorableSinkConnector.class.getName());
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+ config.get(CONNECTOR_CLASS_CONFIG),
+ config,
+ 1,
+ "Connector config should fail preflight validation when a header converter with a class of the wrong type is specified"
+ );
+ }
+
+ @Test
+ public void testConnectorHasAbstractHeaderConverter() throws InterruptedException {
+ Map config = defaultSinkConnectorProps();
+ config.put(HEADER_CONVERTER_CLASS_CONFIG, AbstractTestConverter.class.getName());
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+ config.get(CONNECTOR_CLASS_CONFIG),
+ config,
+ 1,
+ "Connector config should fail preflight validation when an abstract header converter class is specified"
+ );
+ }
+
+ @Test
+ public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws InterruptedException {
+ Map config = defaultSinkConnectorProps();
+ config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName());
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+ config.get(CONNECTOR_CLASS_CONFIG),
+ config,
+ 1,
+ "Connector config should fail preflight validation when a header converter class with no suitable constructor is specified"
+ );
+ }
+
+ @Test
+ public void testConnectorHasHeaderConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
+ Map config = defaultSinkConnectorProps();
+ config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName());
+ connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+ config.get(CONNECTOR_CLASS_CONFIG),
+ config,
+ 1,
+ "Connector config should fail preflight validation when a header converter class that throws an exception on instantiation is specified"
+ );
+ }
+
+ public static abstract class TestConverter implements Converter, HeaderConverter {
+
+ // Defined by both Converter and HeaderConverter interfaces
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+
+ // Defined by Converter interface
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ }
+
+ @Override
+ public byte[] fromConnectData(String topic, Schema schema, Object value) {
+ return null;
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(String topic, byte[] value) {
+ return null;
+ }
+
+ // Defined by HeaderConverter interface
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(Map configs) {
+ }
+
+ @Override
+ public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+ return null;
+ }
+
+ @Override
+ public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+ return new byte[0];
+ }
+ }
+
+ public static abstract class AbstractTestConverter extends TestConverter {
+ }
+
+ public static class TestConverterWithPrivateConstructor extends TestConverter {
+ private TestConverterWithPrivateConstructor() {
+ }
+ }
+
+ public static class TestConverterWithConstructorThatThrowsException extends TestConverter {
+ public TestConverterWithConstructorThatThrowsException() {
+ throw new ConnectException("whoops");
+ }
+ }
+
private Map defaultSourceConnectorProps() {
// setup up props for the source connector
Map props = new HashMap<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index c113039912e4..f4e890fcaf36 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -198,13 +198,10 @@ public void abstractTransform() {
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", AbstractTransformation.class.getName());
- try {
- new ConnectorConfig(MOCK_PLUGINS, props);
- } catch (ConfigException ex) {
- assertTrue(
- ex.getMessage().contains("Transformation is abstract and cannot be created.")
- );
- }
+ ConfigException ex = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
+ assertTrue(
+ ex.getMessage().contains("This class is abstract and cannot be created.")
+ );
}
@Test
public void abstractKeyValueTransform() {
@@ -213,19 +210,16 @@ public void abstractKeyValueTransform() {
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", AbstractKeyValueTransformation.class.getName());
- try {
- new ConnectorConfig(MOCK_PLUGINS, props);
- } catch (ConfigException ex) {
- assertTrue(
- ex.getMessage().contains("Transformation is abstract and cannot be created.")
- );
- assertTrue(
- ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName())
- );
- assertTrue(
- ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName())
- );
- }
+ ConfigException ex = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
+ assertTrue(
+ ex.getMessage().contains("This class is abstract and cannot be created.")
+ );
+ assertTrue(
+ ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName())
+ );
+ assertTrue(
+ ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName())
+ );
}
@Test
@@ -240,7 +234,7 @@ public void wrongPredicateType() {
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestConnector.class.getName());
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
- assertTrue(e.getMessage().contains("Not a Predicate"));
+ assertEquals("Class " + TestConnector.class + " does not implement the Predicate interface", e.getMessage());
}
@Test
@@ -287,7 +281,7 @@ public void abstractPredicate() {
props.put("predicates.my-pred.type", AbstractTestPredicate.class.getName());
props.put("predicates.my-pred.int", "84");
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
- assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
+ assertTrue(e.getMessage().contains("This class is abstract and cannot be created"));
}
private void assertTransformationStageWithPredicate(Map props, boolean expectedNegated) {