Skip to content

Commit 7393c47

Browse files
authored
[Improve] [Engine] Improve Engine performance. (#3216)
* [Engine] [Test] improve engine performance * [Engine] [Improve] engine performance improve * [Core] [Improve] Add test timeout * [Bug] [FakeSource] Add log for FakeSourceSplitEnumerator * [Bug] [Engine] Fix TaskLocation Serializable problem
1 parent ac47457 commit 7393c47

File tree

19 files changed

+81
-34
lines changed

19 files changed

+81
-34
lines changed

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private void assignPendingSplits() {
139139
// Mark pending splits as already assigned
140140
assignedSplits.addAll(pendingAssignmentForReader);
141141
// Assign pending splits to reader
142-
LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
142+
LOG.info("Assigning splits to readers {} {}", pendingReader, pendingAssignmentForReader);
143143
enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
144144
enumeratorContext.signalNoMoreSplits(pendingReader);
145145
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2323
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
2424
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
25+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
2526
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
2627

2728
import org.slf4j.Logger;
@@ -47,7 +48,7 @@
4748

4849
public class JdbcInputFormat implements Serializable {
4950

50-
protected static final long serialVersionUID = 2L;
51+
private static final long serialVersionUID = 2L;
5152
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
5253

5354
protected JdbcConnectionProvider connectionProvider;
@@ -63,19 +64,22 @@ public class JdbcInputFormat implements Serializable {
6364

6465
protected boolean hasNext;
6566

67+
protected JdbcDialect jdbcDialect;
68+
6669
public JdbcInputFormat(JdbcConnectionProvider connectionProvider,
67-
JdbcRowConverter jdbcRowConverter,
70+
JdbcDialect jdbcDialect,
6871
SeaTunnelRowType typeInfo,
6972
String queryTemplate,
7073
int fetchSize,
7174
Boolean autoCommit
7275
) {
7376
this.connectionProvider = connectionProvider;
74-
this.jdbcRowConverter = jdbcRowConverter;
77+
this.jdbcRowConverter = jdbcDialect.getRowConverter();
7578
this.typeInfo = typeInfo;
7679
this.queryTemplate = queryTemplate;
7780
this.fetchSize = fetchSize;
7881
this.autoCommit = autoCommit;
82+
this.jdbcDialect = jdbcDialect;
7983
}
8084

8185
public void openInputFormat() {
@@ -89,10 +93,7 @@ public void openInputFormat() {
8993
dbConn.setAutoCommit(autoCommit);
9094
}
9195

92-
statement = dbConn.prepareStatement(queryTemplate);
93-
if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
94-
statement.setFetchSize(fetchSize);
95-
}
96+
statement = jdbcDialect.creatPreparedStatement(dbConn, queryTemplate, fetchSize);
9697
} catch (SQLException se) {
9798
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
9899
} catch (ClassNotFoundException cnfe) {

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,7 @@ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunn
5353
for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
5454
Object seatunnelField;
5555
SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
56-
if (null == rs.getObject(i)) {
57-
seatunnelField = null;
58-
}
59-
else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
56+
if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
6057
seatunnelField = rs.getBoolean(i);
6158
} else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
6259
seatunnelField = rs.getByte(i);
@@ -88,7 +85,9 @@ else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
8885
} else {
8986
throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
9087
}
91-
88+
if (rs.wasNull()) {
89+
seatunnelField = null;
90+
}
9291
fields.add(seatunnelField);
9392
}
9493

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
2121

2222
import java.io.Serializable;
23+
import java.sql.Connection;
24+
import java.sql.PreparedStatement;
25+
import java.sql.ResultSet;
26+
import java.sql.SQLException;
2327

2428
/**
2529
* Represents a dialect of SQL implemented by a particular JDBC system. Dialects should be immutable
@@ -45,8 +49,22 @@ public interface JdbcDialect extends Serializable {
4549

4650
/**
4751
* get jdbc meta-information type to seatunnel data type mapper.
52+
*
4853
* @return a type mapper for the database
4954
*/
5055
JdbcDialectTypeMapper getJdbcDialectTypeMapper();
5156

57+
/**
58+
* Different dialects optimize their PreparedStatement
59+
*
60+
* @return The logic about optimize PreparedStatement
61+
*/
62+
default PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException {
63+
PreparedStatement statement = connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
64+
if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
65+
statement.setFetchSize(fetchSize);
66+
}
67+
return statement;
68+
}
69+
5270
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
2222
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
2323

