Skip to content

Commit

Permalink
update it base class and add snapshot only case
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Mar 13, 2024
1 parent 32fa202 commit 6f5c066
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet
private final String serverTimeZone;

private final String hostname;
private final Integer port;
private final int port;
private final String compatibleMode;
private final String jdbcDriver;
private final Properties jdbcProperties;
Expand Down Expand Up @@ -99,7 +99,7 @@ public OceanBaseTableSource(
String serverTimeZone,
Duration connectTimeout,
String hostname,
Integer port,
int port,
String compatibleMode,
String jdbcDriver,
Properties jdbcProperties,
Expand All @@ -116,19 +116,19 @@ public OceanBaseTableSource(
this.startupOptions = checkNotNull(startupOptions);
this.username = checkNotNull(username);
this.password = checkNotNull(password);
this.tenantName = checkNotNull(tenantName);
this.tenantName = tenantName;
this.databaseName = databaseName;
this.tableName = tableName;
this.tableList = tableList;
this.serverTimeZone = serverTimeZone;
this.connectTimeout = connectTimeout;
this.hostname = hostname;
this.hostname = checkNotNull(hostname);
this.port = port;
this.compatibleMode = compatibleMode;
this.jdbcDriver = jdbcDriver;
this.jdbcProperties = jdbcProperties;
this.logProxyHost = checkNotNull(logProxyHost);
this.logProxyPort = checkNotNull(logProxyPort);
this.logProxyHost = logProxyHost;
this.logProxyPort = logProxyPort;
this.logProxyClientId = logProxyClientId;
this.startupTimestamp = startupTimestamp;
this.rsList = rsList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,45 +61,64 @@ public abstract class OceanBaseTestBase extends TestLogger {

@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

protected String obServerHost;
protected int obServerPort;
protected String logProxyHost;
protected int logProxyPort;
protected String username;
protected String password;
protected String tenant;

protected String commonOptionString() {
return " 'connector' = 'oceanbase-cdc',"
+ " 'scan.startup.mode' = 'initial',"
+ " 'username' = '"
+ username
+ "',"
+ " 'password' = '"
+ password
+ "',"
+ " 'tenant-name' = '"
+ tenant
+ "',"
+ " 'hostname' = '"
+ obServerHost
+ "',"
+ " 'port' = '"
+ obServerPort
+ "',"
+ " 'logproxy.host' = '"
+ logProxyHost
+ "',"
+ " 'logproxy.port' = '"
+ logProxyPort
+ "',"
+ " 'compatible-mode' = '"
+ compatibleMode()
+ "',"
+ " 'working-mode' = 'memory',";
protected final String compatibleMode;
protected final String username;
protected final String password;
protected final String hostname;
protected final int port;
protected final String logProxyHost;
protected final int logProxyPort;
protected final String tenant;

public OceanBaseTestBase(
String compatibleMode,
String username,
String password,
String hostname,
int port,
String logProxyHost,
int logProxyPort,
String tenant) {
this.compatibleMode = compatibleMode;
this.username = username;
this.password = password;
this.hostname = hostname;
this.port = port;
this.logProxyHost = logProxyHost;
this.logProxyPort = logProxyPort;
this.tenant = tenant;
}

protected abstract String compatibleMode();
protected String commonOptionsString() {
return String.format(
" 'connector' = 'oceanbase-cdc', "
+ " 'username' = '%s', "
+ " 'password' = '%s', "
+ " 'hostname' = '%s', "
+ " 'port' = '%s', "
+ " 'compatible-mode' = '%s'",
username, password, hostname, port, compatibleMode);
}

protected String logProxyOptionsString() {
return String.format(
" 'working-mode' = 'memory',"
+ " 'tenant-name' = '%s',"
+ " 'logproxy.host' = '%s',"
+ " 'logproxy.port' = '%s'",
tenant, logProxyHost, logProxyPort);
}

protected String initialOptionsString() {
return " 'scan.startup.mode' = 'initial', "
+ commonOptionsString()
+ ", "
+ logProxyOptionsString();
}

protected String snapshotOptionsString() {
return " 'scan.startup.mode' = 'snapshot', " + commonOptionsString();
}

protected abstract Connection getJdbcConnection() throws SQLException;

Expand All @@ -111,7 +130,7 @@ protected void setGlobalTimeZone(String serverTimeZone) throws SQLException {
}

protected void initializeTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode(), sqlFile);
final String ddlFile = String.format("ddl/%s/%s.sql", compatibleMode, sqlFile);
final URL ddlTestFile = getClass().getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -44,10 +46,12 @@
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

/** Integration tests for OceanBase MySQL mode table source. */
@RunWith(Parameterized.class)
public class OceanBaseMySQLModeITCase extends OceanBaseTestBase {

private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLModeITCase.class);
Expand Down Expand Up @@ -99,46 +103,54 @@ public static void stopContainers() {
LOG.info("Containers are stopped.");
}

protected final String rsList;

public OceanBaseMySQLModeITCase() {
this.obServerHost = "127.0.0.1";
this.obServerPort = 2881;
this.logProxyHost = "127.0.0.1";
this.logProxyPort = 2983;
this.username = "root@test";
this.password = "123456";
this.tenant = "test";
this.rsList = "127.0.0.1:2882:2881";
}

@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
}

@Override
protected String compatibleMode() {
return "mysql";
private final String rsList;

public OceanBaseMySQLModeITCase(
String username,
String password,
String hostname,
int port,
String logProxyHost,
int logProxyPort,
String tenant,
String rsList) {
super("mysql", username, password, hostname, port, logProxyHost, logProxyPort, tenant);
this.rsList = rsList;
}

@Parameterized.Parameters
public static List<Object[]> parameters() {
return Collections.singletonList(
new Object[] {
"root@test",
"123456",
"127.0.0.1",
2881,
"127.0.0.1",
2983,
"test",
"127.0.0.1:2882:2881"
});
}

@Override
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
"jdbc:mysql://" + obServerHost + ":" + obServerPort + "/?useSSL=false",
username,
password);
protected String logProxyOptionsString() {
return super.logProxyOptionsString()
+ " , "
+ String.format(" 'rootserver-list' = '%s'", rsList);
}

@Override
protected String commonOptionString() {
return super.commonOptionString()
+ " 'rootserver-list' = '"
+ rsList
+ "',"
+ " 'jdbc.properties.useSSL' = 'false',";
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
"jdbc:mysql://" + hostname + ":" + port + "/?useSSL=false", username, password);
}

@Test
Expand All @@ -154,7 +166,8 @@ public void testTableList() throws Exception {
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ commonOptionString()
+ initialOptionsString()
+ ", "
+ " 'table-list' = '%s'"
+ ")",
"inventory.products");
Expand Down Expand Up @@ -259,7 +272,8 @@ public void testMetadataColumns() throws Exception {
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ commonOptionString()
+ initialOptionsString()
+ ","
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
Expand Down Expand Up @@ -388,7 +402,8 @@ public void testAllDataTypes() throws Exception {
+ " json_c STRING,\n"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ commonOptionString()
+ initialOptionsString()
+ ","
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'server-time-zone' = '%s'"
Expand Down Expand Up @@ -540,7 +555,8 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {
+ " timestamp_c TIMESTAMP,\n"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ commonOptionString()
+ initialOptionsString()
+ ","
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'server-time-zone' = '%s'"
Expand Down Expand Up @@ -592,4 +608,58 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception {
assertContainsInAnyOrder(expected, actual);
result.getJobClient().get().cancel().get();
}

@Test
public void testSnapshotOnly() throws Exception {
initializeTable("inventory");

String sourceDDL =
String.format(
"CREATE TABLE ob_source ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ snapshotOptionsString()
+ ", "
+ " 'table-list' = '%s'"
+ ")",
"inventory.products");

String sinkDDL =
"CREATE TABLE sink ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(20, 10),"
+ " PRIMARY KEY (`id`) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '30'"
+ ")";

tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM ob_source");

waitForSinkSize("sink", 9);

List<String> expected =
Arrays.asList(
"+I(101,scooter,Small 2-wheel scooter,3.1400000000)",
"+I(102,car battery,12V car battery,8.1000000000)",
"+I(103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000)",
"+I(104,hammer,12oz carpenter's hammer,0.7500000000)",
"+I(105,hammer,14oz carpenter's hammer,0.8750000000)",
"+I(106,hammer,16oz carpenter's hammer,1.0000000000)",
"+I(107,rocks,box of assorted rocks,5.3000000000)",
"+I(108,jacket,water resistent black wind breaker,0.1000000000)",
"+I(109,spare tire,24 inch spare tire,22.2000000000)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertContainsInAnyOrder(expected, actual);
}
}
Loading

0 comments on commit 6f5c066

Please sign in to comment.