Skip to content

Commit

Permalink
[HUDI-4924] Auto-tune dedup parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored and nsivabalan committed Sep 27, 2022
1 parent be9b419 commit 4828d6c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 3 deletions.
Expand Up @@ -56,6 +56,9 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr) {
boolean isIndexingGlobal = index.isGlobal();
final SerializableSchema schema = new SerializableSchema(schemaStr);
// Auto-tunes the parallelism for reduce transformation based on the number of data partitions
// in engine-specific representation
int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism));
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Expand All @@ -67,7 +70,7 @@ public HoodieData<HoodieRecord<T>> deduplicateRecords(
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

return new HoodieAvroRecord<>(reducedKey, reducedData);
}, parallelism).map(Pair::getRight);
}, reduceParallelism).map(Pair::getRight);
}

}
Expand Up @@ -102,6 +102,11 @@ public long count() {
return rddData.count();
}

@Override
public int getNumPartitions() {
return rddData.getNumPartitions();
}

@Override
public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
return HoodieJavaRDD.of(rddData.map(func::apply));
Expand Down
Expand Up @@ -67,14 +67,19 @@ public interface HoodieData<T> extends Serializable {

/**
* Returns number of objects held in the collection
*
* <p>
* NOTE: This is a terminal operation
*/
long count();

/**
* @return the number of data partitions in the engine-specific representation.
*/
int getNumPartitions();

/**
* Maps every element in the collection using provided mapping {@code func}.
*
* <p>
* This is an intermediate operation
*
* @param func serializable map function
Expand Down
Expand Up @@ -175,6 +175,11 @@ public long count() {
return super.count();
}

@Override
public int getNumPartitions() {
return 1;
}

@Override
public List<T> collectAsList() {
return super.collectAsList();
Expand Down

0 comments on commit 4828d6c

Please sign in to comment.