diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index f855fd15f0014..303fc049f9d2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -286,6 +286,21 @@ private void compactNonAlignedSeries( throw (StopReadTsFileByInterruptException) cause; } throw new IOException("[Compaction] SubCompactionTask meet errors ", e); + } catch (InterruptedException e) { + abortAllSubTasks(futures); + throw e; + } + } + } + + private void abortAllSubTasks(List> futures) { + for (Future future : futures) { + future.cancel(true); + } + for (Future future : futures) { + try { + future.get(); + } catch (Exception ignored) { } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java index bd0c01b6dc2ad..598ac2b674739 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java @@ -334,8 +334,8 @@ public synchronized Future submitSubTask(Callable subCompactionTask) /** * Abort all compactions of a database. The running compaction tasks will be returned as a list, - * the compaction threads for the database are not terminated util all the tasks in the list is - * finish. The outer caller can use function isAnyTaskInListStillRunning to determine this. + * the compaction threads for the database are interrupted immediately. The outer caller can use + * function isAnyTaskInListStillRunning to determine whether the tasks in list are finished. */ public synchronized List abortCompaction(String storageGroupName) { List compactionTaskOfCurSG = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java index b70a9f47f8918..03c0636cb388d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java @@ -99,16 +99,21 @@ public boolean processOneCompactionTask(AbstractCompactionTask task) { return taskSuccess; } - static class CompactionTaskFuture implements Future { + public static class CompactionTaskFuture implements Future { CompactionTaskSummary summary; + Thread thread; public CompactionTaskFuture(CompactionTaskSummary summary) { + this.thread = Thread.currentThread(); this.summary = summary; } @Override public boolean cancel(boolean mayInterruptIfRunning) { summary.cancel(); + if (mayInterruptIfRunning) { + this.thread.interrupt(); + } return true; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java index 556a596273e7b..df433c2fa2124 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java @@ -21,9 +21,11 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskQueue; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.comparator.DefaultCompactionTaskComparatorImpl; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -42,7 +44,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class CompactionWorkerTest { @Before @@ -261,4 +267,31 @@ public void testFailedToCheckValidInInnerTask() throws InterruptedException { thread.interrupt(); thread.join(); } + + @Test + public void testAbortCompactionTask() { + AtomicReference compactionTaskSummary = + new AtomicReference<>(); + AtomicBoolean isInterrupted = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + Phaser phaser = new Phaser(2); + Thread t = + new Thread( + () -> { + compactionTaskSummary.set( + new CompactionWorker.CompactionTaskFuture(new CompactionTaskSummary())); + phaser.arriveAndAwaitAdvance(); + try { + latch.await(); + } catch (InterruptedException ignored) { + isInterrupted.set(true); + } + phaser.arriveAndAwaitAdvance(); + }); + t.start(); + phaser.arriveAndAwaitAdvance(); + compactionTaskSummary.get().cancel(true); + phaser.arriveAndAwaitAdvance(); + Assert.assertTrue(isInterrupted.get()); + } }