From ea619c6516678384667d4d7a5fe99974409e41e5 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 14 Sep 2023 22:44:25 -0700 Subject: [PATCH 1/2] [HUDI-6863] Revert auto-tuning of dedup parallelism --- .../action/commit/HoodieWriteHelper.java | 7 +-- .../TestHoodieClientOnCopyOnWriteStorage.java | 6 +-- .../hudi/TestHoodieSparkSqlWriter.scala | 46 ++++++++++++++----- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index d7640c28e50d..b56ac08e16fe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -60,9 +60,6 @@ public HoodieData> deduplicateRecords( HoodieData> records, HoodieIndex index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) { boolean isIndexingGlobal = index.isGlobal(); final SerializableSchema schema = new SerializableSchema(schemaStr); - // Auto-tunes the parallelism for reduce transformation based on the number of data partitions - // in engine-specific representation - int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism)); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -74,7 +71,7 @@ public HoodieData> deduplicateRecords( }).reduceByKey((rec1, rec2) -> { HoodieRecord reducedRecord; try { - reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft(); + reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft(); } catch (IOException e) { throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e); } @@ -82,6 +79,6 @@ public HoodieData> deduplicateRecords( HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey(); HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation(); return reducedRecord.newInstance(reducedKey, operation); - }, reduceParallelism).map(Pair::getRight); + }, parallelism).map(Pair::getRight); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 5fcc4c0adf3e..764be044bc2f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -479,12 +479,12 @@ private void testDeduplication( // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - int dedupParallelism = records.getNumPartitions() + 100; + int dedupParallelism = records.getNumPartitions() + 2; HoodieData> dedupedRecsRdd = (HoodieData>) HoodieWriteHelper.newInstance() .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE); List> dedupedRecs = dedupedRecsRdd.collectAsList(); - assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); + assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions()); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -496,7 +496,7 @@ private void testDeduplication( (HoodieData>) HoodieWriteHelper.newInstance() .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE); dedupedRecs = dedupedRecsRdd.collectAsList(); - assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); + assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions()); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 7f89817a7f8c..c31bd30b060a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -49,7 +49,6 @@ import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept} -import java.io.IOException import java.time.format.DateTimeFormatterBuilder import java.time.temporal.ChronoField import java.time.{Instant, ZoneId} @@ -298,13 +297,17 @@ class TestHoodieSparkSqlWriter { def testValidateTableConfigWithOverwriteSaveMode(): Unit = { //create a new table val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid") + "hoodie.datasource.write.recordkey.field" -> "uuid", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) //on same path try write with different RECORDKEY_FIELD_NAME and Append SaveMode should throw an exception val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "ts") + "hoodie.datasource.write.recordkey.field" -> "ts", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) assert(hoodieException.getMessage.contains("Config conflict")) @@ -321,13 +324,17 @@ class TestHoodieSparkSqlWriter { def testChangePartitionPath(): Unit = { //create a new table val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "ts") + "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "ts", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) //on same path try write with different partitionpath field and Append SaveMode should throw an exception val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "uuid") + "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "uuid", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) assert(hoodieException.getMessage.contains("Config conflict")) @@ -791,7 +798,10 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test") + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + ) val df = spark.range(0, 1000).toDF("keyid") .withColumn("col3", expr("keyid")) .withColumn("age", lit(1)) @@ -1051,7 +1061,9 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: test table which created by sql @@ -1129,7 +1141,9 @@ class TestHoodieSparkSqlWriter { val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts" + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: When commit C1 specifies a key generator and commit C2 does not specify key generator @@ -1158,7 +1172,9 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: When commit C1 does not specify key generator and commit C2 specifies a key generator @@ -1191,7 +1207,9 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: When commit C1 specifies a key generator and commkt C2 does not specify key generator @@ -1223,7 +1241,9 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) // case 1: When commit C1 specifies a key generator and commkt C2 does not specify key generator @@ -1279,7 +1299,9 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key -> "CONSISTENT_HASHING", - HoodieIndexConfig.INDEX_TYPE.key -> "BUCKET" + HoodieIndexConfig.INDEX_TYPE.key -> "BUCKET", + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" ) val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1") From 09a14fbd65818bab270d260b40fe92fa9d5b28b9 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 15 Sep 2023 10:30:25 -0700 Subject: [PATCH 2/2] Remove unnecessary parallelism configs in tests --- .../hudi/TestHoodieSparkSqlWriter.scala | 46 +++++-------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index c31bd30b060a..7f89817a7f8c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -49,6 +49,7 @@ import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept} +import java.io.IOException import java.time.format.DateTimeFormatterBuilder import java.time.temporal.ChronoField import java.time.{Instant, ZoneId} @@ -297,17 +298,13 @@ class TestHoodieSparkSqlWriter { def testValidateTableConfigWithOverwriteSaveMode(): Unit = { //create a new table val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") + "hoodie.datasource.write.recordkey.field" -> "uuid") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) //on same path try write with different RECORDKEY_FIELD_NAME and Append SaveMode should throw an exception val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "ts", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") + "hoodie.datasource.write.recordkey.field" -> "ts") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) assert(hoodieException.getMessage.contains("Config conflict")) @@ -324,17 +321,13 @@ class TestHoodieSparkSqlWriter { def testChangePartitionPath(): Unit = { //create a new table val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "ts", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") + "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "ts") val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) //on same path try write with different partitionpath field and Append SaveMode should throw an exception val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, - "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "uuid", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2") + "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.partitionpath.field" -> "uuid") val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) assert(hoodieException.getMessage.contains("Config conflict")) @@ -798,10 +791,7 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "keyid", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" - ) + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test") val df = spark.range(0, 1000).toDF("keyid") .withColumn("col3", expr("keyid")) .withColumn("age", lit(1)) @@ -1061,9 +1051,7 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" ) // case 1: test table which created by sql @@ -1141,9 +1129,7 @@ class TestHoodieSparkSqlWriter { val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts" ) // case 1: When commit C1 specifies a key generator and commit C2 does not specify key generator @@ -1172,9 +1158,7 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" ) // case 1: When commit C1 does not specify key generator and commit C2 specifies a key generator @@ -1207,9 +1191,7 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" ) // case 1: When commit C1 specifies a key generator and commkt C2 does not specify key generator @@ -1241,9 +1223,7 @@ class TestHoodieSparkSqlWriter { val options = Map( DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" ) // case 1: When commit C1 specifies a key generator and commkt C2 does not specify key generator @@ -1299,9 +1279,7 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key -> "CONSISTENT_HASHING", - HoodieIndexConfig.INDEX_TYPE.key -> "BUCKET", - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "2", - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "2" + HoodieIndexConfig.INDEX_TYPE.key -> "BUCKET" ) val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")