diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/BringOnlineFileRequest.java b/modules/srm-server/src/main/java/org/dcache/srm/request/BringOnlineFileRequest.java index 2e4fbc6a37c..82a8aeed701 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/BringOnlineFileRequest.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/BringOnlineFileRequest.java @@ -352,14 +352,9 @@ protected void onSrmRestartForActiveJob(Scheduler scheduler) State state = getState(); switch (state) { - case UNSCHEDULED: - case RETRYWAIT: - case QUEUED: case INPROGRESS: - setState(State.RESTORED, "Rescheduled after SRM service restart"); - // Fall through - case RESTORED: - scheduler.schedule(this); + addHistoryEvent("Rescheduled after SRM service restart."); + scheduler.queue(this); break; // All other states are invalid. @@ -598,9 +593,7 @@ public void run() } } catch (SRMInternalErrorException e) { if (!fr.getState().isFinal()) { - Scheduler scheduler = - Scheduler.getScheduler(fr.getSchedulerId()); - scheduler.schedule(fr); + Scheduler.getScheduler(fr.getSchedulerId()).execute(fr); } } catch (SRMException e) { fr.setStateAndStatusCode( diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/CopyFileRequest.java b/modules/srm-server/src/main/java/org/dcache/srm/request/CopyFileRequest.java index f96f90b56c5..ce437d16808 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/CopyFileRequest.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/CopyFileRequest.java @@ -1036,12 +1036,7 @@ public void run() LOG.debug("PutCallbacks success for file {}", fr.getDestinationSurl()); fr.setDestinationFileId(fileId); fr.saveJob(true); - Scheduler scheduler = Scheduler.getScheduler(fr.getSchedulerId()); - try { - scheduler.schedule(fr); - } catch (Exception ie) { - LOG.error(ie.toString()); - } + Scheduler.getScheduler(fr.getSchedulerId()).execute(fr); } } catch (SRMException e) { fr.setStateAndStatusCode( @@ -1099,11 +1094,7 @@ public void copyFailed(SRMException e) State state = copyFileRequest.getState(); Scheduler scheduler = Scheduler.getScheduler(copyFileRequest.getSchedulerId()); if (!state.isFinal() && scheduler != null) { - try { - scheduler.schedule(copyFileRequest); - } catch (IllegalStateTransition ie) { - LOG.error(ie.toString()); - } + scheduler.execute(copyFileRequest); } } } diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/GetFileRequest.java b/modules/srm-server/src/main/java/org/dcache/srm/request/GetFileRequest.java index a8659de0d9a..cdba32391d2 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/GetFileRequest.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/GetFileRequest.java @@ -629,7 +629,7 @@ public void run() fr.setFileId(pin.fileMetaData.fileId); fr.setFileMetaData(pin.fileMetaData); fr.setPinId(pin.pinId); - Scheduler.getScheduler(fr.getSchedulerId()).schedule(fr); + Scheduler.getScheduler(fr.getSchedulerId()).execute(fr); } } catch (SRMException e) { fr.setStateAndStatusCode(State.FAILED, diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/Job.java b/modules/srm-server/src/main/java/org/dcache/srm/request/Job.java index ef68062fcc8..d373540d695 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/Job.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/Job.java @@ -332,8 +332,7 @@ private boolean isValidTransition(State currentState, State newState) || newState == State.RETRYWAIT || newState == State.RQUEUED || newState == State.READY - || newState == State.DONE - || newState == State.RESTORED; + || newState == State.DONE; case RETRYWAIT: return newState == State.CANCELED || newState == State.FAILED @@ -958,7 +957,7 @@ public void scheduleWith(Scheduler scheduler) throws InterruptedException, "] has state " + state + "(not UNSCHEDULED)"); } setScheduler(scheduler.getId(), scheduler.getTimestamp()); - scheduler.schedule(this); + scheduler.queue(this); } finally { wunlock(); } @@ -1041,22 +1040,21 @@ public void onSrmRestart(Scheduler scheduler, boolean shouldFailJobs) } switch (state) { - // Pending jobs were never worked on before the SRM restart; we - // simply schedule them now. - case QUEUED: - setState(State.UNSCHEDULED, "Restarting request."); - scheduler.schedule(this); - break; - + // Unscheduled or queued jobs were never worked on before the SRM restart; we + // simply queue them now. case UNSCHEDULED: - scheduler.schedule(this); + case QUEUED: + case RETRYWAIT: + addHistoryEvent("Restored from database."); + scheduler.queue(this); break; - // Jobs in RQUEUED or READY states require no further processing. - // We can leave them for the client to discover the TURL or place - // the job into the DONE state, respectively. - case READY: + // Jobs in RQUEUED, READY or TRANSFERRING states require no further + // processing. We can leave them for the client to discover the TURL + // or place the job into the DONE state, respectively. case RQUEUED: + case READY: + case TRANSFERRING: break; // Other job states need request-specific recovery process. @@ -1073,10 +1071,7 @@ public void onSrmRestart(Scheduler scheduler, boolean shouldFailJobs) /** * Provide request-specific recovery for jobs that were being processed - * when SRM was restarted. This corresponds to jobs in states: - * - * PRIORITYTQUEUED, TQUEUED, RUNNING, RETRYWAIT, ASYNCWAIT, - * TRANSFERRING, RESTORED, RUNNINGWITHOUTTHREAD. + * when SRM was restarted. This corresponds to jobs in state INPROGRESS. * * In general, such jobs require some request-specific procedure. * Subclasses are expected to override this method to provide this diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/PutFileRequest.java b/modules/srm-server/src/main/java/org/dcache/srm/request/PutFileRequest.java index bd849e4706b..5be9a4ca40d 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/PutFileRequest.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/PutFileRequest.java @@ -599,12 +599,7 @@ public void run() logger.trace("Storage info arrived for file {}.", fr.getSurlString()); fr.setFileId(fileId); fr.saveJob(true); - Scheduler scheduler = Scheduler.getScheduler(fr.getSchedulerId()); - try { - scheduler.schedule(fr); - } catch (Exception ie) { - logger.error(ie.toString()); - } + Scheduler.getScheduler(fr.getSchedulerId()).execute(fr); break; case CANCELED: case FAILED: diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/Request.java b/modules/srm-server/src/main/java/org/dcache/srm/request/Request.java index a8fb35de8f0..0e5d3ff69ea 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/Request.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/Request.java @@ -77,7 +77,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.dao.DataAccessException; import javax.annotation.Nonnull; import javax.annotation.Nullable; diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java index db301c593c0..4c81b19bf9e 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java @@ -517,7 +517,7 @@ public void schedulePendingJobs(Scheduler scheduler) " WHERE SCHEDULERID is NULL and State=" + State.UNSCHEDULED.getStateId(); for (Long ID : jdbcTemplate.queryForList(sql, Long.class)) { try { - scheduler.schedule(Job.getJob(ID, jobType)); + scheduler.queue(Job.getJob(ID, jobType)); } catch (SRMInvalidRequestException ire) { logger.error(ire.toString()); } diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/Scheduler.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/Scheduler.java index 0e68e3b05f9..e9423bdf567 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/Scheduler.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/Scheduler.java @@ -84,7 +84,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import org.dcache.srm.SRMAuthorizationException; import org.dcache.srm.SRMException; @@ -99,7 +98,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.srm.v2_2.TStatusCode; import static com.google.common.base.Preconditions.*; -import static com.google.common.base.Strings.*; +import static com.google.common.base.Strings.padEnd; +import static com.google.common.base.Strings.repeat; public class Scheduler { @@ -206,47 +206,47 @@ public void stop() pooledExecutor.shutdownNow(); } - - public void schedule(Job job) - throws IllegalStateException, IllegalArgumentException, - IllegalStateTransition + /** + * Places a job in the queue. + * + * The job is moved to the QUEUED state. The job will be moved to the INPROGRESS state + * in accordance with the scheduling strategy of this scheduler. + * + * This action is subject to request limits and may cause the job to be failed + * immediately if the limits are exceeded. + */ + public void queue(Job job) throws IllegalStateTransition { - checkState(running, "scheduler is not running"); + checkState(running, "Scheduler is not running"); checkOwnership(job); - LOGGER.trace("schedule is called for job with id={} in state={}", job.getId(), job.getState()); + LOGGER.trace("queue is called for job with id={} in state={}", job.getId(), job.getState()); job.wlock(); try { - switch (job.getState()) { - case UNSCHEDULED: - case RETRYWAIT: - case RESTORED: - if (threadQueue(job)) { - job.setState(State.QUEUED, "Queued."); - } else { - LOGGER.warn("Maximum request limit reached."); - job.setState(State.FAILED, "Site busy: Too many queued requests."); - } - break; - case QUEUED: - job.setState(State.INPROGRESS, "In progress."); - // fall through - case INPROGRESS: - LOGGER.trace("putting job in a thread queue, job#{}", job.getId()); - try { - pooledExecutor.execute(new JobWrapper(job)); - } catch (RejectedExecutionException e) { - job.setState(State.FAILED, "Site busy: Too many queued requests."); - } - break; - default: - throw new IllegalStateException("cannot schedule job in state =" + job.getState()); + if (job.getState().isFinal()) { + throw new IllegalStateException("Cannot queue job in state " + job.getState()); + } else if (threadQueue(job)) { + job.setState(State.QUEUED, "Queued."); + } else { + LOGGER.warn("Maximum request limit reached."); + job.setState(State.FAILED, "Site busy: Too many queued requests."); } } finally { job.wunlock(); } } + /** + * Requests the job's run method to be called from this scheduler's thread pool. + */ + public void execute(Job job) + { + checkState(running, "Scheduler is not running"); + checkOwnership(job); + LOGGER.trace("execute is called for job with id={} in state={}", job.getId(), job.getState()); + pooledExecutor.execute(new JobWrapper(job)); + } + public synchronized int getTotalQueued() { return jobs.get(State.QUEUED).size(); @@ -375,7 +375,8 @@ private void updateThreadQueue() try { if (job.getState() == org.dcache.srm.scheduler.State.QUEUED) { try { - schedule(job); + job.setState(org.dcache.srm.scheduler.State.INPROGRESS, "In progress."); + execute(job); } catch (IllegalStateTransition e) { LOGGER.error("Bug detected.", e); try { @@ -473,7 +474,7 @@ public void run() job.wlock(); try { if (job.getState() == State.RETRYWAIT) { - schedule(job); + queue(job); } } catch (IllegalStateTransition e) { LOGGER.error("Bug detected.", e); @@ -691,26 +692,6 @@ public synchronized void printReadyQueue(StringBuilder sb) printQueue(sb, jobs.get(State.RQUEUED)); } - /** - * Getter for property queuesUpdateMaxWait. - * - * @return Value of property queuesUpdateMaxWait. - */ - public synchronized long getQueuesUpdateMaxWait() - { - return queuesUpdateMaxWait; - } - - /** - * Setter for property queuesUpdateMaxWait. - * - * @param queuesUpdateMaxWait New value of property queuesUpdateMaxWait. - */ - public synchronized void setQueuesUpdateMaxWait(long queuesUpdateMaxWait) - { - this.queuesUpdateMaxWait = queuesUpdateMaxWait; - } - public Class getType() { return type;