Skip to content

Commit

Permalink
srm: More Scheduler refactoring
Browse files Browse the repository at this point in the history
Target: trunk
Require-notes: no
Require-book: no
Acked-by: Dmitry Litvintsev <litvinse@fnal.gov>
Acked-by: Paul Millar <paul.millar@desy.de>
Patch: http://rb.dcache.org/r/6209/
  • Loading branch information
gbehrmann committed Nov 15, 2013
1 parent 28ff5c5 commit 59a6828
Showing 1 changed file with 16 additions and 27 deletions.
Expand Up @@ -171,11 +171,11 @@ public final class Scheduler implements Runnable

// this will contain the number of
private final long timeStamp = System.currentTimeMillis();
private long queuesUpdateMaxWait = 60 * 1000; // one
private long queuesUpdateMaxWait = 60 * 1000;

private static volatile Map<String, Scheduler> schedulers = ImmutableMap.of();

private volatile JobPriorityPolicyInterface jobAppraiser;
private JobPriorityPolicyInterface jobAppraiser;
private String priorityPolicyPlugin;

public static Scheduler getScheduler(String id)
Expand Down Expand Up @@ -243,35 +243,27 @@ public void schedule(Job job)
// fall through
case RESTORED:
if (getTotalTQueued() >= getMaxThreadQueueSize()) {
job.setState(State.FAILED, "Roo many jobs in the queue.");
job.setState(State.FAILED, "Too many jobs in the queue.");
return;
}
// now we try to add the job to the thread queue without blocking
job.setState(State.TQUEUED, "Queued for execution.");
if (threadQueue(job)) {
// offer returned true -> successfully added job to the queue
return;
if (!threadQueue(job)) {
LOGGER.warn("Thread queue limit reached.");
job.setState(State.FAILED, "Site busy: too many queued requests.");
}

// if offer returned false or if it threw an exception,
// the job could not be scheduled, so it fails
job.setState(State.FAILED, "Site busy: too many queued requests.");
break;
case ASYNCWAIT:
case RETRYWAIT:
case RUNNINGWITHOUTTHREAD:
// put blocks if priorityThreadQueue is full
// this will block the retry timer (or the event handler)
LOGGER.trace("putting job in a priority thread queue, which might block, job#{}", job.getId());
LOGGER.trace("putting job in a priority thread queue, job#{}", job.getId());
job.setState(State.PRIORITYTQUEUED, "queued for execution");
if (!priorityQueue(job)) {
LOGGER.warn("Thread queue limit reached.");
LOGGER.warn("Priority thread queue limit reached.");
job.setState(State.FAILED, "Site busy: too many queued requests.");
}
LOGGER.trace("done putting job in a priority thread queue");
break;
default:
throw new IllegalStateException("can not schedule job in state =" + job.getState());
throw new IllegalStateException("cannot schedule job in state =" + job.getState());
}
} finally {
job.wunlock();
Expand Down Expand Up @@ -656,16 +648,15 @@ public int calculateValue(

public void tryToReadyJob(Job job)
{
if (getTotalReady() >= maxReadyJobs) {
if (getTotalReady() >= getMaxReadyJobs()) {
// can't add any more jobs to ready state
return;
}
try {
job.setState(State.READY, "Execution succeeded.");
} catch (IllegalStateTransition ist) {
//nothing more we can do here
LOGGER.error("Illegal State Transition : " + ist.getMessage());

LOGGER.error("Illegal State Transition : {}", ist.getMessage());
}
}

Expand Down Expand Up @@ -754,12 +745,10 @@ private boolean readyQueue(Job job)
return false;
}

private void jobAddedToQueue()
private synchronized void jobAddedToQueue()
{
synchronized (this) {
notifyAll();
notified = true;
}
notified = true;
notifyAll();
}

@Override
Expand Down Expand Up @@ -802,8 +791,8 @@ public void run()

private class JobWrapper implements Runnable
{
Job job;
boolean started;
private final Job job;
private boolean started;

public JobWrapper(Job job)
{
Expand Down

0 comments on commit 59a6828

Please sign in to comment.