diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java index 05566a8e00b..2f61a90b485 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java @@ -446,9 +446,9 @@ private List getJobHistoriesToSave(Job job) @Override - public void saveJob(final Job job, boolean saveifmonitoringisdesabled) throws DataAccessException + public void saveJob(final Job job, boolean force) throws DataAccessException { - if (!saveifmonitoringisdesabled && !logHistory) { + if (!force && !logHistory) { return; } diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorage.java index 2f56df36bb3..e1eaab42206 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorage.java @@ -9,7 +9,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import org.dcache.srm.request.Job; @@ -20,14 +20,14 @@ public class AsynchronousSaveJobStorage implements JobStorage private final JobStorage storage; private final ConcurrentMap states = new ConcurrentHashMap<>(); - private final ExecutorService executor; + private final Executor executor; private static enum UpdateState { - QUEUED, PROCESSING + QUEUED_FORCED, QUEUED_NOT_FORCED, PROCESSING } - public AsynchronousSaveJobStorage(JobStorage storage, ExecutorService executor) + public AsynchronousSaveJobStorage(JobStorage storage, Executor executor) { this.storage = storage; this.executor = executor; @@ -63,12 +63,21 @@ public Set getJobs(String scheduler, State state) throws DataAccessException return storage.getJobs(scheduler, state); } - public void saveJob(final J job, final boolean saveIfMonitoringDisabled) + public void saveJob(final J job, final boolean force) { - if (!saveIfMonitoringDisabled && !isJdbcLogRequestHistoryInDBEnabled()) { + if (!force && !isJdbcLogRequestHistoryInDBEnabled()) { return; } - if (states.put(job.getId(), UpdateState.QUEUED) == null) { + + UpdateState state; + if (force) { + state = states.put(job.getId(), UpdateState.QUEUED_FORCED); + } else { + while ((state = states.putIfAbsent(job.getId(), UpdateState.QUEUED_NOT_FORCED)) == UpdateState.PROCESSING && + !states.replace(job.getId(), UpdateState.PROCESSING, UpdateState.QUEUED_NOT_FORCED)); + } + + if (state == null) { boolean success = false; try { Runnable task = @@ -77,9 +86,9 @@ public void saveJob(final J job, final boolean saveIfMonitoringDisabled) @Override public void run() { - states.put(job.getId(), UpdateState.PROCESSING); + UpdateState state = states.put(job.getId(), UpdateState.PROCESSING); try { - storage.saveJob(job, saveIfMonitoringDisabled); + storage.saveJob(job, state == UpdateState.QUEUED_FORCED); } catch (DataAccessException e) { LOGGER.error("SQL statement failed: {}", e.getMessage()); } catch (Throwable e) { diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/CanonicalizingJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/CanonicalizingJobStorage.java index 9c96490c707..6e8664c0b89 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/CanonicalizingJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/CanonicalizingJobStorage.java @@ -105,13 +105,13 @@ public Set getJobs(String scheduler, State state) throws DataAccessException } @Override - public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException + public void saveJob(J job, boolean force) throws DataAccessException { Job other = map.putIfAbsent(job.getId(), job); if (other != null && other != job) { throw new IllegalStateException("Duplicate job #" + job.getId()); } - storage.saveJob(job, saveIfMonitoringDisabled); + storage.saveJob(job, force); } @Override diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java index 1b796af0384..e623bce1bad 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java @@ -47,9 +47,9 @@ public Set getJobs(String scheduler, State state) throws DataAccessException } @Override - public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException { + public void saveJob(J job, boolean force) throws DataAccessException { if(job.getState().isFinalState()) { - jobStorage.saveJob(job,saveIfMonitoringDisabled); + jobStorage.saveJob(job, force); } } diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/JobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/JobStorage.java index ea022b8d638..f41841b4ab7 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/JobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/JobStorage.java @@ -94,11 +94,11 @@ public interface JobStorage { /** * * @param job Job to save - * @param saveIfMonitoringDisabled if this is false and monitoring jdbc login + * @param force if this is false and monitoring jdbc login * disabled, this operation will be ignored * @throws SQLException */ - void saveJob(J job, boolean saveIfMonitoringDisabled) + void saveJob(J job, boolean force) throws DataAccessException; boolean isJdbcLogRequestHistoryInDBEnabled(); diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/NoopJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/NoopJobStorage.java index 307fb1a01cf..db5389bc8dc 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/NoopJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/NoopJobStorage.java @@ -41,7 +41,7 @@ public Set getJobs(String scheduler, State state) { } @Override - public void saveJob(J job, boolean saveIfMonitoringDisabled) { + public void saveJob(J job, boolean force) { } @Override diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/SharedMemoryCacheJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/SharedMemoryCacheJobStorage.java index ff13759e397..1adaa4b4e83 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/SharedMemoryCacheJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/SharedMemoryCacheJobStorage.java @@ -10,7 +10,6 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import org.dcache.srm.request.Job; @@ -126,9 +125,9 @@ public Set getJobs(String scheduler, State state) throws DataAccessException } @Override - public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException + public void saveJob(J job, boolean force) throws DataAccessException { - storage.saveJob(job, saveIfMonitoringDisabled); + storage.saveJob(job, force); sharedMemoryCache.update(job); updateExpirationSet(job); } diff --git a/modules/srm-server/src/test/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorageTest.java b/modules/srm-server/src/test/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorageTest.java new file mode 100644 index 00000000000..ae3741b7030 --- /dev/null +++ b/modules/srm-server/src/test/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorageTest.java @@ -0,0 +1,120 @@ +package org.dcache.srm.scheduler; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +import org.dcache.srm.request.Job; + +import static org.mockito.Mockito.*; + +public class AsynchronousSaveJobStorageTest +{ + private JobStorage storage; + private List tasks; + private AsynchronousSaveJobStorage asyncStorage; + private Job job; + + @Before + public void setUp() throws Exception + { + storage = mock(JobStorage.class); + job = mock(Job.class); + tasks = new ArrayList<>(); + asyncStorage = new AsynchronousSaveJobStorage<>(storage, new ListExecutor(tasks)); + } + + @Test + public void whenRequestHistoryLoggingIsDisabledAndSavingWithForceThenActualSaveIsWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(false); + asyncStorage.saveJob(job, true); + runTasks(); + verify(storage).saveJob(job, true); + } + + @Test + public void whenRequestHistoryLoggingIsEnabledAndSavingWithForceThenActualSaveIsWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, true); + runTasks(); + verify(storage).saveJob(job, true); + } + + @Test + public void whenRequestHistoryLoggingIsEnabledAndSavingWithoutForceThenActualSaveIsWithoutForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, false); + runTasks(); + verify(storage).saveJob(job, false); + } + + @Test + public void whenSavingTwiceWithoutForceThenActualSaveIsOnceWithoutForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, false); + asyncStorage.saveJob(job, false); + runTasks(); + verify(storage).saveJob(job, false); + } + + @Test + public void whenSavingTwiceWithForceThenActualSaveIsOnceWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, true); + asyncStorage.saveJob(job, true); + runTasks(); + verify(storage).saveJob(job, true); + } + + @Test + public void whenSavingTwiceWithAndWithoutForceThenActualSaveIsOnceWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, true); + asyncStorage.saveJob(job, false); + runTasks(); + verify(storage).saveJob(job, true); + } + + @Test + public void whenSavingTwiceWithoutAndWithForceThenActualSaveIsOnceWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, false); + asyncStorage.saveJob(job, true); + runTasks(); + verify(storage).saveJob(job, true); + } + + private void runTasks() + { + for (Runnable task : tasks) { + task.run(); + } + } + + private static class ListExecutor implements Executor + { + private final List tasks; + + private ListExecutor(List tasks) + { + this.tasks = tasks; + } + + @Override + public void execute( + Runnable command) + { + tasks.add(command); + } + } +} \ No newline at end of file