Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify the strategy of compaction schedule #12469

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,9 @@ public int executeCompaction() throws InterruptedException {
// could
// be evicted due to the low priority of the task
for (long timePartition : timePartitions) {
if (CompactionTaskManager.getInstance().isWaitingQueueFull()) {
break;
}
CompactionScheduler.sharedLockCompactionSelection();
try {
trySubmitCount +=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ private static int addTaskToWaitingQueue(List<? extends AbstractCompactionTask>

private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task)
throws InterruptedException {
if (CompactionTaskManager.getInstance().isWaitingQueueFull()) {
return false;
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
Expand Down Expand Up @@ -244,6 +247,9 @@ private static int tryToSubmitCrossSpaceCompactionTask(
if (!config.isEnableCrossSpaceCompaction()) {
return 0;
}
if (CompactionTaskManager.getInstance().isWaitingQueueFull()) {
return 0;
}
String logicalStorageGroupName = tsFileManager.getStorageGroupName();
String dataRegionId = tsFileManager.getDataRegionId();
ICrossSpaceSelector crossSpaceCompactionSelector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class CompactionTaskManager implements IService {

private volatile boolean init = false;

private final AtomicInteger zeroLevelInnerCompactionTaskNum = new AtomicInteger(0);

public static CompactionTaskManager getInstance() {
return INSTANCE;
}
Expand All @@ -120,6 +122,15 @@ public synchronized void start() {
candidateCompactionTaskQueue.regsitPollLastHook(
AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles);
candidateCompactionTaskQueue.regsitPollLastHook(AbstractCompactionTask::handleTaskCleanup);
candidateCompactionTaskQueue.regsitPollLastHook(
task -> {
if (task instanceof InnerSpaceCompactionTask
&& ((InnerSpaceCompactionTask) task).getSumOfCompactionCount() == 0) {
CompactionTaskManager.getInstance()
.getZeroLevelInnerCompactionTaskNum()
.decrementAndGet();
}
});
init = true;
}
logger.info("Compaction task manager started.");
Expand Down Expand Up @@ -233,8 +244,17 @@ public ServiceType getID() {
return ServiceType.COMPACTION_SERVICE;
}

public AtomicInteger getZeroLevelInnerCompactionTaskNum() {
return this.zeroLevelInnerCompactionTaskNum;
}

public boolean isWaitingQueueFull() {
return candidateCompactionTaskQueue.size() == candidateCompactionTaskQueue.getMaxSize();
if (config.getCandidateCompactionTaskQueueSize()
> 2 * config.getCompactionScheduleThreadNum()) {
return candidateCompactionTaskQueue.size()
>= (candidateCompactionTaskQueue.getMaxSize() - config.getCompactionScheduleThreadNum());
}
return candidateCompactionTaskQueue.size() >= candidateCompactionTaskQueue.getMaxSize();
}

/**
Expand All @@ -250,6 +270,10 @@ public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compact
&& !candidateCompactionTaskQueue.contains(compactionTask)
&& !isTaskRunning(compactionTask)
&& compactionTask.setSourceFilesToCompactionCandidate()) {
if (compactionTask instanceof InnerSpaceCompactionTask
&& ((InnerSpaceCompactionTask) compactionTask).getSumOfCompactionCount() == 0) {
zeroLevelInnerCompactionTaskNum.incrementAndGet();
}
candidateCompactionTaskQueue.put(compactionTask);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;

import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;

Expand Down Expand Up @@ -51,6 +52,10 @@ public AbstractCompactionTask take() throws InterruptedException {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
continue;
}
if (task instanceof InnerSpaceCompactionTask
&& ((InnerSpaceCompactionTask) task).getSumOfCompactionCount() == 0) {
CompactionTaskManager.getInstance().getZeroLevelInnerCompactionTaskNum().decrementAndGet();
}
return task;
}
}
Expand All @@ -75,6 +80,10 @@ private boolean prepareTask(AbstractCompactionTask task) throws InterruptedExcep
}

private void dropCompactionTask(AbstractCompactionTask task) {
if (task instanceof InnerSpaceCompactionTask
&& ((InnerSpaceCompactionTask) task).getSumOfCompactionCount() == 0) {
CompactionTaskManager.getInstance().getZeroLevelInnerCompactionTaskNum().decrementAndGet();
}
task.resetCompactionCandidateStatusForAllSourceFiles();
task.handleTaskCleanup();
task.releaseOccupiedResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ private List<InnerSpaceCompactionTask> selectTaskBaseOnLevel() throws IOExceptio
if (!selectedResourceList.isEmpty()) {
return createCompactionTasks(selectedResourceList, CompactionTaskPriorityType.NORMAL);
}
// If any zero level task exist, skip the selection of high level compaction task
if (CompactionTaskManager.getInstance().getZeroLevelInnerCompactionTaskNum().get() > 0) {
return Collections.emptyList();
}
}
return Collections.emptyList();
}
Expand Down
Loading