From 3836f8d84433261b7a4c01241324653eb879652b Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Mon, 28 Mar 2022 20:02:04 +0800 Subject: [PATCH] [HUDI-3728] Set the sort operator parallelism for flink bucket bulk insert --- .../bucket/BucketStreamWriteFunction.java | 67 +++++++------------ .../org/apache/hudi/sink/utils/Pipelines.java | 6 +- 2 files changed, 30 insertions(+), 43 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 7e4cf686ce0e..1456e8882f02 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -18,16 +18,12 @@ package org.apache.hudi.sink.bucket; -import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.hudi.sink.StreamWriteFunction; -import org.apache.hudi.table.HoodieFlinkTable; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -39,12 +35,9 @@ import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import static java.util.stream.Collectors.toList; - /** * A stream write function with bucket hash index. * @@ -58,18 +51,14 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class); - private int maxParallelism; - private int parallelism; private int bucketNum; - private transient HoodieFlinkTable table; - private String indexKeyFields; /** - * BucketID should be load in this task. + * BucketID should be loaded in this task. */ private Set bucketToLoad; @@ -86,6 +75,11 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { */ private Set incBucketIndex; + /** + * Returns whether this is an empty table. + */ + private boolean isEmptyTable; + /** * Constructs a BucketStreamWriteFunction. * @@ -102,17 +96,15 @@ public void open(Configuration parameters) throws IOException { this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); - this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); - this.bucketToLoad = new HashSet<>(); + this.bucketToLoad = getBucketToLoad(); this.bucketIndex = new HashMap<>(); this.incBucketIndex = new HashSet<>(); - getBucketToLoad(); + this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent(); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { super.initializeState(context); - this.table = this.writeClient.getHoodieTable(); } @Override @@ -129,19 +121,19 @@ public void processElement(I i, ProcessFunction.Context context, Coll final HoodieRecordLocation location; bootstrapIndexIfNeed(partition); - Map bucketToFileIdMap = bucketIndex.get(partition); + Map bucketToFileId = bucketIndex.computeIfAbsent(partition, p -> new HashMap<>()); final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum); - final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum); + final String bucketId = partition + bucketNum; - if (incBucketIndex.contains(partitionBucketId)) { - location = new HoodieRecordLocation("I", bucketToFileIdMap.get(bucketNum)); - } else if (bucketToFileIdMap.containsKey(bucketNum)) { - location = new HoodieRecordLocation("U", bucketToFileIdMap.get(bucketNum)); + if (incBucketIndex.contains(bucketId)) { + location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum)); + } else if (bucketToFileId.containsKey(bucketNum)) { + location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum)); } else { String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); location = new HoodieRecordLocation("I", newFileId); - bucketToFileIdMap.put(bucketNum,newFileId); - incBucketIndex.add(partitionBucketId); + bucketToFileId.put(bucketNum, newFileId); + incBucketIndex.add(bucketId); } record.unseal(); record.setCurrentLocation(location); @@ -153,39 +145,32 @@ public void processElement(I i, ProcessFunction.Context context, Coll * Bootstrap bucket info from existing file system, * bucketNum % totalParallelism == this taskID belongs to this task. */ - private void getBucketToLoad() { + private Set getBucketToLoad() { + Set bucketToLoad = new HashSet<>(); for (int i = 0; i < bucketNum; i++) { int partitionOfBucket = BucketIdentifier.mod(i, parallelism); if (partitionOfBucket == taskID) { - LOG.info(String.format("Bootstrapping index. Adding bucket %s , " - + "Current parallelism: %s , Max parallelism: %s , Current task id: %s", - i, parallelism, maxParallelism, taskID)); bucketToLoad.add(i); } } - bucketToLoad.forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket))); + LOG.info("Bucket number that belongs to task [{}/{}]: {}", taskID, parallelism, bucketToLoad); + return bucketToLoad; } /** * Get partition_bucket -> fileID mapping from the existing hudi table. * This is a required operation for each restart to avoid having duplicate file ids for one bucket. */ - private void bootstrapIndexIfNeed(String partition) throws IOException { - if (bucketIndex.containsKey(partition)) { - return; - } - Option latestCommitTime = table.getHoodieView().getTimeline().filterCompletedInstants().lastInstant(); - if (!latestCommitTime.isPresent()) { - bucketIndex.put(partition, new HashMap<>()); + private void bootstrapIndexIfNeed(String partition) { + if (isEmptyTable || bucketIndex.containsKey(partition)) { return; } - LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(), - table.getMetaClient().getBasePath() + "/" + partition)); + LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(), + this.metaClient.getBasePath() + "/" + partition)); // Load existing fileID belongs to this task Map bucketToFileIDMap = new HashMap<>(); - List fileSlices = table.getHoodieView().getLatestFileSlices(partition).collect(toList()); - for (FileSlice fileSlice : fileSlices) { + this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> { String fileID = fileSlice.getFileId(); int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID); if (bucketToLoad.contains(bucketNumber)) { @@ -198,7 +183,7 @@ private void bootstrapIndexIfNeed(String partition) throws IOException { bucketToFileIDMap.put(bucketNumber, fileID); } } - } + }); bucketIndex.put(partition, bucketToFileIDMap); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 1992eddd6338..9f0a817536a8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -98,10 +98,12 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT InternalTypeInfo typeInfo = InternalTypeInfo.of(rowTypeWithFileId); dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey) .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum), - typeInfo); + typeInfo) + .setParallelism(dataStream.getParallelism()); // same parallelism as source if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId); - dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator()); + dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator()) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); }