Skip to content

Commit

Permalink
Fix JobMaster.cancel(jobId)
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #10634
orig-commit: 5064335
orig-commit-author: Bradley Yoo <bradleyyoo@gmail.com>

pr-link: #10774
change-id: cid-a9af8ad612139b0540e8de972380f8f61c00bcb1
  • Loading branch information
bradyoo authored and alluxio-bot committed Jan 21, 2020
1 parent c6d31fa commit 95a3414
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 39 deletions.
Expand Up @@ -75,11 +75,9 @@ public void run() {
Object result;
try {
result = definition.runTask(mJobConfig, mTaskArgs, mContext);
if (Thread.interrupted()) {
mTaskExecutorManager.notifyTaskCancellation(mJobId, mTaskId);
}
} catch (InterruptedException e) {
mTaskExecutorManager.notifyTaskCancellation(mJobId, mTaskId);
// Cleanup around the interruption should already have been handled by a different thread
Thread.currentThread().interrupt();
return;
} catch (Throwable t) {
if (ServerConfiguration.getBoolean(PropertyKey.DEBUG)) {
Expand Down
Expand Up @@ -105,20 +105,6 @@ public synchronized void notifyTaskFailure(long jobId, int taskId, String errorM
LOG.info("Task {} for job {} failed: {}", taskId, jobId, errorMessage);
}

/**
* Notifies the cancellation of the task.
*
* @param jobId the job id
* @param taskId the task id
*/
public synchronized void notifyTaskCancellation(long jobId, int taskId) {
Pair<Long, Integer> id = new Pair<>(jobId, taskId);
TaskInfo.Builder taskInfo = mUnfinishedTasks.get(id);
taskInfo.setStatus(Status.CANCELED);
finishTask(id);
LOG.info("Task {} for job {} canceled", taskId, jobId);
}

/**
* Executes the given task.
*
Expand Down Expand Up @@ -157,11 +143,15 @@ public synchronized void cancelTask(long jobId, int taskId) {
return;
}

LOG.info("Task {} for job {} canceled", taskId, jobId);
Future<?> future = mTaskFutures.get(id);
if (!future.cancel(true)) {
taskInfo.setStatus(Status.FAILED);
taskInfo.setErrorMessage("Failed to cancel the task");
finishTask(id);
} else {
taskInfo.setStatus(Status.CANCELED);
finishTask(id);
}
}

Expand Down
Expand Up @@ -89,25 +89,4 @@ public void runFailure() throws Exception {
Mockito.verify(mTaskExecutorManager).notifyTaskFailure(Mockito.eq(jobId), Mockito.eq(taskId),
Mockito.anyString());
}

@Test
public void runCancelation() throws Exception {
long jobId = 1;
int taskId = 2;
JobConfig jobConfig = Mockito.mock(JobConfig.class);
Serializable taskArgs = Lists.newArrayList(1);
RunTaskContext context = Mockito.mock(RunTaskContext.class);
@SuppressWarnings("unchecked")
JobDefinition<JobConfig, Serializable, Serializable> jobDefinition =
Mockito.mock(JobDefinition.class);
Mockito.when(mRegistry.getJobDefinition(jobConfig)).thenReturn(jobDefinition);
Mockito.doThrow(new InterruptedException("interupt")).when(jobDefinition).runTask(jobConfig,
taskArgs, context);

TaskExecutor executor =
new TaskExecutor(jobId, taskId, jobConfig, taskArgs, context, mTaskExecutorManager);
executor.run();

Mockito.verify(mTaskExecutorManager).notifyTaskCancellation(jobId, taskId);
}
}

0 comments on commit 95a3414

Please sign in to comment.