Skip to content

Commit

Permalink
Fix tracking of finished jobs within JobMaster
Browse files Browse the repository at this point in the history
After some investigation it was found that the JobMaster was not
properly
tracking finished jobs due to discrepancies in the implementation of the
`compareTo` and `equals` methods of `JobInfo`. This led to jobs being
removed from the ConcurrentSkipListSet without ever actually calling
the `remove` function. The resulting behavior is that we would see the
master hit the maximum job capacity very frequently as there would be
un-flushable jobs in the `mIdToCoordinators` map.

This change fixes the original issue by altering the underlying
structure
used to keep track of the finished jobs and by making more guarantees
about how the status of jobs may change.

First, with this change we no longer allow a JobInfo to change states
after
it has been marked with a state where `JobInfo#isFinished` returns true.
This means all jobs states should now follow a DAG (RUNNING is an
optional state as a job may move from CREATED to any one of the finished
states):

```
CREATED -> (RUNNING) -> [FAILED|CANCELLED|COMPLETED]
```

Second, instead of utilizing a ConcurrentSkipListSet I've opted for a
simpler concurrent structure; the LinkedBlockingQueue. Using a FIFO
queue with the guarantee that once jobs are finished they cannot change
states, we gain the opportunity to use a simple queue that has O(1)
offer/poll operations. With this approach jobs that are marked as
finished
may get added in different orders depending on how the internal locks
within the queue operate. This implementation no longer strictly
guarantees
that objects in the queue are ordered by the lastStatusChangeMs time.
The
amount of time difference between concurrent jobs being added should not
be so great that leaves jobs such that the retention time severely
affects
the amount of jobs that can be evicted.

This implementation introduces the JobTracker class which is now
responsible for adding and evicting jobs from the job master. It
encapsulates
what was the mIdToCoordinator map, and also houses the FIFO queue that
used to be the mFinishedJobs set.

Closes #9874

pr-link: #9934
change-id: cid-c4495dfd0a353e90a154b29c640f15b66f3a4208
  • Loading branch information
