From 0c55ba22149c926ac301bf39b5582f2641172b9e Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 25 Apr 2024 09:46:03 +0800 Subject: [PATCH 1/6] modify compaction schedule strategy --- .../execute/task/InnerSpaceCompactionTask.java | 8 ++++++++ .../schedule/CompactionScheduler.java | 3 +++ .../schedule/CompactionTaskManager.java | 17 ++++++++++++++++- .../impl/SizeTieredCompactionSelector.java | 4 ++++ 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 01399c8d5fe4..43816eb097f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator; @@ -508,6 +509,13 @@ public int getProcessedFileNum() { return selectedTsFileResourceList.size(); } + @Override + public void handleTaskCleanup() { + if (sumOfCompactionCount == 0) { + CompactionTaskManager.getInstance().getZeroLevelInnerCompactionTaskNum().decrementAndGet(); + } + } + @Override protected void createSummary() { if (performer instanceof FastCompactionPerformer) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java index 6ebb5578670a..8e6feb247b70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java @@ -200,6 +200,9 @@ private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task) "Compaction task start check failed because disk free ratio is less than disk_space_warning_threshold"); return false; } + if (task.getEstimatedMemoryCost() > SystemInfo.getInstance().getMemorySizeForCompaction()) { + return false; + } return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java index 7fa2e1158aa2..8b7ab911fd78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java @@ -86,6 +86,8 @@ public class CompactionTaskManager implements IService { private volatile boolean init = false; + private final AtomicInteger zeroLevelInnerCompactionTaskNum = new AtomicInteger(0); + public static CompactionTaskManager getInstance() { return INSTANCE; } @@ -212,8 +214,17 @@ public ServiceType getID() { return ServiceType.COMPACTION_SERVICE; } + public AtomicInteger getZeroLevelInnerCompactionTaskNum() { + return this.zeroLevelInnerCompactionTaskNum; + } + public boolean isWaitingQueueFull() { - return candidateCompactionTaskQueue.size() == candidateCompactionTaskQueue.getMaxSize(); + if (config.getCandidateCompactionTaskQueueSize() + > 2 * config.getCompactionScheduleThreadNum()) { + return candidateCompactionTaskQueue.size() + >= (candidateCompactionTaskQueue.getMaxSize() - config.getCompactionScheduleThreadNum()); + } + return candidateCompactionTaskQueue.size() >= candidateCompactionTaskQueue.getMaxSize(); } /** @@ -229,6 +240,10 @@ public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compact && !candidateCompactionTaskQueue.contains(compactionTask) && !isTaskRunning(compactionTask) && compactionTask.setSourceFilesToCompactionCandidate()) { + if (compactionTask instanceof InnerSpaceCompactionTask + && ((InnerSpaceCompactionTask) compactionTask).getSumOfCompactionCount() == 0) { + zeroLevelInnerCompactionTaskNum.incrementAndGet(); + } candidateCompactionTaskQueue.put(compactionTask); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java index 2dee4544b010..89b05f44e533 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SizeTieredCompactionSelector.java @@ -210,6 +210,10 @@ private List selectTaskBaseOnLevel() throws IOExceptio if (!selectedResourceList.isEmpty()) { return createCompactionTasks(selectedResourceList, CompactionTaskPriorityType.NORMAL); } + // If any zero level task exist, skip the selection of high level compaction task + if (CompactionTaskManager.getInstance().getZeroLevelInnerCompactionTaskNum().get() > 0) { + return Collections.emptyList(); + } } return Collections.emptyList(); } From 2d419721985fbdfb51d404e7b4e0c045e41b2bea Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 25 Apr 2024 16:36:20 +0800 Subject: [PATCH 2/6] skip submit compaction task if queue is full --- .../apache/iotdb/db/storageengine/dataregion/DataRegion.java | 3 +++ .../dataregion/compaction/schedule/CompactionScheduler.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 9555eb0b7f2c..d209899911ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -2400,6 +2400,9 @@ public int executeCompaction() throws InterruptedException { // could // be evicted due to the low priority of the task for (long timePartition : timePartitions) { + if (CompactionTaskManager.getInstance().isWaitingQueueFull()) { + break; + } CompactionScheduler.sharedLockCompactionSelection(); try { trySubmitCount += diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java index 8e6feb247b70..7280da2cb54d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java @@ -186,6 +186,9 @@ private static int addTaskToWaitingQueue(List private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task) throws InterruptedException { + if (CompactionTaskManager.getInstance().isWaitingQueueFull()) { + return false; + } if (Thread.interrupted()) { throw new InterruptedException(); } From 5cac37a2ce4c31abb1a67e6b9529bb4fc3107ffd Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 29 Apr 2024 14:31:12 +0800 Subject: [PATCH 3/6] delay estimate memory of InnerSpaceCompactionTask --- .../dataregion/compaction/schedule/CompactionScheduler.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java index 7280da2cb54d..845653d112e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java @@ -203,9 +203,6 @@ private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task) "Compaction task start check failed because disk free ratio is less than disk_space_warning_threshold"); return false; } - if (task.getEstimatedMemoryCost() > SystemInfo.getInstance().getMemorySizeForCompaction()) { - return false; - } return true; } @@ -250,6 +247,9 @@ private static int tryToSubmitCrossSpaceCompactionTask( if (!config.isEnableCrossSpaceCompaction()) { return 0; } + if (CompactionTaskManager.getInstance().isWaitingQueueFull()) { + return 0; + } String logicalStorageGroupName = tsFileManager.getStorageGroupName(); String dataRegionId = tsFileManager.getDataRegionId(); ICrossSpaceSelector crossSpaceCompactionSelector = From 67f02336ec8de49ae738ffa63b00fbb5130ca07e Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 6 May 2024 19:05:34 +0800 Subject: [PATCH 4/6] decrement counter when task leave waiting queue --- .../compaction/schedule/CompactionTaskQueue.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java index 9f0a5dbbc7dd..3b38343d73ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; @@ -51,6 +52,10 @@ public AbstractCompactionTask take() throws InterruptedException { Thread.sleep(TimeUnit.SECONDS.toMillis(1)); continue; } + if (task instanceof InnerSpaceCompactionTask + && ((InnerSpaceCompactionTask) task).getSumOfCompactionCount() == 0) { + CompactionTaskManager.getInstance().getZeroLevelInnerCompactionTaskNum().decrementAndGet(); + } return task; } } @@ -75,6 +80,10 @@ private boolean prepareTask(AbstractCompactionTask task) throws InterruptedExcep } private void dropCompactionTask(AbstractCompactionTask task) { + if (task instanceof InnerSpaceCompactionTask + && ((InnerSpaceCompactionTask) task).getSumOfCompactionCount() == 0) { + CompactionTaskManager.getInstance().getZeroLevelInnerCompactionTaskNum().decrementAndGet(); + } task.resetCompactionCandidateStatusForAllSourceFiles(); task.handleTaskCleanup(); task.releaseOccupiedResources(); From 5eca9e3fb46436255296b6ab9b5fe936fc109feb Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 6 May 2024 19:20:20 +0800 Subject: [PATCH 5/6] delete old code --- .../compaction/execute/task/InnerSpaceCompactionTask.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index f33330e63dec..d0bcbfad439f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -33,7 +33,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier; -import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator; @@ -511,13 +510,6 @@ public int getProcessedFileNum() { return selectedTsFileResourceList.size(); } - @Override - public void handleTaskCleanup() { - if (sumOfCompactionCount == 0) { - CompactionTaskManager.getInstance().getZeroLevelInnerCompactionTaskNum().decrementAndGet(); - } - } - @Override protected void createSummary() { if (performer instanceof FastCompactionPerformer) { From ce2659662d5a7cae3fa210b68a2d9ddfce147382 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 6 May 2024 19:27:15 +0800 Subject: [PATCH 6/6] add poll last hook --- .../compaction/schedule/CompactionTaskManager.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java index e89e84e090ed..e9d4dda9187a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java @@ -122,6 +122,15 @@ public synchronized void start() { candidateCompactionTaskQueue.regsitPollLastHook( AbstractCompactionTask::resetCompactionCandidateStatusForAllSourceFiles); candidateCompactionTaskQueue.regsitPollLastHook(AbstractCompactionTask::handleTaskCleanup); + candidateCompactionTaskQueue.regsitPollLastHook( + task -> { + if (task instanceof InnerSpaceCompactionTask + && ((InnerSpaceCompactionTask) task).getSumOfCompactionCount() == 0) { + CompactionTaskManager.getInstance() + .getZeroLevelInnerCompactionTaskNum() + .decrementAndGet(); + } + }); init = true; } logger.info("Compaction task manager started.");