Skip to content

Commit db434ad

Browse files
authored
[Feature] [Connector-V2 E2E] Add mysql and postgres e2e test and bug fix (#2838)
* add mysql and postgres e2e test * fix bug
1 parent e0459bb commit db434ad

File tree

35 files changed

+2590
-5
lines changed

35 files changed

+2590
-5
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) th
103103
case MYSQL_MEDIUMINT_UNSIGNED:
104104
case MYSQL_INT:
105105
case MYSQL_INTEGER:
106+
case MYSQL_YEAR:
106107
return BasicType.INT_TYPE;
107108
case MYSQL_INT_UNSIGNED:
108109
case MYSQL_INTEGER_UNSIGNED:
@@ -138,8 +139,6 @@ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) th
138139
+ "the precision will be set to 2147483647.",
139140
MYSQL_LONGTEXT);
140141
return BasicType.STRING_TYPE;
141-
142-
case MYSQL_YEAR:
143142
case MYSQL_DATE:
144143
return LocalTimeType.LOCAL_DATE_TYPE;
145144
case MYSQL_TIME:

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Collection;
3131
import java.util.Iterator;
3232
import java.util.List;
33+
import java.util.stream.Collectors;
3334

3435
public class XaGroupOpsImpl
3536
implements XaGroupOps {
@@ -112,8 +113,11 @@ public GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids)
112113
}
113114

