From ea8f9258ec1b2c281d88e58855750a3aa229f6f3 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 15 Sep 2023 18:18:20 -0700 Subject: [PATCH] [HUDI-6863] Revert auto-tuning of dedup parallelism (#9722) Before this PR, the auto-tuning logic for dedup parallelism dictates the write parallelism so that the user-configured `hoodie.upsert.shuffle.parallelism` is ignored. This commit reverts #6802 to fix the issue. --- .../apache/hudi/table/action/commit/HoodieWriteHelper.java | 7 ++----- .../functional/TestHoodieClientOnCopyOnWriteStorage.java | 6 +++--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index d7640c28e50d..b56ac08e16fe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -60,9 +60,6 @@ public HoodieData> deduplicateRecords( HoodieData> records, HoodieIndex index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) { boolean isIndexingGlobal = index.isGlobal(); final SerializableSchema schema = new SerializableSchema(schemaStr); - // Auto-tunes the parallelism for reduce transformation based on the number of data partitions - // in engine-specific representation - int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism)); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -74,7 +71,7 @@ public HoodieData> deduplicateRecords( }).reduceByKey((rec1, rec2) -> { HoodieRecord reducedRecord; try { - reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft(); + reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft(); } catch (IOException e) { throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e); } @@ -82,6 +79,6 @@ public HoodieData> deduplicateRecords( HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey(); HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation(); return reducedRecord.newInstance(reducedKey, operation); - }, reduceParallelism).map(Pair::getRight); + }, parallelism).map(Pair::getRight); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 5fcc4c0adf3e..764be044bc2f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -479,12 +479,12 @@ private void testDeduplication( // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - int dedupParallelism = records.getNumPartitions() + 100; + int dedupParallelism = records.getNumPartitions() + 2; HoodieData> dedupedRecsRdd = (HoodieData>) HoodieWriteHelper.newInstance() .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE); List> dedupedRecs = dedupedRecsRdd.collectAsList(); - assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); + assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions()); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -496,7 +496,7 @@ private void testDeduplication( (HoodieData>) HoodieWriteHelper.newInstance() .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE); dedupedRecs = dedupedRecsRdd.collectAsList(); - assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); + assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions()); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs);