Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix][e2e][jdbc] fix some error when docker version is old #2907

Merged
merged 12 commits into from
Oct 6, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -50,6 +52,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -62,9 +65,9 @@ public class JdbcMysqlIT extends FlinkContainer {

@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
public void startMySqlContainer() throws Exception {
// Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
mc = new MySQLContainer<>(DockerImageName.parse("bitnami/mysql:8.0.29").asCompatibleSubstituteFor("mysql"))
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withUsername("root")
Expand All @@ -77,17 +80,14 @@ public void startPostgreSqlContainer() throws Exception {
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
.untilAsserted(this::initializeJdbcTable);
batchInsertData();
}

private void initializeJdbcTable() {
java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/mysql_init.conf");
if (resource == null) {
throw new IllegalArgumentException("can't find find file");
}
private void initializeJdbcTable() throws URISyntaxException {
URI resource = Objects.requireNonNull(FlinkContainer.class.getResource("/jdbc/init_sql/mysql_init.conf")).toURI();

config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
config = new ConfigBuilder(Paths.get(resource)).getConfig();

CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
"type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
Expand Down Expand Up @@ -125,15 +125,15 @@ private void batchInsertData() {
@Test
public void testJdbcMysqlSourceAndSink() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}

@Test
public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_parallel.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
Expand All @@ -145,7 +145,7 @@ public void testJdbcMysqlSourceAndSinkParallel() throws Exception {
public void testJdbcMysqlSourceAndSinkParallelUpperLower() throws Exception {
Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
Expand All @@ -167,7 +167,7 @@ public void testJdbcMysqlSourceAndSinkXA() throws Exception {
@Test
public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_mysql_source_and_sink_datatype.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
checkSinkDataTypeTable();
}

Expand Down Expand Up @@ -213,7 +213,7 @@ private static List<List> generateTestDataset() {
}

@AfterEach
public void closePostgreSqlContainer() {
public void closeMySqlContainer() {
if (mc != null) {
mc.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
public class JdbcPostgresIT extends FlinkContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcPostgresIT.class);
private PostgreSQLContainer<?> pg;
Expand All @@ -57,10 +59,11 @@ public class JdbcPostgresIT extends FlinkContainer {
@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14-alpine"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
.withCommand("postgres -c max_prepared_transactions=100")
.withUsername("root")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(pg)).join();
LOGGER.info("Postgres container started");
Expand Down Expand Up @@ -115,14 +118,14 @@ private void batchInsertData() {
@Test
public void testJdbcPostgresSourceAndSink() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}

@Test
public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
Expand All @@ -134,7 +137,7 @@ public void testJdbcPostgresSourceAndSinkParallel() throws Exception {
public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws Exception {
Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

//Sorting is required, because it is read in parallel, so there will be out of order
List<List> sortedResult = queryResult().stream().sorted(Comparator.comparing(list -> (Integer) list.get(1)))
Expand All @@ -148,7 +151,7 @@ public void testJdbcPostgresSourceAndSinkParallelUpperLower() throws Exception {
@Test
public void testJdbcPostgresSourceAndSinkXA() throws Exception {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_postgres_source_and_sink_xa.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Assertions.assertIterableEquals(generateTestDataset(), queryResult());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ source{
jdbc{
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "select name , age from source"
}
Expand All @@ -37,8 +37,8 @@ sink {
jdbc {
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ source{
jdbc{
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "select user_id, name , age from source"
partition_column= "user_id"
Expand All @@ -44,9 +44,9 @@ sink {

url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
connection_check_timeout_sec = 100
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ source{
jdbc{
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "select user_id, name , age from source"
partition_column= "user_id"
Expand All @@ -46,9 +46,9 @@ sink {
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"

user = "test"
user = "root"
password = "test"
connection_check_timeout_sec = 100
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ source {
jdbc{
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"
query = "select name , age from source"
}
Expand All @@ -38,7 +38,7 @@ sink {
jdbc {
url = "jdbc:postgresql://postgresql:5432/test"
driver = "org.postgresql.Driver"
user = "test"
user = "root"
password = "test"

max_retries = 0
Expand All @@ -50,4 +50,4 @@ sink {
max_commit_attempts = 3
transaction_timeout_sec = 86400
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URL;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -51,6 +52,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -63,9 +65,9 @@ public class JdbcMysqlIT extends SparkContainer {

@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
public void startMySqlContainer() throws Exception {
// Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
mc = new MySQLContainer<>(DockerImageName.parse("bitnami/mysql:8.0.29").asCompatibleSubstituteFor("mysql"))
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withUsername("root")
Expand All @@ -78,18 +80,14 @@ public void startPostgreSqlContainer() throws Exception {
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
.untilAsserted(this::initializeJdbcTable);
batchInsertData();
}

private void initializeJdbcTable() {
URL resource = JdbcMysqlIT.class.getResource("/jdbc/init_sql/mysql_init.conf");
if (resource == null) {
throw new IllegalArgumentException("can't find find file");
}

config = new ConfigBuilder(Paths.get(resource.getPath())).getConfig();
private void initializeJdbcTable() throws URISyntaxException {

URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/mysql_init.conf")).toURI();
config = new ConfigBuilder(Paths.get(resource)).getConfig();
CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
"type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");

Expand Down Expand Up @@ -213,7 +211,7 @@ private static List<List> generateTestDataset() {
}

@AfterEach
public void closePostgreSqlContainer() {
public void closeMySqlContainer() {
if (mc != null) {
mc.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class JdbcPostgresIT extends SparkContainer {
@SuppressWarnings("checkstyle:MagicNumber")
@BeforeEach
public void startPostgreSqlContainer() throws Exception {
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14.3"))
pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14-alpine"))
.withNetwork(NETWORK)
.withNetworkAliases("postgresql")
.withCommand("postgres -c max_prepared_transactions=100")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ sink {
password = "test"
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ sink {
connection_check_timeout_sec = 100
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ sink {
connection_check_timeout_sec = 100
query = "insert into sink(name,age) values(?,?)"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ sink {
max_commit_attempts = 3
transaction_timeout_sec = 86400
}
}
}