Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14654: Connector classes should statically initialize with plugin classloader #13165

Merged
merged 13 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ private <T> Collection<PluginDesc<T>> getPluginDesc(
Collection<PluginDesc<T>> result = new ArrayList<>();
for (Class<? extends T> plugin : plugins) {
if (PluginUtils.isConcrete(plugin)) {
try {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
result.add(pluginDesc(plugin, versionFor(plugin), loader));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), plugin.getSimpleName(), reflectiveErrorDescription(e), e);
Expand All @@ -419,11 +419,10 @@ private <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version,

@SuppressWarnings("unchecked")
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
Collection<PluginDesc<T>> result = new ArrayList<>();
try {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
T pluginImpl;
try {
pluginImpl = iterator.next();
Expand All @@ -434,8 +433,6 @@ private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass,
result.add(pluginDesc((Class<? extends T>) pluginImpl.getClass(),
versionFor(pluginImpl), loader));
}
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
return result;
}
Expand Down Expand Up @@ -473,6 +470,16 @@ private static String reflectiveErrorDescription(Throwable t) {
}
}

public LoaderSwap withClassLoader(ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
try {
return new LoaderSwap(savedLoader);
} catch (Throwable t) {
Plugins.compareAndSwapLoaders(savedLoader);
throw t;
}
}

@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
String fullName = aliases.getOrDefault(name, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -295,7 +296,7 @@ public void newPluginShouldServiceLoadWithPluginClassLoader() {
// Assert that the service loaded subclass is found in both environments
assertTrue(samples.containsKey("ServiceLoadedSubclass.static"));
assertTrue(samples.containsKey("ServiceLoadedSubclass.dynamic"));
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}

@Test
Expand All @@ -306,9 +307,7 @@ public void newPluginShouldInstantiateWithPluginClassLoader() {
Converter.class
);

assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}

@Test
Expand All @@ -334,7 +333,7 @@ public void newConverterShouldConfigureWithPluginClassLoader() {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("configure"));
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}

@Test
Expand All @@ -357,7 +356,7 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("configure"));
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}

@Test
Expand All @@ -377,7 +376,17 @@ public void newHeaderConverterShouldConfigureWithPluginClassLoader() {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("configure")); // HeaderConverter::configure was called
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}

@Test
public void newConnectorShouldInstantiateWithPluginClassLoader() {
Connector plugin = plugins.newConnector(TestPlugin.SAMPLING_CONNECTOR.className());

assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("<init>")); // constructor was called
assertPluginClassLoaderAlwaysActive(plugin);
}

@Test
Expand All @@ -393,7 +402,7 @@ public void newPluginsShouldConfigureWithPluginClassLoader() {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("configure")); // Configurable::configure was called
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}

@Test
Expand Down Expand Up @@ -460,19 +469,23 @@ private void assertClassLoaderReadsVersionFromResource(
converter.toConnectData(null, null).value());
}

