Skip to content

Commit

Permalink
Fix JobMaster.cancel(jobId)
Browse files Browse the repository at this point in the history
JobMaster.cancel() previously cleaned up properly only if the JobWorker
thread was actually executing the task. But in reality, almost all of
the tasks are in the executor queue.

pr-link: #10634
change-id: cid-a9af8ad612139b0540e8de972380f8f61c00bcb1
  • Loading branch information
bradyoo authored and alluxio-bot committed Dec 16, 2019
1 parent dddf1ae commit 5064335
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 40 deletions.
Expand Up @@ -75,11 +75,9 @@ public void run() {
Serializable result;
try {
result = definition.runTask(mJobConfig, mTaskArgs, mContext);
if (Thread.interrupted()) {
mTaskExecutorManager.notifyTaskCancellation(mJobId, mTaskId);
}
} catch (InterruptedException e) {
mTaskExecutorManager.notifyTaskCancellation(mJobId, mTaskId);
// Cleanup around the interruption should already have been handled by a different thread
Thread.currentThread().interrupt();
return;
} catch (Throwable t) {
if (ServerConfiguration.getBoolean(PropertyKey.DEBUG)) {
Expand Down
Expand Up @@ -142,20 +142,6 @@ public synchronized void notifyTaskFailure(long jobId, long taskId, String error
LOG.info("Task {} for job {} failed: {}", taskId, jobId, errorMessage);
}

/**
* Notifies the cancellation of the task.
*
* @param jobId the job id
* @param taskId the task id
*/
public synchronized void notifyTaskCancellation(long jobId, long taskId) {
Pair<Long, Long> id = new Pair<>(jobId, taskId);
TaskInfo taskInfo = mUnfinishedTasks.get(id);
taskInfo.setStatus(Status.CANCELED);
finishTask(id);
LOG.info("Task {} for job {} canceled", taskId, jobId);
}

/**
* Executes the given task.
*
Expand Down Expand Up @@ -192,11 +178,15 @@ public synchronized void cancelTask(long jobId, long taskId) {
return;
}

LOG.info("Task {} for job {} canceled", taskId, jobId);
Future<?> future = mTaskFutures.get(id);
if (!future.cancel(true)) {
taskInfo.setStatus(Status.FAILED);
taskInfo.setErrorMessage("Failed to cancel the task");
finishTask(id);
} else {
taskInfo.setStatus(Status.CANCELED);
finishTask(id);
}
}

Expand Down
Expand Up @@ -89,25 +89,4 @@ public void runFailure() throws Exception {
Mockito.verify(mTaskExecutorManager).notifyTaskFailure(Mockito.eq(jobId), Mockito.eq(taskId),
Mockito.anyString());
}

@Test
public void runCancelation() throws Exception {
long jobId = 1;
long taskId = 2;
JobConfig jobConfig = Mockito.mock(JobConfig.class);
Serializable taskArgs = Lists.newArrayList(1);
RunTaskContext context = Mockito.mock(RunTaskContext.class);
@SuppressWarnings("unchecked")
PlanDefinition<JobConfig, Serializable, Serializable> planDefinition =
Mockito.mock(PlanDefinition.class);
Mockito.when(mRegistry.getJobDefinition(jobConfig)).thenReturn(planDefinition);
Mockito.doThrow(new InterruptedException("interupt")).when(planDefinition).runTask(jobConfig,
taskArgs, context);

TaskExecutor executor =
new TaskExecutor(jobId, taskId, jobConfig, taskArgs, context, mTaskExecutorManager);
executor.run();

Mockito.verify(mTaskExecutorManager).notifyTaskCancellation(jobId, taskId);
}
}
Expand Up @@ -195,7 +195,7 @@ public void throttleJobWorkerTasks() throws Exception {
JobTestUtils.waitForJobStatus(mJobMaster, jobId0,
Sets.newHashSet(Status.RUNNING, Status.COMPLETED));

long jobId1 = mJobMaster.run(new SleepJobConfig(5000));
long jobId1 = mJobMaster.run(new SleepJobConfig(50000));
JobTestUtils.waitForJobStatus(mJobMaster, jobId1, Status.RUNNING);
JobTestUtils.waitForJobStatus(mJobMaster, jobId0, Status.COMPLETED);

Expand All @@ -211,5 +211,10 @@ public void throttleJobWorkerTasks() throws Exception {

assertEquals(1, mJobMaster.getAllWorkerHealth().get(0).getTaskPoolSize());
assertEquals(1, mJobMaster.getAllWorkerHealth().get(0).getNumActiveTasks());

mJobMaster.cancel(jobId1);

JobTestUtils.waitForJobStatus(mJobMaster, jobId2, Status.COMPLETED);
JobTestUtils.waitForJobStatus(mJobMaster, jobId3, Status.COMPLETED);
}
}

0 comments on commit 5064335

Please sign in to comment.