Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898) #14055

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.storage.StringConverter
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.storage.SimpleHeaderConverter
org.apache.kafka.connect.storage.StringConverter
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.sink.SinkConnectorTest$TestSinkConnector
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.source.SourceConnectorTest$TestSourceConnector
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.file.FileStreamSinkConnector
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.file.FileStreamSourceConnector
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.json.JsonConverter
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.json.JsonConverter
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.mirror.MirrorCheckpointConnector
org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
org.apache.kafka.connect.mirror.MirrorSourceConnector
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger;
Expand All @@ -35,6 +36,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig {
+ "by the worker's scanner before config providers are initialized and used to "
+ "replace variables.";

public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins provided in the "
+ "plugin.path configuration. This can be one of multiple values with the following meanings:\n"
C0urante marked this conversation as resolved.
Show resolved Hide resolved
+ "* ONLY_SCAN: Discover plugins only by reflection. "
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
+ "Plugins which are not discoverable by ServiceLoader will not impact worker startup.\n"
+ "* HYBRID_WARN: Discover plugins reflectively and by ServiceLoader. "
+ "Plugins which are not discoverable by ServiceLoader will print warnings during worker startup.\n"
+ "* HYBRID_FAIL: Discover plugins reflectively and by ServiceLoader."
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
+ "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n"
+ "* SERVICE_LOAD: Discover plugins only by ServiceLoader. Faster startup than prior modes. "
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
+ "Plugins which are not discoverable by ServiceLoader will not be found or usable.";

public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
protected static final String CONFIG_PROVIDERS_DOC =
"Comma-separated names of <code>ConfigProvider</code> classes, loaded and used "
Expand Down Expand Up @@ -199,6 +213,11 @@ protected static ConfigDef baseConfigDef() {
null,
Importance.LOW,
PLUGIN_PATH_DOC)
.define(PLUGIN_DISCOVERY_CONFIG,
Type.STRING,
PluginDiscoveryMode.HYBRID_WARN.toString(),
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
Importance.LOW,
PLUGIN_DISCOVERY_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG,
30000, atLeast(0), Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
Expand Down Expand Up @@ -401,6 +420,11 @@ public static String pluginPath(Map<String, String> props) {
return props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
}

public static PluginDiscoveryMode pluginDiscovery(Map<String, String> props) {
String value = props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN.toString());
return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT));
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
}

public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
logInternalConverterRemovalWarnings(props);
Expand Down
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;

