Skip to content

Commit

Permalink
Fix CI E2e testcase.
Browse files Browse the repository at this point in the history
Needs to ensure we're in incremental stage before triggering any DDL events.
  • Loading branch information
yuxiqian committed May 24, 2024
1 parent 76bced0 commit daa32df
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,37 @@ public void testSyncWholeDatabase() throws Exception {
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {

stat.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110

validateSinkResult(
mysqlInventoryDatabase.getDatabaseName(),
"products",
7,
Arrays.asList(
"101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
"102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
"103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
"104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
"105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
"106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null",
"107 | rocks | box of assorted rocks | 5.3 | null | null | null",
"108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
"109 | spare tire | 24 inch spare tire | 22.2 | null | null | null",
"110 | jacket | water resistent white wind breaker | 0.2 | null | null | null"));

stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");

// modify table schema
stat.execute("ALTER TABLE products DROP COLUMN point_c;");
stat.execute("DELETE FROM products WHERE id=101;");
stat.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null);"); // 110

stat.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111
stat.execute(
"INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
Expand All @@ -261,7 +283,8 @@ public void testSyncWholeDatabase() throws Exception {
"108 | jacket | water resistent black wind breaker | 0.1 | null | null | null",
"109 | spare tire | 24 inch spare tire | 22.2 | null | null | null",
"110 | jacket | water resistent white wind breaker | 0.2 | null | null | null",
"111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null"));
"111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null",
"112 | finally | null | 2.14 | null | null | null"));
}

public static void createDorisDatabase(String databaseName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ public void testSyncWholeDatabase() throws Exception {
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
60000L);
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
Expand Down Expand Up @@ -186,6 +186,14 @@ public void testSyncWholeDatabase() throws Exception {
Statement stat = conn.createStatement()) {
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");

// Perform DDL changes after the binlog is generated
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
20000L);

// modify table schema
stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
stat.execute(
Expand All @@ -201,7 +209,7 @@ public void testSyncWholeDatabase() throws Exception {
throw e;
}

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
mysqlInventoryDatabase.getDatabaseName()),
Expand Down Expand Up @@ -238,11 +246,11 @@ public void testSyncWholeDatabase() throws Exception {

private void validateResult(List<String> expectedEvents) throws Exception {
for (String event : expectedEvents) {
waitUtilSpecificEvent(event, 6000L);
waitUntilSpecificEvent(event, 6000L);
}
}

private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ public void testHeteroSchemaTransform() throws Exception {
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
60000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
Expand Down Expand Up @@ -184,19 +184,19 @@ public void testHeteroSchemaTransform() throws Exception {
throw e;
}

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
transformRenameDatabase.getDatabaseName()),
20000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
transformRenameDatabase.getDatabaseName()),
20000L);

waitUtilSpecificEvent(
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
transformRenameDatabase.getDatabaseName()),
Expand All @@ -208,11 +208,11 @@ public void testHeteroSchemaTransform() throws Exception {

private void validateResult(List<String> expectedEvents) throws Exception {
for (String event : expectedEvents) {
waitUtilSpecificEvent(event, 6000L);
waitUntilSpecificEvent(event, 6000L);
}
}

private void waitUtilSpecificEvent(String event, long timeout) throws Exception {
private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
boolean result = false;
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
Expand Down

0 comments on commit daa32df

Please sign in to comment.