diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index 376e22ec71..030333bb4c 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -35,11 +35,10 @@ import org.apache.iceberg.util.BinPacking; import org.apache.iceberg.util.Pair; -import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -227,20 +226,15 @@ public Weight getWeight() { } /** - * When splitTask has only one undersized segment file, it needs to be triggered again to - * determine whether to rewrite pos. If needed, add it to rewritePosDataFiles and bin-packing - * together, else reserved delete files. + * If the undersized segment file should not be rewritten, we should dispose of it by either + * rewriting pos delete or skipping optimizing(reserving its delete files). */ - protected void disposeUndersizedSegmentFile(SplitTask splitTask) { - Optional dataFile = splitTask.getRewriteDataFiles().stream().findFirst(); - if (dataFile.isPresent()) { - DataFile rewriteDataFile = dataFile.get(); - List> deletes = new ArrayList<>(splitTask.getDeleteFiles()); - if (evaluator().segmentShouldRewritePos(rewriteDataFile, deletes)) { - rewritePosDataFiles.put(rewriteDataFile, deletes); - } else { - reservedDeleteFiles(deletes); - } + protected void disposeUndersizedSegmentFile(DataFile undersizedSegmentFile) { + List> deletes = undersizedSegmentFiles.get(undersizedSegmentFile); + if (evaluator().segmentShouldRewritePos(undersizedSegmentFile, deletes)) { + rewritePosDataFiles.put(undersizedSegmentFile, deletes); + } else { + reservedDeleteFiles(deletes); } } @@ -329,19 +323,53 @@ public boolean isRewritePosDataFile() { protected class BinPackingTaskSplitter implements TaskSplitter { + private final Map>> rewriteDataFiles; + private final Map>> rewritePosDataFiles; + private final Map>> undersizedSegmentFiles; + private boolean limitByTaskCount = false; + + public BinPackingTaskSplitter( + Map>> rewriteDataFiles, + Map>> rewritePosDataFiles, + Map>> undersizedSegmentFiles) { + this.rewriteDataFiles = rewriteDataFiles; + this.rewritePosDataFiles = rewritePosDataFiles; + this.undersizedSegmentFiles = undersizedSegmentFiles; + } + + /** + * In this mode, the number of tasks will be limited to avoid the high cost of excessive + * duplicate delete files reads. + */ + public BinPackingTaskSplitter limitByTaskCount() { + limitByTaskCount = true; + return this; + } + @Override public List splitTasks(int targetTaskCount) { + if (allEmpty()) { + return Collections.emptyList(); + } List results = Lists.newArrayList(); List fileTasks = Lists.newArrayList(); + long taskSize = Math.max(config.getTargetSize(), config.getMaxTaskSize()); + if (limitByTaskCount) { + taskSize = Math.max(taskSize, totalSize() / targetTaskCount + 1); + } // bin-packing for undersized segment files undersizedSegmentFiles.forEach( (dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, true))); - for (SplitTask splitTask : genSplitTasks(fileTasks)) { + for (SplitTask splitTask : genSplitTasks(fileTasks, taskSize)) { if (splitTask.getRewriteDataFiles().size() > 1) { results.add(splitTask); - continue; + } else { + // If there are only one undersized segment file in the split task, it's meaningless to + // rewrite it. + splitTask + .getRewriteDataFiles() + .forEach(AbstractPartitionPlan.this::disposeUndersizedSegmentFile); } - disposeUndersizedSegmentFile(splitTask); } // bin-packing for fragment file and rewrite pos data file @@ -350,16 +378,33 @@ public List splitTasks(int targetTaskCount) { (dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, true))); rewritePosDataFiles.forEach( (dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, false))); - results.addAll(genSplitTasks(fileTasks)); + results.addAll(genSplitTasks(fileTasks, taskSize)); return results; } - private Collection genSplitTasks(List allDataFiles) { + private boolean allEmpty() { + return rewriteDataFiles.isEmpty() + && rewritePosDataFiles.isEmpty() + && undersizedSegmentFiles.isEmpty(); + } + + private long totalSize() { + long totalSize = 0; + for (DataFile dataFile : rewriteDataFiles.keySet()) { + totalSize += dataFile.fileSizeInBytes(); + } + for (DataFile dataFile : rewritePosDataFiles.keySet()) { + totalSize += dataFile.fileSizeInBytes(); + } + for (DataFile dataFile : undersizedSegmentFiles.keySet()) { + totalSize += dataFile.fileSizeInBytes(); + } + return totalSize; + } + + private Collection genSplitTasks(List allDataFiles, long taskSize) { List> packed = - new BinPacking.ListPacker( - Math.max(config.getTargetSize(), config.getMaxTaskSize()), - Integer.MAX_VALUE, - false) + new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, false) .pack(allDataFiles, f -> f.getFile().fileSizeInBytes()); List results = Lists.newArrayListWithCapacity(packed.size()); diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java index 4d9db0f284..c311a8d39b 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java @@ -40,7 +40,8 @@ protected IcebergPartitionPlan( @Override protected TaskSplitter buildTaskSplitter() { - return new BinPackingTaskSplitter(); + return new BinPackingTaskSplitter( + rewriteDataFiles, rewritePosDataFiles, undersizedSegmentFiles); } @Override diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index 24edbfc686..1eeeb3db82 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -70,6 +70,8 @@ protected void beforeSplit() { // files not in hive location to hive location, so the files in the hive location should not // be optimizing. Preconditions.checkArgument(reservedDeleteFiles.isEmpty(), "delete files should be empty"); + Preconditions.checkArgument( + undersizedSegmentFiles.isEmpty(), "undersized segment files should be empty"); rewriteDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); rewritePosDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index 5c58c6bdbf..7f56463b3f 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -33,15 +33,14 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Pair; import javax.annotation.Nonnull; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Predicate; public class MixedIcebergPartitionPlan extends AbstractPartitionPlan { @@ -94,7 +93,8 @@ protected TaskSplitter buildTaskSplitter() { if (isKeyedTable()) { return new TreeNodeTaskSplitter(); } else { - return new BinPackingTaskSplitter(); + return new BinPackingTaskSplitter( + rewriteDataFiles, rewritePosDataFiles, undersizedSegmentFiles); } } @@ -253,40 +253,44 @@ public List splitTasks(int targetTaskCount) { List result = Lists.newArrayList(); FileTree rootTree = FileTree.newTreeRoot(); undersizedSegmentFiles.forEach(rootTree::addRewriteDataFile); - for (SplitTask splitTask : genSplitTasks(rootTree)) { + for (SplitTask splitTask : genSplitTasks(rootTree, 0)) { if (splitTask.getRewriteDataFiles().size() > 1) { result.add(splitTask); - continue; + } else { + // If there are only one undersized segment file in the split task, it's meaningless to + // rewrite it. + splitTask + .getRewriteDataFiles() + .forEach(MixedIcebergPartitionPlan.this::disposeUndersizedSegmentFile); } - disposeUndersizedSegmentFile(splitTask); } rootTree = FileTree.newTreeRoot(); rewritePosDataFiles.forEach(rootTree::addRewritePosDataFile); rewriteDataFiles.forEach(rootTree::addRewriteDataFile); - result.addAll(genSplitTasks(rootTree)); + result.addAll(genSplitTasks(rootTree, targetTaskCount - result.size())); return result; } - private Collection genSplitTasks(FileTree rootTree) { + /** + * Generate split tasks with target task count, if target task count <= the count of nodes, it + * will be ignored. + */ + private Collection genSplitTasks(FileTree rootTree, int targetTaskCount) { List result = Lists.newArrayList(); rootTree.completeTree(); List subTrees = Lists.newArrayList(); rootTree.splitFileTree(subTrees, new SplitIfNoFileExists()); + int taskCountForNode = Math.max(1, targetTaskCount / subTrees.size()); for (FileTree subTree : subTrees) { Map>> rewriteDataFiles = Maps.newHashMap(); Map>> rewritePosDataFiles = Maps.newHashMap(); - Set> deleteFiles = Sets.newHashSet(); subTree.collectRewriteDataFiles(rewriteDataFiles); subTree.collectRewritePosDataFiles(rewritePosDataFiles); - // A subTree will also be generated when there is no file, filter it - if (rewriteDataFiles.size() == 0 && rewritePosDataFiles.size() == 0) { - continue; - } - rewriteDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes)); - rewritePosDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes)); - result.add( - new SplitTask(rewriteDataFiles.keySet(), rewritePosDataFiles.keySet(), deleteFiles)); + BinPackingTaskSplitter binPacking = + new BinPackingTaskSplitter( + rewriteDataFiles, rewritePosDataFiles, Collections.emptyMap()); + result.addAll(binPacking.limitByTaskCount().splitTasks(taskCountForNode)); } return result; } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java index c89d0feaec..326827f32c 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java @@ -154,16 +154,17 @@ public List planTasks() { return cacheAndReturnTasks(Collections.emptyList()); } - List evaluators = new ArrayList<>(partitionPlanMap.values()); - // prioritize partitions with high cost to avoid starvation - evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder())); + List partitionPlans = new ArrayList<>(partitionPlanMap.values()); + // Prioritize partitions with high cost to avoid starvation + partitionPlans.sort( + Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder())); double maxInputSize = maxInputSizePerThread * availableCore; actualPartitionPlans = Lists.newArrayList(); long actualInputSize = 0; - for (PartitionEvaluator evaluator : evaluators) { - actualPartitionPlans.add((AbstractPartitionPlan) evaluator); - actualInputSize += evaluator.getCost(); + for (PartitionEvaluator partitionPlan : partitionPlans) { + actualPartitionPlans.add((AbstractPartitionPlan) partitionPlan); + actualInputSize += partitionPlan.getCost(); if (actualInputSize > maxInputSize) { break; } @@ -172,14 +173,14 @@ public List planTasks() { double avgThreadCost = actualInputSize / availableCore; List tasks = Lists.newArrayList(); for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) { - tasks.addAll(partitionPlan.splitTasks((int) (actualInputSize / avgThreadCost))); + tasks.addAll(partitionPlan.splitTasks((int) (partitionPlan.getCost() / avgThreadCost))); } if (!tasks.isEmpty()) { - if (evaluators.stream() - .anyMatch(evaluator -> evaluator.getOptimizingType() == OptimizingType.FULL)) { + if (actualPartitionPlans.stream() + .anyMatch(plan -> plan.getOptimizingType() == OptimizingType.FULL)) { optimizingType = OptimizingType.FULL; - } else if (evaluators.stream() - .anyMatch(evaluator -> evaluator.getOptimizingType() == OptimizingType.MAJOR)) { + } else if (actualPartitionPlans.stream() + .anyMatch(plan -> plan.getOptimizingType() == OptimizingType.MAJOR)) { optimizingType = OptimizingType.MAJOR; } else { optimizingType = OptimizingType.MINOR; diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java index 8d9f5a2ccd..c725f81579 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java @@ -270,7 +270,7 @@ public void testWithDeleteFilesBase() { segmentFiles.addAll(rePosSegmentFiles); segmentFiles.addAll(rewrittenSegmentFiles); - setTargetSize(segmentFiles, true); + setTargetSize(segmentFiles); setFragmentRatio(segmentFiles); assertSegmentFiles(segmentFiles); assertFragmentFiles(fragmentFiles); @@ -350,20 +350,32 @@ protected AbstractPartitionPlan buildPlanWithCurrentFiles() { return partitionPlan; } - private void setTargetSize(List dataFiles, boolean isCompleteSegment) { + protected void setTargetSize(List dataFiles) { Long maxFileSizeBytes = dataFiles.stream() .map(ContentFile::fileSizeInBytes) .max(Long::compareTo) .orElseThrow(() -> new IllegalStateException("dataFiles can't not be empty")); - long targetFileSizeBytes = - isCompleteSegment ? maxFileSizeBytes + 1 : (maxFileSizeBytes * 2 + 1); + long targetFileSizeBytes = maxFileSizeBytes + 1; getArcticTable() .updateProperties() .set(TableProperties.SELF_OPTIMIZING_TARGET_SIZE, targetFileSizeBytes + "") .commit(); } + protected void setMaxTaskSize(List dataFiles) { + Long maxFileSizeBytes = + dataFiles.stream() + .map(ContentFile::fileSizeInBytes) + .max(Long::compareTo) + .orElseThrow(() -> new IllegalStateException("dataFiles can't not be empty")); + long targetFileSizeBytes = maxFileSizeBytes + 1; + getArcticTable() + .updateProperties() + .set(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE, targetFileSizeBytes + "") + .commit(); + } + private void setFragmentRatio(List dataFiles) { Long minFileSizeBytes = dataFiles.stream() @@ -404,12 +416,6 @@ protected void closeMinorOptimizingInterval() { .commit(); } - private void resetTargetSize() { - updateTableProperty( - TableProperties.SELF_OPTIMIZING_TARGET_SIZE, - TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + ""); - } - protected void updateTableProperty(String key, String value) { getArcticTable().updateProperties().set(key, value).commit(); } @@ -509,6 +515,20 @@ protected void assertTask( assertTaskProperties(buildProperties(), actual.properties()); } + protected void assertTaskFileCount( + TaskDescriptor actual, + int rewrittenDataFiles, + int rePosDeletedDataFiles, + int readOnlyDeleteFiles, + int rewrittenDeleteFiles) { + Assert.assertEquals(actual.getPartition(), getPartitionPath()); + Assert.assertEquals(rewrittenDeleteFiles, actual.getInput().rewrittenDeleteFiles().length); + Assert.assertEquals(rewrittenDataFiles, actual.getInput().rewrittenDataFiles().length); + Assert.assertEquals(readOnlyDeleteFiles, actual.getInput().readOnlyDeleteFiles().length); + Assert.assertEquals(rePosDeletedDataFiles, actual.getInput().rePosDeletedDataFiles().length); + assertTaskProperties(buildProperties(), actual.properties()); + } + protected void assertTaskProperties(Map expect, Map actual) { Assert.assertEquals(expect, actual); } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestKeyedPartitionPlan.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestKeyedPartitionPlan.java index 7a1e6cd4c6..8f0f45dce2 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestKeyedPartitionPlan.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestKeyedPartitionPlan.java @@ -145,12 +145,15 @@ public void testChangeFilesWithDelete() { tableTestHelper() .writeChangeStore( getArcticTable(), transactionId, ChangeAction.INSERT, newRecords, false))); + setTargetSize(dataFiles); + setMaxTaskSize(dataFiles); Snapshot toSnapshot = getArcticTable().changeTable().currentSnapshot(); AbstractPartitionPlan plan = buildPlanWithCurrentFiles(); Assert.assertEquals(fromSnapshot.sequenceNumber(), (long) plan.getFromSequence()); Assert.assertEquals(toSnapshot.sequenceNumber(), (long) plan.getToSequence()); + // 1.Split tasks with tree nodes List taskDescriptors = plan.splitTasks(0); Assert.assertEquals(1, taskDescriptors.size()); @@ -161,6 +164,14 @@ public void testChangeFilesWithDelete() { Collections.emptyList(), Collections.emptyList(), deleteFiles); + + // 2.Split tasks with tree nodes and bin-packing + plan = buildPlanWithCurrentFiles(); + taskDescriptors = plan.splitTasks(2); + Assert.assertEquals(2, taskDescriptors.size()); + for (TaskDescriptor taskDescriptor : taskDescriptors) { + assertTaskFileCount(taskDescriptor, 1, 0, 0, 1); + } } @Test