From f869a1b11fa1a58bf7eae054ad3af33a157edfea Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 16 Jan 2025 09:57:49 +0800 Subject: [PATCH] Update TestUtils.java --- .../apache/iotdb/db/it/utils/TestUtils.java | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java index fdbb40d510439..723dabc95f458 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java @@ -1021,6 +1021,8 @@ public static void assertDataEventuallyOnEnv( String expectedHeader, Set expectedResSet, long timeoutSeconds) { + final long startTime = System.currentTimeMillis(); + final boolean[] flushed = {false}; try (Connection connection = env.getConnection(); Statement statement = connection.createStatement()) { // Keep retrying if there are execution failures @@ -1032,6 +1034,13 @@ public static void assertDataEventuallyOnEnv( .untilAsserted( () -> { try { + // For IoTV2 batch mode, the pipe receiver may need to flush because the replica + // sync requires tsFile to process. We flush in the middle of assertion because we + // don't know when the data reaches the receiver in general cases + if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) { + flushed[0] = true; + statement.execute("flush"); + } TestUtils.assertResultSetEqual( executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet); } catch (Exception e) { @@ -1055,6 +1064,8 @@ public static void assertDataSizeEventuallyOnEnv( final int size, final long timeoutSeconds, final String dataBaseName) { + final long startTime = System.currentTimeMillis(); + final boolean[] flushed = {false}; try (final Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT); final Statement statement = connection.createStatement()) { // Keep retrying if there are execution failures @@ -1069,6 +1080,13 @@ public static void assertDataSizeEventuallyOnEnv( if (dataBaseName != null) { statement.execute("use " + dataBaseName); } + // For IoTV2 batch mode, the pipe receiver may need to flush because the replica + // sync requires tsFile to process. We flush in the middle of assertion because we + // don't know when the data reaches the receiver in general cases + if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) { + flushed[0] = true; + statement.execute("flush"); + } if (sql != null && !sql.isEmpty()) { TestUtils.assertResultSetSize(executeQueryWithRetry(statement, sql), size); } @@ -1088,6 +1106,8 @@ public static void assertDataEventuallyOnEnv( final Set expectedResSet, final long timeoutSeconds, final Consumer handleFailure) { + final long startTime = System.currentTimeMillis(); + final boolean[] flushed = {false}; try (Connection connection = env.getConnection(); Statement statement = connection.createStatement()) { // Keep retrying if there are execution failures @@ -1099,6 +1119,13 @@ public static void assertDataEventuallyOnEnv( .untilAsserted( () -> { try { + // For IoTV2 batch mode, the pipe receiver may need to flush because the replica + // sync requires tsFile to process. We flush in the middle of assertion because we + // don't know when the data reaches the receiver in general cases + if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) { + flushed[0] = true; + statement.execute("flush"); + } TestUtils.assertResultSetEqual( executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet); } catch (Exception e) { @@ -1147,6 +1174,8 @@ public static void assertDataEventuallyOnEnv( final long timeoutSeconds, final String databaseName, final Consumer handleFailure) { + final long startTime = System.currentTimeMillis(); + final boolean[] flushed = {false}; try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT); Statement statement = connection.createStatement()) { // Keep retrying if there are execution failures @@ -1161,6 +1190,13 @@ public static void assertDataEventuallyOnEnv( if (databaseName != null) { statement.execute("use " + databaseName); } + // For IoTV2 batch mode, the pipe receiver may need to flush because the replica + // sync requires tsFile to process. We flush in the middle of assertion because we + // don't know when the data reaches the receiver in general cases + if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) { + flushed[0] = true; + statement.execute("flush"); + } if (sql != null && !sql.isEmpty()) { TestUtils.assertResultSetEqual( executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet); @@ -1189,6 +1225,8 @@ public static void assertDataEventuallyOnEnv( public static void assertDataEventuallyOnEnv( BaseEnv env, String sql, Map expectedHeaderWithResult, long timeoutSeconds) { + final long startTime = System.currentTimeMillis(); + final boolean[] flushed = {false}; try (Connection connection = env.getConnection(); Statement statement = connection.createStatement()) { // Keep retrying if there are execution failures @@ -1200,6 +1238,13 @@ public static void assertDataEventuallyOnEnv( .untilAsserted( () -> { try { + // For IoTV2 batch mode, the pipe receiver may need to flush because the replica + // sync requires tsFile to process. We flush in the middle of assertion because we + // don't know when the data reaches the receiver in general cases + if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) { + flushed[0] = true; + statement.execute("flush"); + } TestUtils.assertSingleResultSetEqual( executeQueryWithRetry(statement, sql), expectedHeaderWithResult); } catch (Exception e) { @@ -1245,6 +1290,8 @@ public static void assertDataAlwaysOnEnv( Set expectedResSet, long consistentSeconds, final String database) { + final long startTime = System.currentTimeMillis(); + final boolean[] flushed = {false}; try (Connection connection = env.getConnection( Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT : BaseEnv.TABLE_SQL_DIALECT); @@ -1261,6 +1308,14 @@ public static void assertDataAlwaysOnEnv( if (Objects.nonNull(database)) { statement.execute("use " + database); } + // For IoTV2 batch mode, the pipe receiver may need to flush because the replica + // sync requires tsFile to process. We flush in the middle of assertion because we + // don't know when the data reaches the receiver in general cases + if (!flushed[0] + && System.currentTimeMillis() - startTime > consistentSeconds >> 1) { + flushed[0] = true; + statement.execute("flush"); + } TestUtils.assertResultSetEqual( executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet); } catch (Exception e) { @@ -1280,9 +1335,10 @@ public static void assertDataAlwaysOnEnv( Set expectedResSet, long consistentSeconds, Consumer handleFailure) { + final long startTime = System.currentTimeMillis(); + final boolean[] flushed = {false}; try (Connection connection = env.getConnection(); Statement statement = connection.createStatement()) { - // Keep retrying if there are execution failures await() .pollInSameThread() .pollDelay(1L, TimeUnit.SECONDS) @@ -1291,6 +1347,14 @@ public static void assertDataAlwaysOnEnv( .failFast( () -> { try { + // For IoTV2 batch mode, the pipe receiver may need to flush because the replica + // sync requires tsFile to process. We flush in the middle of assertion because we + // don't know when the data reaches the receiver in general cases + if (!flushed[0] + && System.currentTimeMillis() - startTime > consistentSeconds >> 1) { + flushed[0] = true; + statement.execute("flush"); + } TestUtils.assertResultSetEqual( executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet); } catch (Exception e) { @@ -1319,6 +1383,8 @@ public static void assertDataAlwaysOnEnv( long consistentSeconds, String database, Consumer handleFailure) { + final long startTime = System.currentTimeMillis(); + final boolean[] flushed = {false}; try (Connection connection = env.getConnection(); Statement statement = connection.createStatement()) { // Keep retrying if there are execution failures @@ -1333,6 +1399,14 @@ public static void assertDataAlwaysOnEnv( if (database != null) { statement.execute("use " + database); } + // For IoTV2 batch mode, the pipe receiver may need to flush because the replica + // sync requires tsFile to process. We flush in the middle of assertion because we + // don't know when the data reaches the receiver in general cases + if (!flushed[0] + && System.currentTimeMillis() - startTime > consistentSeconds >> 1) { + flushed[0] = true; + statement.execute("flush"); + } TestUtils.assertResultSetEqual( executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet); } catch (Exception e) {