From 5a659f4a9bb229336c7cefa6e6801bac5f486271 Mon Sep 17 00:00:00 2001 From: Gerd Behrmann Date: Tue, 23 Sep 2014 21:58:49 +0200 Subject: [PATCH] srm: Fix asynchroneous job storage leak The asynchroneous job storage fails to preserve the force flag when collapsing several save calls. In case srm.persistence.enable.history is false (the default), this could lead to incomplete data in the database. This in case could lead to incomplete cleanup of upload directories after an SRM restart. The patch also renames the boolean parameter to something that I find less confusing. Target: trunk Request: 2.10 Request: 2.9 Request: 2.8 Request: 2.7 Require-notes: yes Require-book: no Acked-by: Paul Millar Patch: https://rb.dcache.org/r/7307/ (cherry picked from commit 98f21155a8927aae68a2619c3738bb31f0110fa1) Conflicts: modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java --- .../srm/request/sql/DatabaseJobStorage.java | 4 +- .../scheduler/AsynchronousSaveJobStorage.java | 27 ++-- .../scheduler/CanonicalizingJobStorage.java | 4 +- .../FinalStateOnlyJobStorageDecorator.java | 4 +- .../org/dcache/srm/scheduler/JobStorage.java | 4 +- .../dcache/srm/scheduler/NoopJobStorage.java | 2 +- .../SharedMemoryCacheJobStorage.java | 5 +- .../AsynchronousSaveJobStorageTest.java | 120 ++++++++++++++++++ 8 files changed, 149 insertions(+), 21 deletions(-) create mode 100644 modules/srm-server/src/test/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorageTest.java diff --git a/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java index 05566a8e00b..2f61a90b485 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/request/sql/DatabaseJobStorage.java @@ -446,9 +446,9 @@ private List getJobHistoriesToSave(Job job) @Override - public void saveJob(final Job job, boolean saveifmonitoringisdesabled) throws DataAccessException + public void saveJob(final Job job, boolean force) throws DataAccessException { - if (!saveifmonitoringisdesabled && !logHistory) { + if (!force && !logHistory) { return; } diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorage.java index 2f56df36bb3..e1eaab42206 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorage.java @@ -9,7 +9,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import org.dcache.srm.request.Job; @@ -20,14 +20,14 @@ public class AsynchronousSaveJobStorage implements JobStorage private final JobStorage storage; private final ConcurrentMap states = new ConcurrentHashMap<>(); - private final ExecutorService executor; + private final Executor executor; private static enum UpdateState { - QUEUED, PROCESSING + QUEUED_FORCED, QUEUED_NOT_FORCED, PROCESSING } - public AsynchronousSaveJobStorage(JobStorage storage, ExecutorService executor) + public AsynchronousSaveJobStorage(JobStorage storage, Executor executor) { this.storage = storage; this.executor = executor; @@ -63,12 +63,21 @@ public Set getJobs(String scheduler, State state) throws DataAccessException return storage.getJobs(scheduler, state); } - public void saveJob(final J job, final boolean saveIfMonitoringDisabled) + public void saveJob(final J job, final boolean force) { - if (!saveIfMonitoringDisabled && !isJdbcLogRequestHistoryInDBEnabled()) { + if (!force && !isJdbcLogRequestHistoryInDBEnabled()) { return; } - if (states.put(job.getId(), UpdateState.QUEUED) == null) { + + UpdateState state; + if (force) { + state = states.put(job.getId(), UpdateState.QUEUED_FORCED); + } else { + while ((state = states.putIfAbsent(job.getId(), UpdateState.QUEUED_NOT_FORCED)) == UpdateState.PROCESSING && + !states.replace(job.getId(), UpdateState.PROCESSING, UpdateState.QUEUED_NOT_FORCED)); + } + + if (state == null) { boolean success = false; try { Runnable task = @@ -77,9 +86,9 @@ public void saveJob(final J job, final boolean saveIfMonitoringDisabled) @Override public void run() { - states.put(job.getId(), UpdateState.PROCESSING); + UpdateState state = states.put(job.getId(), UpdateState.PROCESSING); try { - storage.saveJob(job, saveIfMonitoringDisabled); + storage.saveJob(job, state == UpdateState.QUEUED_FORCED); } catch (DataAccessException e) { LOGGER.error("SQL statement failed: {}", e.getMessage()); } catch (Throwable e) { diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/CanonicalizingJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/CanonicalizingJobStorage.java index 9c96490c707..6e8664c0b89 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/CanonicalizingJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/CanonicalizingJobStorage.java @@ -105,13 +105,13 @@ public Set getJobs(String scheduler, State state) throws DataAccessException } @Override - public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException + public void saveJob(J job, boolean force) throws DataAccessException { Job other = map.putIfAbsent(job.getId(), job); if (other != null && other != job) { throw new IllegalStateException("Duplicate job #" + job.getId()); } - storage.saveJob(job, saveIfMonitoringDisabled); + storage.saveJob(job, force); } @Override diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java index 1b796af0384..e623bce1bad 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java @@ -47,9 +47,9 @@ public Set getJobs(String scheduler, State state) throws DataAccessException } @Override - public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException { + public void saveJob(J job, boolean force) throws DataAccessException { if(job.getState().isFinalState()) { - jobStorage.saveJob(job,saveIfMonitoringDisabled); + jobStorage.saveJob(job, force); } } diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/JobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/JobStorage.java index ea022b8d638..f41841b4ab7 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/JobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/JobStorage.java @@ -94,11 +94,11 @@ public interface JobStorage { /** * * @param job Job to save - * @param saveIfMonitoringDisabled if this is false and monitoring jdbc login + * @param force if this is false and monitoring jdbc login * disabled, this operation will be ignored * @throws SQLException */ - void saveJob(J job, boolean saveIfMonitoringDisabled) + void saveJob(J job, boolean force) throws DataAccessException; boolean isJdbcLogRequestHistoryInDBEnabled(); diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/NoopJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/NoopJobStorage.java index 307fb1a01cf..db5389bc8dc 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/NoopJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/NoopJobStorage.java @@ -41,7 +41,7 @@ public Set getJobs(String scheduler, State state) { } @Override - public void saveJob(J job, boolean saveIfMonitoringDisabled) { + public void saveJob(J job, boolean force) { } @Override diff --git a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/SharedMemoryCacheJobStorage.java b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/SharedMemoryCacheJobStorage.java index ff13759e397..1adaa4b4e83 100644 --- a/modules/srm-server/src/main/java/org/dcache/srm/scheduler/SharedMemoryCacheJobStorage.java +++ b/modules/srm-server/src/main/java/org/dcache/srm/scheduler/SharedMemoryCacheJobStorage.java @@ -10,7 +10,6 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import org.dcache.srm.request.Job; @@ -126,9 +125,9 @@ public Set getJobs(String scheduler, State state) throws DataAccessException } @Override - public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException + public void saveJob(J job, boolean force) throws DataAccessException { - storage.saveJob(job, saveIfMonitoringDisabled); + storage.saveJob(job, force); sharedMemoryCache.update(job); updateExpirationSet(job); } diff --git a/modules/srm-server/src/test/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorageTest.java b/modules/srm-server/src/test/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorageTest.java new file mode 100644 index 00000000000..ae3741b7030 --- /dev/null +++ b/modules/srm-server/src/test/java/org/dcache/srm/scheduler/AsynchronousSaveJobStorageTest.java @@ -0,0 +1,120 @@ +package org.dcache.srm.scheduler; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +import org.dcache.srm.request.Job; + +import static org.mockito.Mockito.*; + +public class AsynchronousSaveJobStorageTest +{ + private JobStorage storage; + private List tasks; + private AsynchronousSaveJobStorage asyncStorage; + private Job job; + + @Before + public void setUp() throws Exception + { + storage = mock(JobStorage.class); + job = mock(Job.class); + tasks = new ArrayList<>(); + asyncStorage = new AsynchronousSaveJobStorage<>(storage, new ListExecutor(tasks)); + } + + @Test + public void whenRequestHistoryLoggingIsDisabledAndSavingWithForceThenActualSaveIsWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(false); + asyncStorage.saveJob(job, true); + runTasks(); + verify(storage).saveJob(job, true); + } + + @Test + public void whenRequestHistoryLoggingIsEnabledAndSavingWithForceThenActualSaveIsWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, true); + runTasks(); + verify(storage).saveJob(job, true); + } + + @Test + public void whenRequestHistoryLoggingIsEnabledAndSavingWithoutForceThenActualSaveIsWithoutForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, false); + runTasks(); + verify(storage).saveJob(job, false); + } + + @Test + public void whenSavingTwiceWithoutForceThenActualSaveIsOnceWithoutForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, false); + asyncStorage.saveJob(job, false); + runTasks(); + verify(storage).saveJob(job, false); + } + + @Test + public void whenSavingTwiceWithForceThenActualSaveIsOnceWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, true); + asyncStorage.saveJob(job, true); + runTasks(); + verify(storage).saveJob(job, true); + } + + @Test + public void whenSavingTwiceWithAndWithoutForceThenActualSaveIsOnceWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, true); + asyncStorage.saveJob(job, false); + runTasks(); + verify(storage).saveJob(job, true); + } + + @Test + public void whenSavingTwiceWithoutAndWithForceThenActualSaveIsOnceWithForce() throws Exception + { + when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true); + asyncStorage.saveJob(job, false); + asyncStorage.saveJob(job, true); + runTasks(); + verify(storage).saveJob(job, true); + } + + private void runTasks() + { + for (Runnable task : tasks) { + task.run(); + } + } + + private static class ListExecutor implements Executor + { + private final List tasks; + + private ListExecutor(List tasks) + { + this.tasks = tasks; + } + + @Override + public void execute( + Runnable command) + { + tasks.add(command); + } + } +} \ No newline at end of file