Skip to content

Commit 439f686

Browse files
authored
[Improve][Connector-v2] Unset AutoCommit default to true (#3451)
1 parent 9781a6a commit 439f686

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,6 @@ public Connection getOrEstablishConnection()
129129
"No suitable driver found for " + jdbcOptions.getUrl(), "08001");
130130
}
131131

132-
//Auto commit is used by default
133-
connection.setAutoCommit(true);
134-
135132
return connection;
136133
}
137134

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.commons.lang3.SerializationUtils;
3333

3434
import java.io.IOException;
35+
import java.sql.SQLException;
3536
import java.util.Collections;
3637
import java.util.List;
3738
import java.util.Optional;
@@ -42,12 +43,14 @@ public class JdbcSinkWriter implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
4243
private final SinkWriter.Context context;
4344
private transient boolean isOpen;
4445

46+
private JdbcConnectionProvider connectionProvider;
47+
4548
public JdbcSinkWriter(
4649
SinkWriter.Context context,
4750
JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
4851
JdbcSinkOptions jdbcSinkOptions) {
4952

50-
JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(jdbcSinkOptions.getJdbcConnectionOptions());
53+
connectionProvider = new SimpleJdbcConnectionProvider(jdbcSinkOptions.getJdbcConnectionOptions());
5154

5255
this.context = context;
5356
this.outputFormat = new JdbcOutputFormat<>(
@@ -81,6 +84,13 @@ public Optional<XidInfo> prepareCommit()
8184
throws IOException {
8285
tryOpen();
8386
outputFormat.flush();
87+
try {
88+
if (!connectionProvider.getConnection().getAutoCommit()){
89+
connectionProvider.getConnection().commit();
90+
}
91+
} catch (SQLException e) {
92+
throw new IOException(e);
93+
}
8494
return Optional.empty();
8595
}
8696

@@ -94,6 +104,13 @@ public void close()
94104
throws IOException {
95105
tryOpen();
96106
outputFormat.flush();
107+
try {
108+
if (!connectionProvider.getConnection().getAutoCommit()){
109+
connectionProvider.getConnection().commit();
110+
}
111+
} catch (SQLException e) {
112+
throw new IOException(e);
113+
}
97114
outputFormat.close();
98115
}
99116
}

0 commit comments

Comments
 (0)