Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14654: Connector classes should statically initialize with plugin classloader #13165

Merged
merged 13 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,17 +360,22 @@ private PluginScanResult scanPluginPath(
builder.useParallelExecutor();
Reflections reflections = new InternalReflections(builder);

return new PluginScanResult(
getPluginDesc(reflections, SinkConnector.class, loader),
getPluginDesc(reflections, SourceConnector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getTransformationPluginDesc(loader, reflections),
getPredicatePluginDesc(loader, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
try {
return new PluginScanResult(
getPluginDesc(reflections, SinkConnector.class, loader),
getPluginDesc(reflections, SourceConnector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getTransformationPluginDesc(loader, reflections),
getPredicatePluginDesc(loader, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
}

@SuppressWarnings({"unchecked"})
Expand Down Expand Up @@ -419,23 +424,18 @@ private <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version,

@SuppressWarnings("unchecked")
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
Collection<PluginDesc<T>> result = new ArrayList<>();
try {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
T pluginImpl;
try {
pluginImpl = iterator.next();
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
continue;
}
result.add(pluginDesc((Class<? extends T>) pluginImpl.getClass(),
versionFor(pluginImpl), loader));
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
T pluginImpl;
try {
pluginImpl = iterator.next();
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
continue;
}
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
result.add(pluginDesc((Class<? extends T>) pluginImpl.getClass(),
versionFor(pluginImpl), loader));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -380,6 +381,16 @@ public void newHeaderConverterShouldConfigureWithPluginClassLoader() {
assertPluginClassLoaderAlwaysActive(samples);
}

@Test
public void newConnectorShouldInstantiateWithPluginClassLoader() {
Connector plugin = plugins.newConnector(TestPlugin.SAMPLING_CONNECTOR.className());

assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("<init>")); // constructor was called
assertPluginClassLoaderAlwaysActive(samples);
}

@Test
public void newPluginsShouldConfigureWithPluginClassLoader() {
List<Configurable> configurables = plugins.newPlugins(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,30 @@
/**
* Base class for plugins so we can sample information about their initialization
*/
public abstract class SamplingTestPlugin {
public interface SamplingTestPlugin {

/**
* @return the ClassLoader used to statically initialize this plugin class
*/
public abstract ClassLoader staticClassloader();
ClassLoader staticClassloader();

/**
* @return the ClassLoader used to initialize this plugin instance
*/
public abstract ClassLoader classloader();
ClassLoader classloader();

/**
* @return a group of other SamplingTestPlugin instances known by this plugin
* This should only return direct children, and not reference this instance directly
*/
public Map<String, SamplingTestPlugin> otherSamples() {
default Map<String, SamplingTestPlugin> otherSamples() {
return Collections.emptyMap();
}

/**
* @return a flattened list of child samples including this entry keyed as "this"
*/
public Map<String, SamplingTestPlugin> flatten() {
default Map<String, SamplingTestPlugin> flatten() {
Map<String, SamplingTestPlugin> out = new HashMap<>();
Map<String, SamplingTestPlugin> otherSamples = otherSamples();
if (otherSamples != null) {
Expand All @@ -71,7 +71,7 @@ public Map<String, SamplingTestPlugin> flatten() {
* Stores only the last invocation of each method if there are multiple invocations.
* @param samples The collection of samples to which this method call should be added
*/
public void logMethodCall(Map<String, SamplingTestPlugin> samples) {
default void logMethodCall(Map<String, SamplingTestPlugin> samples) {
StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace();
if (stackTraces.length < 2) {
return;
Expand All @@ -88,7 +88,7 @@ public void logMethodCall(Map<String, SamplingTestPlugin> samples) {
));
}

public static class MethodCallSample extends SamplingTestPlugin {
class MethodCallSample implements SamplingTestPlugin {

private final StackTraceElement caller;
private final ClassLoader staticClassLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public enum TestPlugin {
* which samples information about its method calls.
*/
SAMPLING_CONFIG_PROVIDER("sampling-config-provider", "test.plugins.SamplingConfigProvider"),
/**
* A {@link org.apache.kafka.connect.sink.SinkConnector}
* which samples information about its method calls.
*/
SAMPLING_CONNECTOR("sampling-connector", "test.plugins.SamplingConnector"),
/**
* A plugin which uses a {@link java.util.ServiceLoader}
* to load internal classes, and samples information about their initialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* <p>Samples data about its initialization environment for later analysis.
* Samples are shared between instances of the same class in a static variable.
*/
public class AliasedStaticField extends SamplingTestPlugin implements Converter {
public class AliasedStaticField implements SamplingTestPlugin, Converter {

private static final Map<String, SamplingTestPlugin> SAMPLES;
private static final ClassLoader STATIC_CLASS_LOADER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.storage.Converter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.storage.Converter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.storage.Converter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConfigProvider extends SamplingTestPlugin implements ConfigProvider {
public class SamplingConfigProvider implements SamplingTestPlugin, ConfigProvider {

private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConfigurable extends SamplingTestPlugin implements Converter, Configurable {
public class SamplingConfigurable implements SamplingTestPlugin, Converter, Configurable {

private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test.plugins;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.HashMap;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;

/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConnector extends SinkConnector implements SamplingTestPlugin {

private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;

static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
}

{
samples = new HashMap<>();
classloader = Thread.currentThread().getContextClassLoader();
}

public SamplingConnector() {
logMethodCall(samples);
}

@Override
public void start(Map<String, String> props) {
logMethodCall(samples);
}

@Override
public Class<? extends Task> taskClass() {
logMethodCall(samples);
return null;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
logMethodCall(samples);
return null;
}

@Override
public void stop() {
logMethodCall(samples);
}

@Override
public ConfigDef config() {
logMethodCall(samples);
return null;
}

@Override
public String version() {
logMethodCall(samples);
return "1.0.0";
}

@Override
public ClassLoader staticClassloader() {
return STATIC_CLASS_LOADER;
}

@Override
public ClassLoader classloader() {
return classloader;
}

@Override
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConverter extends SamplingTestPlugin implements Converter {
public class SamplingConverter implements SamplingTestPlugin, Converter {

private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingHeaderConverter extends SamplingTestPlugin implements HeaderConverter {
public class SamplingHeaderConverter implements SamplingTestPlugin, HeaderConverter {

private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* Superclass for service loaded classes
*/
public class ServiceLoadedClass extends SamplingTestPlugin {
public class ServiceLoadedClass implements SamplingTestPlugin {

private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class ServiceLoaderPlugin extends SamplingTestPlugin implements Converter {
public class ServiceLoaderPlugin implements SamplingTestPlugin, Converter {

private static final ClassLoader STATIC_CLASS_LOADER;
private static final Map<String, SamplingTestPlugin> SAMPLES;
Expand Down