public static void assertPluginClassLoaderAlwaysActive(Map<String, SamplingTestPlugin> samples) {
for (Entry<String, SamplingTestPlugin> e : samples.entrySet()) {
String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")";
assertInstanceOf(
PluginClassLoader.class,
e.getValue().staticClassloader(),
sampleName + " has incorrect static classloader"
);
assertInstanceOf(
PluginClassLoader.class,
e.getValue().classloader(),
sampleName + " has incorrect dynamic classloader"
);
public static void assertPluginClassLoaderAlwaysActive(Object plugin) {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
for (SamplingTestPlugin instance : ((SamplingTestPlugin) plugin).allInstances()) {
Map<String, SamplingTestPlugin> samples = instance.flatten();
for (Entry<String, SamplingTestPlugin> e : samples.entrySet()) {
String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")";
assertInstanceOf(
PluginClassLoader.class,
e.getValue().staticClassloader(),
sampleName + " has incorrect static classloader"
);
assertInstanceOf(
PluginClassLoader.class,
e.getValue().classloader(),
sampleName + " has incorrect dynamic classloader"
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,44 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
* Base class for plugins so we can sample information about their initialization
*/
public abstract class SamplingTestPlugin {
public interface SamplingTestPlugin {

/**
* @return the ClassLoader used to statically initialize this plugin class
*/
public abstract ClassLoader staticClassloader();
ClassLoader staticClassloader();

/**
* @return the ClassLoader used to initialize this plugin instance
*/
public abstract ClassLoader classloader();
ClassLoader classloader();

/**
* @return All known instances of this class, including this instance.
*/
default List<SamplingTestPlugin> allInstances() {
return Collections.singletonList(this);
}

/**
* @return a group of other SamplingTestPlugin instances known by this plugin
* This should only return direct children, and not reference this instance directly
*/
public Map<String, SamplingTestPlugin> otherSamples() {
default Map<String, SamplingTestPlugin> otherSamples() {
return Collections.emptyMap();
}

/**
* @return a flattened list of child samples including this entry keyed as "this"
*/
public Map<String, SamplingTestPlugin> flatten() {
default Map<String, SamplingTestPlugin> flatten() {
Map<String, SamplingTestPlugin> out = new HashMap<>();
Map<String, SamplingTestPlugin> otherSamples = otherSamples();
if (otherSamples != null) {
Expand All @@ -71,7 +79,7 @@ public Map<String, SamplingTestPlugin> flatten() {
* Stores only the last invocation of each method if there are multiple invocations.
* @param samples The collection of samples to which this method call should be added
*/
public void logMethodCall(Map<String, SamplingTestPlugin> samples) {
default void logMethodCall(Map<String, SamplingTestPlugin> samples) {
StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace();
if (stackTraces.length < 2) {
return;
Expand All @@ -88,7 +96,7 @@ public void logMethodCall(Map<String, SamplingTestPlugin> samples) {
));
}

public static class MethodCallSample extends SamplingTestPlugin {
class MethodCallSample implements SamplingTestPlugin {

private final StackTraceElement caller;
private final ClassLoader staticClassLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public enum TestPlugin {
* which samples information about its method calls.
*/
SAMPLING_CONFIG_PROVIDER("sampling-config-provider", "test.plugins.SamplingConfigProvider"),
/**
* A {@link org.apache.kafka.connect.sink.SinkConnector}
* which samples information about its method calls.
*/
SAMPLING_CONNECTOR("sampling-connector", "test.plugins.SamplingConnector"),
/**
* A plugin which uses a {@link java.util.ServiceLoader}
* to load internal classes, and samples information about their initialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* <p>Samples data about its initialization environment for later analysis.
* Samples are shared between instances of the same class in a static variable.
*/
public class AliasedStaticField extends SamplingTestPlugin implements Converter {
public class AliasedStaticField implements SamplingTestPlugin, Converter {

private static final Map<String, SamplingTestPlugin> SAMPLES;
private static final ClassLoader STATIC_CLASS_LOADER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.storage.Converter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.storage.Converter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.storage.Converter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package test.plugins;

import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.config.ConfigChangeCallback;
Expand All @@ -34,14 +38,16 @@
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConfigProvider extends SamplingTestPlugin implements ConfigProvider {
public final class SamplingConfigProvider implements SamplingTestPlugin, ConfigProvider {

private static final ClassLoader STATIC_CLASS_LOADER;
private static List<SamplingTestPlugin> instances;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;

static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
instances = Collections.synchronizedList(new ArrayList<>());
}

{
Expand All @@ -61,6 +67,11 @@ public ConfigData get(String path, Set<String> keys) {
return null;
}

public SamplingConfigProvider() {
logMethodCall(samples);
instances.add(this);
}

@Override
public void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) {
logMethodCall(samples);
Expand Down Expand Up @@ -100,4 +111,10 @@ public ClassLoader classloader() {
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}


@Override
public List<SamplingTestPlugin> allInstances() {
return instances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package test.plugins;

import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
Expand All @@ -30,21 +34,28 @@
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConfigurable extends SamplingTestPlugin implements Converter, Configurable {
public final class SamplingConfigurable implements SamplingTestPlugin, Converter, Configurable {

private static final ClassLoader STATIC_CLASS_LOADER;
private static List<SamplingTestPlugin> instances;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;

static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
instances = Collections.synchronizedList(new ArrayList<>());
}

{
samples = new HashMap<>();
classloader = Thread.currentThread().getContextClassLoader();
}

public SamplingConfigurable() {
logMethodCall(samples);
instances.add(this);
}

@Override
public void configure(final Map<String, ?> configs) {
logMethodCall(samples);
Expand Down Expand Up @@ -78,4 +89,9 @@ public ClassLoader classloader() {
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}

@Override
public List<SamplingTestPlugin> allInstances() {
return instances;
}
}