Skip to content

Commit

Permalink
[Improve][Core][Starter] Add PluginDiscovery Method to Support Search…
Browse files Browse the repository at this point in the history
… Plugin (#3488)

* [Core] [PluginDiscovery] Add Connector search method

* [PluginDiscovery] [Connector] Add PluginDiscovery Method to Support Web

* [Core] [Plugin] Fix Ci problem

* [Core] [Plugin] Fix Ci problem

* [Core] [Plugin] Change Search Plugin Method

* [Plugin] [Core] Remove useless engine type
  • Loading branch information
Hisoka-X committed Nov 24, 2022
1 parent 5af632e commit 4e60418
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@

<properties>
<!--todo The classification is too confusing, reclassify by type-->
<revision>2.1.3-SNAPSHOT</revision>
<revision>2.3.1-SNAPSHOT</revision>
<seatunnel.config.shade.version>2.1.1</seatunnel.config.shade.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,33 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
public class FileUtils {

public static List<URL> searchJarFiles(@NonNull Path directory) throws IOException {
try (Stream<Path> paths = Files.walk(directory, FileVisitOption.FOLLOW_LINKS)) {
return paths.filter(path -> path.toString().endsWith(".jar"))
.map(path -> {
try {
return path.toUri().toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelException(e);
}
}).collect(Collectors.toList());
}
}

public static String readFileToStr(Path path) {
try {
byte[] bytes = Files.readAllBytes(path);
Expand Down Expand Up @@ -87,7 +106,7 @@ public static void createNewFile(String filePath) {
* return the line number of file
*
* @param filePath The file need be read
* @return
* @return The file line number
*/
public static Long getFileLineNumber(@NonNull String filePath) {
try {
Expand All @@ -101,7 +120,7 @@ public static Long getFileLineNumber(@NonNull String filePath) {
* return the line number of all files in the dirPath
*
* @param dirPath dirPath
* @return
* @return The file line number of dirPath
*/
public static Long getFileLineNumberFromDir(@NonNull String dirPath) {
File file = new File(dirPath);
Expand Down Expand Up @@ -135,7 +154,6 @@ public static void createNewDir(@NonNull String dirPath) {
* clear dir and the sub dir
*
* @param filePath filePath
* @return
*/
public static void deleteFile(@NonNull String filePath) {
File file = new File(filePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
Expand All @@ -34,19 +35,14 @@
import org.apache.commons.lang3.tuple.ImmutablePair;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class JobExecutionEnvironment {

Expand Down Expand Up @@ -88,16 +84,7 @@ public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath,
private Set<URL> searchPluginJars() {
try {
if (Files.exists(Common.pluginRootDir())) {
try (Stream<Path> paths = Files.walk(Common.pluginRootDir(), FileVisitOption.FOLLOW_LINKS)) {
return paths.filter(path -> path.toString().endsWith(".jar"))
.map(path -> {
try {
return path.toUri().toURL();
} catch (MalformedURLException e) {
throw new SeaTunnelEngineException(e);
}
}).collect(Collectors.toSet());
}
return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
}
} catch (IOException | SeaTunnelEngineException e) {
LOGGER.warning(String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@
package org.apache.seatunnel.plugin.discovery;

import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
Expand All @@ -31,16 +39,20 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -80,6 +92,10 @@ public AbstractPluginDiscovery(String pluginSubDir) {
this(Common.connectorJarDir(pluginSubDir), loadConnectorPluginConfig());
}

public AbstractPluginDiscovery(Path pluginDir) {
this(pluginDir, loadConnectorPluginConfig());
}

public AbstractPluginDiscovery(Path pluginDir,
Config pluginConfig) {
this(pluginDir, pluginConfig, DEFAULT_URL_TO_CLASSLOADER);
Expand Down Expand Up @@ -117,6 +133,28 @@ public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
.collect(Collectors.toList());
}

/**
* Get all support plugin by plugin type
*
* @param pluginType plugin type, not support transform
* @return the all plugin identifier of the engine with artifactId
*/
public static @Nonnull Map<PluginIdentifier, String> getAllSupportedPlugins(PluginType pluginType) {
Config config = loadConnectorPluginConfig();
Map<PluginIdentifier, String> pluginIdentifiers = new HashMap<>();
if (config.isEmpty() || !config.hasPath(CollectionConstants.SEATUNNEL_PLUGIN)) {
return pluginIdentifiers;
}
Config engineConfig = config.getConfig(CollectionConstants.SEATUNNEL_PLUGIN);
if (engineConfig.hasPath(pluginType.getType())) {
engineConfig.getConfig(pluginType.getType()).entrySet().forEach(entry -> {
pluginIdentifiers.put(PluginIdentifier.of(CollectionConstants.SEATUNNEL_PLUGIN, pluginType.getType(), entry.getKey()),
entry.getValue().unwrapped().toString());
});
}
return pluginIdentifiers;
}

@Override
public T createPluginInstance(PluginIdentifier pluginIdentifier) {
return (T) createPluginInstance(pluginIdentifier, Collections.EMPTY_LIST);
Expand Down Expand Up @@ -160,6 +198,32 @@ public T createPluginInstance(PluginIdentifier pluginIdentifier, Collection<URL>
throw new RuntimeException("Plugin " + pluginIdentifier + " not found.");
}

/**
* Get all support plugin already in SEATUNNEL_HOME, only support connector-v2
*
* @param pluginType choose which type plugin should be returned
* @return the all plugin identifier of the engine
*/
@SuppressWarnings("unchecked")
public @Nonnull List<PluginIdentifier> getAllPlugin(PluginType pluginType) throws IOException {
List<URL> files = FileUtils.searchJarFiles(pluginDir);
List<PluginIdentifier> plugins = new ArrayList<>();
List factories;
if (pluginType.equals(PluginType.SOURCE)) {
factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])), TableSourceFactory.class);
} else if (pluginType.equals(PluginType.SINK)) {
factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])), TableSinkFactory.class);
} else if (pluginType.equals(PluginType.TRANSFORM)) {
factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])), TableTransformFactory.class);
} else {
throw new IllegalArgumentException("Unsupported plugin type: " + pluginType);
}
factories.forEach(plugin -> {
plugins.add(PluginIdentifier.of("seatunnel", pluginType.getType(), ((Factory) plugin).factoryIdentifier()));
});
return plugins;
}

@Nullable
private T loadPluginInstance(PluginIdentifier pluginIdentifier, ClassLoader classLoader) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.seatunnel.plugin.discovery.seatunnel;

import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;

public class SeaTunnelTransformPluginDiscovery extends AbstractPluginDiscovery<SeaTunnelTransform> {

public SeaTunnelTransformPluginDiscovery() {
super("seatunnel");
super(Common.libDir());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.plugin.discovery;

import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.PluginType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import java.util.Map;
import java.util.Objects;

@DisabledOnOs(OS.WINDOWS)
public class AbstractPluginDiscoveryTest {

@Test
public void testGetAllPlugins() {
Common.setDeployMode(DeployMode.CLIENT);
System.setProperty("SEATUNNEL_HOME", Objects.requireNonNull(AbstractPluginDiscoveryTest.class.getResource("/home")).getPath());
Map<PluginIdentifier, String> sourcePlugins = AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE);
Assertions.assertEquals(27, sourcePlugins.size());

Map<PluginIdentifier, String> sinkPlugins = AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SINK);
Assertions.assertEquals(30, sinkPlugins.size());

}

}
Loading

0 comments on commit 4e60418

Please sign in to comment.