Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15150: Add ServiceLoaderScanner implementation #13971

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Supplier;

/**
* Superclass for plugin discovery implementations.
Expand Down Expand Up @@ -118,35 +120,79 @@ private void loadJdbcDrivers(final ClassLoader loader) {
}

@SuppressWarnings({"rawtypes", "unchecked"})
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, ClassLoader loader) {
return new PluginDesc(plugin, version, loader);
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginSource source) {
return new PluginDesc(plugin, version, source.loader());
}

@SuppressWarnings("unchecked")
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, PluginSource source) {
SortedSet<PluginDesc<T>> result = new TreeSet<>();
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, source.loader());
Iterator<T> iterator = serviceLoader.iterator();
while (handleLinkageError(klass, source, iterator::hasNext)) {
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
T pluginImpl;
try {
pluginImpl = iterator.next();
pluginImpl = handleLinkageError(klass, source, iterator::next);
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
log.error("Failed to discover {} in {}{}",
klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
C0urante marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass();
if (pluginKlass.getClassLoader() != loader) {
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader);
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location());
continue;
}
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader));
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source));
}
}
return result;
}

/**
* Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s.
*
* @param klass The plugin superclass which is being loaded
* @param function A function on a {@link ServiceLoader}'s {@link Iterator} which may throw {@link LinkageError}
* @return the return value of function
* @throws Error errors thrown by the passed-in function
* @param <T> Type being iterated over by the ServiceLoader
* @param <U> Return value of the passed-in function
*/
private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, Supplier<U> function) {
// It's difficult to know for sure if the iterator was able to advance past the first broken
// plugin class, or if it will continue to fail on that broken class for any subsequent calls
// to Iterator::hasNext or Iterator::next
// For reference, see https://bugs.openjdk.org/browse/JDK-8196182, which describes
// the behavior we are trying to mitigate with this logic as buggy, but indicates that a fix
// in the JDK standard library ServiceLoader implementation is unlikely to land
LinkageError lastError = null;
// Try a fixed maximum number of times in case the ServiceLoader cannot move past a faulty plugin,
// but the LinkageError varies between calls. This limit is chosen to be higher than the typical number
// of plugins in a single plugin location, and to limit the amount of log-spam on startup.
for (int i = 0; i < 100; i++) {
try {
return function.get();
} catch (LinkageError t) {
// As an optimization, hide subsequent error logs if two consecutive errors look similar.
// This reduces log-spam for iterators which cannot advance and rethrow the same exception.
if (lastError == null
|| !Objects.equals(lastError.getClass(), t.getClass())
|| !Objects.equals(lastError.getMessage(), t.getMessage())) {
log.error("Failed to discover {} in {}{}",
klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
}
lastError = t;
}
}
log.error("Received excessive ServiceLoader errors: assuming the runtime ServiceLoader implementation cannot " +
"skip faulty implementations. Use a different JRE, or resolve LinkageErrors for plugins in {}",
source.location(), lastError);
throw lastError;
}

protected static <T> String versionFor(T pluginImpl) {
try {
if (pluginImpl instanceof Versioned) {
Expand All @@ -169,6 +215,8 @@ protected static String reflectiveErrorDescription(Throwable t) {
return ": Failed to statically initialize plugin class";
} else if (t instanceof InvocationTargetException) {
return ": Failed to invoke plugin constructor";
} else if (t instanceof LinkageError) {
return ": Plugin class has a dependency which is missing or invalid";
} else {
return "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Objects;

public class PluginSource {

public static final Path CLASSPATH = Paths.get("classpath");
private final Path location;
private final ClassLoader loader;
private final URL[] urls;
Expand All @@ -46,7 +48,7 @@ public URL[] urls() {
}

public boolean isolated() {
return loader instanceof PluginClassLoader;
return location != CLASSPATH;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public static Set<PluginSource> pluginSources(List<Path> pluginLocations, ClassL
List<URL> parentUrls = new ArrayList<>();
parentUrls.addAll(ClasspathHelper.forJavaClassPath());
parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent()));
pluginSources.add(new PluginSource(null, classLoader.getParent(), parentUrls.toArray(new URL[0])));
pluginSources.add(new PluginSource(PluginSource.CLASSPATH, classLoader.getParent(), parentUrls.toArray(new URL[0])));
return pluginSources;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* <p>This implements the legacy discovery strategy, which uses a combination of reflection and service loading in
* order to discover plugins. Specifically, a plugin appears in the scan result if all the following conditions are true:
* <ul>
* <li>The class and direct dependencies can be loaded</li>
* <li>The class is concrete</li>
* <li>The class is public</li>
* <li>The class has a no-args constructor</li>
Expand All @@ -62,7 +63,9 @@
* </li>
* </ul>
* <p>Note: This scanner has a runtime proportional to the number of overall classes in the passed-in
* {@link PluginSource} objects, which may be significant for plugins with large dependencies.
* {@link PluginSource} objects, which may be significant for plugins with large dependencies. For a more performant
* implementation, consider using {@link ServiceLoaderScanner} and follow migration instructions for
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery">KIP-898</a>.
*/
public class ReflectionScanner extends PluginScanner {

Expand All @@ -75,66 +78,67 @@ public static <T> String versionFor(Class<? extends T> pluginKlass) throws Refle

@Override
protected PluginScanResult scanPlugins(PluginSource source) {
ClassLoader loader = source.loader();
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.setClassLoaders(new ClassLoader[]{source.loader()});
builder.addUrls(source.urls());
builder.setScanners(new SubTypesScanner());
builder.useParallelExecutor();
Reflections reflections = new InternalReflections(builder);

return new PluginScanResult(
getPluginDesc(reflections, SinkConnector.class, loader),
getPluginDesc(reflections, SourceConnector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getTransformationPluginDesc(loader, reflections),
getPredicatePluginDesc(loader, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
getPluginDesc(reflections, SinkConnector.class, source),
getPluginDesc(reflections, SourceConnector.class, source),
getPluginDesc(reflections, Converter.class, source),
getPluginDesc(reflections, HeaderConverter.class, source),
getTransformationPluginDesc(source, reflections),
getPredicatePluginDesc(source, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, source),
getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
);
}

@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, loader);
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, source);
}

@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, loader);
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, source);
}

private <T> SortedSet<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
ClassLoader loader
PluginSource source
) {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any classes for URLs: " +
reflections.getConfiguration().getUrls(), e);
log.debug("Reflections scanner could not find any {} in {} for URLs: {}",
klass, source.location(), source.urls(), e);
return Collections.emptySortedSet();
}

SortedSet<PluginDesc<T>> result = new TreeSet<>();
for (Class<? extends T> pluginKlass : plugins) {
if (!PluginUtils.isConcrete(pluginKlass)) {
log.debug("Skipping {} as it is not concrete implementation", pluginKlass);
log.debug("Skipping {} in {} as it is not concrete implementation", pluginKlass, source.location());
continue;
}
if (pluginKlass.getClassLoader() != loader) {
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader);
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location());
continue;
}
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), loader));
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), source));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e);
log.error("Failed to discover {} in {}: Unable to instantiate {}{}",
klass.getSimpleName(), source.location(), pluginKlass.getSimpleName(),
reflectiveErrorDescription(e), e);
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;