114115
@Override
115-
public void recoverAndRollback(JobContext context, SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
116-
Collection<Xid> recovered = xaFacade.recover();
116+
public void recoverAndRollback(JobContext context, SinkWriter.Context sinkContext, XidGenerator xidGenerator,
117+
Xid excludeXid) {
118+
Collection<Xid> recovered = xaFacade.recover().stream()
119+
.map(x -> new XidImpl(x.getFormatId(), x.getGlobalTransactionId(), x.getBranchQualifier())).collect(
120+
Collectors.toList());
117121
recovered.remove(excludeXid);
118122
if (recovered.isEmpty()) {
119123
return;

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public void write(SeaTunnelRow element)
134134
@Override
135135
public Optional<XidInfo> prepareCommit()
136136
throws IOException {
137+
tryOpen();
137138
prepareCurrentTx();
138139
this.currentXid = null;
139140
beginTx();

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public void write(SeaTunnelRow element)
7979
@Override
8080
public Optional<XidInfo> prepareCommit()
8181
throws IOException {
82+
tryOpen();
8283
outputFormat.flush();
8384
return Optional.empty();
8485
}
@@ -91,6 +92,8 @@ public void abortPrepare() {
9192
@Override
9293
public void close()
9394
throws IOException {
95+
tryOpen();
96+
outputFormat.flush();
9497
outputFormat.close();
9598
}
9699
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
20+
21+
import org.apache.seatunnel.api.common.JobContext;
22+
import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
23+
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
28+
import javax.transaction.xa.Xid;
29+
30+
class SemanticXidGeneratorTest {
31+
private JobContext jobContext;
32+
private SemanticXidGenerator xidGenerator;
33+
34+
@BeforeEach
35+
void before() {
36+
jobContext = new JobContext();
37+
xidGenerator = new SemanticXidGenerator();
38+
xidGenerator.open();
39+
}
40+
41+
@Test
42+
void testBelongsToSubtask() {
43+
DefaultSinkWriterContext dc1 = new DefaultSinkWriterContext(1);
44+
Xid xid1 = xidGenerator.generateXid(jobContext, dc1, System.currentTimeMillis());
45+
Assertions.assertTrue(xidGenerator.belongsToSubtask(xid1, jobContext, dc1));
46+
Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, jobContext, new DefaultSinkWriterContext(2)));
47+
Assertions.assertFalse(xidGenerator.belongsToSubtask(xid1, new JobContext(), dc1));
48+
}
49+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to You under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
-->
16+
<project xmlns="http://maven.apache.org/POM/4.0.0"
17+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
18+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<parent>
20+
<artifactId>seatunnel-connector-v2-e2e</artifactId>
21+
<groupId>org.apache.seatunnel</groupId>
22+
<version>${revision}</version>
23+
</parent>
24+
<modelVersion>4.0.0</modelVersion>
25+
26+
<artifactId>connector-jdbc-it</artifactId>
27+
28+
<properties>
29+
<testcontainers.version>1.17.3</testcontainers.version>
30+
</properties>
31+
32+
<dependencies>
33+
<!-- SeaTunnel connectors -->
34+
<dependency>
35+
<groupId>org.apache.seatunnel</groupId>
36+
<artifactId>connector-jdbc</artifactId>
37+
<version>${project.version}</version>
38+
<scope>test</scope>
39+
</dependency>
40+
41+
<dependency>
42+
<groupId>org.testcontainers</groupId>
43+
<artifactId>mysql</artifactId>
44+
<version>${testcontainers.version}</version>
45+
<scope>test</scope>
46+
</dependency>
47+
48+
</dependencies>
49+
50+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
20+
21+
import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
22+
23+
import org.apache.seatunnel.api.common.JobContext;
24+
import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
25+
import org.apache.seatunnel.api.sink.SinkWriter;
26+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
27+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
28+
29+
import org.junit.jupiter.api.AfterEach;
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
import org.testcontainers.containers.MySQLContainer;
36+
import org.testcontainers.containers.output.Slf4jLogConsumer;
37+
import org.testcontainers.lifecycle.Startables;
38+
import org.testcontainers.utility.DockerImageName;
39+
40+
import javax.sql.XADataSource;
41+
import javax.transaction.xa.XAException;
42+
import javax.transaction.xa.XAResource;
43+
import javax.transaction.xa.Xid;
44+
45+
import java.util.stream.Stream;
46+
47+
class XaGroupOpsImplIT {
48+
49+
private static final Logger LOGGER = LoggerFactory.getLogger(XaGroupOpsImplIT.class);
50+
private MySQLContainer<?> mc;
51+
private XaGroupOps xaGroupOps;
52+
private SemanticXidGenerator xidGenerator;
53+
private JdbcConnectionOptions jdbcConnectionOptions;
54+
private XaFacade xaFacade;
55+
private XAResource xaResource;
56+
57+
@BeforeEach
58+
void before() throws Exception {
59+
// Non-root users need to grant XA_RECOVER_ADMIN permission
60+
mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
61+
.withUsername("root")
62+
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
63+
Startables.deepStart(Stream.of(mc)).join();
64+
65+
jdbcConnectionOptions = JdbcConnectionOptions.builder()
66+
.withUrl(mc.getJdbcUrl())
67+
.withUsername(mc.getUsername())
68+
.withPassword(mc.getPassword())
69+
.withXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource")
70+
.build();
71+
72+
xidGenerator = new SemanticXidGenerator();
73+
xidGenerator.open();
74+
xaFacade = new XaFacadeImplAutoLoad(jdbcConnectionOptions);
75+
xaFacade.open();
76+
xaGroupOps = new XaGroupOpsImpl(xaFacade);
77+
78+
XADataSource xaDataSource = (XADataSource) DataSourceUtils.buildCommonDataSource(jdbcConnectionOptions);
79+
xaResource = xaDataSource.getXAConnection().getXAResource();
80+
81+
}
82+
83+
@Test
84+
void testRecoverAndRollback() throws Exception {
85+
JobContext jobContext = new JobContext();
86+
SinkWriter.Context writerContext1 = new DefaultSinkWriterContext(1);
87+
Xid xid1 =
88+
xidGenerator.generateXid(jobContext, writerContext1, System.currentTimeMillis());
89+
Xid xid2 =
90+
xidGenerator.generateXid(jobContext, writerContext1, System.currentTimeMillis() + 1);
91+
92+
xaFacade.start(xid1);
93+
xaFacade.endAndPrepare(xid1);
94+
95+
xaFacade.start(xid2);
96+
xaFacade.endAndPrepare(xid2);
97+
98+
Assertions.assertTrue(checkPreparedXid(xid1));
99+
Assertions.assertTrue(checkPreparedXid(xid2));
100+
101+
xaGroupOps.recoverAndRollback(jobContext, writerContext1, xidGenerator, xid2);
102+
103+
Assertions.assertFalse(checkPreparedXid(xid1));
104+
Assertions.assertTrue(checkPreparedXid(xid2));
105+
106+
}
107+
108+
private boolean checkPreparedXid(Xid xidCrr) throws XAException {
109+
Xid[] recover = xaResource.recover(TMSTARTRSCAN);
110+
for (int i = 0; i < recover.length; i++) {
111+
XidImpl xid = new XidImpl(recover[i].getFormatId(), recover[i].getGlobalTransactionId(),
112+
recover[i].getBranchQualifier());
113+
if (xid.equals(xidCrr)) {
114+
return true;
115+
}
116+
}
117+
return false;
118+
}
119+
120+
@AfterEach
121+
public void closePostgreSqlContainer() {
122+
if (mc != null) {
123+
mc.stop();
124+
}
125+
}
126+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
# Set everything to be logged to the console
18+
log4j.rootCategory=INFO, console
19+
log4j.appender.console=org.apache.log4j.ConsoleAppender
20+
log4j.appender.console.target=System.err
21+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
22+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
<packaging>pom</packaging>
2626
<modules>
2727
<module>connector-assert-e2e</module>
28+
<module>connector-jdbc-it</module>
2829
</modules>
2930

3031
<artifactId>seatunnel-connector-v2-e2e</artifactId>

seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525

2626
<artifactId>connector-jdbc-flink-e2e</artifactId>
2727

28+
<properties>
29+
<testcontainers.version>1.17.3</testcontainers.version>
30+
</properties>
31+
2832
<dependencies>
2933
<dependency>
3034
<groupId>org.apache.seatunnel</groupId>
@@ -59,9 +63,17 @@
5963
<dependency>
6064
<groupId>org.testcontainers</groupId>
6165
<artifactId>postgresql</artifactId>
62-
<version>1.17.3</version>
66+
<version>${testcontainers.version}</version>
6367
<scope>test</scope>
6468
</dependency>
69+
70+
<dependency>
71+
<groupId>org.testcontainers</groupId>
72+
<artifactId>mysql</artifactId>
73+
<version>${testcontainers.version}</version>
74+
<scope>test</scope>
75+
</dependency>
76+
6577
<dependency>
6678
<groupId>mysql</groupId>
6779
<artifactId>mysql-connector-java</artifactId>

0 commit comments

Comments
 (0)