Skip to content

Commit

Permalink
srm: Refactor Scheduler#schedule method
Browse files Browse the repository at this point in the history
Motivation:

The Scheduler#schedule has had a somewhat ambigious role, where it
was both used to queue a request and to submit a request for
execution, depending on the state.

Modification:

Split the method into a queue method and an execute method. queue
puts the request into the QUEUED state and places it on the queue,
while EXECUTE doesn't affect the state and submits the request
for execution on the thread pool.

Calling code is updated accordingly. This also affects that code
to restore files from the database. In particular the RESTORED
state is never used as a state for requests now - it is used
as a virtual source state for the state change notification
issued when restoring a request from the database upon restart.

Result:

Minor observable differences in the event history of requests
during restart.

Target: trunk
Require-notes: no
Require-book: no
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Patch: https://rb.dcache.org/r/8552/
  • Loading branch information
gbehrmann committed Sep 21, 2015
1 parent 5783a1f commit 99bdf4b
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 103 deletions.
Expand Up @@ -352,14 +352,9 @@ protected void onSrmRestartForActiveJob(Scheduler scheduler)
State state = getState();

switch (state) {
case UNSCHEDULED:
case RETRYWAIT:
case QUEUED:
case INPROGRESS:
setState(State.RESTORED, "Rescheduled after SRM service restart");
// Fall through
case RESTORED:
scheduler.schedule(this);
addHistoryEvent("Rescheduled after SRM service restart.");
scheduler.queue(this);
break;

// All other states are invalid.
Expand Down Expand Up @@ -598,9 +593,7 @@ public void run()
}
} catch (SRMInternalErrorException e) {
if (!fr.getState().isFinal()) {
Scheduler<?> scheduler =
Scheduler.getScheduler(fr.getSchedulerId());
scheduler.schedule(fr);
Scheduler.getScheduler(fr.getSchedulerId()).execute(fr);
}
} catch (SRMException e) {
fr.setStateAndStatusCode(
Expand Down
Expand Up @@ -1036,12 +1036,7 @@ public void run()
LOG.debug("PutCallbacks success for file {}", fr.getDestinationSurl());
fr.setDestinationFileId(fileId);
fr.saveJob(true);
Scheduler scheduler = Scheduler.getScheduler(fr.getSchedulerId());
try {
scheduler.schedule(fr);
} catch (Exception ie) {
LOG.error(ie.toString());
}
Scheduler.getScheduler(fr.getSchedulerId()).execute(fr);
}
} catch (SRMException e) {
fr.setStateAndStatusCode(
Expand Down Expand Up @@ -1099,11 +1094,7 @@ public void copyFailed(SRMException e)
State state = copyFileRequest.getState();
Scheduler scheduler = Scheduler.getScheduler(copyFileRequest.getSchedulerId());
if (!state.isFinal() && scheduler != null) {
try {
scheduler.schedule(copyFileRequest);
} catch (IllegalStateTransition ie) {
LOG.error(ie.toString());
}
scheduler.execute(copyFileRequest);
}
}
}
Expand Down
Expand Up @@ -629,7 +629,7 @@ public void run()
fr.setFileId(pin.fileMetaData.fileId);
fr.setFileMetaData(pin.fileMetaData);
fr.setPinId(pin.pinId);
Scheduler.getScheduler(fr.getSchedulerId()).schedule(fr);
Scheduler.getScheduler(fr.getSchedulerId()).execute(fr);
}
} catch (SRMException e) {
fr.setStateAndStatusCode(State.FAILED,
Expand Down
33 changes: 14 additions & 19 deletions modules/srm-server/src/main/java/org/dcache/srm/request/Job.java
Expand Up @@ -332,8 +332,7 @@ private boolean isValidTransition(State currentState, State newState)
|| newState == State.RETRYWAIT
|| newState == State.RQUEUED
|| newState == State.READY
|| newState == State.DONE
|| newState == State.RESTORED;
|| newState == State.DONE;
case RETRYWAIT:
return newState == State.CANCELED
|| newState == State.FAILED
Expand Down Expand Up @@ -958,7 +957,7 @@ public void scheduleWith(Scheduler scheduler) throws InterruptedException,
"] has state " + state + "(not UNSCHEDULED)");
}
setScheduler(scheduler.getId(), scheduler.getTimestamp());
scheduler.schedule(this);
scheduler.queue(this);
} finally {
wunlock();
}
Expand Down Expand Up @@ -1041,22 +1040,21 @@ public void onSrmRestart(Scheduler scheduler, boolean shouldFailJobs)
}

