Skip to content

Commit d64d452

Browse files
authored
[Improve][e2e] Add driver-jar to lib (#2719)
* [Improve][e2e] add driver-jar to lib * discovery third-party jars * Remove excess code * modify e2e test case * modify e2e test case * fix e2e error,if i run docker in another machine, the case will error * fix some bug * fix some bug * fix some bug * use root to start spark container. * move `addURLToClassLoader` into `registerPlugin` * move `ADD_URL_TO_CLASSLOADER` into `FlinkCommon`
1 parent 46cf839 commit d64d452

File tree

17 files changed

+223
-152
lines changed

17 files changed

+223
-152
lines changed

seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public static Path pluginLibDir(String pluginName) {
145145
/**
146146
* return plugin's dependent jars, which located in 'plugins/${pluginName}/lib/*'.
147147
*/
148-
public static List<Path> getPluginsJarDependencies(){
148+
public static List<Path> getPluginsJarDependencies() {
149149
Path pluginRootDir = Common.pluginRootDir();
150150
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
151151
return Collections.emptyList();

seatunnel-connectors-v2/connector-jdbc/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<modelVersion>4.0.0</modelVersion>
2929

3030
<artifactId>connector-jdbc</artifactId>
31-
31+
3232
<properties>
3333
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
3434
<mysql.version>8.0.16</mysql.version>
@@ -49,6 +49,7 @@
4949
<groupId>org.postgresql</groupId>
5050
<artifactId>postgresql</artifactId>
5151
<version>${postgresql.version}</version>
52+
<scope>provided</scope>
5253
</dependency>
5354
<dependency>
5455
<groupId>com.dameng</groupId>
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.core.starter.flink.config;
19+
20+
import org.apache.seatunnel.common.utils.ReflectionUtils;
21+
22+
import java.net.URL;
23+
import java.net.URLClassLoader;
24+
import java.util.function.BiConsumer;
25+
26+
public class FlinkCommon {
27+
28+
/**
29+
* Add jar url to classloader. The different engine should have different logic to add url into
30+
* their own classloader
31+
*/
32+
public static BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> {
33+
if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
34+
URLClassLoader c = (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
35+
ReflectionUtils.invoke(c, "addURL", url);
36+
} else if (classLoader instanceof URLClassLoader) {
37+
ReflectionUtils.invoke(classLoader, "addURL", url);
38+
} else {
39+
throw new RuntimeException("Unsupported classloader: " + classLoader.getClass().getName());
40+
}
41+
};
42+
}

seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
2222

2323
import org.apache.seatunnel.api.common.JobContext;
24-
import org.apache.seatunnel.common.utils.ReflectionUtils;
24+
import org.apache.seatunnel.core.starter.flink.config.FlinkCommon;
2525
import org.apache.seatunnel.flink.FlinkEnvironment;
2626
import org.apache.seatunnel.flink.util.TableUtil;
2727

@@ -33,7 +33,6 @@
3333
import org.apache.flink.types.Row;
3434

3535
import java.net.URL;
36-
import java.net.URLClassLoader;
3736
import java.util.List;
3837
import java.util.Optional;
3938
import java.util.function.BiConsumer;
@@ -47,16 +46,7 @@ public abstract class AbstractPluginExecuteProcessor<T> implements PluginExecute
4746
protected static final String ENGINE_TYPE = "seatunnel";
4847
protected static final String PLUGIN_NAME = "plugin_name";
4948

50-
protected final BiConsumer<ClassLoader, URL> addUrlToClassloader = (classLoader, url) -> {
51-
if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
52-
URLClassLoader c = (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
53-
ReflectionUtils.invoke(c, "addURL", url);
54-
} else if (classLoader instanceof URLClassLoader) {
55-
ReflectionUtils.invoke(classLoader, "addURL", url);
56-
} else {
57-
throw new RuntimeException("Unsupported classloader: " + classLoader.getClass().getName());
58-
}
59-
};
49+
protected final BiConsumer<ClassLoader, URL> addUrlToClassloader = FlinkCommon.ADD_URL_TO_CLASSLOADER;
6050

6151
protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
6252
JobContext jobContext,

seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.common.config.Common;
2323
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
2424
import org.apache.seatunnel.core.starter.execution.TaskExecution;
25+
import org.apache.seatunnel.core.starter.flink.config.FlinkCommon;
2526
import org.apache.seatunnel.core.starter.flink.config.FlinkEnvironmentFactory;
2627
import org.apache.seatunnel.flink.FlinkEnvironment;
2728

@@ -54,10 +55,10 @@ public FlinkExecution(Config config) {
5455
this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
5556
JobContext jobContext = new JobContext();
5657
jobContext.setJobMode(flinkEnvironment.getJobMode());
58+
registerPlugin();
5759
this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SOURCE));
5860
this.transformPluginExecuteProcessor = new TransformExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.TRANSFORM));
5961
this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SINK));
60-
registerPlugin();
6162
}
6263

6364
@Override
@@ -75,7 +76,7 @@ public void execute() throws TaskExecuteException {
7576
}
7677
}
7778

