From 76810870801196ffe7323eddbb950edbbada242a Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Tue, 24 Sep 2019 11:46:19 -0700 Subject: [PATCH] Fix tracking of finished jobs within JobMaster After some investigation it was found that the JobMaster was not properly tracking finished jobs due to discrepancies in the implementation of the `compareTo` and `equals` methods of `JobInfo`. This led to jobs being removed from the ConcurrentSkipListSet without ever actually calling the `remove` function. The resulting behavior is that we would see the master hit the maximum job capacity very frequently as there would be un-flushable jobs in the `mIdToCoordinators` map. This change fixes the original issue by altering the underlying structure used to keep track of the finished jobs and by making more guarantees about how the status of jobs may change. First, with this change we no longer allow a JobInfo to change states after it has been marked with a state where `JobInfo#isFinished` returns true. This means all jobs states should now follow a DAG (RUNNING is an optional state as a job may move from CREATED to any one of the finished states): ``` CREATED -> (RUNNING) -> [FAILED|CANCELLED|COMPLETED] ``` Second, instead of utilizing a ConcurrentSkipListSet I've opted for a simpler concurrent structure; the LinkedBlockingQueue. Using a FIFO queue with the guarantee that once jobs are finished they cannot change states, we gain the opportunity to use a simple queue that has O(1) offer/poll operations. With this approach jobs that are marked as finished may get added in different orders depending on how the internal locks within the queue operate. This implementation no longer strictly guarantees that objects in the queue are ordered by the lastStatusChangeMs time. The amount of time difference between concurrent jobs being added should not be so great that leaves jobs such that the retention time severely affects the amount of jobs that can be evicted. This implementation introduces the JobTracker class which is now responsible for adding and evicting jobs from the job master. It encapsulates what was the mIdToCoordinator map, and also houses the FIFO queue that used to be the mFinishedJobs set. Closes #9874 pr-link: Alluxio/alluxio#9934 change-id: cid-c4495dfd0a353e90a154b29c640f15b66f3a4208 --- .../main/java/alluxio/conf/PropertyKey.java | 10 + .../main/java/alluxio/job/meta/JobInfo.java | 54 +++- .../java/alluxio/job/meta/JobInfoTest.java | 9 +- .../alluxio/master/job/JobCoordinator.java | 84 ++++-- .../java/alluxio/master/job/JobMaster.java | 110 +++----- .../java/alluxio/master/job/JobTracker.java | 240 ++++++++++++++++++ .../alluxio/master/job/JobMasterTest.java | 8 +- .../alluxio/master/job/JobTrackerTest.java | 159 ++++++++++++ ...ileOutStreamAsyncWriteIntegrationTest.java | 1 - .../replicate/ReplicateIntegrationTest.java | 10 + 10 files changed, 559 insertions(+), 126 deletions(-) create mode 100644 job/server/src/main/java/alluxio/master/job/JobTracker.java create mode 100644 job/server/src/test/java/alluxio/master/job/JobTrackerTest.java diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index 13607bd01879..7bbbc2204f30 100644 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -3525,6 +3525,14 @@ public String toString() { + "job master.") .setDefaultValue(1024) .build(); + public static final PropertyKey JOB_MASTER_FINISHED_JOB_PURGE_COUNT = + new Builder(Name.JOB_MASTER_FINISHED_JOB_PURGE_COUNT) + .setDescription("The maximum amount of jobs to purge at any single time when the job " + + "master reaches its maximum capacity. It is recommended to set this value when " + + "setting the capacity of the job master to a large ( > 10M) value. Default is -1 " + + "denoting an unlimited value") + .setDefaultValue("-1") + .build(); public static final PropertyKey JOB_MASTER_FINISHED_JOB_RETENTION_TIME = new Builder(Name.JOB_MASTER_FINISHED_JOB_RETENTION_TIME) .setDescription("The length of time the Alluxio Job Master should save information about " @@ -4369,6 +4377,8 @@ public static final class Name { // public static final String JOB_MASTER_CLIENT_THREADS = "alluxio.job.master.client.threads"; + public static final String JOB_MASTER_FINISHED_JOB_PURGE_COUNT = + "alluxio.job.master.finished.job.purge.count"; public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME = "alluxio.job.master.finished.job.retention.time"; public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity"; diff --git a/job/common/src/main/java/alluxio/job/meta/JobInfo.java b/job/common/src/main/java/alluxio/job/meta/JobInfo.java index cff8e8638612..bec01da1ee69 100644 --- a/job/common/src/main/java/alluxio/job/meta/JobInfo.java +++ b/job/common/src/main/java/alluxio/job/meta/JobInfo.java @@ -16,13 +16,14 @@ import alluxio.job.wire.TaskInfo; import alluxio.util.CommonUtils; -import com.google.common.base.Function; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -31,14 +32,14 @@ */ @ThreadSafe public final class JobInfo implements Comparable { + private String mErrorMessage; private final long mId; private final JobConfig mJobConfig; - private final Map mTaskIdToInfo; - private long mLastStatusChangeMs; - private String mErrorMessage; - private Status mStatus; - private Function mStatusChangeCallback; + private volatile long mLastStatusChangeMs; private String mResult; + private volatile Status mStatus; + private Consumer mStatusChangeCallback; + private final Map mTaskIdToInfo; /** * Creates a new instance of {@link JobInfo}. @@ -47,7 +48,7 @@ public final class JobInfo implements Comparable { * @param jobConfig the job configuration * @param statusChangeCallback the callback to invoke upon status change */ - public JobInfo(long id, JobConfig jobConfig, Function statusChangeCallback) { + public JobInfo(long id, JobConfig jobConfig, Consumer statusChangeCallback) { mId = id; mJobConfig = Preconditions.checkNotNull(jobConfig); mTaskIdToInfo = Maps.newHashMap(); @@ -60,11 +61,17 @@ public JobInfo(long id, JobConfig jobConfig, Function statusChang /** * {@inheritDoc} * - * This method orders jobs using the time their status was last modified. + * This method orders jobs using the time their status was last modified. If the status is + * equal, they are compared by jobId */ @Override public synchronized int compareTo(JobInfo other) { - return Long.compare(mLastStatusChangeMs, other.getLastStatusChangeMs()); + int res = Long.compare(mLastStatusChangeMs, other.mLastStatusChangeMs); + if (res != 0) { + return res; + } + // Order by jobId as a secondary measure + return Long.compare(mId, other.mId); } /** @@ -139,14 +146,22 @@ public synchronized List getTaskIdList() { } /** + * Sets the status of a job. + * + * A job can only move from one status to another if the job hasn't already finished. If a job + * is finished and the caller tries to change the status, this method is a no-op. + * * @param status the job status */ public synchronized void setStatus(Status status) { + if (mStatus.isFinished()) { + return; + } Status oldStatus = mStatus; mStatus = status; mLastStatusChangeMs = CommonUtils.getCurrentMs(); if (mStatusChangeCallback != null && status != oldStatus) { - mStatusChangeCallback.apply(this); + mStatusChangeCallback.accept(this); } } @@ -177,4 +192,23 @@ public synchronized String getResult() { public synchronized List getTaskInfoList() { return Lists.newArrayList(mTaskIdToInfo.values()); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof JobInfo)) { + return false; + } + + JobInfo other = (JobInfo) o; + return Objects.equal(mId, other.mId); + } + + @Override + public int hashCode() { + return Objects.hashCode(mId); + } } diff --git a/job/common/src/test/java/alluxio/job/meta/JobInfoTest.java b/job/common/src/test/java/alluxio/job/meta/JobInfoTest.java index 50b3fa9b836e..0ec22bf0b234 100644 --- a/job/common/src/test/java/alluxio/job/meta/JobInfoTest.java +++ b/job/common/src/test/java/alluxio/job/meta/JobInfoTest.java @@ -16,7 +16,6 @@ import alluxio.job.wire.Status; import alluxio.util.CommonUtils; -import com.google.common.base.Function; import org.junit.Assert; import org.junit.Test; @@ -42,13 +41,7 @@ public void compare() { public void callback() { final String result = "I was here!"; JobConfig jobConfig = new TestJobConfig("unused"); - JobInfo a = new JobInfo(0L, jobConfig, new Function() { - @Override - public Void apply(JobInfo jobInfo) { - jobInfo.setResult(result); - return null; - } - }); + JobInfo a = new JobInfo(0L, jobConfig, jobInfo -> jobInfo.setResult(result)); a.setStatus(Status.COMPLETED); Assert.assertEquals(result, a.getResult()); } diff --git a/job/server/src/main/java/alluxio/master/job/JobCoordinator.java b/job/server/src/main/java/alluxio/master/job/JobCoordinator.java index 8a8ada1b01ff..47b9c47e7b62 100644 --- a/job/server/src/main/java/alluxio/master/job/JobCoordinator.java +++ b/job/server/src/main/java/alluxio/master/job/JobCoordinator.java @@ -23,7 +23,7 @@ import alluxio.master.job.command.CommandManager; import alluxio.wire.WorkerInfo; -import com.google.common.base.Function; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.slf4j.Logger; @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -69,7 +70,7 @@ public final class JobCoordinator { private JobCoordinator(CommandManager commandManager, JobServerContext jobServerContext, List workerInfoList, Long jobId, JobConfig jobConfig, - Function statusChangeCallback) { + Consumer statusChangeCallback) { Preconditions.checkNotNull(jobConfig); mJobServerContext = jobServerContext; mJobInfo = new JobInfo(jobId, jobConfig, statusChangeCallback); @@ -91,7 +92,7 @@ private JobCoordinator(CommandManager commandManager, JobServerContext jobServer */ public static JobCoordinator create(CommandManager commandManager, JobServerContext jobServerContext, List workerInfoList, Long jobId, - JobConfig jobConfig, Function statusChangeCallback) + JobConfig jobConfig, Consumer statusChangeCallback) throws JobDoesNotExistException { Preconditions.checkNotNull(commandManager, "commandManager"); JobCoordinator jobCoordinator = new JobCoordinator(commandManager, jobServerContext, @@ -152,11 +153,13 @@ public synchronized void cancel() { * * @param taskInfoList List of @TaskInfo instances to update */ - public synchronized void updateTasks(List taskInfoList) { - for (TaskInfo taskInfo : taskInfoList) { - mJobInfo.setTaskInfo(taskInfo.getTaskId(), taskInfo); + public void updateTasks(List taskInfoList) { + synchronized (mJobInfo) { + for (TaskInfo taskInfo : taskInfoList) { + mJobInfo.setTaskInfo(taskInfo.getTaskId(), taskInfo); + } + updateStatus(); } - updateStatus(); } /** @@ -166,14 +169,25 @@ public synchronized boolean isJobFinished() { return mJobInfo.getStatus().isFinished(); } + /** + * @return the id corresponding to the job + */ + public long getJobId() { + return mJobInfo.getId(); + } + /** * Sets the job as failed with given error message. * * @param errorMessage Error message to set for failure */ - public synchronized void setJobAsFailed(String errorMessage) { - mJobInfo.setStatus(Status.FAILED); - mJobInfo.setErrorMessage(errorMessage); + public void setJobAsFailed(String errorMessage) { + synchronized (mJobInfo) { + if (!mJobInfo.getStatus().isFinished()) { + mJobInfo.setStatus(Status.FAILED); + mJobInfo.setErrorMessage(errorMessage); + } + } } /** @@ -181,18 +195,22 @@ public synchronized void setJobAsFailed(String errorMessage) { * * @param workerId the id of the worker to fail tasks for */ - public synchronized void failTasksForWorker(long workerId) { - Integer taskId = mWorkerIdToTaskId.get(workerId); - if (taskId == null) { - return; - } - TaskInfo taskInfo = mJobInfo.getTaskInfo(taskId); - if (taskInfo.getStatus().isFinished()) { - return; + public void failTasksForWorker(long workerId) { + synchronized (mJobInfo) { + Integer taskId = mWorkerIdToTaskId.get(workerId); + if (taskId == null) { + return; + } + TaskInfo taskInfo = mJobInfo.getTaskInfo(taskId); + if (taskInfo.getStatus().isFinished()) { + return; + } + if (!mJobInfo.getStatus().isFinished()) { + taskInfo.setStatus(Status.FAILED); + taskInfo.setErrorMessage("Job worker was lost before the task could complete"); + updateStatus(); + } } - taskInfo.setStatus(Status.FAILED); - taskInfo.setErrorMessage("Job worker was lost before the task could complete"); - updateStatus(); } /** @@ -206,7 +224,7 @@ public synchronized alluxio.job.wire.JobInfo getJobInfoWire() { * Updates the status of the job. When all the tasks are completed, run the join method in the * definition. */ - private void updateStatus() { + private synchronized void updateStatus() { int completed = 0; List taskInfoList = mJobInfo.getTaskInfoList(); for (TaskInfo info : taskInfoList) { @@ -244,8 +262,9 @@ private void updateStatus() { // all the tasks completed, run join try { - mJobInfo.setStatus(Status.COMPLETED); + // Try to join first, so that in case of failure we don't move to a completed state yet mJobInfo.setResult(join(taskInfoList)); + mJobInfo.setStatus(Status.COMPLETED); } catch (Exception e) { mJobInfo.setStatus(Status.FAILED); mJobInfo.setErrorMessage(e.getMessage()); @@ -270,4 +289,23 @@ private String join(List taskInfoList) throws Exception { } return definition.join(mJobInfo.getJobConfig(), taskResults); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof JobCoordinator)) { + return false; + } + + JobCoordinator other = (JobCoordinator) o; + return Objects.equal(mJobInfo, other.mJobInfo); + } + + @Override + public int hashCode() { + return Objects.hashCode(mJobInfo); + } } diff --git a/job/server/src/main/java/alluxio/master/job/JobMaster.java b/job/server/src/main/java/alluxio/master/job/JobMaster.java index 6c7c41fc4c94..f53955a8fa84 100644 --- a/job/server/src/main/java/alluxio/master/job/JobMaster.java +++ b/job/server/src/main/java/alluxio/master/job/JobMaster.java @@ -31,10 +31,7 @@ import alluxio.heartbeat.HeartbeatThread; import alluxio.job.JobConfig; import alluxio.job.JobServerContext; -import alluxio.job.meta.JobIdGenerator; -import alluxio.job.meta.JobInfo; import alluxio.job.meta.MasterWorkerInfo; -import alluxio.job.wire.Status; import alluxio.job.wire.TaskInfo; import alluxio.master.AbstractMaster; import alluxio.master.MasterContext; @@ -58,12 +55,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.SortedSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -75,6 +68,20 @@ @ThreadSafe public final class JobMaster extends AbstractMaster implements NoopJournaled { private static final Logger LOG = LoggerFactory.getLogger(JobMaster.class); + + /** + * The total number of jobs that the JobMaster may run at any moment. + */ + private static final long JOB_CAPACITY = + ServerConfiguration.getLong(PropertyKey.JOB_MASTER_JOB_CAPACITY); + /** + * The max number of jobs to purge when the master reaches maximum capacity. + */ + private static final long MAX_PURGE_COUNT = + ServerConfiguration.getLong(PropertyKey.JOB_MASTER_FINISHED_JOB_PURGE_COUNT); + /** + * The minimum amount of time to retain finished jobs. + */ private static final long RETENTION_MS = ServerConfiguration.getMs(PropertyKey.JOB_MASTER_FINISHED_JOB_RETENTION_TIME); @@ -100,11 +107,6 @@ public WorkerNetAddress getFieldValue(MasterWorkerInfo o) { */ private final JobServerContext mJobServerContext; - /** - * The total number of jobs that the JobMaster may run at any moment. - */ - private final long mCapacity = ServerConfiguration.getLong(PropertyKey.JOB_MASTER_JOB_CAPACITY); - /** * All worker information. Access must be controlled on mWorkers using the RW lock(mWorkerRWLock). */ @@ -121,52 +123,38 @@ public WorkerNetAddress getFieldValue(MasterWorkerInfo o) { */ private final AtomicLong mNextWorkerId = new AtomicLong(CommonUtils.getCurrentMs()); - /** - * Used to generate Id for new jobs. - */ - private final JobIdGenerator mJobIdGenerator; - /** * Manager for worker tasks. */ private final CommandManager mCommandManager; /** - * Used to store JobCoordinator instances per job Id. - * This member is accessed concurrently and its instance type is ConcurrentHashMap. - */ - private final Map mIdToJobCoordinator; - - /** - * Used to keep track of finished jobs that are still within retention policy. - * This member is accessed concurrently and its instance type is ConcurrentSkipListSet. + * Manager for adding and removing jobs. */ - private final SortedSet mFinishedJobs; + private final JobTracker mTracker; /** * Creates a new instance of {@link JobMaster}. * * @param masterContext the context for Alluxio master - * @param filesystem the Alluxio filesystem client the job master uses to communicate - * @param fsContext the filesystem client's underlying context - * @param ufsManager the ufs manager + * @param filesystem the Alluxio filesystem client the job master uses to communicate + * @param fsContext the filesystem client's underlying context + * @param ufsManager the ufs manager */ public JobMaster(MasterContext masterContext, FileSystem filesystem, FileSystemContext fsContext, UfsManager ufsManager) { super(masterContext, new SystemClock(), ExecutorServiceFactories.cachedThreadPool(Constants.JOB_MASTER_NAME)); mJobServerContext = new JobServerContext(filesystem, fsContext, ufsManager); - mJobIdGenerator = new JobIdGenerator(); mCommandManager = new CommandManager(); - mIdToJobCoordinator = new ConcurrentHashMap<>(); - mFinishedJobs = new ConcurrentSkipListSet<>(); + mTracker = new JobTracker(JOB_CAPACITY, RETENTION_MS, MAX_PURGE_COUNT); } @Override public void start(Boolean isLeader) throws IOException { super.start(isLeader); // Fail any jobs that were still running when the last job master stopped. - for (JobCoordinator jobCoordinator : mIdToJobCoordinator.values()) { + for (JobCoordinator jobCoordinator : mTracker.coordinators()) { if (!jobCoordinator.isJobFinished()) { jobCoordinator.setJobAsFailed("Job failed: Job master shut down during execution"); } @@ -200,8 +188,7 @@ public String getName() { * * @param jobConfig the job configuration * @return the job id tracking the progress - * - * @throws JobDoesNotExistException when the job doesn't exist + * @throws JobDoesNotExistException when the job doesn't exist * @throws ResourceExhaustedException if the job master is too busy to run the job */ public synchronized long run(JobConfig jobConfig) @@ -212,45 +199,8 @@ public synchronized long run(JobConfig jobConfig) Context forkedCtx = Context.current().fork(); Context prevCtx = forkedCtx.attach(); try { - if (mIdToJobCoordinator.size() == mCapacity) { - if (mFinishedJobs.isEmpty()) { - // The job master is at full capacity and no job has finished. - throw new ResourceExhaustedException( - ExceptionMessage.JOB_MASTER_FULL_CAPACITY.getMessage(mCapacity)); - } - // Discard old jobs that have completion time beyond retention policy - Iterator jobIterator = mFinishedJobs.iterator(); - // Used to denote whether space could be reserved for the new job - // It's 'true' if job master is at full capacity - boolean isFull = true; - while (jobIterator.hasNext()) { - JobInfo oldestJob = jobIterator.next(); - long completedBeforeMs = CommonUtils.getCurrentMs() - oldestJob.getLastStatusChangeMs(); - if (completedBeforeMs < RETENTION_MS) { - // mFinishedJobs is sorted. Can't iterate to a job within retention policy - break; - } - jobIterator.remove(); - mIdToJobCoordinator.remove(oldestJob.getId()); - isFull = false; - } - if (isFull) { - throw new ResourceExhaustedException( - ExceptionMessage.JOB_MASTER_FULL_CAPACITY.getMessage(mCapacity)); - } - } - long jobId = mJobIdGenerator.getNewJobId(); - JobCoordinator jobCoordinator = JobCoordinator.create(mCommandManager, mJobServerContext, - getWorkerInfoList(), jobId, jobConfig, (jobInfo) -> { - Status status = jobInfo.getStatus(); - mFinishedJobs.remove(jobInfo); - if (status.isFinished()) { - mFinishedJobs.add(jobInfo); - } - return null; - }); - mIdToJobCoordinator.put(jobId, jobCoordinator); - return jobId; + return mTracker.addJob(jobConfig, mCommandManager, mJobServerContext, + getWorkerInfoList()); } finally { forkedCtx.detach(prevCtx); } @@ -263,7 +213,7 @@ public synchronized long run(JobConfig jobConfig) * @throws JobDoesNotExistException when the job does not exist */ public void cancel(long jobId) throws JobDoesNotExistException { - JobCoordinator jobCoordinator = mIdToJobCoordinator.get(jobId); + JobCoordinator jobCoordinator = mTracker.getCoordinator(jobId); if (jobCoordinator == null) { throw new JobDoesNotExistException(ExceptionMessage.JOB_DOES_NOT_EXIST.getMessage(jobId)); } @@ -274,7 +224,7 @@ public void cancel(long jobId) throws JobDoesNotExistException { * @return list all the job ids */ public List list() { - return Lists.newArrayList(mIdToJobCoordinator.keySet()); + return Lists.newArrayList(mTracker.jobs()); } /** @@ -285,7 +235,7 @@ public List list() { * @throws JobDoesNotExistException if the job does not exist */ public alluxio.job.wire.JobInfo getStatus(long jobId) throws JobDoesNotExistException { - JobCoordinator jobCoordinator = mIdToJobCoordinator.get(jobId); + JobCoordinator jobCoordinator = mTracker.getCoordinator(jobId); if (jobCoordinator == null) { throw new JobDoesNotExistException(ExceptionMessage.JOB_DOES_NOT_EXIST.getMessage(jobId)); } @@ -310,7 +260,7 @@ public long registerWorker(WorkerNetAddress workerNetAddress) { + "address", workerNetAddress); MasterWorkerInfo deadWorker = mWorkers.getFirstByField(mAddressIndex, workerNetAddress); - for (JobCoordinator jobCoordinator : mIdToJobCoordinator.values()) { + for (JobCoordinator jobCoordinator : mTracker.coordinators()) { jobCoordinator.failTasksForWorker(deadWorker.getId()); } mWorkers.remove(deadWorker); @@ -367,7 +317,7 @@ public List workerHeartbeat(long workerId, List taskInfoLi taskInfosPerJob.get(taskInfo.getJobId()).add(taskInfo); } for (Map.Entry> taskInfosPair : taskInfosPerJob.entrySet()) { - JobCoordinator jobCoordinator = mIdToJobCoordinator.get(taskInfosPair.getKey()); + JobCoordinator jobCoordinator = mTracker.getCoordinator(taskInfosPair.getKey()); if (jobCoordinator != null) { jobCoordinator.updateTasks(taskInfosPair.getValue()); } @@ -397,7 +347,7 @@ public void heartbeat() { if (lastUpdate > masterWorkerTimeoutMs) { LOG.warn("The worker {} timed out after {}ms without a heartbeat!", worker, lastUpdate); lostWorkers.add(worker); - for (JobCoordinator jobCoordinator : mIdToJobCoordinator.values()) { + for (JobCoordinator jobCoordinator : mTracker.coordinators()) { jobCoordinator.failTasksForWorker(worker.getId()); } } diff --git a/job/server/src/main/java/alluxio/master/job/JobTracker.java b/job/server/src/main/java/alluxio/master/job/JobTracker.java new file mode 100644 index 000000000000..dd4f52b549d1 --- /dev/null +++ b/job/server/src/main/java/alluxio/master/job/JobTracker.java @@ -0,0 +1,240 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.master.job; + +import alluxio.conf.PropertyKey; +import alluxio.conf.ServerConfiguration; +import alluxio.exception.ExceptionMessage; +import alluxio.exception.JobDoesNotExistException; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.job.JobConfig; +import alluxio.job.JobServerContext; +import alluxio.job.meta.JobIdGenerator; +import alluxio.job.meta.JobInfo; +import alluxio.job.wire.Status; +import alluxio.master.job.command.CommandManager; +import alluxio.util.CommonUtils; +import alluxio.wire.WorkerInfo; + +import com.google.common.base.Preconditions; +import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.annotation.Nullable; + +/** + * The {@link JobTracker} is used to create, remove, and provide access to the set of currently + * scheduled or finished jobs. + * + * All modification of the status of jobs should occur within this class, + */ +@ThreadSafe +public class JobTracker { + private static final Logger LOG = LoggerFactory.getLogger(JobTracker.class); + + /** + * The maximum amount of jobs that will get purged when removing jobs from the queue. + * + * By default this value will be -1 (unlimited), however situations may arise where it is + * desirable to cap the amount of jobs removed. A scenario where the max capacity is large + * (10M+) could cause an RPC to the job master to time out due to significant time to remove a + * large number of jobs. While one RPC of 10M may seem insignificant, providing guarantees that + * RPCs won't fail on long-running production clusters is probably something that we want to + * provide. + * + * One caveat to setting this value is that there will be a lower bound on the amount of + * memory that the job master will use when it is at capacity. This may or may not be a + * reasonable trade-off to guarantee that RPCs don't time out due to job eviction. + * @see PropertyKey#JOB_MASTER_FINISHED_JOB_PURGE_COUNT + */ + private final long mMaxJobPurgeCount; + + /** The maximum amount of jobs that can be tracked at any one time. */ + private final long mCapacity; + + /** The minimum amount of time that finished jobs should be retained for. */ + private final long mRetentionMs; + + /** Used to generate Id for new jobs. */ + private final JobIdGenerator mJobIdGenerator; + + /** The main index to track jobs through their Job Id. */ + private final ConcurrentHashMap mCoordinators; + + /** A FIFO queue used to track jobs which have status {@link Status#isFinished()} as true. */ + private final LinkedBlockingQueue mFinished; + + /** + * Create a new instance of {@link JobTracker}. + * + * @param capacity the capacity of jobs that can be handled + * @param retentionMs the minimum amount of time to retain jobs + * @param maxJobPurgeCount the max amount of jobs to purge when reaching max capacity + */ + public JobTracker(long capacity, long retentionMs, long maxJobPurgeCount) { + Preconditions.checkArgument(capacity >= 0); + mCapacity = capacity; + Preconditions.checkArgument(retentionMs >= 0); + mRetentionMs = retentionMs; + mMaxJobPurgeCount = maxJobPurgeCount <= 0 ? Long.MAX_VALUE : maxJobPurgeCount; + mCoordinators = new ConcurrentHashMap<>(0, + 0.95f, ServerConfiguration.getInt(PropertyKey.MASTER_RPC_EXECUTOR_PARALLELISM)); + mFinished = new LinkedBlockingQueue<>(); + mJobIdGenerator = new JobIdGenerator(); + } + + private void statusChangeCallback(JobInfo jobInfo) { + if (jobInfo == null) { + return; + } + Status status = jobInfo.getStatus(); + if (!status.isFinished()) { + return; + } + // Retry if offering to mFinished doesn't work + for (int i = 0; i < 2; i++) { + if (mFinished.offer(jobInfo)) { + return; + } + if (!removeFinished()) { + // Still couldn't add to mFinished - remove from coordinators so that it doesn't + // get stuck + LOG.warn("Failed to remove any jobs from the finished queue in status change callback"); + } + } + if (mFinished.offer(jobInfo)) { + return; + } + //remove from the coordinator map preemptively so that it doesn't get lost forever even if + // it's still within the retention time + LOG.warn("Failed to offer job id {} to finished queue, removing from tracking preemptively", + jobInfo.getId()); + } + + /** + * Gets a {@link JobCoordinator} associated with the given job Id. + * + * @param jobId the job id associated with the {@link JobCoordinator} + * @return The {@link JobCoordinator} associated with the id, or null if there is no association + */ + @Nullable + public JobCoordinator getCoordinator(long jobId) { + return mCoordinators.get(jobId); + } + + /** + * Adds a job with the given {@link JobConfig} to the job tracker. + * + * @param jobConfig configuration for the job + * @param manager command manager for jobs + * @param ctx the {@link JobServerContext} from the job master + * @param workers a list of available workers + * @return the job id of the newly added job + * @throws JobDoesNotExistException if the job type does not exist + * @throws ResourceExhaustedException if there is no more space available in the job master + */ + public synchronized long addJob(JobConfig jobConfig, CommandManager manager, + JobServerContext ctx, List workers) throws + JobDoesNotExistException, ResourceExhaustedException { + if (removeFinished()) { + long jobId = mJobIdGenerator.getNewJobId(); + JobCoordinator jobCoordinator = JobCoordinator.create(manager, ctx, + workers, jobId, jobConfig, this::statusChangeCallback); + mCoordinators.put(jobId, jobCoordinator); + return jobId; + } else { + throw new ResourceExhaustedException( + ExceptionMessage.JOB_MASTER_FULL_CAPACITY.getMessage(mCapacity)); + } + } + + /** + * Removes all finished jobs outside of the retention time from the queue. + * + * @return true if at least one job was removed, or if not at maximum capacity yet, false if at + * capacity and no job was removed + */ + private synchronized boolean removeFinished() { + boolean removedJob = false; + boolean isFull = mCoordinators.size() >= mCapacity; + if (!isFull) { + return true; + } + // coordinators at max capacity + // Try to clear the queue + if (mFinished.isEmpty()) { + // The job master is at full capacity and no job has finished. + return false; + } + int removeCount = 0; + while (!mFinished.isEmpty() && removeCount < mMaxJobPurgeCount) { + JobInfo oldestJob = mFinished.peek(); + if (oldestJob == null) { // no items to remove + break; + } + long timeSinceCompletion = CommonUtils.getCurrentMs() - oldestJob.getLastStatusChangeMs(); + // Once inserted into mFinished, the status of a job should not change - so the peek() + // /poll() methods guarantee to some extent that the job at the top of the queue is one + // of the oldest jobs. Thus, if it is still within retention time here, we likely can't + // remove anything else from the queue. Though it should be noted that it is not strictly + // guaranteed that the job at the top of is the oldest. + if (timeSinceCompletion < mRetentionMs) { + break; + } + + // Remove the top item since we know it's old enough now. + // Assumes there are no concurrent poll() operations taking place between here and the + // first peek() + if (mFinished.poll() == null) { + // This should not happen because peek() returned an element + // there should be no other concurrent operations that remove from mFinished + LOG.warn("Polling the queue resulted in a null element"); + break; + } + // Remove from the job coordinator + if (mCoordinators.remove(oldestJob.getId()) == null) { + LOG.warn("Did not find a coordinator with id {}", oldestJob.getId()); + } else { + removedJob = true; + removeCount++; + } + } + return removedJob; + } + + /** + * A collection of all job Ids currently tracked in the job master. Jobs may be in a finished + * state. + * + * @return An unmodifiable collection of all tracked Job Ids + */ + public Collection jobs() { + return Collections.unmodifiableCollection(mCoordinators.keySet()); + } + + /** + * A collection of all {@link JobCoordinator} currently tracked by the job master. May contain + * coordinators for jobs which have finished. + * + * @return An unmodifiable collection of all tracked {@link JobCoordinator} + */ + public Collection coordinators() { + return Collections.unmodifiableCollection(mCoordinators.values()); + } +} diff --git a/job/server/src/test/java/alluxio/master/job/JobMasterTest.java b/job/server/src/test/java/alluxio/master/job/JobMasterTest.java index 899839bf5333..048e22fd3d5a 100644 --- a/job/server/src/test/java/alluxio/master/job/JobMasterTest.java +++ b/job/server/src/test/java/alluxio/master/job/JobMasterTest.java @@ -26,7 +26,6 @@ import alluxio.master.journal.noop.NoopJournalSystem; import alluxio.underfs.UfsManager; -import com.google.common.collect.Maps; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -133,10 +132,11 @@ public void cancelNonExistingJob() { @Test public void cancel() throws Exception { JobCoordinator coordinator = Mockito.mock(JobCoordinator.class); - Map map = Maps.newHashMap(); long jobId = 1L; - map.put(jobId, coordinator); - Whitebox.setInternalState(mJobMaster, "mIdToJobCoordinator", map); + JobTracker tracker = new JobTracker(10, 0, -1); + ((Map) Whitebox.getInternalState(tracker, "mCoordinators")) + .put(jobId, coordinator); + Whitebox.setInternalState(mJobMaster, "mTracker", tracker); mJobMaster.cancel(jobId); Mockito.verify(coordinator).cancel(); } diff --git a/job/server/src/test/java/alluxio/master/job/JobTrackerTest.java b/job/server/src/test/java/alluxio/master/job/JobTrackerTest.java new file mode 100644 index 000000000000..13bed9121379 --- /dev/null +++ b/job/server/src/test/java/alluxio/master/job/JobTrackerTest.java @@ -0,0 +1,159 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.master.job; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.job.JobServerContext; +import alluxio.job.SleepJobConfig; +import alluxio.master.job.command.CommandManager; +import alluxio.util.FormatUtils; +import alluxio.wire.WorkerInfo; + +import com.google.common.collect.Lists; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; +import org.powermock.reflect.Whitebox; + +import java.util.List; +import java.util.Queue; + +public class JobTrackerTest { + + private static final long CAPACITY = 25; + private static final long RETENTION_TIME = 0; + private static final long PURGE_CONUT = -1; + private List mWorkers; + private JobTracker mTracker; + private CommandManager mMockCommandManager; + private JobServerContext mMockJobServerContext; + + @Rule + public ExpectedException mException = ExpectedException.none(); + + @Before + public void before() { + mTracker = new JobTracker(CAPACITY, RETENTION_TIME, PURGE_CONUT); + mMockCommandManager = new CommandManager(); + mMockJobServerContext = Mockito.mock(JobServerContext.class); + mWorkers = Lists.newArrayList(new WorkerInfo()); + } + + @Test + public void testAddJobIncreaesCount() throws Exception { + assertEquals("tracker should be empty", 0, mTracker.coordinators().size()); + addJob(100); + assertEquals("tracker should have one job", 1, mTracker.coordinators().size()); + } + + @Test + public void testAddJobUpToCapacity() throws Exception { + assertEquals("tracker should be empty", 0, mTracker.coordinators().size()); + fillJobTracker(CAPACITY); + mException.expect(ResourceExhaustedException.class); + addJob(100); + } + + @Test + public void testAddAndPurge() throws Exception { + assertEquals("tracker should be empty", 0, mTracker.coordinators().size()); + fillJobTracker(CAPACITY); + try { + addJob(100); + fail("Should have failed to add a job over capacity"); + } catch (ResourceExhaustedException e) { + // Empty on purpose + } + finishAllJobs(); + try { + addJob(100); + } catch (ResourceExhaustedException e) { + fail("Should not have failed to add a job over capacity when all are finished"); + } + } + + @Test + public void testPurgeCount() throws Exception { + JobTracker tracker = new JobTracker(10, 0, 5); + assertEquals("tracker should be empty", 0, tracker.coordinators().size()); + fillJobTracker(tracker, 10); + finishAllJobs(tracker); + addJob(tracker, 100); + assertEquals(6, tracker.coordinators().size()); + } + + @Test + public void testRetentionTime() throws Exception { + JobTracker tracker = new JobTracker(10, FormatUtils.parseTimeSize("24h"), -1); + assertEquals("tracker should be empty", 0, tracker.coordinators().size()); + fillJobTracker(tracker, 10); + finishAllJobs(tracker); + mException.expect(ResourceExhaustedException.class); + addJob(tracker, 100); + } + + @Test + public void testGetCoordinator() throws Exception { + long jobId = addJob(100); + assertNull("job id should not exist", mTracker.getCoordinator(-1)); + assertNotNull("job should exist", mTracker.getCoordinator(jobId)); + assertFalse("job should not be finished", mTracker.getCoordinator(jobId).isJobFinished()); + finishAllJobs(); + assertTrue("job should be finished", mTracker.getCoordinator(jobId).isJobFinished()); + assertEquals("finished should be of size 1", 1, ((Queue) Whitebox.getInternalState(mTracker, + "mFinished")).size()); + } + + private long addJob(int sleepTimeMs) throws Exception { + return addJob(mTracker, sleepTimeMs); + } + + private long addJob(JobTracker tracker, int sleepTimeMs) throws Exception { + return tracker.addJob(new SleepJobConfig(sleepTimeMs), mMockCommandManager, + mMockJobServerContext, mWorkers); + } + + private void fillJobTracker(long nJobs) throws Exception { + fillJobTracker(mTracker, nJobs); + } + + private void fillJobTracker(JobTracker tracker, long nJobs) throws Exception { + int initial = tracker.coordinators().size(); + for (int i = 1; i <= nJobs; i++) { + addJob(tracker, 100); + int expectedCount = initial + i; + assertEquals(String.format("tracker should have %d job(s)", expectedCount), expectedCount, + tracker.coordinators().size()); + assertEquals(String.format("tracker should have %d job ids", expectedCount), expectedCount, + tracker.jobs().size()); + } + } + + private void finishAllJobs() { + // Put all jobs in a failed state + finishAllJobs(mTracker); + } + + private void finishAllJobs(JobTracker tracker) { + // Put all jobs in a failed state + tracker.coordinators().forEach(c -> c.setJobAsFailed("failed for test")); + } +} diff --git a/tests/src/test/java/alluxio/client/fs/FileOutStreamAsyncWriteIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/FileOutStreamAsyncWriteIntegrationTest.java index e5d73d40241e..82d8e17c7323 100644 --- a/tests/src/test/java/alluxio/client/fs/FileOutStreamAsyncWriteIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/FileOutStreamAsyncWriteIntegrationTest.java @@ -34,7 +34,6 @@ /** * Integration tests for {@link alluxio.client.file.FileOutStream} of under storage type being async * persist. - * */ public final class FileOutStreamAsyncWriteIntegrationTest extends AbstractFileOutStreamIntegrationTest { diff --git a/tests/src/test/java/alluxio/job/replicate/ReplicateIntegrationTest.java b/tests/src/test/java/alluxio/job/replicate/ReplicateIntegrationTest.java index d76fffb5151a..8a50061fa7e9 100644 --- a/tests/src/test/java/alluxio/job/replicate/ReplicateIntegrationTest.java +++ b/tests/src/test/java/alluxio/job/replicate/ReplicateIntegrationTest.java @@ -23,6 +23,7 @@ import alluxio.heartbeat.HeartbeatScheduler; import alluxio.heartbeat.ManuallyScheduleHeartbeat; import alluxio.job.JobIntegrationTest; +import alluxio.master.job.JobTracker; import alluxio.testutils.LocalAlluxioClusterResource; import alluxio.util.io.BufferUtils; import alluxio.wire.BlockInfo; @@ -32,6 +33,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.powermock.reflect.Whitebox; /** * Integration tests for {@link ReplicateDefinition}. @@ -71,8 +73,12 @@ private void createFileOutsideOfAlluxio(AlluxioURI uri) throws Exception { } @Test + @LocalAlluxioClusterResource.Config(confParams = {PropertyKey.Name.JOB_MASTER_JOB_CAPACITY, "1", + PropertyKey.Name.JOB_MASTER_FINISHED_JOB_RETENTION_TIME, "0"}) public void replicateFullBlockFromUFS() throws Exception { // run the replicate job for mBlockId1 + // hack - use a job tracker with capacity of 1 + Whitebox.setInternalState(mJobMaster, "mTracker", new JobTracker(1, 0, -1)); waitForJobToFinish(mJobMaster.run(new ReplicateConfig(TEST_URI, mBlockId1, 1))); BlockInfo blockInfo1 = AdjustJobTestUtils.getBlock(mBlockId1, mFsContext); @@ -84,8 +90,12 @@ public void replicateFullBlockFromUFS() throws Exception { } @Test + @LocalAlluxioClusterResource.Config(confParams = {PropertyKey.Name.JOB_MASTER_JOB_CAPACITY, "1", + PropertyKey.Name.JOB_MASTER_FINISHED_JOB_RETENTION_TIME, "0"}) public void replicateLastBlockFromUFS() throws Exception { // run the replicate job for mBlockId2 + // hack - use a job tracker with capacity of 1 + Whitebox.setInternalState(mJobMaster, "mTracker", new JobTracker(1, 0, -1)); waitForJobToFinish(mJobMaster.run(new ReplicateConfig(TEST_URI, mBlockId2, 1))); BlockInfo blockInfo1 = AdjustJobTestUtils.getBlock(mBlockId1, mFsContext);