switch (state) {
// Pending jobs were never worked on before the SRM restart; we
// simply schedule them now.
case QUEUED:
setState(State.UNSCHEDULED, "Restarting request.");
scheduler.schedule(this);
break;

// Unscheduled or queued jobs were never worked on before the SRM restart; we
// simply queue them now.
case UNSCHEDULED:
scheduler.schedule(this);
case QUEUED:
case RETRYWAIT:
addHistoryEvent("Restored from database.");
scheduler.queue(this);
break;

// Jobs in RQUEUED or READY states require no further processing.
// We can leave them for the client to discover the TURL or place
// the job into the DONE state, respectively.
case READY:
// Jobs in RQUEUED, READY or TRANSFERRING states require no further
// processing. We can leave them for the client to discover the TURL
// or place the job into the DONE state, respectively.
case RQUEUED:
case READY:
case TRANSFERRING:
break;

// Other job states need request-specific recovery process.
Expand All @@ -1073,10 +1071,7 @@ public void onSrmRestart(Scheduler scheduler, boolean shouldFailJobs)

/**
* Provide request-specific recovery for jobs that were being processed
* when SRM was restarted. This corresponds to jobs in states:
*
* PRIORITYTQUEUED, TQUEUED, RUNNING, RETRYWAIT, ASYNCWAIT,
* TRANSFERRING, RESTORED, RUNNINGWITHOUTTHREAD.
* when SRM was restarted. This corresponds to jobs in state INPROGRESS.
*
* In general, such jobs require some request-specific procedure.
* Subclasses are expected to override this method to provide this
Expand Down
Expand Up @@ -599,12 +599,7 @@ public void run()
logger.trace("Storage info arrived for file {}.", fr.getSurlString());
fr.setFileId(fileId);
fr.saveJob(true);
Scheduler<?> scheduler = Scheduler.getScheduler(fr.getSchedulerId());
try {
scheduler.schedule(fr);
} catch (Exception ie) {
logger.error(ie.toString());
}
Scheduler.getScheduler(fr.getSchedulerId()).execute(fr);
break;
case CANCELED:
case FAILED:
Expand Down
Expand Up @@ -77,7 +77,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down
Expand Up @@ -517,7 +517,7 @@ public void schedulePendingJobs(Scheduler scheduler)
" WHERE SCHEDULERID is NULL and State=" + State.UNSCHEDULED.getStateId();
for (Long ID : jdbcTemplate.queryForList(sql, Long.class)) {
try {
scheduler.schedule(Job.getJob(ID, jobType));
scheduler.queue(Job.getJob(ID, jobType));
} catch (SRMInvalidRequestException ire) {
logger.error(ire.toString());
}
Expand Down
Expand Up @@ -84,7 +84,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;

import org.dcache.srm.SRMAuthorizationException;
import org.dcache.srm.SRMException;
Expand All @@ -99,7 +98,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.srm.v2_2.TStatusCode;

import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Strings.*;
import static com.google.common.base.Strings.padEnd;
import static com.google.common.base.Strings.repeat;

public class Scheduler <T extends Job>
{
Expand Down Expand Up @@ -206,47 +206,47 @@ public void stop()
pooledExecutor.shutdownNow();
}


public void schedule(Job job)
throws IllegalStateException, IllegalArgumentException,
IllegalStateTransition
/**
* Places a job in the queue.
*
* The job is moved to the QUEUED state. The job will be moved to the INPROGRESS state
* in accordance with the scheduling strategy of this scheduler.
*
* This action is subject to request limits and may cause the job to be failed
* immediately if the limits are exceeded.
*/
public void queue(Job job) throws IllegalStateTransition
{
checkState(running, "scheduler is not running");
checkState(running, "Scheduler is not running");
checkOwnership(job);
LOGGER.trace("schedule is called for job with id={} in state={}", job.getId(), job.getState());
LOGGER.trace("queue is called for job with id={} in state={}", job.getId(), job.getState());

job.wlock();
try {
switch (job.getState()) {
case UNSCHEDULED:
case RETRYWAIT:
case RESTORED:
if (threadQueue(job)) {
job.setState(State.QUEUED, "Queued.");
} else {
LOGGER.warn("Maximum request limit reached.");
job.setState(State.FAILED, "Site busy: Too many queued requests.");
}
break;
case QUEUED:
job.setState(State.INPROGRESS, "In progress.");
// fall through
case INPROGRESS:
LOGGER.trace("putting job in a thread queue, job#{}", job.getId());
try {
pooledExecutor.execute(new JobWrapper(job));
} catch (RejectedExecutionException e) {
job.setState(State.FAILED, "Site busy: Too many queued requests.");
}
break;
default:
throw new IllegalStateException("cannot schedule job in state =" + job.getState());
if (job.getState().isFinal()) {
throw new IllegalStateException("Cannot queue job in state " + job.getState());
} else if (threadQueue(job)) {
job.setState(State.QUEUED, "Queued.");
} else {
LOGGER.warn("Maximum request limit reached.");
job.setState(State.FAILED, "Site busy: Too many queued requests.");
}
} finally {
job.wunlock();
}
}

/**
* Requests the job's run method to be called from this scheduler's thread pool.
*/
public void execute(Job job)
{
checkState(running, "Scheduler is not running");
checkOwnership(job);
LOGGER.trace("execute is called for job with id={} in state={}", job.getId(), job.getState());
pooledExecutor.execute(new JobWrapper(job));
}

public synchronized int getTotalQueued()
{
return jobs.get(State.QUEUED).size();
Expand Down Expand Up @@ -375,7 +375,8 @@ private void updateThreadQueue()
try {
if (job.getState() == org.dcache.srm.scheduler.State.QUEUED) {
try {
schedule(job);
job.setState(org.dcache.srm.scheduler.State.INPROGRESS, "In progress.");
execute(job);
} catch (IllegalStateTransition e) {
LOGGER.error("Bug detected.", e);
try {
Expand Down Expand Up @@ -473,7 +474,7 @@ public void run()
job.wlock();
try {
if (job.getState() == State.RETRYWAIT) {
schedule(job);
queue(job);
}
} catch (IllegalStateTransition e) {
LOGGER.error("Bug detected.", e);
Expand Down Expand Up @@ -691,26 +692,6 @@ public synchronized void printReadyQueue(StringBuilder sb)
printQueue(sb, jobs.get(State.RQUEUED));
}

/**
* Getter for property queuesUpdateMaxWait.
*
* @return Value of property queuesUpdateMaxWait.
*/
public synchronized long getQueuesUpdateMaxWait()
{
return queuesUpdateMaxWait;
}

/**
* Setter for property queuesUpdateMaxWait.
*
* @param queuesUpdateMaxWait New value of property queuesUpdateMaxWait.
*/
public synchronized void setQueuesUpdateMaxWait(long queuesUpdateMaxWait)
{
this.queuesUpdateMaxWait = queuesUpdateMaxWait;
}

public Class<T> getType()
{
return type;
Expand Down

0 comments on commit 99bdf4b

Please sign in to comment.