From 9b26287c3f0ed325793c37890fa7f1bafa578589 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Fri, 2 Jun 2023 13:32:58 -0700 Subject: [PATCH 01/12] KAFKA-15031: Add configurable scanning modes to Plugins Signed-off-by: Greg Harris --- .../kafka/connect/runtime/WorkerConfig.java | 24 +++++++ .../isolation/PluginDiscoveryMode.java | 57 +++++++++++++++ .../connect/runtime/isolation/Plugins.java | 70 +++++++++++++++++-- .../util/clusters/EmbeddedConnectCluster.java | 2 + 4 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 6ba7cea74064..d6906d69001a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode; import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.slf4j.Logger; @@ -35,6 +36,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; @@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig { + "by the worker's scanner before config providers are initialized and used to " + "replace variables."; + public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery"; + protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins provided in the " + + "plugin.path configuration. This can be one of multiple values with the following meanings:\n" + + "* ONLY_SCAN: Discover plugins only by reflection. " + + "Plugins which are not discoverable by ServiceLoader will not impact worker startup.\n" + + "* HYBRID_WARN: Discover plugins reflectively and by ServiceLoader. " + + "Plugins which are not discoverable by ServiceLoader will print warnings during worker startup.\n" + + "* HYBRID_FAIL: Discover plugins reflectively and by ServiceLoader." + + "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n" + + "* SERVICE_LOAD: Discover plugins only by ServiceLoader. Faster startup than prior modes. " + + "Plugins which are not discoverable by ServiceLoader will not be found or usable."; + public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; protected static final String CONFIG_PROVIDERS_DOC = "Comma-separated names of ConfigProvider classes, loaded and used " @@ -199,6 +213,11 @@ protected static ConfigDef baseConfigDef() { null, Importance.LOW, PLUGIN_PATH_DOC) + .define(PLUGIN_DISCOVERY_CONFIG, + Type.STRING, + PluginDiscoveryMode.HYBRID_WARN.toString(), + Importance.LOW, + PLUGIN_DISCOVERY_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, atLeast(0), Importance.LOW, CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) @@ -401,6 +420,11 @@ public static String pluginPath(Map props) { return props.get(WorkerConfig.PLUGIN_PATH_CONFIG); } + public static PluginDiscoveryMode pluginDiscovery(Map props) { + String value = props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN.toString()); + return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT)); + } + public WorkerConfig(ConfigDef definition, Map props) { super(definition, props); logInternalConverterRemovalWarnings(props); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java new file mode 100644 index 000000000000..9dbfd28cba3b --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +/** + * Method to use to discover plugins usable on a Connect worker. + */ +public enum PluginDiscoveryMode { + + /** + * Scan for plugins reflectively. This corresponds to the legacy behavior of Connect prior to KIP-898. + *

Note: the following plugins are still loaded using {@link java.util.ServiceLoader} in this mode: + *

    + *
  • {@link org.apache.kafka.common.config.provider.ConfigProvider}
  • + *
  • {@link org.apache.kafka.connect.rest.ConnectRestExtension}
  • + *
  • {@link org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}
  • + *
+ */ + ONLY_SCAN, + /** + * Scan for plugins reflectively and via {@link java.util.ServiceLoader}. + * Emit warnings if one or more plugins is not available via {@link java.util.ServiceLoader} + */ + HYBRID_WARN, + /** + * Scan for plugins reflectively and via {@link java.util.ServiceLoader}. + * Fail worker during startup if one or more plugins is not available via {@link java.util.ServiceLoader} + */ + HYBRID_FAIL, + /** + * Discover plugins via {@link java.util.ServiceLoader} only. + * Plugins will not be present in the REST API if it is not available via {@link java.util.ServiceLoader} + */ + SERVICE_LOAD; + + public static boolean reflectivelyScan(PluginDiscoveryMode pluginDiscoveryMode) { + return pluginDiscoveryMode != SERVICE_LOAD; + } + + public static boolean serviceLoad(PluginDiscoveryMode pluginDiscoveryMode) { + return pluginDiscoveryMode != ONLY_SCAN; + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 83dec38a6fb2..2a710711c7c7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -39,11 +39,15 @@ import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; public class Plugins { @@ -63,16 +67,72 @@ public Plugins(Map props) { // VisibleForTesting Plugins(Map props, ClassLoader parent, ClassLoaderFactory factory) { String pluginPath = WorkerConfig.pluginPath(props); + PluginDiscoveryMode discoveryMode = WorkerConfig.pluginDiscovery(props); List pluginLocations = PluginUtils.pluginLocations(pluginPath); delegatingLoader = factory.newDelegatingClassLoader(parent); Set pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory); - scanResult = initLoaders(pluginSources); + scanResult = initLoaders(pluginSources, discoveryMode); } - private PluginScanResult initLoaders(Set pluginSources) { - PluginScanResult reflectiveScanResult = new ReflectionScanner().discoverPlugins(pluginSources); - delegatingLoader.installDiscoveredPlugins(reflectiveScanResult); - return reflectiveScanResult; + public PluginScanResult initLoaders(Set pluginSources, PluginDiscoveryMode discoveryMode) { + PluginScanResult empty = new PluginScanResult(Collections.emptyList()); + PluginScanResult serviceLoadingScanResult; + try { + serviceLoadingScanResult = PluginDiscoveryMode.serviceLoad(discoveryMode) ? + new ServiceLoaderScanner().discoverPlugins(pluginSources) : empty; + } catch (Throwable t) { + log.error("Unable to perform ServiceLoader scanning as requested by {}={}, this error may be avoided by reconfiguring {}={}", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode, + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN, t); + throw t; + } + PluginScanResult reflectiveScanResult = PluginDiscoveryMode.reflectivelyScan(discoveryMode) ? + new ReflectionScanner().discoverPlugins(pluginSources) : empty; + PluginScanResult scanResult = new PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult)); + maybeReportHybridDiscoveryIssue(discoveryMode, serviceLoadingScanResult, scanResult); + delegatingLoader.installDiscoveredPlugins(scanResult); + return scanResult; + } + + private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult mergedResult) { + SortedSet> missingPlugins = new TreeSet<>(); + mergedResult.forEach(missingPlugins::add); + serviceLoadingScanResult.forEach(missingPlugins::remove); + if (missingPlugins.isEmpty()) { + switch (discoveryMode) { + case ONLY_SCAN: + log.debug("Service loading of plugins disabled, consider reconfiguring {}={}", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN); + break; + case HYBRID_WARN: + case HYBRID_FAIL: + log.warn("All plugins have ServiceLoader manifests, consider reconfiguring {}={}", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD); + break; + case SERVICE_LOAD: + log.debug("Reflective loading of plugins disabled, plugins without manifests will not be visible"); + break; + default: + throw new IllegalArgumentException("Unknown discovery mode"); + } + } else { + String message = String.format( + "Plugins are missing ServiceLoader manifests, these plugins will not be visible with %s=%s: %s", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, + PluginDiscoveryMode.SERVICE_LOAD, + missingPlugins.stream() + .map(pluginDesc -> pluginDesc.location() + "\t" + pluginDesc.className() + "\t" + pluginDesc.version()) + .collect(Collectors.joining("\n", "[\n", "\n]"))); + switch (discoveryMode) { + case HYBRID_WARN: + log.warn(message); + break; + case HYBRID_FAIL: + throw new ConnectException(message); + default: + throw new IllegalArgumentException("Unknown discovery mode"); + } + } } private static String pluginNames(Collection> plugins) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index 36e0fc765a03..4f0c4369f89e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -57,6 +57,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG; @@ -276,6 +277,7 @@ public void startConnect() { putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor); putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + putIfAbsent(workerProps, PLUGIN_DISCOVERY_CONFIG, "hybrid_fail"); for (int i = 0; i < numInitialWorkers; i++) { addWorker(); From 2a0f702ec7267f24d0a72b215e25724a49183e5a Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 28 Dec 2022 17:15:53 -0800 Subject: [PATCH 02/12] fixup: Add services files for all Connect plugins Signed-off-by: Greg Harris --- ...org.apache.kafka.connect.storage.Converter | 16 ++++++++ ...ache.kafka.connect.storage.HeaderConverter | 17 ++++++++ ...rg.apache.kafka.connect.sink.SinkConnector | 16 ++++++++ ...pache.kafka.connect.source.SourceConnector | 16 ++++++++ ...rg.apache.kafka.connect.sink.SinkConnector | 16 ++++++++ ...pache.kafka.connect.source.SourceConnector | 16 ++++++++ ...org.apache.kafka.connect.storage.Converter | 16 ++++++++ ...ache.kafka.connect.storage.HeaderConverter | 16 ++++++++ ...pache.kafka.connect.source.SourceConnector | 18 ++++++++ ...org.apache.kafka.connect.storage.Converter | 21 ++++++++++ ...ache.kafka.connect.storage.HeaderConverter | 21 ++++++++++ ...rg.apache.kafka.connect.sink.SinkConnector | 20 +++++++++ ...pache.kafka.connect.source.SourceConnector | 25 +++++++++++ ...org.apache.kafka.connect.storage.Converter | 19 +++++++++ ...ache.kafka.connect.storage.HeaderConverter | 18 ++++++++ ...he.kafka.connect.transforms.Transformation | 22 ++++++++++ ...ka.connect.transforms.predicates.Predicate | 17 ++++++++ ...he.kafka.connect.transforms.Transformation | 41 +++++++++++++++++++ ...ka.connect.transforms.predicates.Predicate | 18 ++++++++ 19 files changed, 369 insertions(+) create mode 100644 connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter create mode 100644 connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter create mode 100644 connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector create mode 100644 connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector create mode 100644 connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector create mode 100644 connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector create mode 100644 connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter create mode 100644 connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter create mode 100644 connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector create mode 100644 connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter create mode 100644 connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter create mode 100644 connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector create mode 100644 connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector create mode 100644 connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter create mode 100644 connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter create mode 100644 connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation create mode 100644 connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate create mode 100644 connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation create mode 100644 connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate diff --git a/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 000000000000..78e322373bb6 --- /dev/null +++ b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.storage.StringConverter \ No newline at end of file diff --git a/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 000000000000..42e02d13aaf2 --- /dev/null +++ b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.storage.SimpleHeaderConverter +org.apache.kafka.connect.storage.StringConverter \ No newline at end of file diff --git a/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 000000000000..4a1f3a9baf20 --- /dev/null +++ b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.sink.SinkConnectorTest$TestSinkConnector \ No newline at end of file diff --git a/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 000000000000..ab7f14d0aebb --- /dev/null +++ b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.source.SourceConnectorTest$TestSourceConnector \ No newline at end of file diff --git a/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 000000000000..4acecd76b5a3 --- /dev/null +++ b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.file.FileStreamSinkConnector \ No newline at end of file diff --git a/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 000000000000..66a0c5d8588d --- /dev/null +++ b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.file.FileStreamSourceConnector \ No newline at end of file diff --git a/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 000000000000..0ea37b79c01d --- /dev/null +++ b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.json.JsonConverter \ No newline at end of file diff --git a/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 000000000000..0ea37b79c01d --- /dev/null +++ b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.json.JsonConverter \ No newline at end of file diff --git a/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 000000000000..4836e08f3e61 --- /dev/null +++ b/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,18 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.mirror.MirrorCheckpointConnector +org.apache.kafka.connect.mirror.MirrorHeartbeatConnector +org.apache.kafka.connect.mirror.MirrorSourceConnector \ No newline at end of file diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 000000000000..134262474b95 --- /dev/null +++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,21 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.converters.ByteArrayConverter +org.apache.kafka.connect.converters.DoubleConverter +org.apache.kafka.connect.converters.FloatConverter +org.apache.kafka.connect.converters.IntegerConverter +org.apache.kafka.connect.converters.LongConverter +org.apache.kafka.connect.converters.ShortConverter diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 000000000000..134262474b95 --- /dev/null +++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,21 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.converters.ByteArrayConverter +org.apache.kafka.connect.converters.DoubleConverter +org.apache.kafka.connect.converters.FloatConverter +org.apache.kafka.connect.converters.IntegerConverter +org.apache.kafka.connect.converters.LongConverter +org.apache.kafka.connect.converters.ShortConverter diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 000000000000..4c26fece1840 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,20 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector +org.apache.kafka.connect.integration.ErrantRecordSinkConnector +org.apache.kafka.connect.integration.MonitorableSinkConnector +org.apache.kafka.connect.runtime.SampleSinkConnector \ No newline at end of file diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 000000000000..73033ca23c02 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,25 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$InitializeBlockingConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$ConfigBlockingConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$ValidateBlockingConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSourceConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSourceConnector +org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest$NaughtyConnector +org.apache.kafka.connect.integration.MonitorableSourceConnector +org.apache.kafka.connect.runtime.SampleSourceConnector +org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResourceTest$ConnectorPluginsResourceTestConnector diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 000000000000..c3219493495a --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,19 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.runtime.SampleConverterWithHeaders +org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConverter +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter \ No newline at end of file diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 000000000000..5e1aa1966ad0 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,18 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.runtime.SampleHeaderConverter +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestHeaderConverter +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter \ No newline at end of file diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation new file mode 100644 index 000000000000..36beccff5d25 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation @@ -0,0 +1,22 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.integration.ErrorHandlingIntegrationTest$FaultyPassthrough +org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyPassthrough +org.apache.kafka.connect.runtime.ConnectorConfigTest$SimpleTransformation +org.apache.kafka.connect.runtime.ConnectorConfigTest$HasDuplicateConfigTransformation +org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Key +org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Value +org.apache.kafka.connect.runtime.SampleTransformation diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate new file mode 100644 index 000000000000..b235b1fec512 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.runtime.ConnectorConfigTest$TestPredicate +org.apache.kafka.connect.runtime.SamplePredicate \ No newline at end of file diff --git a/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation new file mode 100644 index 000000000000..cf9646be3768 --- /dev/null +++ b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation @@ -0,0 +1,41 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.transforms.Cast$Key +org.apache.kafka.connect.transforms.Cast$Value +org.apache.kafka.connect.transforms.DropHeaders +org.apache.kafka.connect.transforms.ExtractField$Key +org.apache.kafka.connect.transforms.ExtractField$Value +org.apache.kafka.connect.transforms.Filter +org.apache.kafka.connect.transforms.Flatten$Key +org.apache.kafka.connect.transforms.Flatten$Value +org.apache.kafka.connect.transforms.HeaderFrom$Key +org.apache.kafka.connect.transforms.HeaderFrom$Value +org.apache.kafka.connect.transforms.HoistField$Key +org.apache.kafka.connect.transforms.HoistField$Value +org.apache.kafka.connect.transforms.InsertField$Key +org.apache.kafka.connect.transforms.InsertField$Value +org.apache.kafka.connect.transforms.InsertHeader +org.apache.kafka.connect.transforms.MaskField$Key +org.apache.kafka.connect.transforms.MaskField$Value +org.apache.kafka.connect.transforms.RegexRouter +org.apache.kafka.connect.transforms.ReplaceField$Key +org.apache.kafka.connect.transforms.ReplaceField$Value +org.apache.kafka.connect.transforms.SetSchemaMetadata$Key +org.apache.kafka.connect.transforms.SetSchemaMetadata$Value +org.apache.kafka.connect.transforms.TimestampConverter$Key +org.apache.kafka.connect.transforms.TimestampConverter$Value +org.apache.kafka.connect.transforms.TimestampRouter +org.apache.kafka.connect.transforms.ValueToKey diff --git a/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate new file mode 100644 index 000000000000..b451672377b8 --- /dev/null +++ b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate @@ -0,0 +1,18 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.transforms.predicates.HasHeaderKey +org.apache.kafka.connect.transforms.predicates.RecordIsTombstone +org.apache.kafka.connect.transforms.predicates.TopicNameMatches \ No newline at end of file From c92cb3d36400d827dd2942eb18d99dd01093d7bc Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Fri, 26 May 2023 16:32:36 -0700 Subject: [PATCH 03/12] fixup: Add service manifests for Mock, Schema, Verifiable connectors Signed-off-by: Greg Harris --- ...org.apache.kafka.connect.sink.SinkConnector | 17 +++++++++++++++++ ...apache.kafka.connect.source.SourceConnector | 18 ++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector create mode 100644 connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector diff --git a/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 000000000000..170043754d58 --- /dev/null +++ b/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.tools.MockSinkConnector +org.apache.kafka.connect.tools.VerifiableSinkConnector diff --git a/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 000000000000..acc2ddce7188 --- /dev/null +++ b/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,18 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.tools.MockSourceConnector +org.apache.kafka.connect.tools.SchemaSourceConnector +org.apache.kafka.connect.tools.VerifiableSourceConnector \ No newline at end of file From 266c39fc6b494e5c625c29fd1163877c9df17ead Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 19 Jul 2023 17:18:03 -0700 Subject: [PATCH 04/12] fixup: move Mock, Schema, Verifiable manifests to their old location Signed-off-by: Greg Harris --- .../META-INF/services/org.apache.kafka.connect.sink.SinkConnector | 0 .../services/org.apache.kafka.connect.source.SourceConnector | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename connect/{test-plugins => runtime}/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector (100%) rename connect/{test-plugins => runtime}/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector (100%) diff --git a/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector similarity index 100% rename from connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector rename to connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector diff --git a/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector similarity index 100% rename from connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector rename to connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector From 28e0f50f0e32e5ca934ff50b6af24f01dc32a602 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 19 Jul 2023 17:21:04 -0700 Subject: [PATCH 05/12] fixup: add PluginUtilsTest$Colliding plugins Signed-off-by: Greg Harris --- .../services/org.apache.kafka.connect.storage.Converter | 3 ++- .../services/org.apache.kafka.connect.storage.HeaderConverter | 3 ++- .../org.apache.kafka.connect.transforms.Transformation | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter index c3219493495a..6d38aebee3d9 100644 --- a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -16,4 +16,5 @@ org.apache.kafka.connect.runtime.SampleConverterWithHeaders org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConverter -org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter \ No newline at end of file +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter +org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingConverter \ No newline at end of file diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter index 5e1aa1966ad0..38d00e4f7036 100644 --- a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -15,4 +15,5 @@ org.apache.kafka.connect.runtime.SampleHeaderConverter org.apache.kafka.connect.runtime.isolation.PluginsTest$TestHeaderConverter -org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter \ No newline at end of file +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter +org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingHeaderConverter \ No newline at end of file diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation index 36beccff5d25..6d36ee90888b 100644 --- a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation @@ -20,3 +20,4 @@ org.apache.kafka.connect.runtime.ConnectorConfigTest$HasDuplicateConfigTransform org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Key org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Value org.apache.kafka.connect.runtime.SampleTransformation +org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$Colliding From da71b8d0593a4c7a73ee2769d1360508d888200e Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Fri, 21 Jul 2023 10:30:31 -0700 Subject: [PATCH 06/12] fixup: review comments Signed-off-by: Greg Harris --- .../kafka/connect/runtime/WorkerConfig.java | 25 ++++++--- .../isolation/PluginDiscoveryMode.java | 13 ++--- .../connect/runtime/isolation/Plugins.java | 53 ++++++++----------- 3 files changed, 47 insertions(+), 44 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index d6906d69001a..b43ad8448eda 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode; import org.apache.kafka.connect.runtime.rest.RestServerConfig; @@ -44,6 +45,10 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.ONLY_SCAN; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_FAIL; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.SERVICE_LOAD; /** * Common base class providing configuration for Kafka Connect workers, whether standalone or distributed. @@ -125,15 +130,15 @@ public class WorkerConfig extends AbstractConfig { + "replace variables."; public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery"; - protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins provided in the " - + "plugin.path configuration. This can be one of multiple values with the following meanings:\n" - + "* ONLY_SCAN: Discover plugins only by reflection. " + protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins present in the classpath " + + "and plugin.path configuration. This can be one of multiple values with the following meanings:\n" + + "* " + ONLY_SCAN.name() + ": Discover plugins only by reflection. " + "Plugins which are not discoverable by ServiceLoader will not impact worker startup.\n" - + "* HYBRID_WARN: Discover plugins reflectively and by ServiceLoader. " + + "* " + HYBRID_WARN.name() + ": Discover plugins reflectively and by ServiceLoader. " + "Plugins which are not discoverable by ServiceLoader will print warnings during worker startup.\n" - + "* HYBRID_FAIL: Discover plugins reflectively and by ServiceLoader." + + "* " + HYBRID_FAIL.name() + ": Discover plugins reflectively and by ServiceLoader. " + "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n" - + "* SERVICE_LOAD: Discover plugins only by ServiceLoader. Faster startup than prior modes. " + + "* " + SERVICE_LOAD.name() + ": Discover plugins only by ServiceLoader. Faster startup than other modes. " + "Plugins which are not discoverable by ServiceLoader will not be found or usable."; public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; @@ -216,6 +221,7 @@ protected static ConfigDef baseConfigDef() { .define(PLUGIN_DISCOVERY_CONFIG, Type.STRING, PluginDiscoveryMode.HYBRID_WARN.toString(), + ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(PluginDiscoveryMode.class)), Importance.LOW, PLUGIN_DISCOVERY_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, @@ -422,7 +428,12 @@ public static String pluginPath(Map props) { public static PluginDiscoveryMode pluginDiscovery(Map props) { String value = props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN.toString()); - return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT)); + try { + return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new ConnectException("Invalid " + PLUGIN_DISCOVERY_CONFIG + " value, must be one of " + + Arrays.toString(Utils.enumOptions(PluginDiscoveryMode.class))); + } } public WorkerConfig(ConfigDef definition, Map props) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java index 9dbfd28cba3b..0fea08b2c55b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java @@ -17,7 +17,8 @@ package org.apache.kafka.connect.runtime.isolation; /** - * Method to use to discover plugins usable on a Connect worker. + * Strategy to use to discover plugins usable on a Connect worker. + * @see KIP-898 */ public enum PluginDiscoveryMode { @@ -43,15 +44,15 @@ public enum PluginDiscoveryMode { HYBRID_FAIL, /** * Discover plugins via {@link java.util.ServiceLoader} only. - * Plugins will not be present in the REST API if it is not available via {@link java.util.ServiceLoader} + * Plugins will not be present in the REST API if they are not available via {@link java.util.ServiceLoader} */ SERVICE_LOAD; - public static boolean reflectivelyScan(PluginDiscoveryMode pluginDiscoveryMode) { - return pluginDiscoveryMode != SERVICE_LOAD; + public boolean reflectivelyScan() { + return this != SERVICE_LOAD; } - public static boolean serviceLoad(PluginDiscoveryMode pluginDiscoveryMode) { - return pluginDiscoveryMode != ONLY_SCAN; + public boolean serviceLoad() { + return this != ONLY_SCAN; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 2a710711c7c7..c6cfcaea4f37 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -78,15 +78,15 @@ public PluginScanResult initLoaders(Set pluginSources, PluginDisco PluginScanResult empty = new PluginScanResult(Collections.emptyList()); PluginScanResult serviceLoadingScanResult; try { - serviceLoadingScanResult = PluginDiscoveryMode.serviceLoad(discoveryMode) ? + serviceLoadingScanResult = discoveryMode.serviceLoad() ? new ServiceLoaderScanner().discoverPlugins(pluginSources) : empty; } catch (Throwable t) { - log.error("Unable to perform ServiceLoader scanning as requested by {}={}, this error may be avoided by reconfiguring {}={}", + throw new ConnectException(String.format( + "Unable to perform ServiceLoader scanning as requested by %s=%s. It may be possible to fix this issue by reconfiguring %s=%s", WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode, - WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN, t); - throw t; + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN), t); } - PluginScanResult reflectiveScanResult = PluginDiscoveryMode.reflectivelyScan(discoveryMode) ? + PluginScanResult reflectiveScanResult = discoveryMode.reflectivelyScan() ? new ReflectionScanner().discoverPlugins(pluginSources) : empty; PluginScanResult scanResult = new PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult)); maybeReportHybridDiscoveryIssue(discoveryMode, serviceLoadingScanResult, scanResult); @@ -99,38 +99,29 @@ private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discover mergedResult.forEach(missingPlugins::add); serviceLoadingScanResult.forEach(missingPlugins::remove); if (missingPlugins.isEmpty()) { - switch (discoveryMode) { - case ONLY_SCAN: - log.debug("Service loading of plugins disabled, consider reconfiguring {}={}", - WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN); - break; - case HYBRID_WARN: - case HYBRID_FAIL: - log.warn("All plugins have ServiceLoader manifests, consider reconfiguring {}={}", - WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD); - break; - case SERVICE_LOAD: - log.debug("Reflective loading of plugins disabled, plugins without manifests will not be visible"); - break; - default: - throw new IllegalArgumentException("Unknown discovery mode"); + if (discoveryMode == PluginDiscoveryMode.HYBRID_WARN || discoveryMode == PluginDiscoveryMode.HYBRID_FAIL) { + log.warn("All plugins have ServiceLoader manifests, consider reconfiguring {}={}", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD); } } else { String message = String.format( - "Plugins are missing ServiceLoader manifests, these plugins will not be visible with %s=%s: %s", - WorkerConfig.PLUGIN_DISCOVERY_CONFIG, + "One or more plugins are missing ServiceLoader manifests and will not be visible with %s=%s: %s\n" + + "Read the documentation at %s for instructions on migrating your plugins " + + "to take advantage of the performance improvements of %s mode.", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD, missingPlugins.stream() .map(pluginDesc -> pluginDesc.location() + "\t" + pluginDesc.className() + "\t" + pluginDesc.version()) - .collect(Collectors.joining("\n", "[\n", "\n]"))); - switch (discoveryMode) { - case HYBRID_WARN: - log.warn(message); - break; - case HYBRID_FAIL: - throw new ConnectException(message); - default: - throw new IllegalArgumentException("Unknown discovery mode"); + .collect(Collectors.joining("\n", "[\n", "\n]")), + "https://kafka.apache.org/documentation.html#connect_plugindiscovery", + PluginDiscoveryMode.SERVICE_LOAD + ); + if (discoveryMode == PluginDiscoveryMode.HYBRID_WARN) { + log.warn("{} To silence this warning, set {}={} in the worker config.", + message, WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN); + } else if (discoveryMode == PluginDiscoveryMode.HYBRID_FAIL) { + throw new ConnectException(String.format("%s To silence this error, set %s=%s in the worker config.", + message, WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN)); } } } From 346e23c70e811fdba2ecd17b05c31a9160cb7ffd Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Fri, 21 Jul 2023 10:53:42 -0700 Subject: [PATCH 07/12] fixup: add tests for maybeReportHybridDiscoveryIssue Signed-off-by: Greg Harris --- .../connect/runtime/isolation/Plugins.java | 5 +- .../runtime/isolation/PluginsTest.java | 99 +++++++++++++++++++ 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index c6cfcaea4f37..e9cb250f5b8a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -94,7 +94,8 @@ public PluginScanResult initLoaders(Set pluginSources, PluginDisco return scanResult; } - private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult mergedResult) { + // visible for testing + static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult mergedResult) { SortedSet> missingPlugins = new TreeSet<>(); mergedResult.forEach(missingPlugins::add); serviceLoadingScanResult.forEach(missingPlugins::remove); @@ -105,7 +106,7 @@ private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discover } } else { String message = String.format( - "One or more plugins are missing ServiceLoader manifests and will not be visible with %s=%s: %s\n" + + "One or more plugins are missing ServiceLoader manifests and will not be visible with %s=%s: %s%n" + "Read the documentation at %s for instructions on migrating your plugins " + "to take advantage of the performance improvements of %s mode.", WorkerConfig.PLUGIN_DISCOVERY_CONFIG, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 75418b27a6f3..889a9d0a5cfd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.converters.ByteArrayConverter; @@ -46,6 +47,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin; import org.apache.kafka.connect.runtime.rest.RestServerConfig; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; @@ -58,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -76,6 +79,9 @@ public class PluginsTest { private TestConverter converter; private TestHeaderConverter headerConverter; private TestInternalConverter internalConverter; + private PluginScanResult nonEmpty; + private PluginScanResult empty; + private String missingPluginClass; @Before public void setup() { @@ -94,6 +100,22 @@ public void setup() { props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName()); props.put("header.converter.extra.config", "baz"); + // Set up some PluginScanResult instances to test the plugin discovery modes + SortedSet> sinkConnectors = (SortedSet>) plugins.sinkConnectors(); + missingPluginClass = sinkConnectors.first().className(); + nonEmpty = new PluginScanResult( + sinkConnectors, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + empty = new PluginScanResult(Collections.emptyList()); + createConfig(); } @@ -476,6 +498,83 @@ public void subclassedServiceLoadedPluginShouldNotAppearIsolated() { } } + @Test + public void testOnlyScanNoPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.ONLY_SCAN, empty, empty); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN"))); + } + } + + @Test + public void testOnlyScanWithPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.ONLY_SCAN, empty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN"))); + } + } + + @Test + public void testHybridWarnNoPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, empty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN"))); + } + } + + @Test + public void testHybridWarnWithPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, nonEmpty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN") && !e.getMessage().contains(missingPluginClass))); + } + } + + @Test + public void testHybridWarnMissingPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN") && e.getMessage().contains(missingPluginClass))); + } + } + + @Test + public void testHybridFailNoPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, empty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN"))); + } + } + + @Test + public void testHybridFailWithPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, nonEmpty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN") && !e.getMessage().contains(missingPluginClass))); + } + } + + @Test + public void testHybridFailMissingPlugins() { + assertThrows(ConnectException.class, () -> Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, nonEmpty)); + } + + @Test + public void testServiceLoadNoPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.SERVICE_LOAD, empty, empty); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN"))); + } + } + + @Test + public void testServiceLoadWithPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.SERVICE_LOAD, nonEmpty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN"))); + } + } + private void assertClassLoaderReadsVersionFromResource( TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) { URL[] systemPath = TestPlugins.pluginPath(parentResource) From 436f595e91c8304f0d7a5e3d2ffd0e0dc5c6726e Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 26 Jul 2023 14:58:08 -0700 Subject: [PATCH 08/12] fixup: assert that hybrid warn messages include config key Signed-off-by: Greg Harris --- .../runtime/isolation/PluginsTest.java | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 889a9d0a5cfd..189d75a842aa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -518,7 +518,11 @@ public void testOnlyScanWithPlugins() { public void testHybridWarnNoPlugins() { try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, empty); - assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN"))); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + // These log messages must contain the config name, it is referenced in the documentation. + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); } } @@ -526,7 +530,11 @@ public void testHybridWarnNoPlugins() { public void testHybridWarnWithPlugins() { try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, nonEmpty, nonEmpty); - assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN") && !e.getMessage().contains(missingPluginClass))); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + && !e.getMessage().contains(missingPluginClass) + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); } } @@ -534,7 +542,11 @@ public void testHybridWarnWithPlugins() { public void testHybridWarnMissingPlugins() { try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, nonEmpty); - assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN") && e.getMessage().contains(missingPluginClass))); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + && e.getMessage().contains(missingPluginClass) + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); } } @@ -542,7 +554,10 @@ public void testHybridWarnMissingPlugins() { public void testHybridFailNoPlugins() { try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, empty); - assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN"))); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); } } @@ -550,7 +565,11 @@ public void testHybridFailNoPlugins() { public void testHybridFailWithPlugins() { try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, nonEmpty, nonEmpty); - assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN") && !e.getMessage().contains(missingPluginClass))); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + && !e.getMessage().contains(missingPluginClass) + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); } } From 22cbcd044788e5992809335ddb9615fb827617da Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 26 Jul 2023 15:20:30 -0700 Subject: [PATCH 09/12] fixup: soften language for SERVICE_LOAD Signed-off-by: Greg Harris --- .../java/org/apache/kafka/connect/runtime/WorkerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index b43ad8448eda..5878e6d09f50 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -139,7 +139,7 @@ public class WorkerConfig extends AbstractConfig { + "* " + HYBRID_FAIL.name() + ": Discover plugins reflectively and by ServiceLoader. " + "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n" + "* " + SERVICE_LOAD.name() + ": Discover plugins only by ServiceLoader. Faster startup than other modes. " - + "Plugins which are not discoverable by ServiceLoader will not be found or usable."; + + "Plugins which are not discoverable by ServiceLoader may not be usable."; public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; protected static final String CONFIG_PROVIDERS_DOC = From e0ccb0c079648d4200ae17c2255afb021e378f4d Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 1 Aug 2023 10:28:07 -0700 Subject: [PATCH 10/12] fixup: lowercase enum, soften service_load language Signed-off-by: Greg Harris --- .../org/apache/kafka/connect/runtime/WorkerConfig.java | 8 ++++---- .../connect/runtime/isolation/PluginDiscoveryMode.java | 9 ++++++++- .../apache/kafka/connect/runtime/isolation/Plugins.java | 2 +- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 5878e6d09f50..c1fd69317bce 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -132,13 +132,13 @@ public class WorkerConfig extends AbstractConfig { public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery"; protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins present in the classpath " + "and plugin.path configuration. This can be one of multiple values with the following meanings:\n" - + "* " + ONLY_SCAN.name() + ": Discover plugins only by reflection. " + + "* " + ONLY_SCAN + ": Discover plugins only by reflection. " + "Plugins which are not discoverable by ServiceLoader will not impact worker startup.\n" - + "* " + HYBRID_WARN.name() + ": Discover plugins reflectively and by ServiceLoader. " + + "* " + HYBRID_WARN + ": Discover plugins reflectively and by ServiceLoader. " + "Plugins which are not discoverable by ServiceLoader will print warnings during worker startup.\n" - + "* " + HYBRID_FAIL.name() + ": Discover plugins reflectively and by ServiceLoader. " + + "* " + HYBRID_FAIL + ": Discover plugins reflectively and by ServiceLoader. " + "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n" - + "* " + SERVICE_LOAD.name() + ": Discover plugins only by ServiceLoader. Faster startup than other modes. " + + "* " + SERVICE_LOAD + ": Discover plugins only by ServiceLoader. Faster startup than other modes. " + "Plugins which are not discoverable by ServiceLoader may not be usable."; public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java index 0fea08b2c55b..bd390a929b46 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.connect.runtime.isolation; +import java.util.Locale; + /** * Strategy to use to discover plugins usable on a Connect worker. * @see KIP-898 @@ -44,7 +46,7 @@ public enum PluginDiscoveryMode { HYBRID_FAIL, /** * Discover plugins via {@link java.util.ServiceLoader} only. - * Plugins will not be present in the REST API if they are not available via {@link java.util.ServiceLoader} + * Plugins may not be usable if they are not available via {@link java.util.ServiceLoader} */ SERVICE_LOAD; @@ -55,4 +57,9 @@ public boolean reflectivelyScan() { public boolean serviceLoad() { return this != ONLY_SCAN; } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index e9cb250f5b8a..2fe1605e858e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -106,7 +106,7 @@ static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, P } } else { String message = String.format( - "One or more plugins are missing ServiceLoader manifests and will not be visible with %s=%s: %s%n" + + "One or more plugins are missing ServiceLoader manifests may not be usable with %s=%s: %s%n" + "Read the documentation at %s for instructions on migrating your plugins " + "to take advantage of the performance improvements of %s mode.", WorkerConfig.PLUGIN_DISCOVERY_CONFIG, From fd2f69356e2f7c95d92044054cbde5b612b8a54a Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 1 Aug 2023 10:49:15 -0700 Subject: [PATCH 11/12] fixup: fix SynchronizationTest broken by serviceloading Signed-off-by: Greg Harris --- .../kafka/connect/runtime/isolation/SynchronizationTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java index c7b884c3605f..16b4d3a0e9fd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java @@ -44,13 +44,13 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.WorkerConfig; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -239,7 +239,8 @@ public void testSimultaneousUpwardAndDownwardDelegating() throws Exception { }; // THREAD 2: loads a class by delegating upward starting from the PluginClassLoader - String t2Class = JsonConverter.class.getName(); + // Use any non-plugin class that no plugins depend on, so that the class isn't loaded during plugin discovery + String t2Class = Mockito.class.getName(); // PluginClassLoader breakpoint will only trigger on this thread pclBreakpoint.set(t2Class::equals); Runnable thread2 = () -> { From 7d65156853961882346968cd6dc7d2a5fb101adf Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 1 Aug 2023 17:40:49 -0700 Subject: [PATCH 12/12] fixup: missing headerconverter manifest now that plugintype bug is fixed Signed-off-by: Greg Harris --- .../org/apache/kafka/connect/runtime/isolation/Plugins.java | 2 +- .../services/org.apache.kafka.connect.storage.HeaderConverter | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index c301498f22f5..72fe40a50a5b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -112,7 +112,7 @@ static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, P WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD, missingPlugins.stream() - .map(pluginDesc -> pluginDesc.location() + "\t" + pluginDesc.className() + "\t" + pluginDesc.version()) + .map(pluginDesc -> pluginDesc.location() + "\t" + pluginDesc.className() + "\t" + pluginDesc.type() + "\t" + pluginDesc.version()) .collect(Collectors.joining("\n", "[\n", "\n]")), "https://kafka.apache.org/documentation.html#connect_plugindiscovery", PluginDiscoveryMode.SERVICE_LOAD diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter index 38d00e4f7036..a5b008543b17 100644 --- a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -14,6 +14,7 @@ # limitations under the License. org.apache.kafka.connect.runtime.SampleHeaderConverter +org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter org.apache.kafka.connect.runtime.isolation.PluginsTest$TestHeaderConverter org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingHeaderConverter \ No newline at end of file