/**
* Method to use to discover plugins usable on a Connect worker.
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
*/
public enum PluginDiscoveryMode {

/**
* Scan for plugins reflectively. This corresponds to the legacy behavior of Connect prior to KIP-898.
* <p>Note: the following plugins are still loaded using {@link java.util.ServiceLoader} in this mode:
* <ul>
* <li>{@link org.apache.kafka.common.config.provider.ConfigProvider}</li>
* <li>{@link org.apache.kafka.connect.rest.ConnectRestExtension}</li>
* <li>{@link org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}</li>
* </ul>
*/
ONLY_SCAN,
/**
* Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
* Emit warnings if one or more plugins is not available via {@link java.util.ServiceLoader}
*/
HYBRID_WARN,
/**
* Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
* Fail worker during startup if one or more plugins is not available via {@link java.util.ServiceLoader}
*/
HYBRID_FAIL,
/**
* Discover plugins via {@link java.util.ServiceLoader} only.
* Plugins will not be present in the REST API if it is not available via {@link java.util.ServiceLoader}
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
*/
SERVICE_LOAD;

public static boolean reflectivelyScan(PluginDiscoveryMode pluginDiscoveryMode) {
return pluginDiscoveryMode != SERVICE_LOAD;
}

public static boolean serviceLoad(PluginDiscoveryMode pluginDiscoveryMode) {
return pluginDiscoveryMode != ONLY_SCAN;
}
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Up @@ -39,11 +39,15 @@

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;

public class Plugins {

Expand All @@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
// VisibleForTesting
Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) {
String pluginPath = WorkerConfig.pluginPath(props);
PluginDiscoveryMode discoveryMode = WorkerConfig.pluginDiscovery(props);
List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
delegatingLoader = factory.newDelegatingClassLoader(parent);
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
scanResult = initLoaders(pluginSources);
scanResult = initLoaders(pluginSources, discoveryMode);
}

private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
PluginScanResult reflectiveScanResult = new ReflectionScanner().discoverPlugins(pluginSources);
delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
return reflectiveScanResult;
public PluginScanResult initLoaders(Set<PluginSource> pluginSources, PluginDiscoveryMode discoveryMode) {
PluginScanResult empty = new PluginScanResult(Collections.emptyList());
PluginScanResult serviceLoadingScanResult;
try {
serviceLoadingScanResult = PluginDiscoveryMode.serviceLoad(discoveryMode) ?
new ServiceLoaderScanner().discoverPlugins(pluginSources) : empty;
} catch (Throwable t) {
log.error("Unable to perform ServiceLoader scanning as requested by {}={}, this error may be avoided by reconfiguring {}={}",
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN, t);
throw t;
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
}
PluginScanResult reflectiveScanResult = PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
new ReflectionScanner().discoverPlugins(pluginSources) : empty;
PluginScanResult scanResult = new PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
maybeReportHybridDiscoveryIssue(discoveryMode, serviceLoadingScanResult, scanResult);
delegatingLoader.installDiscoveredPlugins(scanResult);
return scanResult;
}

private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult mergedResult) {
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
mergedResult.forEach(missingPlugins::add);
serviceLoadingScanResult.forEach(missingPlugins::remove);
if (missingPlugins.isEmpty()) {
switch (discoveryMode) {
case ONLY_SCAN:
log.debug("Service loading of plugins disabled, consider reconfiguring {}={}",
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN);
break;
case HYBRID_WARN:
case HYBRID_FAIL:
log.warn("All plugins have ServiceLoader manifests, consider reconfiguring {}={}",
WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD);
break;
case SERVICE_LOAD:
log.debug("Reflective loading of plugins disabled, plugins without manifests will not be visible");
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
break;
default:
throw new IllegalArgumentException("Unknown discovery mode");
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
String message = String.format(
"Plugins are missing ServiceLoader manifests, these plugins will not be visible with %s=%s: %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: language

Suggested change
"Plugins are missing ServiceLoader manifests, these plugins will not be visible with %s=%s: %s",
"One or more plugins are missing ServiceLoader manifests and will not be visible with %s=%s: %s",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, should we include instructions on what users should do in this case? For many of them, this is going to be the first time hearing about new discovery logic; we should make this message as informative as possible if we want them to start to make use of it.

Some options include:

  • Suggest that they add service loader manifests for these plugins (possibly by mentioning the CLI tool added in KAFKA-15030: Add connect-plugin-path command-line tool. #14064)
  • Link to a docs section on the website describing the new plugin discovery logic
  • Link to a docs section for the Java standard library describing the service loader feature
  • In the case of HYBRID_FAIL, suggest changing the discovery mode if the missing plugins cannot be updated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also also, if the point about classpath-installed plugins being unaffected by the plugin discovery mode is correct, should we either filter these plugins out from missingPlugins or clarify that those plugins will still be usable with SERVICE_LOAD?

I suppose "visible" is accurate in the sense that these plugins won't be listed by the GET /connector-plugins endpoint, but that may confuse users if they notice that they're still able to use these plugins in a connector config after switching to SERVICE_LOAD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For many of them, this is going to be the first time hearing about new discovery logic; we should make this message as informative as possible if we want them to start to make use of it.
Link to a docs section on the website describing the new plugin discovery logic

I think to explaining everything in one warning/error message is unreasonable, and linking out to external documentation is the better strategy. The error message is already lengthy enough with each plugin listing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linked documentation is added in #14068

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great--one more thing: can we make the same tweak to the language here that we did in WorkerConfig by replacing "will not be visible" with "may not be usable"?

WorkerConfig.PLUGIN_DISCOVERY_CONFIG,
PluginDiscoveryMode.SERVICE_LOAD,
missingPlugins.stream()
.map(pluginDesc -> pluginDesc.location() + "\t" + pluginDesc.className() + "\t" + pluginDesc.version())
.collect(Collectors.joining("\n", "[\n", "\n]")));
switch (discoveryMode) {
case HYBRID_WARN:
log.warn(message);
break;
case HYBRID_FAIL:
throw new ConnectException(message);
default:
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("Unknown discovery mode");
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {
Expand Down
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.tools.MockSinkConnector
org.apache.kafka.connect.tools.VerifiableSinkConnector