ZacBlanco authored and alluxio-bot committed Sep 24, 2019
1 parent 6f40083 commit 7681087
Show file tree
Hide file tree
Showing 10 changed files with 559 additions and 126 deletions.
10 changes: 10 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -3525,6 +3525,14 @@ public String toString() {
+ "job master.")
.setDefaultValue(1024)
.build();
public static final PropertyKey JOB_MASTER_FINISHED_JOB_PURGE_COUNT =
new Builder(Name.JOB_MASTER_FINISHED_JOB_PURGE_COUNT)
.setDescription("The maximum amount of jobs to purge at any single time when the job "
+ "master reaches its maximum capacity. It is recommended to set this value when "
+ "setting the capacity of the job master to a large ( > 10M) value. Default is -1 "
+ "denoting an unlimited value")
.setDefaultValue("-1")
.build();
public static final PropertyKey JOB_MASTER_FINISHED_JOB_RETENTION_TIME =
new Builder(Name.JOB_MASTER_FINISHED_JOB_RETENTION_TIME)
.setDescription("The length of time the Alluxio Job Master should save information about "
Expand Down Expand Up @@ -4369,6 +4377,8 @@ public static final class Name {
//
public static final String JOB_MASTER_CLIENT_THREADS =
"alluxio.job.master.client.threads";
public static final String JOB_MASTER_FINISHED_JOB_PURGE_COUNT =
"alluxio.job.master.finished.job.purge.count";
public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME =
"alluxio.job.master.finished.job.retention.time";
public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity";
Expand Down
54 changes: 44 additions & 10 deletions job/common/src/main/java/alluxio/job/meta/JobInfo.java
Expand Up @@ -16,13 +16,14 @@
import alluxio.job.wire.TaskInfo;
import alluxio.util.CommonUtils;

import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import javax.annotation.concurrent.ThreadSafe;

Expand All @@ -31,14 +32,14 @@
*/
@ThreadSafe
public final class JobInfo implements Comparable<JobInfo> {
private String mErrorMessage;
private final long mId;
private final JobConfig mJobConfig;
private final Map<Integer, TaskInfo> mTaskIdToInfo;
private long mLastStatusChangeMs;
private String mErrorMessage;
private Status mStatus;
private Function<JobInfo, Void> mStatusChangeCallback;
private volatile long mLastStatusChangeMs;
private String mResult;
private volatile Status mStatus;
private Consumer<JobInfo> mStatusChangeCallback;
private final Map<Integer, TaskInfo> mTaskIdToInfo;

/**
* Creates a new instance of {@link JobInfo}.
Expand All @@ -47,7 +48,7 @@ public final class JobInfo implements Comparable<JobInfo> {
* @param jobConfig the job configuration
* @param statusChangeCallback the callback to invoke upon status change
*/
public JobInfo(long id, JobConfig jobConfig, Function<JobInfo, Void> statusChangeCallback) {
public JobInfo(long id, JobConfig jobConfig, Consumer<JobInfo> statusChangeCallback) {
mId = id;
mJobConfig = Preconditions.checkNotNull(jobConfig);
mTaskIdToInfo = Maps.newHashMap();
Expand All @@ -60,11 +61,17 @@ public JobInfo(long id, JobConfig jobConfig, Function<JobInfo, Void> statusChang
/**
* {@inheritDoc}
*
* This method orders jobs using the time their status was last modified.
* This method orders jobs using the time their status was last modified. If the status is
* equal, they are compared by jobId
*/
@Override
public synchronized int compareTo(JobInfo other) {
return Long.compare(mLastStatusChangeMs, other.getLastStatusChangeMs());
int res = Long.compare(mLastStatusChangeMs, other.mLastStatusChangeMs);
if (res != 0) {
return res;
}
// Order by jobId as a secondary measure
return Long.compare(mId, other.mId);
}

/**
Expand Down Expand Up @@ -139,14 +146,22 @@ public synchronized List<Integer> getTaskIdList() {
}

/**
* Sets the status of a job.
*
* A job can only move from one status to another if the job hasn't already finished. If a job
* is finished and the caller tries to change the status, this method is a no-op.
*
* @param status the job status
*/
public synchronized void setStatus(Status status) {
if (mStatus.isFinished()) {
return;
}
Status oldStatus = mStatus;
mStatus = status;
mLastStatusChangeMs = CommonUtils.getCurrentMs();
if (mStatusChangeCallback != null && status != oldStatus) {
mStatusChangeCallback.apply(this);
mStatusChangeCallback.accept(this);
}
}

Expand Down Expand Up @@ -177,4 +192,23 @@ public synchronized String getResult() {
public synchronized List<TaskInfo> getTaskInfoList() {
return Lists.newArrayList(mTaskIdToInfo.values());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (!(o instanceof JobInfo)) {
return false;
}

JobInfo other = (JobInfo) o;
return Objects.equal(mId, other.mId);
}

@Override
public int hashCode() {
return Objects.hashCode(mId);
}
}
9 changes: 1 addition & 8 deletions job/common/src/test/java/alluxio/job/meta/JobInfoTest.java
Expand Up @@ -16,7 +16,6 @@
import alluxio.job.wire.Status;
import alluxio.util.CommonUtils;

import com.google.common.base.Function;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -42,13 +41,7 @@ public void compare() {
public void callback() {
final String result = "I was here!";
JobConfig jobConfig = new TestJobConfig("unused");
JobInfo a = new JobInfo(0L, jobConfig, new Function<JobInfo, Void>() {
@Override
public Void apply(JobInfo jobInfo) {
jobInfo.setResult(result);
return null;
}
});
JobInfo a = new JobInfo(0L, jobConfig, jobInfo -> jobInfo.setResult(result));
a.setStatus(Status.COMPLETED);
Assert.assertEquals(result, a.getResult());
}
Expand Down
84 changes: 61 additions & 23 deletions job/server/src/main/java/alluxio/master/job/JobCoordinator.java
Expand Up @@ -23,7 +23,7 @@
import alluxio.master.job.command.CommandManager;
import alluxio.wire.WorkerInfo;

import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
Expand All @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Consumer;

import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -69,7 +70,7 @@ public final class JobCoordinator {

private JobCoordinator(CommandManager commandManager, JobServerContext jobServerContext,
List<WorkerInfo> workerInfoList, Long jobId, JobConfig jobConfig,
Function<JobInfo, Void> statusChangeCallback) {
Consumer<JobInfo> statusChangeCallback) {
Preconditions.checkNotNull(jobConfig);
mJobServerContext = jobServerContext;
mJobInfo = new JobInfo(jobId, jobConfig, statusChangeCallback);
Expand All @@ -91,7 +92,7 @@ private JobCoordinator(CommandManager commandManager, JobServerContext jobServer
*/
public static JobCoordinator create(CommandManager commandManager,
JobServerContext jobServerContext, List<WorkerInfo> workerInfoList, Long jobId,
JobConfig jobConfig, Function<JobInfo, Void> statusChangeCallback)
JobConfig jobConfig, Consumer<JobInfo> statusChangeCallback)
throws JobDoesNotExistException {
Preconditions.checkNotNull(commandManager, "commandManager");
JobCoordinator jobCoordinator = new JobCoordinator(commandManager, jobServerContext,
Expand Down Expand Up @@ -152,11 +153,13 @@ public synchronized void cancel() {
*
* @param taskInfoList List of @TaskInfo instances to update
*/
public synchronized void updateTasks(List<TaskInfo> taskInfoList) {
for (TaskInfo taskInfo : taskInfoList) {
mJobInfo.setTaskInfo(taskInfo.getTaskId(), taskInfo);
public void updateTasks(List<TaskInfo> taskInfoList) {
synchronized (mJobInfo) {
for (TaskInfo taskInfo : taskInfoList) {
mJobInfo.setTaskInfo(taskInfo.getTaskId(), taskInfo);
}
updateStatus();
}
updateStatus();
}

/**
Expand All @@ -166,33 +169,48 @@ public synchronized boolean isJobFinished() {
return mJobInfo.getStatus().isFinished();
}

/**
* @return the id corresponding to the job
*/
public long getJobId() {
return mJobInfo.getId();
}

/**
* Sets the job as failed with given error message.
*
* @param errorMessage Error message to set for failure
*/
public synchronized void setJobAsFailed(String errorMessage) {
mJobInfo.setStatus(Status.FAILED);
mJobInfo.setErrorMessage(errorMessage);
public void setJobAsFailed(String errorMessage) {
synchronized (mJobInfo) {
if (!mJobInfo.getStatus().isFinished()) {
mJobInfo.setStatus(Status.FAILED);
mJobInfo.setErrorMessage(errorMessage);
}
}
}

/**
* Fails any incomplete tasks being run on the specified worker.
*
* @param workerId the id of the worker to fail tasks for
*/
public synchronized void failTasksForWorker(long workerId) {
Integer taskId = mWorkerIdToTaskId.get(workerId);
if (taskId == null) {
return;
}
TaskInfo taskInfo = mJobInfo.getTaskInfo(taskId);
if (taskInfo.getStatus().isFinished()) {
return;
public void failTasksForWorker(long workerId) {
synchronized (mJobInfo) {
Integer taskId = mWorkerIdToTaskId.get(workerId);
if (taskId == null) {
return;
}
TaskInfo taskInfo = mJobInfo.getTaskInfo(taskId);
if (taskInfo.getStatus().isFinished()) {
return;
}
if (!mJobInfo.getStatus().isFinished()) {
taskInfo.setStatus(Status.FAILED);
taskInfo.setErrorMessage("Job worker was lost before the task could complete");
updateStatus();
}
}
taskInfo.setStatus(Status.FAILED);
taskInfo.setErrorMessage("Job worker was lost before the task could complete");
updateStatus();
}

/**
Expand All @@ -206,7 +224,7 @@ public synchronized alluxio.job.wire.JobInfo getJobInfoWire() {
* Updates the status of the job. When all the tasks are completed, run the join method in the
* definition.
*/
private void updateStatus() {
private synchronized void updateStatus() {
int completed = 0;
List<TaskInfo> taskInfoList = mJobInfo.getTaskInfoList();
for (TaskInfo info : taskInfoList) {
Expand Down Expand Up @@ -244,8 +262,9 @@ private void updateStatus() {

// all the tasks completed, run join
try {
mJobInfo.setStatus(Status.COMPLETED);
// Try to join first, so that in case of failure we don't move to a completed state yet
mJobInfo.setResult(join(taskInfoList));
mJobInfo.setStatus(Status.COMPLETED);
} catch (Exception e) {
mJobInfo.setStatus(Status.FAILED);
mJobInfo.setErrorMessage(e.getMessage());
Expand All @@ -270,4 +289,23 @@ private String join(List<TaskInfo> taskInfoList) throws Exception {
}
return definition.join(mJobInfo.getJobConfig(), taskResults);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (!(o instanceof JobCoordinator)) {
return false;
}

JobCoordinator other = (JobCoordinator) o;
return Objects.equal(mJobInfo, other.mJobInfo);
}

@Override
public int hashCode() {
return Objects.hashCode(mJobInfo);
}
}

0 comments on commit 7681087

Please sign in to comment.