78-
private void registerPlugin(){
79+
private void registerPlugin() {
7980
List<URL> pluginsJarDependencies = Common.getPluginsJarDependencies().stream()
8081
.map(Path::toUri)
8182
.map(uri -> {
@@ -86,6 +87,9 @@ private void registerPlugin(){
8687
}
8788
})
8889
.collect(Collectors.toList());
90+
91+
pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
92+
8993
flinkEnvironment.registerPlugin(pluginsJarDependencies);
9094
}
9195
}

seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public AbstractContainer() {
6464

6565
protected abstract List<String> getExtraStartShellCommands();
6666

67+
protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
68+
//do nothing
69+
}
70+
6771
protected void copySeaTunnelStarter(GenericContainer<?> container) {
6872
ContainerUtil.copySeaTunnelStarter(container,
6973
this.startModuleName,
@@ -81,6 +85,8 @@ protected Container.ExecResult executeJob(GenericContainer<?> container, String
8185
getConnectorNamePrefix(),
8286
getConnectorType(),
8387
getSeaTunnelHomeInContainer());
88+
// execute extra commands
89+
executeExtraCommands(container);
8490
return executeCommand(container, confInContainerPath);
8591
}
8692

seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public void before() {
6161
.withNetworkAliases("spark-master")
6262
.withExposedPorts()
6363
.withEnv("SPARK_MODE", "master")
64-
.withLogConsumer(new Slf4jLogConsumer(LOG));
64+
.withLogConsumer(new Slf4jLogConsumer(LOG))
65+
.withCreateContainerCmdModifier(cmd -> cmd.withUser("root"));
6566
// In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
6667
// start a worker.
6768
Startables.deepStart(Stream.of(master)).join();
@@ -79,7 +80,7 @@ public void close() {
7980
@Override
8081
protected List<String> getExtraStartShellCommands() {
8182
return Arrays.asList("--master local",
82-
"--deploy-mode client");
83+
"--deploy-mode client");
8384
}
8485

8586
public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {

seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
<groupId>org.postgresql</groupId>
7373
<artifactId>postgresql</artifactId>
7474
<version>${postgresql.version}</version>
75+
<scope>provided</scope>
7576
</dependency>
7677
<dependency>
7778
<groupId>com.dameng</groupId>
@@ -80,4 +81,4 @@
8081
</dependency>
8182
</dependencies>
8283

83-
</project>
84+
</project>

seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131
import org.testcontainers.containers.Container;
32+
import org.testcontainers.containers.GenericContainer;
3233
import org.testcontainers.containers.PostgreSQLContainer;
3334
import org.testcontainers.containers.output.Slf4jLogConsumer;
3435
import org.testcontainers.lifecycle.Startables;
@@ -47,14 +48,15 @@
4748
public class FakeSourceToJdbcIT extends FlinkContainer {
4849
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
4950
private PostgreSQLContainer<?> psl;
51+
private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
5052

5153
@SuppressWarnings("checkstyle:MagicNumber")
5254
@BeforeEach
5355
public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
5456
psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
55-
.withNetwork(NETWORK)
56-
.withNetworkAliases("postgresql")
57-
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
57+
.withNetwork(NETWORK)
58+
.withNetworkAliases("postgresql")
59+
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
5860
Startables.deepStart(Stream.of(psl)).join();
5961
LOGGER.info("PostgreSql container started");
6062
Class.forName(psl.getDriverClassName());
@@ -70,8 +72,8 @@ private void initializeJdbcTable() {
7072
try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
7173
Statement statement = connection.createStatement();
7274
String sql = "CREATE TABLE test (\n" +
73-
" name varchar(255) NOT NULL\n" +
74-
")";
75+
" name varchar(255) NOT NULL\n" +
76+
")";
7577
statement.execute(sql);
7678
} catch (SQLException e) {
7779
throw new RuntimeException("Initializing PostgreSql table failed!", e);
@@ -101,4 +103,10 @@ public void closePostgreSqlContainer() {
101103
psl.stop();
102104
}
103105
}
106+
107+
@Override
108+
protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
109+
Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
110+
Assertions.assertEquals(0, extraCommands.getExitCode());
111+
}
104112
}

seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ public class JdbcDmdbIT extends FlinkContainer {
5151
private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
5252
private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
5353
private static final String HOST = "flink_e2e_dmdb";
54-
private static final String LOCAL_HOST = "localhost";
55-
private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
54+
private static final String URL = "jdbc:dm://%s:5236";
5655
private static final String USERNAME = "SYSDBA";
5756
private static final String PASSWORD = "SYSDBA";
5857
private static final String DATABASE = "SYSDBA";
@@ -81,7 +80,8 @@ public void startDmdbContainer() throws ClassNotFoundException, SQLException {
8180
}
8281

8382
private void initializeJdbcConnection() throws SQLException {
84-
jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
83+
jdbcConnection = DriverManager.getConnection(String.format(
84+
URL, dbServer.getHost()), USERNAME, PASSWORD);
8585
}
8686

8787
/**
@@ -110,8 +110,7 @@ private void initializeJdbcTable() {
110110
}
111111

112112
private void assertHasData(String table) {
113-
try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
114-
Statement statement = connection.createStatement();
113+
try (Statement statement = jdbcConnection.createStatement();) {
115114
String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
116115
ResultSet source = statement.executeQuery(sql);
117116
Assertions.assertTrue(source.next());

0 commit comments

Comments
 (0)