Skip to content

Commit

Permalink
srm: Fix asynchroneous job storage leak
Browse files Browse the repository at this point in the history
The asynchroneous job storage fails to preserve the force flag when
collapsing several save calls. In case srm.persistence.enable.history
is false (the default), this could lead to incomplete data in the
database. This in case could lead to incomplete cleanup of upload
directories after an SRM restart.

The patch also renames the boolean parameter to something that I find
less confusing.

Target: trunk
Request: 2.10
Request: 2.9
Request: 2.8
Request: 2.7
Require-notes: yes
Require-book: no
Acked-by: Paul Millar <paul.millar@desy.de>
Patch: https://rb.dcache.org/r/7307/
(cherry picked from commit 98f2115)

Conflicts:
	modules/srm-server/src/main/java/org/dcache/srm/scheduler/FinalStateOnlyJobStorageDecorator.java
  • Loading branch information
gbehrmann committed Sep 25, 2014
1 parent 4ee3b2b commit 5a659f4
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 21 deletions.
Expand Up @@ -446,9 +446,9 @@ private List<Job.JobHistory> getJobHistoriesToSave(Job job)


@Override
public void saveJob(final Job job, boolean saveifmonitoringisdesabled) throws DataAccessException
public void saveJob(final Job job, boolean force) throws DataAccessException
{
if (!saveifmonitoringisdesabled && !logHistory) {
if (!force && !logHistory) {
return;
}

Expand Down
Expand Up @@ -9,7 +9,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

import org.dcache.srm.request.Job;
Expand All @@ -20,14 +20,14 @@ public class AsynchronousSaveJobStorage<J extends Job> implements JobStorage<J>

private final JobStorage<J> storage;
private final ConcurrentMap<Long,UpdateState> states = new ConcurrentHashMap<>();
private final ExecutorService executor;
private final Executor executor;

private static enum UpdateState
{
QUEUED, PROCESSING
QUEUED_FORCED, QUEUED_NOT_FORCED, PROCESSING
}

public AsynchronousSaveJobStorage(JobStorage<J> storage, ExecutorService executor)
public AsynchronousSaveJobStorage(JobStorage<J> storage, Executor executor)
{
this.storage = storage;
this.executor = executor;
Expand Down Expand Up @@ -63,12 +63,21 @@ public Set<J> getJobs(String scheduler, State state) throws DataAccessException
return storage.getJobs(scheduler, state);
}

public void saveJob(final J job, final boolean saveIfMonitoringDisabled)
public void saveJob(final J job, final boolean force)
{
if (!saveIfMonitoringDisabled && !isJdbcLogRequestHistoryInDBEnabled()) {
if (!force && !isJdbcLogRequestHistoryInDBEnabled()) {
return;
}
if (states.put(job.getId(), UpdateState.QUEUED) == null) {

UpdateState state;
if (force) {
state = states.put(job.getId(), UpdateState.QUEUED_FORCED);
} else {
while ((state = states.putIfAbsent(job.getId(), UpdateState.QUEUED_NOT_FORCED)) == UpdateState.PROCESSING &&
!states.replace(job.getId(), UpdateState.PROCESSING, UpdateState.QUEUED_NOT_FORCED));
}

if (state == null) {
boolean success = false;
try {
Runnable task =
Expand All @@ -77,9 +86,9 @@ public void saveJob(final J job, final boolean saveIfMonitoringDisabled)
@Override
public void run()
{
states.put(job.getId(), UpdateState.PROCESSING);
UpdateState state = states.put(job.getId(), UpdateState.PROCESSING);
try {
storage.saveJob(job, saveIfMonitoringDisabled);
storage.saveJob(job, state == UpdateState.QUEUED_FORCED);
} catch (DataAccessException e) {
LOGGER.error("SQL statement failed: {}", e.getMessage());
} catch (Throwable e) {
Expand Down
Expand Up @@ -105,13 +105,13 @@ public Set<J> getJobs(String scheduler, State state) throws DataAccessException
}

@Override
public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException
public void saveJob(J job, boolean force) throws DataAccessException
{
Job other = map.putIfAbsent(job.getId(), job);
if (other != null && other != job) {
throw new IllegalStateException("Duplicate job #" + job.getId());
}
storage.saveJob(job, saveIfMonitoringDisabled);
storage.saveJob(job, force);
}

@Override
Expand Down
Expand Up @@ -47,9 +47,9 @@ public Set<J> getJobs(String scheduler, State state) throws DataAccessException
}

@Override
public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException {
public void saveJob(J job, boolean force) throws DataAccessException {
if(job.getState().isFinalState()) {
jobStorage.saveJob(job,saveIfMonitoringDisabled);
jobStorage.saveJob(job, force);
}
}

Expand Down
Expand Up @@ -94,11 +94,11 @@ public interface JobStorage<J extends Job> {
/**
*
* @param job Job to save
* @param saveIfMonitoringDisabled if this is false and monitoring jdbc login
* @param force if this is false and monitoring jdbc login
* disabled, this operation will be ignored
* @throws SQLException
*/
void saveJob(J job, boolean saveIfMonitoringDisabled)
void saveJob(J job, boolean force)
throws DataAccessException;

boolean isJdbcLogRequestHistoryInDBEnabled();
Expand Down
Expand Up @@ -41,7 +41,7 @@ public Set<J> getJobs(String scheduler, State state) {
}

@Override
public void saveJob(J job, boolean saveIfMonitoringDisabled) {
public void saveJob(J job, boolean force) {
}

@Override
Expand Down
Expand Up @@ -10,7 +10,6 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.dcache.srm.request.Job;

Expand Down Expand Up @@ -126,9 +125,9 @@ public Set<J> getJobs(String scheduler, State state) throws DataAccessException
}

@Override
public void saveJob(J job, boolean saveIfMonitoringDisabled) throws DataAccessException
public void saveJob(J job, boolean force) throws DataAccessException
{
storage.saveJob(job, saveIfMonitoringDisabled);
storage.saveJob(job, force);
sharedMemoryCache.update(job);
updateExpirationSet(job);
}
Expand Down
@@ -0,0 +1,120 @@
package org.dcache.srm.scheduler;

import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;

import org.dcache.srm.request.Job;

import static org.mockito.Mockito.*;

public class AsynchronousSaveJobStorageTest
{
private JobStorage<Job> storage;
private List<Runnable> tasks;
private AsynchronousSaveJobStorage<Job> asyncStorage;
private Job job;

@Before
public void setUp() throws Exception
{
storage = mock(JobStorage.class);
job = mock(Job.class);
tasks = new ArrayList<>();
asyncStorage = new AsynchronousSaveJobStorage<>(storage, new ListExecutor(tasks));
}

@Test
public void whenRequestHistoryLoggingIsDisabledAndSavingWithForceThenActualSaveIsWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(false);
asyncStorage.saveJob(job, true);
runTasks();
verify(storage).saveJob(job, true);
}

@Test
public void whenRequestHistoryLoggingIsEnabledAndSavingWithForceThenActualSaveIsWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, true);
runTasks();
verify(storage).saveJob(job, true);
}

@Test
public void whenRequestHistoryLoggingIsEnabledAndSavingWithoutForceThenActualSaveIsWithoutForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, false);
runTasks();
verify(storage).saveJob(job, false);
}

@Test
public void whenSavingTwiceWithoutForceThenActualSaveIsOnceWithoutForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, false);
asyncStorage.saveJob(job, false);
runTasks();
verify(storage).saveJob(job, false);
}

@Test
public void whenSavingTwiceWithForceThenActualSaveIsOnceWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, true);
asyncStorage.saveJob(job, true);
runTasks();
verify(storage).saveJob(job, true);
}

@Test
public void whenSavingTwiceWithAndWithoutForceThenActualSaveIsOnceWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, true);
asyncStorage.saveJob(job, false);
runTasks();
verify(storage).saveJob(job, true);
}

@Test
public void whenSavingTwiceWithoutAndWithForceThenActualSaveIsOnceWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, false);
asyncStorage.saveJob(job, true);
runTasks();
verify(storage).saveJob(job, true);
}

private void runTasks()
{
for (Runnable task : tasks) {
task.run();
}
}

private static class ListExecutor implements Executor
{
private final List<Runnable> tasks;

private ListExecutor(List<Runnable> tasks)
{
this.tasks = tasks;
}

@Override
public void execute(
Runnable command)
{
tasks.add(command);
}
}
}

0 comments on commit 5a659f4

Please sign in to comment.