24+
import java.sql.Connection;
25+
import java.sql.PreparedStatement;
26+
import java.sql.ResultSet;
27+
import java.sql.SQLException;
28+
2429
public class MysqlDialect implements JdbcDialect {
2530
@Override
2631
public String dialectName() {
@@ -36,4 +41,11 @@ public JdbcRowConverter getRowConverter() {
3641
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
3742
return new MySqlTypeMapper();
3843
}
44+
45+
@Override
46+
public PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException {
47+
PreparedStatement statement = connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
48+
statement.setFetchSize(Integer.MIN_VALUE);
49+
return statement;
50+
}
3951
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
8686

8787
inputFormat = new JdbcInputFormat(
8888
jdbcConnectionProvider,
89-
jdbcDialect.getRowConverter(),
89+
jdbcDialect,
9090
typeInfo,
9191
query,
9292
0,
@@ -147,7 +147,7 @@ private SeaTunnelRowType initTableField(Connection conn) {
147147
} catch (Exception e) {
148148
LOG.warn("get row type info exception", e);
149149
}
150-
return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
150+
return new SeaTunnelRowType(fieldNames.toArray(new String[0]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
151151
}
152152

153153
private PartitionParameter initPartitionParameter(String columnName, Connection connection) throws SQLException {

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public void testStreamJobRunOkIn3Node() throws ExecutionException, InterruptedEx
215215
return clientJobProxy.waitForJobComplete();
216216
});
217217

218-
Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
218+
Awaitility.await().atMost(3, TimeUnit.MINUTES)
219219
.untilAsserted(() -> {
220220
Thread.sleep(2000);
221221
System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));

seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/SeatunnelChildFirstClassLoader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ public class SeatunnelChildFirstClassLoader extends SeatunnelBaseClassLoader {
2929
private final String[] alwaysParentFirstPatterns;
3030
private static final String[] DEFAULT_PARENT_FIRST_PATTERNS = new String[]{
3131
"java.",
32+
"javax.xml",
33+
"org.xml",
34+
"org.w3c",
35+
"org.apache.hadoop",
3236
"scala.",
3337
"org.apache.seatunnel.",
3438
"javax.annotation.",

seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public ImmutablePair<List<Action>, Set<URL>> parse() {
123123
complexAnalyze(sourceConfigs, transformConfigs, sinkConfigs);
124124
}
125125
actions.forEach(this::addCommonPluginJarsToAction);
126+
jarUrlsSet.addAll(commonPluginJars);
126127
return new ImmutablePair<>(actions, jarUrlsSet);
127128
}
128129

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ public PassiveCompletableFuture<TaskExecutionState> deployTask(
177177
return new PassiveCompletableFuture<>(resultFuture);
178178
}
179179

180+
@Deprecated
180181
public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
181182
@NonNull TaskGroup taskGroup,
182183
@NonNull CompletableFuture<TaskExecutionState> resultFuture) {
@@ -272,7 +273,8 @@ private BlockingWorker(TaskTracker tracker, CountDownLatch startedLatch) {
272273

273274
@Override
274275
public void run() {
275-
ClassLoader classLoader = executionContexts.get(tracker.taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
276+
TaskExecutionService.TaskGroupExecutionTracker taskGroupExecutionTracker = tracker.taskGroupExecutionTracker;
277+
ClassLoader classLoader = executionContexts.get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation()).getClassLoader();
276278
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
277279
Thread.currentThread().setContextClassLoader(classLoader);
278280
final Task t = tracker.task;
@@ -283,12 +285,12 @@ public void run() {
283285
do {
284286
result = t.call();
285287
} while (!result.isDone() && isRunning &&
286-
!tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
288+
!taskGroupExecutionTracker.executionCompletedExceptionally());
287289
} catch (Throwable e) {
288290
logger.warning("Exception in " + t, e);
289-
tracker.taskGroupExecutionTracker.exception(e);
291+
taskGroupExecutionTracker.exception(e);
290292
} finally {
291-
tracker.taskGroupExecutionTracker.taskDone();
293+
taskGroupExecutionTracker.taskDone();
292294
}
293295
Thread.currentThread().setContextClassLoader(oldClassLoader);
294296
}

0 commit comments

Comments
 (0)