Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15150: Add ServiceLoaderScanner implementation #13971

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
* </li>
* </ul>
* <p>Note: This scanner has a runtime proportional to the number of overall classes in the passed-in
* {@link PluginSource} objects, which may be significant for plugins with large dependencies.
* {@link PluginSource} objects, which may be significant for plugins with large dependencies. For a more performant
* implementation, consider using {@link ServiceLoaderScanner} and follow migration instructions for KIP-898.
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
*/
public class ReflectionScanner extends PluginScanner {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;

import java.util.ServiceLoader;
import java.util.SortedSet;

/**
* A {@link PluginScanner} implementation which uses {@link ServiceLoader} to discover plugins.
* <p>This implements the modern discovery strategy, which uses only service loading in order to discover plugins.
* Specifically, a plugin appears in the scan result if all the following conditions are true:
* <ul>
* <li>The class is concrete</li>
* <li>The class is public</li>
* <li>The class has a no-args constructor</li>
* <li>The no-args constructor is public</li>
* <li>Static initialization of the class completes without throwing an exception</li>
* <li>The no-args constructor completes without throwing an exception</li>
* <li>The class is a subclass of {@link SinkConnector}, {@link SourceConnector}, {@link Converter},
* {@link HeaderConverter}, {@link Transformation}, {@link Predicate}, {@link ConfigProvider},
* {@link ConnectRestExtension}, or {@link ConnectorClientConfigOverridePolicy}
* </li>
* <li>The class has a {@link ServiceLoader} compatible manifest file or module declaration</li>
* </ul>
* <p>Note: This scanner can only find plugins with corresponding {@link ServiceLoader} manifests, which are
* not required to be packaged with plugins. This has the effect that some plugins discoverable by the
* {@link ReflectionScanner} may not be visible with this implementation.
*/
public class ServiceLoaderScanner extends PluginScanner {

@Override
protected PluginScanResult scanPlugins(PluginSource source) {
ClassLoader loader = source.loader();
return new PluginScanResult(
getServiceLoaderPluginDesc(SinkConnector.class, loader),
getServiceLoaderPluginDesc(SourceConnector.class, loader),
getServiceLoaderPluginDesc(Converter.class, loader),
getServiceLoaderPluginDesc(HeaderConverter.class, loader),
getTransformationPluginDesc(loader),
getPredicatePluginDesc(loader),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
}

@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Predicate.class, loader);
}

@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getServiceLoaderPluginDesc(Transformation.class, loader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,79 +20,114 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class PluginScannerTest {

private enum ScannerType { Reflection, ServiceLoader };

@Rule
public TemporaryFolder pluginDir = new TemporaryFolder();

public PluginScanner scanner;
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved

@Parameterized.Parameters
public static Collection<Object[]> parameters() {
List<Object[]> values = new ArrayList<>();
for (ScannerType type : ScannerType.values()) {
values.add(new Object[]{type});
}
return values;
}

public PluginScannerTest(ScannerType scannerType) {
switch (scannerType) {
case Reflection:
this.scanner = new ReflectionScanner();
break;
case ServiceLoader:
this.scanner = new ServiceLoaderScanner();
break;
default:
throw new IllegalArgumentException("Unknown type " + scannerType);
}
}

@Test
public void testLoadingUnloadedPluginClass() {
DelegatingClassLoader classLoader = initClassLoader(
public void testScanningEmptyPluginPath() {
PluginScanResult result = scan(
Collections.emptyList()
);
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName));
}
assertTrue(result.isEmpty());
}

@Test
public void testLoadingPluginClass() throws ClassNotFoundException {
DelegatingClassLoader classLoader = initClassLoader(
public void testScanningPluginClasses() {
PluginScanResult result = scan(
TestPlugins.pluginPath()
);
Set<String> classes = new HashSet<>();
result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
assertTrue("Expected " + pluginClassName + "to be discovered but it was not",
classes.contains(pluginClassName));
C0urante marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Test
public void testLoadingInvalidUberJar() throws Exception {
public void testScanningInvalidUberJar() throws Exception {
pluginDir.newFile("invalid.jar");

initClassLoader(
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}

@Test
public void testLoadingPluginDirContainsInvalidJarsOnly() throws Exception {
public void testScanningPluginDirContainsInvalidJarsOnly() throws Exception {
pluginDir.newFolder("my-plugin");
pluginDir.newFile("my-plugin/invalid.jar");

initClassLoader(
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}

@Test
public void testLoadingNoPlugins() {
initClassLoader(
public void testScanningNoPlugins() {
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}

@Test
public void testLoadingPluginDirEmpty() throws Exception {
public void testScanningPluginDirEmpty() throws Exception {
pluginDir.newFolder("my-plugin");

initClassLoader(
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
assertTrue(result.isEmpty());
}

@Test
public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
public void testScanningMixOfValidAndInvalidPlugins() throws Exception {
pluginDir.newFile("invalid.jar");
pluginDir.newFolder("my-plugin");
pluginDir.newFile("my-plugin/invalid.jar");
Expand All @@ -102,22 +137,21 @@ public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
Files.copy(source, pluginPath.resolve(source.getFileName()));
}

DelegatingClassLoader classLoader = initClassLoader(
PluginScanResult result = scan(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
Set<String> classes = new HashSet<>();
result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
assertTrue("Expected " + pluginClassName + "to be discovered but it was not",
classes.contains(pluginClassName));
}
}

private DelegatingClassLoader initClassLoader(List<Path> pluginLocations) {
private PluginScanResult scan(List<Path> pluginLocations) {
ClassLoaderFactory factory = new ClassLoaderFactory();
DelegatingClassLoader classLoader = factory.newDelegatingClassLoader(DelegatingClassLoader.class.getClassLoader());
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, classLoader, factory);
PluginScanResult scanResult = new ReflectionScanner().discoverPlugins(pluginSources);
classLoader.installDiscoveredPlugins(scanResult);
return classLoader;
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory);
return scanner.discoverPlugins(pluginSources);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.AliasedStaticField
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.AlwaysThrowException
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.CoLocatedPlugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.ThingOne
test.plugins.ThingTwo
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.ReadVersionFromResource
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.ReadVersionFromResource
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.SamplingConfigurable
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.SamplingConnector