From 18445419696da668a93a5a73b31978e6d3f4be29 Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 15 Dec 2023 18:39:28 +0800 Subject: [PATCH 1/8] fix cost calculation --- .../optimizing/plan/OptimizingPlanner.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java index 7bdd1f323c..01a0be6ac1 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java @@ -128,16 +128,17 @@ public List planTasks() { return cacheAndReturnTasks(Collections.emptyList()); } - List evaluators = new ArrayList<>(partitionPlanMap.values()); - // prioritize partitions with high cost to avoid starvation - evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder())); + List partitionPlans = new ArrayList<>(partitionPlanMap.values()); + // Prioritize partitions with high cost to avoid starvation + partitionPlans.sort( + Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder())); double maxInputSize = maxInputSizePerThread * availableCore; actualPartitionPlans = Lists.newArrayList(); long actualInputSize = 0; - for (PartitionEvaluator evaluator : evaluators) { - actualPartitionPlans.add((AbstractPartitionPlan) evaluator); - actualInputSize += evaluator.getCost(); + for (PartitionEvaluator partitionPlan : partitionPlans) { + actualPartitionPlans.add((AbstractPartitionPlan) partitionPlan); + actualInputSize += partitionPlan.getCost(); if (actualInputSize > maxInputSize) { break; } @@ -146,14 +147,14 @@ public List planTasks() { double avgThreadCost = actualInputSize / availableCore; List tasks = Lists.newArrayList(); for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) { - tasks.addAll(partitionPlan.splitTasks((int) (actualInputSize / avgThreadCost))); + tasks.addAll(partitionPlan.splitTasks((int) (partitionPlan.getCost() / avgThreadCost))); } if (!tasks.isEmpty()) { - if (evaluators.stream() - .anyMatch(evaluator -> evaluator.getOptimizingType() == OptimizingType.FULL)) { + if (actualPartitionPlans.stream() + .anyMatch(plan -> plan.getOptimizingType() == OptimizingType.FULL)) { optimizingType = OptimizingType.FULL; - } else if (evaluators.stream() - .anyMatch(evaluator -> evaluator.getOptimizingType() == OptimizingType.MAJOR)) { + } else if (actualPartitionPlans.stream() + .anyMatch(plan -> plan.getOptimizingType() == OptimizingType.MAJOR)) { optimizingType = OptimizingType.MAJOR; } else { optimizingType = OptimizingType.MINOR; From 2e57999528e7817f8a6905633e25e4188e45a153 Mon Sep 17 00:00:00 2001 From: wangtao Date: Mon, 18 Dec 2023 15:00:34 +0800 Subject: [PATCH 2/8] split tasks by bin-packing in a tree node --- .../plan/AbstractPartitionPlan.java | 32 ++++++++++++++++--- .../optimizing/plan/IcebergPartitionPlan.java | 2 +- .../plan/MixedIcebergPartitionPlan.java | 14 +++----- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index 3903e55562..6810691fe1 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -274,6 +274,26 @@ public boolean isRewritePosDataFile() { protected class BinPackingTaskSplitter implements TaskSplitter { + private final Map>> rewriteDataFiles; + private final Map>> rewritePosDataFiles; + private boolean limitByTaskCount = false; + + public BinPackingTaskSplitter( + Map>> rewriteDataFiles, + Map>> rewritePosDataFiles) { + this.rewriteDataFiles = rewriteDataFiles; + this.rewritePosDataFiles = rewritePosDataFiles; + } + + /** + * In this mode, the number of tasks will be limited to avoid the high cost of excessive + * duplicate delete files reads. + */ + public BinPackingTaskSplitter limitByTaskCount() { + limitByTaskCount = true; + return this; + } + @Override public List splitTasks(int targetTaskCount) { // bin-packing @@ -283,11 +303,15 @@ public List splitTasks(int targetTaskCount) { rewritePosDataFiles.forEach( (dataFile, deleteFiles) -> allDataFiles.add(new FileTask(dataFile, deleteFiles, false))); + long taskSize = Math.max(config.getTargetSize(), config.getMaxTaskSize()); + if (limitByTaskCount) { + long totalSize = + allDataFiles.stream().map(f -> f.getFile().fileSizeInBytes()).reduce(0L, Long::sum); + taskSize = Math.max(taskSize, totalSize / targetTaskCount + 1); + } + List> packed = - new BinPacking.ListPacker( - Math.max(config.getTargetSize(), config.getMaxTaskSize()), - Integer.MAX_VALUE, - false) + new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, false) .pack(allDataFiles, f -> f.getFile().fileSizeInBytes()); // collect diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java index 7004d1d4d5..6abd973faa 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java @@ -32,7 +32,7 @@ protected IcebergPartitionPlan( @Override protected TaskSplitter buildTaskSplitter() { - return new BinPackingTaskSplitter(); + return new BinPackingTaskSplitter(rewriteDataFiles, rewritePosDataFiles); } @Override diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index 1c2730b47a..42d1962321 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -32,13 +32,10 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import javax.annotation.Nonnull; - import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Predicate; public class MixedIcebergPartitionPlan extends AbstractPartitionPlan { @@ -88,7 +85,7 @@ protected TaskSplitter buildTaskSplitter() { if (isKeyedTable()) { return new TreeNodeTaskSplitter(); } else { - return new BinPackingTaskSplitter(); + return new BinPackingTaskSplitter(rewriteDataFiles, rewritePosDataFiles); } } @@ -241,16 +238,15 @@ public List splitTasks(int targetTaskCount) { rootTree.completeTree(); List subTrees = Lists.newArrayList(); rootTree.splitFileTree(subTrees, new SplitIfNoFileExists()); + int taskCountForNode = Math.max(1, targetTaskCount / subTrees.size()); for (FileTree subTree : subTrees) { Map>> rewriteDataFiles = Maps.newHashMap(); Map>> rewritePosDataFiles = Maps.newHashMap(); - Set> deleteFiles = Sets.newHashSet(); subTree.collectRewriteDataFiles(rewriteDataFiles); subTree.collectRewritePosDataFiles(rewritePosDataFiles); - rewriteDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes)); - rewritePosDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes)); - result.add( - new SplitTask(rewriteDataFiles.keySet(), rewritePosDataFiles.keySet(), deleteFiles)); + BinPackingTaskSplitter binPacking = + new BinPackingTaskSplitter(rewriteDataFiles, rewritePosDataFiles); + result.addAll(binPacking.limitByTaskCount().splitTasks(taskCountForNode)); } return result; } From caaefe1c7ed5b348fd2af3bb236125687e2e8689 Mon Sep 17 00:00:00 2001 From: wangtao Date: Mon, 18 Dec 2023 20:18:22 +0800 Subject: [PATCH 3/8] spotless:apply --- .../arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index 42d1962321..bba5ce1fbb 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -34,6 +34,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import javax.annotation.Nonnull; + import java.util.List; import java.util.Map; import java.util.function.Predicate; From de981e00805e308800378eb481c3f8f900cd0ae4 Mon Sep 17 00:00:00 2001 From: wangtao Date: Tue, 19 Dec 2023 21:40:05 +0800 Subject: [PATCH 4/8] fix conflict --- .../plan/AbstractPartitionPlan.java | 73 ++++++++++++------- .../optimizing/plan/IcebergPartitionPlan.java | 3 +- .../plan/MixedHivePartitionPlan.java | 2 + .../plan/MixedIcebergPartitionPlan.java | 33 +++++---- 4 files changed, 69 insertions(+), 42 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index e1d31eaea6..c19c1a8372 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -31,11 +31,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.BinPacking; -import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -220,20 +219,15 @@ public Weight getWeight() { } /** - * When splitTask has only one undersized segment file, it needs to be triggered again to - * determine whether to rewrite pos. If needed, add it to rewritePosDataFiles and bin-packing - * together, else reserved delete files. + * If the undersized segment file should not be rewritten, we should dispose of it by either + * rewriting pos delete or skipping optimizing(reserving its delete files). */ - protected void disposeUndersizedSegmentFile(SplitTask splitTask) { - Optional dataFile = splitTask.getRewriteDataFiles().stream().findFirst(); - if (dataFile.isPresent()) { - DataFile rewriteDataFile = dataFile.get(); - List> deletes = new ArrayList<>(splitTask.getDeleteFiles()); - if (evaluator().segmentShouldRewritePos(rewriteDataFile, deletes)) { - rewritePosDataFiles.put(rewriteDataFile, deletes); - } else { - reservedDeleteFiles(deletes); - } + protected void disposeUndersizedSegmentFile(DataFile undersizedSegmentFile) { + List> deletes = undersizedSegmentFiles.get(undersizedSegmentFile); + if (evaluator().segmentShouldRewritePos(undersizedSegmentFile, deletes)) { + rewritePosDataFiles.put(undersizedSegmentFile, deletes); + } else { + reservedDeleteFiles(deletes); } } @@ -318,13 +312,16 @@ protected class BinPackingTaskSplitter implements TaskSplitter { private final Map>> rewriteDataFiles; private final Map>> rewritePosDataFiles; + private final Map>> undersizedSegmentFiles; private boolean limitByTaskCount = false; public BinPackingTaskSplitter( Map>> rewriteDataFiles, - Map>> rewritePosDataFiles) { + Map>> rewritePosDataFiles, + Map>> undersizedSegmentFiles) { this.rewriteDataFiles = rewriteDataFiles; this.rewritePosDataFiles = rewritePosDataFiles; + this.undersizedSegmentFiles = undersizedSegmentFiles; } /** @@ -338,17 +335,26 @@ public BinPackingTaskSplitter limitByTaskCount() { @Override public List splitTasks(int targetTaskCount) { + if (allEmpty()) { + return Collections.emptyList(); + } List results = Lists.newArrayList(); List fileTasks = Lists.newArrayList(); + long taskSize = Math.max(config.getTargetSize(), config.getMaxTaskSize()); + if (limitByTaskCount) { + taskSize = Math.max(taskSize, totalSize() / targetTaskCount + 1); + } // bin-packing for undersized segment files undersizedSegmentFiles.forEach( (dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, true))); - for (SplitTask splitTask : genSplitTasks(fileTasks)) { + for (SplitTask splitTask : genSplitTasks(fileTasks, taskSize)) { if (splitTask.getRewriteDataFiles().size() > 1) { results.add(splitTask); - continue; + } else { + splitTask + .getRewriteDataFiles() + .forEach(AbstractPartitionPlan.this::disposeUndersizedSegmentFile); } - disposeUndersizedSegmentFile(splitTask); } // bin-packing for fragment file and rewrite pos data file @@ -357,16 +363,31 @@ public List splitTasks(int targetTaskCount) { (dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, true))); rewritePosDataFiles.forEach( (dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, false))); - results.addAll(genSplitTasks(fileTasks)); + results.addAll(genSplitTasks(fileTasks, taskSize)); return results; } - private Collection genSplitTasks(List allDataFiles) { - long taskSize = Math.max(config.getTargetSize(), config.getMaxTaskSize()); - if (limitByTaskCount) { - long totalSize = - allDataFiles.stream().map(f -> f.getFile().fileSizeInBytes()).reduce(0L, Long::sum); - taskSize = Math.max(taskSize, totalSize / targetTaskCount + 1); + private boolean allEmpty() { + return rewriteDataFiles.isEmpty() + && rewritePosDataFiles.isEmpty() + && undersizedSegmentFiles.isEmpty(); + } + + private long totalSize() { + long totalSize = 0; + for (DataFile dataFile : rewriteDataFiles.keySet()) { + totalSize += dataFile.fileSizeInBytes(); + } + for (DataFile dataFile : rewritePosDataFiles.keySet()) { + totalSize += dataFile.fileSizeInBytes(); + } + for (DataFile dataFile : undersizedSegmentFiles.keySet()) { + totalSize += dataFile.fileSizeInBytes(); + } + return totalSize; + } + + private Collection genSplitTasks(List allDataFiles, long taskSize) { List> packed = new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, false) .pack(allDataFiles, f -> f.getFile().fileSizeInBytes()); diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java index 25b8c5f3e7..fc1efec86c 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/IcebergPartitionPlan.java @@ -35,7 +35,8 @@ protected IcebergPartitionPlan( @Override protected TaskSplitter buildTaskSplitter() { - return new BinPackingTaskSplitter(rewriteDataFiles, rewritePosDataFiles); + return new BinPackingTaskSplitter( + rewriteDataFiles, rewritePosDataFiles, undersizedSegmentFiles); } @Override diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java index 7ea5c67ec8..7703074fdf 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedHivePartitionPlan.java @@ -68,6 +68,8 @@ protected void beforeSplit() { // files not in hive location to hive location, so the files in the hive location should not // be optimizing. Preconditions.checkArgument(reservedDeleteFiles.isEmpty(), "delete files should be empty"); + Preconditions.checkArgument( + undersizedSegmentFiles.isEmpty(), "undersized segment files should be empty"); rewriteDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); rewritePosDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey())); } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index 1309f07bf2..61ed5cfbde 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -34,8 +34,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import javax.annotation.Nonnull; - import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Predicate; @@ -87,7 +87,8 @@ protected TaskSplitter buildTaskSplitter() { if (isKeyedTable()) { return new TreeNodeTaskSplitter(); } else { - return new BinPackingTaskSplitter(rewriteDataFiles, rewritePosDataFiles); + return new BinPackingTaskSplitter( + rewriteDataFiles, rewritePosDataFiles, undersizedSegmentFiles); } } @@ -246,22 +247,28 @@ public List splitTasks(int targetTaskCount) { List result = Lists.newArrayList(); FileTree rootTree = FileTree.newTreeRoot(); undersizedSegmentFiles.forEach(rootTree::addRewriteDataFile); - for (SplitTask splitTask : genSplitTasks(rootTree)) { + for (SplitTask splitTask : genSplitTasks(rootTree, 0)) { if (splitTask.getRewriteDataFiles().size() > 1) { result.add(splitTask); - continue; + } else { + splitTask + .getRewriteDataFiles() + .forEach(MixedIcebergPartitionPlan.this::disposeUndersizedSegmentFile); } - disposeUndersizedSegmentFile(splitTask); } rootTree = FileTree.newTreeRoot(); rewritePosDataFiles.forEach(rootTree::addRewritePosDataFile); rewriteDataFiles.forEach(rootTree::addRewriteDataFile); - result.addAll(genSplitTasks(rootTree)); + result.addAll(genSplitTasks(rootTree, targetTaskCount - result.size())); return result; } - private Collection genSplitTasks(FileTree rootTree) { + /** + * Generate split tasks with target task count, if target task count <= the count of nodes, it + * will be ignored. + */ + private Collection genSplitTasks(FileTree rootTree, int targetTaskCount) { List result = Lists.newArrayList(); rootTree.completeTree(); List subTrees = Lists.newArrayList(); @@ -272,14 +279,10 @@ private Collection genSplitTasks(FileTree rootTree) { Map>> rewritePosDataFiles = Maps.newHashMap(); subTree.collectRewriteDataFiles(rewriteDataFiles); subTree.collectRewritePosDataFiles(rewritePosDataFiles); - // A subTree will also be generated when there is no file, filter it - if (rewriteDataFiles.size() == 0 && rewritePosDataFiles.size() == 0) { - continue; - } - rewriteDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes)); - rewritePosDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes)); - result.add( - new SplitTask(rewriteDataFiles.keySet(), rewritePosDataFiles.keySet(), deleteFiles)); + BinPackingTaskSplitter binPacking = + new BinPackingTaskSplitter( + rewriteDataFiles, rewritePosDataFiles, Collections.emptyMap()); + result.addAll(binPacking.limitByTaskCount().splitTasks(taskCountForNode)); } return result; } From f176311119886fe49438937b999b2722085cc0a2 Mon Sep 17 00:00:00 2001 From: wangtao Date: Tue, 19 Dec 2023 21:50:44 +0800 Subject: [PATCH 5/8] modify comments --- .../arctic/server/optimizing/plan/AbstractPartitionPlan.java | 2 ++ .../server/optimizing/plan/MixedIcebergPartitionPlan.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java index c19c1a8372..07eaa806e2 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java @@ -351,6 +351,8 @@ public List splitTasks(int targetTaskCount) { if (splitTask.getRewriteDataFiles().size() > 1) { results.add(splitTask); } else { + // If there are only one undersized segment file in the split task, it's meaningless to + // rewrite it. splitTask .getRewriteDataFiles() .forEach(AbstractPartitionPlan.this::disposeUndersizedSegmentFile); diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index 61ed5cfbde..53d1e33020 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -251,6 +251,8 @@ public List splitTasks(int targetTaskCount) { if (splitTask.getRewriteDataFiles().size() > 1) { result.add(splitTask); } else { + // If there are only one undersized segment file in the split task, it's meaningless to + // rewrite it. splitTask .getRewriteDataFiles() .forEach(MixedIcebergPartitionPlan.this::disposeUndersizedSegmentFile); From 81f00497a7f7ca641ad3b0aeb078147276d6fce6 Mon Sep 17 00:00:00 2001 From: wangtao Date: Tue, 19 Dec 2023 21:52:10 +0800 Subject: [PATCH 6/8] spotless: apply --- .../arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java index 53d1e33020..74b294eeaa 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/MixedIcebergPartitionPlan.java @@ -34,6 +34,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import javax.annotation.Nonnull; + import java.util.Collection; import java.util.Collections; import java.util.List; From 130c2466d5b0ce06e5cac54aae9c8aa745dff0dc Mon Sep 17 00:00:00 2001 From: wangtao Date: Wed, 27 Dec 2023 21:04:45 +0800 Subject: [PATCH 7/8] as head --- .../plan/MixedTablePlanTestBase.java | 40 ++++++++++++++----- .../plan/TestKeyedPartitionPlan.java | 11 +++++ 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java index 95bb2f93f6..d9b63d2c9f 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java @@ -266,7 +266,7 @@ public void testWithDeleteFilesBase() { segmentFiles.addAll(rePosSegmentFiles); segmentFiles.addAll(rewrittenSegmentFiles); - setTargetSize(segmentFiles, true); + setTargetSize(segmentFiles); setFragmentRatio(segmentFiles); assertSegmentFiles(segmentFiles); assertFragmentFiles(fragmentFiles); @@ -346,20 +346,32 @@ protected AbstractPartitionPlan buildPlanWithCurrentFiles() { return partitionPlan; } - private void setTargetSize(List dataFiles, boolean isCompleteSegment) { + protected void setTargetSize(List dataFiles) { Long maxFileSizeBytes = dataFiles.stream() .map(ContentFile::fileSizeInBytes) .max(Long::compareTo) .orElseThrow(() -> new IllegalStateException("dataFiles can't not be empty")); - long targetFileSizeBytes = - isCompleteSegment ? maxFileSizeBytes + 1 : (maxFileSizeBytes * 2 + 1); + long targetFileSizeBytes = maxFileSizeBytes + 1; getArcticTable() .updateProperties() .set(TableProperties.SELF_OPTIMIZING_TARGET_SIZE, targetFileSizeBytes + "") .commit(); } + protected void setMaxTaskSize(List dataFiles) { + Long maxFileSizeBytes = + dataFiles.stream() + .map(ContentFile::fileSizeInBytes) + .max(Long::compareTo) + .orElseThrow(() -> new IllegalStateException("dataFiles can't not be empty")); + long targetFileSizeBytes = maxFileSizeBytes + 1; + getArcticTable() + .updateProperties() + .set(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE, targetFileSizeBytes + "") + .commit(); + } + private void setFragmentRatio(List dataFiles) { Long minFileSizeBytes = dataFiles.stream() @@ -400,12 +412,6 @@ protected void closeMinorOptimizingInterval() { .commit(); } - private void resetTargetSize() { - updateTableProperty( - TableProperties.SELF_OPTIMIZING_TARGET_SIZE, - TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + ""); - } - protected void updateTableProperty(String key, String value) { getArcticTable().updateProperties().set(key, value).commit(); } @@ -505,6 +511,20 @@ protected void assertTask( assertTaskProperties(buildProperties(), actual.properties()); } + protected void assertTaskFileCount( + TaskDescriptor actual, + int rewrittenDataFiles, + int rePosDeletedDataFiles, + int readOnlyDeleteFiles, + int rewrittenDeleteFiles) { + Assert.assertEquals(actual.getPartition(), getPartition()); + Assert.assertEquals(rewrittenDeleteFiles, actual.getInput().rewrittenDeleteFiles().length); + Assert.assertEquals(rewrittenDataFiles, actual.getInput().rewrittenDataFiles().length); + Assert.assertEquals(readOnlyDeleteFiles, actual.getInput().readOnlyDeleteFiles().length); + Assert.assertEquals(rePosDeletedDataFiles, actual.getInput().rePosDeletedDataFiles().length); + assertTaskProperties(buildProperties(), actual.properties()); + } + protected void assertTaskProperties(Map expect, Map actual) { Assert.assertEquals(expect, actual); } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestKeyedPartitionPlan.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestKeyedPartitionPlan.java index 7a1e6cd4c6..8f0f45dce2 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestKeyedPartitionPlan.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/TestKeyedPartitionPlan.java @@ -145,12 +145,15 @@ public void testChangeFilesWithDelete() { tableTestHelper() .writeChangeStore( getArcticTable(), transactionId, ChangeAction.INSERT, newRecords, false))); + setTargetSize(dataFiles); + setMaxTaskSize(dataFiles); Snapshot toSnapshot = getArcticTable().changeTable().currentSnapshot(); AbstractPartitionPlan plan = buildPlanWithCurrentFiles(); Assert.assertEquals(fromSnapshot.sequenceNumber(), (long) plan.getFromSequence()); Assert.assertEquals(toSnapshot.sequenceNumber(), (long) plan.getToSequence()); + // 1.Split tasks with tree nodes List taskDescriptors = plan.splitTasks(0); Assert.assertEquals(1, taskDescriptors.size()); @@ -161,6 +164,14 @@ public void testChangeFilesWithDelete() { Collections.emptyList(), Collections.emptyList(), deleteFiles); + + // 2.Split tasks with tree nodes and bin-packing + plan = buildPlanWithCurrentFiles(); + taskDescriptors = plan.splitTasks(2); + Assert.assertEquals(2, taskDescriptors.size()); + for (TaskDescriptor taskDescriptor : taskDescriptors) { + assertTaskFileCount(taskDescriptor, 1, 0, 0, 1); + } } @Test From ca2fdaafb2e6f90c591e1a9c3b9d24e3a90d7795 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 28 Dec 2023 09:58:43 +0800 Subject: [PATCH 8/8] fix unit test --- .../arctic/server/optimizing/plan/MixedTablePlanTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java index d92c31735d..c725f81579 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/plan/MixedTablePlanTestBase.java @@ -521,7 +521,7 @@ protected void assertTaskFileCount( int rePosDeletedDataFiles, int readOnlyDeleteFiles, int rewrittenDeleteFiles) { - Assert.assertEquals(actual.getPartition(), getPartition()); + Assert.assertEquals(actual.getPartition(), getPartitionPath()); Assert.assertEquals(rewrittenDeleteFiles, actual.getInput().rewrittenDeleteFiles().length); Assert.assertEquals(rewrittenDataFiles, actual.getInput().rewrittenDataFiles().length); Assert.assertEquals(readOnlyDeleteFiles, actual.getInput().readOnlyDeleteFiles().length);