Skip to content

Commit

Permalink
[hotfix][e2e][jdbc] fix some error when docker version is old (#2907)
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx committed Oct 6, 2022
1 parent de34578 commit 4c76129
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -50,6 +52,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -62,9 +65,9 @@ public class JdbcMysqlIT extends FlinkContainer {

@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
public void startMySqlContainer() throws Exception {
// Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
mc = new MySQLContainer<>(DockerImageName.parse("bitnami/mysql:8.0.29").asCompatibleSubstituteFor("mysql"))
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withUsername("root")
Expand All @@ -77,17 +80,14 @@ public void startPostgreSqlContainer() throws Exception {
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
.untilAsserted(this::initializeJdbcTable);
batchInsertData();
}

private void initializeJdbcTable() {
java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/mysql_init.conf");
if (resource == null) {
throw new IllegalArgumentException("can't find find file");
}
private void initializeJdbcTable() throws URISyntaxException {
URI resource = Objects.requireNonNull(FlinkContainer.class.getResource("/jdbc/init_sql/mysql_init.conf")).toURI();

config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
config = new ConfigBuilder(Paths.get(resource)).getConfig();

CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
"type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
Expand Down Expand Up @@ -125,15 +125,15 @@ private void batchInsertData() {
@Test
public void testJdbcMysqlSourceAndSink() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}

@Test
public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_parallel.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
Expand All @@ -145,7 +145,7 @@ public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
public void testJdbcMysqlSourceAndSinkParallelUpperLower() throws Exception {
Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
Expand All @@ -167,7 +167,7 @@ public void testJdbcMysqlSourceAndSinkXA() throws Exception {
@Test
public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_datatype.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
checkSinkDataTypeTable();
}

Expand Down Expand Up @@ -213,7 +213,7 @@ private static List<List> generateTestDataset() {
}

@AfterEach
public void closePostgreSqlContainer() {
public void closeMySqlContainer() {
if (mc != null) {
mc.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
public class JdbcPostgresIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcPostgresIT.class);
private PostgreSQLContainer<?> pg;
Expand All @@ -57,10 +59,11 @@ public class JdbcPostgresIT extends FlinkContainer {
@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14-alpine"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
.withCommand("postgres -c max_prepared_transactions=100")
.withUsername("root")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(pg)).join();
LOGGER.info("Postgres container started");
Expand Down Expand Up @@ -115,14 +118,14 @@ private void batchInsertData() {
@Test
public void testJdbcPostgresSourceAndSink() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}

@Test
public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
Expand All @@ -134,7 +137,7 @@ public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws Exception {
Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
Expand All @@ -148,7 +151,7 @@ public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws Exception {
@Test
public void testJdbcPostgresSourceAndSinkXA() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_xa.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ source{
jdbc{
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "select name , age from source"
}
Expand All @@ -37,8 +37,8 @@ sink {
jdbc {
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ source{
jdbc{
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "select user_id, name , age from source"
partition_column= "user_id"
Expand All @@ -44,9 +44,9 @@ sink {

url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
connection_check_timeout_sec = 100
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ source{
jdbc{
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "select user_id, name , age from source"
partition_column= "user_id"
Expand All @@ -47,9 +47,9 @@ sink {
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"

user = "test"
user = "root"
password = "test"
connection_check_timeout_sec = 100
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ source {
jdbc{
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "select name , age from source"
}
Expand All @@ -38,7 +38,7 @@ sink {
jdbc {
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"

max_retries = 0
Expand All @@ -50,4 +50,4 @@ sink {
max_commit_attempts = 3
transaction_timeout_sec = 86400
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URL;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -51,6 +52,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -63,9 +65,9 @@ public class JdbcMysqlIT extends SparkContainer {

@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
public void startMySqlContainer() throws Exception {
// Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
mc = new MySQLContainer<>(DockerImageName.parse("bitnami/mysql:8.0.29").asCompatibleSubstituteFor("mysql"))
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withUsername("root")
Expand All @@ -78,18 +80,14 @@ public void startPostgreSqlContainer() throws Exception {
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
.untilAsserted(this::initializeJdbcTable);
batchInsertData();
}

private void initializeJdbcTable() {
URL resource = JdbcMysqlIT.class.getResource("/jdbc/init_sql/mysql_init.conf");
if (resource == null) {
throw new IllegalArgumentException("can't find find file");
}

config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
private void initializeJdbcTable() throws URISyntaxException {

URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/mysql_init.conf")).toURI();
config = new ConfigBuilder(Paths.get(resource)).getConfig();
CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
"type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");

Expand Down Expand Up @@ -213,7 +211,7 @@ private static List<List> generateTestDataset() {
}

@AfterEach
public void closePostgreSqlContainer() {
public void closeMySqlContainer() {
if (mc != null) {
mc.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class JdbcPostgresIT extends SparkContainer {
@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14-alpine"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
.withCommand("postgres -c max_prepared_transactions=100")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ sink {
password = "test"
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ sink {
connection_check_timeout_sec = 100
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ sink {
connection_check_timeout_sec = 100
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ sink {
max_commit_attempts = 3
transaction_timeout_sec = 86400
}
}
}

0 comments on commit 4c76129

Please sign in to comment.