Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,18 @@ public void abortQuery(QueryId queryId) {

@Override
public void abortFragmentInstance(FragmentInstanceId instanceId) {
// TODO(EricPai)
FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
if (task == null) {
return;
}
task.lock();
try {
clearFragmentInstanceTask(task);
} finally {
task.unlock();
}
}

@Override
public void fetchFragmentInstance(Driver instance) {}

@Override
public double getSchedulePriority(FragmentInstanceId instanceId) {
FragmentInstanceTask task = timeoutQueue.get(new FragmentInstanceTaskID(instanceId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,12 @@ public interface IFragmentInstanceScheduler {
void abortQuery(QueryId queryId);

/**
* Abort the fragment instance.
* Abort the fragment instance. If the instance is not existed, nothing will happen.
*
* @param instanceId the id of the fragment instance to be aborted.
*/
void abortFragmentInstance(FragmentInstanceId instanceId);

/** Fetch an {@link org.apache.iotdb.db.mpp.execution.Driver}. */
void fetchFragmentInstance(Driver instance);

/**
* Return the schedule priority of a fragment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,24 @@ public void testManagingFragmentInstance() {
Assert.assertTrue(manager.getQueryMap().get(queryId2).contains(task4));
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());

// Abort the query
// Abort one FragmentInstance
manager.abortFragmentInstance(instanceId1);
Mockito.verify(mockDataBlockManager, Mockito.times(1))
.forceDeregisterFragmentInstance(Mockito.any());
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(2, manager.getQueryMap().size());
Assert.assertTrue(manager.getQueryMap().containsKey(queryId));
Assert.assertEquals(3, manager.getTimeoutQueue().size());
Assert.assertEquals(3, manager.getReadyQueue().size());
Assert.assertEquals(FragmentInstanceTaskStatus.ABORTED, task1.getStatus());
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task2.getStatus());
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task3.getStatus());
Assert.assertEquals(FragmentInstanceTaskStatus.READY, task4.getStatus());

// Abort the whole query
Mockito.reset(mockDataBlockManager);
manager.abortQuery(queryId);
Mockito.verify(mockDataBlockManager, Mockito.times(3))
Mockito.verify(mockDataBlockManager, Mockito.times(2))
.forceDeregisterFragmentInstance(Mockito.any());
Assert.assertTrue(manager.getBlockedTasks().isEmpty());
Assert.assertEquals(1, manager.getQueryMap().size());
Expand Down