diff --git a/pom.xml b/pom.xml index 726327b142c..832e199eca6 100644 --- a/pom.xml +++ b/pom.xml @@ -113,7 +113,7 @@ - 2.1.3-SNAPSHOT + 2.3.1-SNAPSHOT 2.1.1 UTF-8 1.8 diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java index a4738b6cce7..58f95e6626e 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java @@ -25,14 +25,33 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; @Slf4j public class FileUtils { + public static List searchJarFiles(@NonNull Path directory) throws IOException { + try (Stream paths = Files.walk(directory, FileVisitOption.FOLLOW_LINKS)) { + return paths.filter(path -> path.toString().endsWith(".jar")) + .map(path -> { + try { + return path.toUri().toURL(); + } catch (MalformedURLException e) { + throw new SeaTunnelException(e); + } + }).collect(Collectors.toList()); + } + } + public static String readFileToStr(Path path) { try { byte[] bytes = Files.readAllBytes(path); @@ -87,7 +106,7 @@ public static void createNewFile(String filePath) { * return the line number of file * * @param filePath The file need be read - * @return + * @return The file line number */ public static Long getFileLineNumber(@NonNull String filePath) { try { @@ -101,7 +120,7 @@ public static Long getFileLineNumber(@NonNull String filePath) { * return the line number of all files in the dirPath * * @param dirPath dirPath - * @return + * @return The file line number of dirPath */ public static Long getFileLineNumberFromDir(@NonNull String dirPath) { File file = new File(dirPath); @@ -135,7 +154,6 @@ public static void createNewDir(@NonNull String dirPath) { * clear dir and the sub dir * * @param filePath filePath - * @return */ public static void deleteFile(@NonNull String filePath) { File file = new File(filePath); diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java index 4a04245cae3..5f9a6ffcdc8 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; @@ -34,19 +35,14 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URL; -import java.nio.file.FileVisitOption; import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class JobExecutionEnvironment { @@ -88,16 +84,7 @@ public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath, private Set searchPluginJars() { try { if (Files.exists(Common.pluginRootDir())) { - try (Stream paths = Files.walk(Common.pluginRootDir(), FileVisitOption.FOLLOW_LINKS)) { - return paths.filter(path -> path.toString().endsWith(".jar")) - .map(path -> { - try { - return path.toUri().toURL(); - } catch (MalformedURLException e) { - throw new SeaTunnelEngineException(e); - } - }).collect(Collectors.toSet()); - } + return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir())); } } catch (IOException | SeaTunnelEngineException e) { LOGGER.warning(String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e); diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java index b5d49e2fcaa..9a861a4e7b2 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java @@ -18,8 +18,16 @@ package org.apache.seatunnel.plugin.discovery; import org.apache.seatunnel.api.common.PluginIdentifierInterface; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.FactoryUtil; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.apis.base.plugin.Plugin; import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.constants.CollectionConstants; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -31,16 +39,20 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.FileFilter; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -80,6 +92,10 @@ public AbstractPluginDiscovery(String pluginSubDir) { this(Common.connectorJarDir(pluginSubDir), loadConnectorPluginConfig()); } + public AbstractPluginDiscovery(Path pluginDir) { + this(pluginDir, loadConnectorPluginConfig()); + } + public AbstractPluginDiscovery(Path pluginDir, Config pluginConfig) { this(pluginDir, pluginConfig, DEFAULT_URL_TO_CLASSLOADER); @@ -117,6 +133,28 @@ public List getAllPlugins(List pluginIdentifiers) { .collect(Collectors.toList()); } + /** + * Get all support plugin by plugin type + * + * @param pluginType plugin type, not support transform + * @return the all plugin identifier of the engine with artifactId + */ + public static @Nonnull Map getAllSupportedPlugins(PluginType pluginType) { + Config config = loadConnectorPluginConfig(); + Map pluginIdentifiers = new HashMap<>(); + if (config.isEmpty() || !config.hasPath(CollectionConstants.SEATUNNEL_PLUGIN)) { + return pluginIdentifiers; + } + Config engineConfig = config.getConfig(CollectionConstants.SEATUNNEL_PLUGIN); + if (engineConfig.hasPath(pluginType.getType())) { + engineConfig.getConfig(pluginType.getType()).entrySet().forEach(entry -> { + pluginIdentifiers.put(PluginIdentifier.of(CollectionConstants.SEATUNNEL_PLUGIN, pluginType.getType(), entry.getKey()), + entry.getValue().unwrapped().toString()); + }); + } + return pluginIdentifiers; + } + @Override public T createPluginInstance(PluginIdentifier pluginIdentifier) { return (T) createPluginInstance(pluginIdentifier, Collections.EMPTY_LIST); @@ -160,6 +198,32 @@ public T createPluginInstance(PluginIdentifier pluginIdentifier, Collection throw new RuntimeException("Plugin " + pluginIdentifier + " not found."); } + /** + * Get all support plugin already in SEATUNNEL_HOME, only support connector-v2 + * + * @param pluginType choose which type plugin should be returned + * @return the all plugin identifier of the engine + */ + @SuppressWarnings("unchecked") + public @Nonnull List getAllPlugin(PluginType pluginType) throws IOException { + List files = FileUtils.searchJarFiles(pluginDir); + List plugins = new ArrayList<>(); + List factories; + if (pluginType.equals(PluginType.SOURCE)) { + factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])), TableSourceFactory.class); + } else if (pluginType.equals(PluginType.SINK)) { + factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])), TableSinkFactory.class); + } else if (pluginType.equals(PluginType.TRANSFORM)) { + factories = FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])), TableTransformFactory.class); + } else { + throw new IllegalArgumentException("Unsupported plugin type: " + pluginType); + } + factories.forEach(plugin -> { + plugins.add(PluginIdentifier.of("seatunnel", pluginType.getType(), ((Factory) plugin).factoryIdentifier())); + }); + return plugins; + } + @Nullable private T loadPluginInstance(PluginIdentifier pluginIdentifier, ClassLoader classLoader) { ServiceLoader serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader); diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java index 395e857ed46..94ede43e107 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java @@ -18,12 +18,13 @@ package org.apache.seatunnel.plugin.discovery.seatunnel; import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery; public class SeaTunnelTransformPluginDiscovery extends AbstractPluginDiscovery { public SeaTunnelTransformPluginDiscovery() { - super("seatunnel"); + super(Common.libDir()); } @Override diff --git a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java new file mode 100644 index 00000000000..0b64df270e7 --- /dev/null +++ b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscoveryTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.plugin.discovery; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.constants.PluginType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.util.Map; +import java.util.Objects; + +@DisabledOnOs(OS.WINDOWS) +public class AbstractPluginDiscoveryTest { + + @Test + public void testGetAllPlugins() { + Common.setDeployMode(DeployMode.CLIENT); + System.setProperty("SEATUNNEL_HOME", Objects.requireNonNull(AbstractPluginDiscoveryTest.class.getResource("/home")).getPath()); + Map sourcePlugins = AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE); + Assertions.assertEquals(27, sourcePlugins.size()); + + Map sinkPlugins = AbstractPluginDiscovery.getAllSupportedPlugins(PluginType.SINK); + Assertions.assertEquals(30, sinkPlugins.size()); + + } + +} diff --git a/seatunnel-plugin-discovery/src/test/resources/home/connectors/plugin-mapping.properties b/seatunnel-plugin-discovery/src/test/resources/home/connectors/plugin-mapping.properties new file mode 100644 index 00000000000..b3f9707aaf5 --- /dev/null +++ b/seatunnel-plugin-discovery/src/test/resources/home/connectors/plugin-mapping.properties @@ -0,0 +1,147 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This mapping is used to resolve the Jar package name without version (or call artifactId) +# corresponding to the module in the user Config, helping SeaTunnel to load the correct Jar package. + +# Flink Source +flink.source.DruidSource = seatunnel-connector-flink-druid +flink.source.FakeSource = seatunnel-connector-flink-fake +flink.source.FakeSourceStream = seatunnel-connector-flink-fake +flink.source.FileSource = seatunnel-connector-flink-file +flink.source.InfluxDbSource = seatunnel-connector-flink-influxdb +flink.source.JdbcSource = seatunnel-connector-flink-jdbc +flink.source.KafkaTableStream = seatunnel-connector-flink-kafka +flink.source.SocketStream = seatunnel-connector-flink-socket +flink.source.Http = seatunnel-connector-flink-http + +# Flink Sink + +flink.sink.Clickhouse = seatunnel-connector-flink-clickhouse +flink.sink.ClickhouseFile = seatunnel-connector-flink-clickhouse +flink.sink.ConsoleSink = seatunnel-connector-flink-console +flink.sink.DorisSink = seatunnel-connector-flink-doris +flink.sink.DruidSink = seatunnel-connector-flink-druid +flink.sink.ElasticSearch = seatunnel-connector-flink-elasticsearch7 +flink.sink.FileSink = seatunnel-connector-flink-file +flink.sink.InfluxDbSink = seatunnel-connector-flink-influxdb +flink.sink.JdbcSink = seatunnel-connector-flink-jdbc +flink.sink.Kafka = seatunnel-connector-flink-kafka +flink.sink.AssertSink = seatunnel-connector-flink-assert + +# Spark Source + +spark.source.ElasticSearch = seatunnel-connector-spark-elasticsearch +spark.source.Fake = seatunnel-connector-spark-fake +spark.source.FakeStream = seatunnel-connector-spark-fake +spark.source.FeishuSheet = seatunnel-connector-spark-feishu +spark.source.File = seatunnel-connector-spark-file +spark.source.Hbase = seatunnel-connector-spark-hbase +spark.source.Hive = seatunnel-connector-spark-hive +spark.source.Http = seatunnel-connector-spark-http +spark.source.Hudi = seatunnel-connector-spark-hudi +spark.source.Iceberg = seatunnel-connector-spark-iceberg +spark.source.Jdbc = seatunnel-connector-spark-jdbc +spark.source.KafkaStream = seatunnel-connector-spark-kafka +spark.source.Kudu = seatunnel-connector-spark-kudu +spark.source.MongoDB = seatunnel-connector-spark-mongodb +spark.source.Neo4j = seatunnel-connector-spark-neo4j +spark.source.Phoenix = seatunnel-connector-spark-phoenix +spark.source.Redis = seatunnel-connector-spark-redis +spark.source.SocketStream = seatunnel-connector-spark-socket +spark.source.TiDB = seatunnel-connector-spark-tidb + +# Spark Sink + +spark.sink.Clickhouse = seatunnel-connector-spark-clickhouse +spark.sink.ClickhouseFile = seatunnel-connector-spark-clickhouse +spark.sink.Console = seatunnel-connector-spark-console +spark.sink.Doris = seatunnel-connector-spark-doris +spark.sink.ElasticSearch = seatunnel-connector-spark-elasticsearch +spark.sink.Email = seatunnel-connector-spark-email +spark.sink.File = seatunnel-connector-spark-file +spark.sink.Hbase = seatunnel-connector-spark-hbase +spark.sink.Hive = seatunnel-connector-spark-hive +spark.sink.Hudi = seatunnel-connector-spark-hudi +spark.sink.Iceberg = seatunnel-connector-spark-iceberg +spark.sink.Jdbc = seatunnel-connector-spark-jdbc +spark.sink.Kafka = seatunnel-connector-spark-kafka +spark.sink.Kudu = seatunnel-connector-spark-kudu +spark.sink.MongoDB = seatunnel-connector-spark-mongodb +spark.sink.Phoenix = seatunnel-connector-spark-phoenix +spark.sink.Redis = seatunnel-connector-spark-redis +spark.sink.TiDB = seatunnel-connector-spark-tidb + +# SeaTunnel new connector API + +seatunnel.source.FakeSource = connector-fake +seatunnel.sink.Console = connector-console +seatunnel.sink.Assert = connector-assert +seatunnel.source.Kafka = connector-kafka +seatunnel.sink.Kafka = connector-kafka +seatunnel.source.Http = connector-http-base +seatunnel.sink.Http = connector-http-base +seatunnel.sink.Feishu = connector-http-feishu +seatunnel.source.Socket = connector-socket +seatunnel.sink.Hive = connector-hive +seatunnel.source.Hive = connector-hive +seatunnel.source.Clickhouse = connector-clickhouse +seatunnel.sink.Clickhouse = connector-clickhouse +seatunnel.sink.ClickhouseFile = connector-clickhouse +seatunnel.source.Jdbc = connector-jdbc +seatunnel.sink.Jdbc = connector-jdbc +seatunnel.source.Kudu = connector-kudu +seatunnel.sink.Kudu = connector-kudu +seatunnel.sink.Email = connector-email +seatunnel.source.HdfsFile = connector-file-hadoop +seatunnel.sink.HdfsFile = connector-file-hadoop +seatunnel.source.LocalFile = connector-file-local +seatunnel.sink.LocalFile = connector-file-local +seatunnel.source.OssFile = connector-file-oss +seatunnel.sink.OssFile = connector-file-oss +seatunnel.source.Pulsar = connector-pulsar +seatunnel.source.Hudi = connector-hudi +seatunnel.sink.DingTalk = connector-dingtalk +seatunnel.source.Elasticsearch = connector-elasticsearch +seatunnel.sink.Elasticsearch = connector-elasticsearch +seatunnel.source.IoTDB = connector-iotdb +seatunnel.sink.IoTDB = connector-iotdb +seatunnel.source.Neo4j = connector-neo4j +seatunnel.sink.Neo4j = connector-neo4j +seatunnel.source.FtpFile = connector-file-ftp +seatunnel.sink.FtpFile = connector-file-ftp +seatunnel.source.SftpFile = connector-file-sftp +seatunnel.sink.SftpFile = connector-file-sftp +seatunnel.sink.Socket = connector-socket +seatunnel.source.Redis = connector-redis +seatunnel.sink.Redis = connector-redis +seatunnel.sink.DataHub = connector-datahub +seatunnel.sink.Sentry = connector-sentry +seatunnel.source.MongoDB = connector-mongodb +seatunnel.sink.MongoDB = connector-mongodb +seatunnel.source.Iceberg = connector-iceberg +seatunnel.source.InfluxDB = connector-influxdb +seatunnel.source.S3File = connector-file-s3 +seatunnel.sink.S3File = connector-file-s3 +seatunnel.source.AmazonDynamodb = connector-amazondynamodb +seatunnel.sink.AmazonDynamodb = connector-amazondynamodb +seatunnel.source.Cassandra = connector-cassandra +seatunnel.sink.Cassandra = connector-cassandra +seatunnel.sink.StarRocks = connector-starrocks +seatunnel.source.MyHours = connector-http-myhours +seatunnel.sink.InfluxDB = connector-influxdb +seatunnel.source.GoogleSheets = connector-google-sheets