import java.util.ServiceLoader;
import java.util.SortedSet;

/**
* A {@link PluginScanner} implementation which uses {@link ServiceLoader} to discover plugins.
* <p>This implements the modern discovery strategy, which uses only service loading in order to discover plugins.
* Specifically, a plugin appears in the scan result if all the following conditions are true:
* <ul>
* <li>The class and direct dependencies can be loaded</li>
* <li>The class is concrete</li>
* <li>The class is public</li>
* <li>The class has a no-args constructor</li>
* <li>The no-args constructor is public</li>
* <li>Static initialization of the class completes without throwing an exception</li>
* <li>The no-args constructor completes without throwing an exception</li>
* <li>The class is a subclass of {@link SinkConnector}, {@link SourceConnector}, {@link Converter},
* {@link HeaderConverter}, {@link Transformation}, {@link Predicate}, {@link ConfigProvider},
* {@link ConnectRestExtension}, or {@link ConnectorClientConfigOverridePolicy}
* </li>
* <li>The class has a {@link ServiceLoader} compatible manifest file or module declaration</li>
* </ul>
* <p>Note: This scanner can only find plugins with corresponding {@link ServiceLoader} manifests, which are
* not required to be packaged with plugins. This has the effect that some plugins discoverable by the
* {@link ReflectionScanner} may not be visible with this implementation.
*/
public class ServiceLoaderScanner extends PluginScanner {

@Override
protected PluginScanResult scanPlugins(PluginSource source) {
return new PluginScanResult(
getServiceLoaderPluginDesc(SinkConnector.class, source),
getServiceLoaderPluginDesc(SourceConnector.class, source),
getServiceLoaderPluginDesc(Converter.class, source),
getServiceLoaderPluginDesc(HeaderConverter.class, source),
getTransformationPluginDesc(source),
getPredicatePluginDesc(source),
getServiceLoaderPluginDesc(ConfigProvider.class, source),
getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
);
}

@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Predicate.class, source);
}

@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Transformation.class, source);
}
}