diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java index 777ba0c1e6c6d..2ddd981249487 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceScheduler.java @@ -166,12 +166,18 @@ public void abortQuery(QueryId queryId) { @Override public void abortFragmentInstance(FragmentInstanceId instanceId) { - // TODO(EricPai) + FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId)); + if (task == null) { + return; + } + task.lock(); + try { + clearFragmentInstanceTask(task); + } finally { + task.unlock(); + } } - @Override - public void fetchFragmentInstance(Driver instance) {} - @Override public double getSchedulePriority(FragmentInstanceId instanceId) { FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId)); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java index bc4c69ea009b1..2c597c15e7926 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceScheduler.java @@ -44,15 +44,12 @@ public interface IFragmentInstanceScheduler { void abortQuery(QueryId queryId); /** - * Abort the fragment instance. + * Abort the fragment instance. If the instance is not existed, nothing will happen. * * @param instanceId the id of the fragment instance to be aborted. */ void abortFragmentInstance(FragmentInstanceId instanceId); - /** Fetch an {@link org.apache.iotdb.db.mpp.execution.Driver}. */ - void fetchFragmentInstance(Driver instance); - /** * Return the schedule priority of a fragment. * diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java index 3cc1fe7ed39fc..233908ca44c76 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceSchedulerTest.java @@ -116,9 +116,24 @@ public void testManagingFragmentInstance() { Assert.assertTrue(manager.getQueryMap().get(queryId2).contains(task4)); Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus()); - // Abort the query + // Abort one FragmentInstance + manager.abortFragmentInstance(instanceId1); + Mockito.verify(mockDataBlockManager, Mockito.times(1)) + .forceDeregisterFragmentInstance(Mockito.any()); + Assert.assertTrue(manager.getBlockedTasks().isEmpty()); + Assert.assertEquals(2, manager.getQueryMap().size()); + Assert.assertTrue(manager.getQueryMap().containsKey(queryId)); + Assert.assertEquals(3, manager.getTimeoutQueue().size()); + Assert.assertEquals(3, manager.getReadyQueue().size()); + Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task1.getStatus()); + Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus()); + Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus()); + Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus()); + + // Abort the whole query + Mockito.reset(mockDataBlockManager); manager.abortQuery(queryId); - Mockito.verify(mockDataBlockManager, Mockito.times(3)) + Mockito.verify(mockDataBlockManager, Mockito.times(2)) .forceDeregisterFragmentInstance(Mockito.any()); Assert.assertTrue(manager.getBlockedTasks().isEmpty()); Assert.assertEquals(1, manager.getQueryMap().size());