Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2438] Mixed Format KeyedTable support bin-packing splitting optimizing tasks #2446

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@
import org.apache.iceberg.util.BinPacking;
import org.apache.iceberg.util.Pair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -227,20 +226,15 @@
}

/**
* When splitTask has only one undersized segment file, it needs to be triggered again to
* determine whether to rewrite pos. If needed, add it to rewritePosDataFiles and bin-packing
* together, else reserved delete files.
* If the undersized segment file should not be rewritten, we should dispose of it by either
* rewriting pos delete or skipping optimizing(reserving its delete files).
*/
protected void disposeUndersizedSegmentFile(SplitTask splitTask) {
Optional<DataFile> dataFile = splitTask.getRewriteDataFiles().stream().findFirst();
if (dataFile.isPresent()) {
DataFile rewriteDataFile = dataFile.get();
List<ContentFile<?>> deletes = new ArrayList<>(splitTask.getDeleteFiles());
if (evaluator().segmentShouldRewritePos(rewriteDataFile, deletes)) {
rewritePosDataFiles.put(rewriteDataFile, deletes);
} else {
reservedDeleteFiles(deletes);
}
protected void disposeUndersizedSegmentFile(DataFile undersizedSegmentFile) {
List<ContentFile<?>> deletes = undersizedSegmentFiles.get(undersizedSegmentFile);
if (evaluator().segmentShouldRewritePos(undersizedSegmentFile, deletes)) {
rewritePosDataFiles.put(undersizedSegmentFile, deletes);
} else {
reservedDeleteFiles(deletes);
}
}

Expand Down Expand Up @@ -329,19 +323,53 @@

protected class BinPackingTaskSplitter implements TaskSplitter {

private final Map<DataFile, List<ContentFile<?>>> rewriteDataFiles;
private final Map<DataFile, List<ContentFile<?>>> rewritePosDataFiles;
private final Map<DataFile, List<ContentFile<?>>> undersizedSegmentFiles;
private boolean limitByTaskCount = false;

public BinPackingTaskSplitter(
Map<DataFile, List<ContentFile<?>>> rewriteDataFiles,
Map<DataFile, List<ContentFile<?>>> rewritePosDataFiles,
Map<DataFile, List<ContentFile<?>>> undersizedSegmentFiles) {
this.rewriteDataFiles = rewriteDataFiles;
this.rewritePosDataFiles = rewritePosDataFiles;
this.undersizedSegmentFiles = undersizedSegmentFiles;
}

/**
* In this mode, the number of tasks will be limited to avoid the high cost of excessive
* duplicate delete files reads.
*/
public BinPackingTaskSplitter limitByTaskCount() {
limitByTaskCount = true;
return this;
}

@Override
public List<SplitTask> splitTasks(int targetTaskCount) {
if (allEmpty()) {
return Collections.emptyList();
}
List<SplitTask> results = Lists.newArrayList();
List<FileTask> fileTasks = Lists.newArrayList();
long taskSize = Math.max(config.getTargetSize(), config.getMaxTaskSize());
if (limitByTaskCount) {
taskSize = Math.max(taskSize, totalSize() / targetTaskCount + 1);
}
// bin-packing for undersized segment files
undersizedSegmentFiles.forEach(
(dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, true)));
for (SplitTask splitTask : genSplitTasks(fileTasks)) {
for (SplitTask splitTask : genSplitTasks(fileTasks, taskSize)) {
if (splitTask.getRewriteDataFiles().size() > 1) {
results.add(splitTask);
continue;
} else {
// If there are only one undersized segment file in the split task, it's meaningless to
// rewrite it.
splitTask
.getRewriteDataFiles()
.forEach(AbstractPartitionPlan.this::disposeUndersizedSegmentFile);
}
disposeUndersizedSegmentFile(splitTask);
}

// bin-packing for fragment file and rewrite pos data file
Expand All @@ -350,16 +378,33 @@
(dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, true)));
rewritePosDataFiles.forEach(
(dataFile, deleteFiles) -> fileTasks.add(new FileTask(dataFile, deleteFiles, false)));
results.addAll(genSplitTasks(fileTasks));
results.addAll(genSplitTasks(fileTasks, taskSize));
return results;
}

private Collection<? extends SplitTask> genSplitTasks(List<FileTask> allDataFiles) {
private boolean allEmpty() {
return rewriteDataFiles.isEmpty()
&& rewritePosDataFiles.isEmpty()
&& undersizedSegmentFiles.isEmpty();
}

private long totalSize() {
long totalSize = 0;
for (DataFile dataFile : rewriteDataFiles.keySet()) {
totalSize += dataFile.fileSizeInBytes();
}
for (DataFile dataFile : rewritePosDataFiles.keySet()) {
totalSize += dataFile.fileSizeInBytes();
}
for (DataFile dataFile : undersizedSegmentFiles.keySet()) {
totalSize += dataFile.fileSizeInBytes();
}

Check warning on line 401 in ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/AbstractPartitionPlan.java#L400-L401

Added lines #L400 - L401 were not covered by tests
return totalSize;
}

private Collection<SplitTask> genSplitTasks(List<FileTask> allDataFiles, long taskSize) {
List<List<FileTask>> packed =
new BinPacking.ListPacker<FileTask>(
Math.max(config.getTargetSize(), config.getMaxTaskSize()),
Integer.MAX_VALUE,
false)
new BinPacking.ListPacker<FileTask>(taskSize, Integer.MAX_VALUE, false)
.pack(allDataFiles, f -> f.getFile().fileSizeInBytes());

List<SplitTask> results = Lists.newArrayListWithCapacity(packed.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ protected IcebergPartitionPlan(

@Override
protected TaskSplitter buildTaskSplitter() {
return new BinPackingTaskSplitter();
return new BinPackingTaskSplitter(
rewriteDataFiles, rewritePosDataFiles, undersizedSegmentFiles);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ protected void beforeSplit() {
// files not in hive location to hive location, so the files in the hive location should not
// be optimizing.
Preconditions.checkArgument(reservedDeleteFiles.isEmpty(), "delete files should be empty");
Preconditions.checkArgument(
undersizedSegmentFiles.isEmpty(), "undersized segment files should be empty");
rewriteDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey()));
rewritePosDataFiles.entrySet().removeIf(entry -> evaluator().inHiveLocation(entry.getKey()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;

import javax.annotation.Nonnull;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;

public class MixedIcebergPartitionPlan extends AbstractPartitionPlan {
Expand Down Expand Up @@ -94,7 +93,8 @@ protected TaskSplitter buildTaskSplitter() {
if (isKeyedTable()) {
return new TreeNodeTaskSplitter();
} else {
return new BinPackingTaskSplitter();
return new BinPackingTaskSplitter(
rewriteDataFiles, rewritePosDataFiles, undersizedSegmentFiles);
}
}

Expand Down Expand Up @@ -253,40 +253,44 @@ public List<SplitTask> splitTasks(int targetTaskCount) {
List<SplitTask> result = Lists.newArrayList();
FileTree rootTree = FileTree.newTreeRoot();
undersizedSegmentFiles.forEach(rootTree::addRewriteDataFile);
for (SplitTask splitTask : genSplitTasks(rootTree)) {
for (SplitTask splitTask : genSplitTasks(rootTree, 0)) {
if (splitTask.getRewriteDataFiles().size() > 1) {
result.add(splitTask);
continue;
} else {
// If there are only one undersized segment file in the split task, it's meaningless to
// rewrite it.
splitTask
.getRewriteDataFiles()
.forEach(MixedIcebergPartitionPlan.this::disposeUndersizedSegmentFile);
}
disposeUndersizedSegmentFile(splitTask);
}

rootTree = FileTree.newTreeRoot();
rewritePosDataFiles.forEach(rootTree::addRewritePosDataFile);
rewriteDataFiles.forEach(rootTree::addRewriteDataFile);
result.addAll(genSplitTasks(rootTree));
result.addAll(genSplitTasks(rootTree, targetTaskCount - result.size()));
return result;
}

private Collection<? extends SplitTask> genSplitTasks(FileTree rootTree) {
/**
* Generate split tasks with target task count, if target task count <= the count of nodes, it
* will be ignored.
*/
private Collection<SplitTask> genSplitTasks(FileTree rootTree, int targetTaskCount) {
List<SplitTask> result = Lists.newArrayList();
rootTree.completeTree();
List<FileTree> subTrees = Lists.newArrayList();
rootTree.splitFileTree(subTrees, new SplitIfNoFileExists());
int taskCountForNode = Math.max(1, targetTaskCount / subTrees.size());
for (FileTree subTree : subTrees) {
Map<DataFile, List<ContentFile<?>>> rewriteDataFiles = Maps.newHashMap();
Map<DataFile, List<ContentFile<?>>> rewritePosDataFiles = Maps.newHashMap();
Set<ContentFile<?>> deleteFiles = Sets.newHashSet();
subTree.collectRewriteDataFiles(rewriteDataFiles);
subTree.collectRewritePosDataFiles(rewritePosDataFiles);
// A subTree will also be generated when there is no file, filter it
if (rewriteDataFiles.size() == 0 && rewritePosDataFiles.size() == 0) {
continue;
}
rewriteDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes));
rewritePosDataFiles.forEach((f, deletes) -> deleteFiles.addAll(deletes));
result.add(
new SplitTask(rewriteDataFiles.keySet(), rewritePosDataFiles.keySet(), deleteFiles));
BinPackingTaskSplitter binPacking =
new BinPackingTaskSplitter(
rewriteDataFiles, rewritePosDataFiles, Collections.emptyMap());
result.addAll(binPacking.limitByTaskCount().splitTasks(taskCountForNode));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,17 @@ public List<TaskDescriptor> planTasks() {
return cacheAndReturnTasks(Collections.emptyList());
}

List<PartitionEvaluator> evaluators = new ArrayList<>(partitionPlanMap.values());
// prioritize partitions with high cost to avoid starvation
evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder()));
List<PartitionEvaluator> partitionPlans = new ArrayList<>(partitionPlanMap.values());
// Prioritize partitions with high cost to avoid starvation
partitionPlans.sort(
Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder()));

double maxInputSize = maxInputSizePerThread * availableCore;
actualPartitionPlans = Lists.newArrayList();
long actualInputSize = 0;
for (PartitionEvaluator evaluator : evaluators) {
actualPartitionPlans.add((AbstractPartitionPlan) evaluator);
actualInputSize += evaluator.getCost();
for (PartitionEvaluator partitionPlan : partitionPlans) {
actualPartitionPlans.add((AbstractPartitionPlan) partitionPlan);
actualInputSize += partitionPlan.getCost();
if (actualInputSize > maxInputSize) {
break;
}
Expand All @@ -172,14 +173,14 @@ public List<TaskDescriptor> planTasks() {
double avgThreadCost = actualInputSize / availableCore;
List<TaskDescriptor> tasks = Lists.newArrayList();
for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) {
tasks.addAll(partitionPlan.splitTasks((int) (actualInputSize / avgThreadCost)));
tasks.addAll(partitionPlan.splitTasks((int) (partitionPlan.getCost() / avgThreadCost)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix calculating the target task count for each partition.

}
if (!tasks.isEmpty()) {
if (evaluators.stream()
.anyMatch(evaluator -> evaluator.getOptimizingType() == OptimizingType.FULL)) {
if (actualPartitionPlans.stream()
.anyMatch(plan -> plan.getOptimizingType() == OptimizingType.FULL)) {
optimizingType = OptimizingType.FULL;
} else if (evaluators.stream()
.anyMatch(evaluator -> evaluator.getOptimizingType() == OptimizingType.MAJOR)) {
} else if (actualPartitionPlans.stream()
.anyMatch(plan -> plan.getOptimizingType() == OptimizingType.MAJOR)) {
optimizingType = OptimizingType.MAJOR;
} else {
optimizingType = OptimizingType.MINOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void testWithDeleteFilesBase() {
segmentFiles.addAll(rePosSegmentFiles);
segmentFiles.addAll(rewrittenSegmentFiles);

setTargetSize(segmentFiles, true);
setTargetSize(segmentFiles);
setFragmentRatio(segmentFiles);
assertSegmentFiles(segmentFiles);
assertFragmentFiles(fragmentFiles);
Expand Down Expand Up @@ -350,20 +350,32 @@ protected AbstractPartitionPlan buildPlanWithCurrentFiles() {
return partitionPlan;
}

private void setTargetSize(List<DataFile> dataFiles, boolean isCompleteSegment) {
protected void setTargetSize(List<DataFile> dataFiles) {
Long maxFileSizeBytes =
dataFiles.stream()
.map(ContentFile::fileSizeInBytes)
.max(Long::compareTo)
.orElseThrow(() -> new IllegalStateException("dataFiles can't not be empty"));
long targetFileSizeBytes =
isCompleteSegment ? maxFileSizeBytes + 1 : (maxFileSizeBytes * 2 + 1);
long targetFileSizeBytes = maxFileSizeBytes + 1;
getArcticTable()
.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_TARGET_SIZE, targetFileSizeBytes + "")
.commit();
}

protected void setMaxTaskSize(List<DataFile> dataFiles) {
Long maxFileSizeBytes =
dataFiles.stream()
.map(ContentFile::fileSizeInBytes)
.max(Long::compareTo)
.orElseThrow(() -> new IllegalStateException("dataFiles can't not be empty"));
long targetFileSizeBytes = maxFileSizeBytes + 1;
getArcticTable()
.updateProperties()
.set(TableProperties.SELF_OPTIMIZING_MAX_TASK_SIZE, targetFileSizeBytes + "")
.commit();
}

private void setFragmentRatio(List<DataFile> dataFiles) {
Long minFileSizeBytes =
dataFiles.stream()
Expand Down Expand Up @@ -404,12 +416,6 @@ protected void closeMinorOptimizingInterval() {
.commit();
}

private void resetTargetSize() {
updateTableProperty(
TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT + "");
}

protected void updateTableProperty(String key, String value) {
getArcticTable().updateProperties().set(key, value).commit();
}
Expand Down Expand Up @@ -509,6 +515,20 @@ protected void assertTask(
assertTaskProperties(buildProperties(), actual.properties());
}

protected void assertTaskFileCount(
TaskDescriptor actual,
int rewrittenDataFiles,
int rePosDeletedDataFiles,
int readOnlyDeleteFiles,
int rewrittenDeleteFiles) {
Assert.assertEquals(actual.getPartition(), getPartitionPath());
Assert.assertEquals(rewrittenDeleteFiles, actual.getInput().rewrittenDeleteFiles().length);
Assert.assertEquals(rewrittenDataFiles, actual.getInput().rewrittenDataFiles().length);
Assert.assertEquals(readOnlyDeleteFiles, actual.getInput().readOnlyDeleteFiles().length);
Assert.assertEquals(rePosDeletedDataFiles, actual.getInput().rePosDeletedDataFiles().length);
assertTaskProperties(buildProperties(), actual.properties());
}

protected void assertTaskProperties(Map<String, String> expect, Map<String, String> actual) {
Assert.assertEquals(expect, actual);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@ public void testChangeFilesWithDelete() {
tableTestHelper()
.writeChangeStore(
getArcticTable(), transactionId, ChangeAction.INSERT, newRecords, false)));
setTargetSize(dataFiles);
setMaxTaskSize(dataFiles);
Snapshot toSnapshot = getArcticTable().changeTable().currentSnapshot();

AbstractPartitionPlan plan = buildPlanWithCurrentFiles();
Assert.assertEquals(fromSnapshot.sequenceNumber(), (long) plan.getFromSequence());
Assert.assertEquals(toSnapshot.sequenceNumber(), (long) plan.getToSequence());

// 1.Split tasks with tree nodes
List<TaskDescriptor> taskDescriptors = plan.splitTasks(0);

Assert.assertEquals(1, taskDescriptors.size());
Expand All @@ -161,6 +164,14 @@ public void testChangeFilesWithDelete() {
Collections.emptyList(),
Collections.emptyList(),
deleteFiles);

// 2.Split tasks with tree nodes and bin-packing
plan = buildPlanWithCurrentFiles();
taskDescriptors = plan.splitTasks(2);
Assert.assertEquals(2, taskDescriptors.size());
for (TaskDescriptor taskDescriptor : taskDescriptors) {
assertTaskFileCount(taskDescriptor, 1, 0, 0, 1);
}
}

@Test
Expand Down