From 8e8ccddf942d8b2e2263c8bbeb54c10570ac8f50 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Thu, 13 Aug 2020 22:57:09 -0700 Subject: [PATCH] Fix Integration test flakiness in HoodieJavaStreamingApp --- .../apache/hudi/integ/ITTestHoodieSanity.java | 44 ++++++++++++------- .../apache/hudi/HoodieDataSourceHelpers.java | 2 +- hudi-spark/src/test/java/HoodieJavaApp.java | 4 ++ .../src/test/java/HoodieJavaStreamingApp.java | 4 +- .../functional/TestStructuredStreaming.scala | 4 +- 5 files changed, 37 insertions(+), 21 deletions(-) diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java index c7787a7a6e8cc..aba1d54e3ffe2 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java @@ -19,6 +19,7 @@ package org.apache.hudi.integ; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -34,20 +35,23 @@ */ public class ITTestHoodieSanity extends ITTestBase { + private static final String HDFS_BASE_URL = "hdfs://namenode"; + private static final String HDFS_STREAMING_SOURCE = HDFS_BASE_URL + "/streaming/source/"; + private static final String HDFS_STREAMING_CKPT = HDFS_BASE_URL + "/streaming/ckpt/"; + enum PartitionType { SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED, } - @ParameterizedTest - @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP }) + @Test /** * A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key data-set * and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive * console. */ - public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) throws Exception { - String hiveTableName = "docker_hoodie_single_partition_key_cow_test"; - testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), + public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception { + String hiveTableName = "docker_hoodie_single_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime(); + testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); } @@ -59,9 +63,9 @@ public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) thr * data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query * in hive console. */ - public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) throws Exception { - String hiveTableName = "docker_hoodie_multi_partition_key_cow_test"; - testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), + public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception { + String hiveTableName = "docker_hoodie_multi_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime(); + testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.MULTI_KEYS_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); } @@ -73,21 +77,20 @@ public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) thr * console. */ public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception { - String hiveTableName = "docker_hoodie_non_partition_key_cow_test"; + String hiveTableName = "docker_hoodie_non_partition_key_cow_test_" + HoodieActiveTimeline.createNewInstantTime(); testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.NON_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); } - @ParameterizedTest - @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP }) + @Test /** * A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set * and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive * console. */ - public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) throws Exception { - String hiveTableName = "docker_hoodie_single_partition_key_mor_test"; - testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(), + public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception { + String hiveTableName = "docker_hoodie_single_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime(); + testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.SINGLE_KEY_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name()); } @@ -100,7 +103,7 @@ public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) thr * in hive console. */ public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) throws Exception { - String hiveTableName = "docker_hoodie_multi_partition_key_mor_test"; + String hiveTableName = "docker_hoodie_multi_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime(); testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.MULTI_KEYS_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name()); @@ -113,7 +116,7 @@ public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) thr * console. */ public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception { - String hiveTableName = "docker_hoodie_non_partition_key_mor_test"; + String hiveTableName = "docker_hoodie_non_partition_key_mor_test_" + HoodieActiveTimeline.createNewInstantTime(); testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.NON_PARTITIONED); dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name()); } @@ -127,7 +130,7 @@ public void testRunHoodieJavaApp(String command, String hiveTableName, String ta throws Exception { String hdfsPath = "/" + hiveTableName; - String hdfsUrl = "hdfs://namenode" + hdfsPath; + String hdfsUrl = HDFS_BASE_URL + hdfsPath; // Drop Table if it exists try { @@ -155,6 +158,13 @@ public void testRunHoodieJavaApp(String command, String hiveTableName, String ta cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL + " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned"; } + + if (command.equals(HOODIE_JAVA_STREAMING_APP)) { + String streamingSourcePath = HDFS_STREAMING_SOURCE + hiveTableName; + String streamingCkptPath = HDFS_STREAMING_CKPT + hiveTableName; + cmd = cmd + " --streaming-source-path " + streamingSourcePath + + " --streaming-checkpointing-path " + streamingCkptPath; + } executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true); String snapshotTableName = tableType.equals(HoodieTableType.MERGE_ON_READ.name()) diff --git a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java index 8d8e7ba20c56f..bed3bdab14068 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java +++ b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java @@ -69,7 +69,7 @@ public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, Strin if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) { return metaClient.getActiveTimeline().getTimelineOfActions( CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION, - HoodieActiveTimeline.DELTA_COMMIT_ACTION)); + HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants(); } else { return metaClient.getCommitTimeline().filterCompletedInstants(); } diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java index 594d813980930..9c42232d183b5 100644 --- a/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark/src/test/java/HoodieJavaApp.java @@ -16,6 +16,7 @@ * limitations under the License. */ +import org.apache.hadoop.fs.Path; import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.HoodieDataSourceHelpers; @@ -120,6 +121,9 @@ public void run() throws Exception { dataGen = new HoodieTestDataGenerator(); } + // Explicitly clear up the hoodie table path if it exists. + fs.delete(new Path(tablePath), true); + /** * Commit with only inserts */ diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index e93784e75dfae..3b35ce9193cb9 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -273,7 +273,9 @@ private void waitTillNCommits(FileSystem fs, int numCommits, int timeoutSecs, in public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, String srcPath, int initialCommits, int expRecords, Dataset inputDF1, Dataset inputDF2, boolean instantTimeValidation) throws Exception { - inputDF1.write().mode(SaveMode.Append).json(srcPath); + // Ensure, we always write only one file. This is very important to ensure a single batch is reliably read + // atomically by one iteration of spark streaming. + inputDF1.coalesce(1).write().mode(SaveMode.Append).json(srcPath); int numExpCommits = initialCommits + 1; // wait for spark streaming to process one microbatch diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 011573b8b2d10..226cf5313f5d0 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -102,7 +102,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { } val f2 = Future { - inputDF1.write.mode(SaveMode.Append).json(sourcePath) + inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) // wait for spark streaming to process one microbatch val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5) assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000")) @@ -112,7 +112,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { .load(destPath + "/*/*/*/*") assert(hoodieROViewDF1.count() == 100) - inputDF2.write.mode(SaveMode.Append).json(sourcePath) + inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath) // wait for spark streaming to process one microbatch waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5) val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)