Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,8 @@ public static void assertDataEventuallyOnEnv(
String expectedHeader,
Set<String> expectedResSet,
long timeoutSeconds) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1032,6 +1034,13 @@ public static void assertDataEventuallyOnEnv(
.untilAsserted(
() -> {
try {
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand All @@ -1055,6 +1064,8 @@ public static void assertDataSizeEventuallyOnEnv(
final int size,
final long timeoutSeconds,
final String dataBaseName) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (final Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1069,6 +1080,13 @@ public static void assertDataSizeEventuallyOnEnv(
if (dataBaseName != null) {
statement.execute("use " + dataBaseName);
}
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
if (sql != null && !sql.isEmpty()) {
TestUtils.assertResultSetSize(executeQueryWithRetry(statement, sql), size);
}
Expand All @@ -1088,6 +1106,8 @@ public static void assertDataEventuallyOnEnv(
final Set<String> expectedResSet,
final long timeoutSeconds,
final Consumer<String> handleFailure) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1099,6 +1119,13 @@ public static void assertDataEventuallyOnEnv(
.untilAsserted(
() -> {
try {
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand Down Expand Up @@ -1147,6 +1174,8 @@ public static void assertDataEventuallyOnEnv(
final long timeoutSeconds,
final String databaseName,
final Consumer<String> handleFailure) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1161,6 +1190,13 @@ public static void assertDataEventuallyOnEnv(
if (databaseName != null) {
statement.execute("use " + databaseName);
}
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
if (sql != null && !sql.isEmpty()) {
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
Expand Down Expand Up @@ -1189,6 +1225,8 @@ public static void assertDataEventuallyOnEnv(

public static void assertDataEventuallyOnEnv(
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult, long timeoutSeconds) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1200,6 +1238,13 @@ public static void assertDataEventuallyOnEnv(
.untilAsserted(
() -> {
try {
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0] && System.currentTimeMillis() - startTime > timeoutSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertSingleResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeaderWithResult);
} catch (Exception e) {
Expand Down Expand Up @@ -1245,6 +1290,8 @@ public static void assertDataAlwaysOnEnv(
Set<String> expectedResSet,
long consistentSeconds,
final String database) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection =
env.getConnection(
Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT : BaseEnv.TABLE_SQL_DIALECT);
Expand All @@ -1261,6 +1308,14 @@ public static void assertDataAlwaysOnEnv(
if (Objects.nonNull(database)) {
statement.execute("use " + database);
}
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0]
&& System.currentTimeMillis() - startTime > consistentSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand All @@ -1280,9 +1335,10 @@ public static void assertDataAlwaysOnEnv(
Set<String> expectedResSet,
long consistentSeconds,
Consumer<String> handleFailure) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
Expand All @@ -1291,6 +1347,14 @@ public static void assertDataAlwaysOnEnv(
.failFast(
() -> {
try {
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0]
&& System.currentTimeMillis() - startTime > consistentSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand Down Expand Up @@ -1319,6 +1383,8 @@ public static void assertDataAlwaysOnEnv(
long consistentSeconds,
String database,
Consumer<String> handleFailure) {
final long startTime = System.currentTimeMillis();
final boolean[] flushed = {false};
try (Connection connection = env.getConnection();
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Expand All @@ -1333,6 +1399,14 @@ public static void assertDataAlwaysOnEnv(
if (database != null) {
statement.execute("use " + database);
}
// For IoTV2 batch mode, the pipe receiver may need to flush because the replica
// sync requires tsFile to process. We flush in the middle of assertion because we
// don't know when the data reaches the receiver in general cases
if (!flushed[0]
&& System.currentTimeMillis() - startTime > consistentSeconds >> 1) {
flushed[0] = true;
statement.execute("flush");
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand Down