Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import ch.vorburger.exec.ManagedProcessException;
import ch.vorburger.mariadb4j.DB;
import ch.vorburger.mariadb4j.DBConfigurationBuilder;
import ch.vorburger.mariadb4j.junit.MariaDB4jRule;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -42,6 +45,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -56,27 +60,58 @@
*/
public class UnsignedTypeConversionITCase extends AbstractTestBase {

private static final Logger logger = LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
private static final String DEFAULT_DB_NAME = "test";
private static final String TABLE_NAME = "unsigned_test";
private static final int INITIALIZE_DB_MAX_RETRY = 3;
private static DB db;
private static String dbUrl;
private static Connection connection;

private StreamTableEnvironment tEnv;
private String dbUrl;
private Connection connection;

@ClassRule
public static MariaDB4jRule db4jRule = new MariaDB4jRule(
DBConfigurationBuilder.newBuilder().build(),
DEFAULT_DB_NAME,
null);
@BeforeClass
public static void prepareMariaDB() throws IllegalStateException {
boolean initDbSuccess = false;
int i = 0;
//The initialization of maria db instance is a little unstable according to past CI tests.
//Add retry logic here to avoid initialization failure.
while (i < INITIALIZE_DB_MAX_RETRY) {
try {
db = DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
db.start();
dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
connection = DriverManager.getConnection(dbUrl);
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE IF NOT EXISTS `" + DEFAULT_DB_NAME + "`;");
ResultSet resultSet = statement.executeQuery("SELECT SCHEMA_NAME FROM " +
"INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '" + DEFAULT_DB_NAME + "';");
if (resultSet.next()) {
String dbName = resultSet.getString(1);
initDbSuccess = DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
}
}
} catch (Exception e) {
logger.warn("Initialize DB fail caused by {}", e);
stopDb();
}
if (initDbSuccess) {
break;
}
i++;
}
if (!initDbSuccess) {
throw new IllegalStateException(String.format("Initialize MySQL database instance failed after {} attempts," +
" please open an issue.", INITIALIZE_DB_MAX_RETRY));
}
}

@Before
public void setUp() throws SQLException, ClassNotFoundException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
public void setUp() throws SQLException, IllegalStateException {
//dbUrl: jdbc:mysql://localhost:3306/test
dbUrl = db4jRule.getURL();
connection = DriverManager.getConnection(dbUrl);
createMysqlTable();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
createFlinkTable();
prepareData();
}
Expand Down Expand Up @@ -177,8 +212,18 @@ private void prepareData() {
}

@After
public void cleanup() throws Exception {
PreparedStatement preparedStatement = connection.prepareStatement(String.format("drop table %s", TABLE_NAME));
preparedStatement.execute();
public void cleanup() {
stopDb();
}

private static void stopDb() {
if (db == null) {
return;
}
try {
db.stop();
} catch (ManagedProcessException e1) {
logger.warn("Stop DB instance fail caused by {}", e1);
}
}
}