Skip to content

Commit

Permalink
Merge pull request #2080 from gbehrmann/fix/2.13/rb8889
Browse files Browse the repository at this point in the history
srm: Fix saving of transient states to database
  • Loading branch information
paulmillar committed Jan 7, 2016
2 parents 572986a + 4946340 commit 56485c3
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 137 deletions.
Expand Up @@ -181,9 +181,7 @@ protected FileRequest(
}

public void addDebugHistoryEvent(String description) {
if(getJobStorage().isJdbcLogRequestHistoryInDBEnabled()) {
addHistoryEvent( description);
}
addHistoryEvent( description);
}

/**
Expand Down
Expand Up @@ -202,12 +202,6 @@ protected Request(
@Nullable
private String description;

public void addDebugHistoryEvent(String description) {
if(getJobStorage().isJdbcLogRequestHistoryInDBEnabled()) {
addHistoryEvent( description);
}
}

/**
* gets request id as int
* @return
Expand Down
Expand Up @@ -151,12 +151,6 @@ public DatabaseJobStorage(Configuration.DatabaseParameters configuration, Schedu
dbInit();
}

@Override
public boolean isJdbcLogRequestHistoryInDBEnabled()
{
return logHistory;
}

public static final String createFileRequestTablePrefix =
"ID "+ longType+" NOT NULL PRIMARY KEY"+
","+
Expand Down Expand Up @@ -368,44 +362,6 @@ private J getJob(Connection _con, ResultSet set) throws SQLException
return job;
}

private int updateJob(Connection connection, Job job) throws SQLException
{
PreparedStatement updateStatement = null;
try {
job.rlock();
try {
updateStatement = getUpdateStatement(connection, job);
} finally {
job.runlock();
}
return updateStatement.executeUpdate();
} finally {
SqlHelper.tryToClose(updateStatement);
}
}

private void createJob(Connection connection, Job job) throws SQLException
{
PreparedStatement createStatement = null;
PreparedStatement batchCreateStatement = null;
try {
job.rlock();
try {
createStatement = getCreateStatement(connection, job);
batchCreateStatement = getBatchCreateStatement(connection, job);
} finally {
job.runlock();
}
createStatement.executeUpdate();
if (batchCreateStatement != null) {
batchCreateStatement.executeBatch();
}
} finally {
SqlHelper.tryToClose(createStatement);
SqlHelper.tryToClose(batchCreateStatement);
}
}

private void saveHistory(Connection connection, Job job,
List<Job.JobHistory> history) throws SQLException
{
Expand Down Expand Up @@ -442,22 +398,41 @@ private List<Job.JobHistory> getJobHistoriesToSave(Job job)
@Override
public void saveJob(final Job job, boolean force) throws DataAccessException
{
if (!force && !logHistory) {
return;
}
List<Job.JobHistory> savedHistory =
transactionTemplate.execute(status -> jdbcTemplate.execute((Connection con) -> {
List<Job.JobHistory> history;
PreparedStatement updateStatement = null;
PreparedStatement createStatement = null;
PreparedStatement batchCreateStatement = null;
try {
job.rlock();
try {
history = getJobHistoriesToSave(job);
updateStatement = getUpdateStatement(con, job);
createStatement = getCreateStatement(con, job);
batchCreateStatement = getBatchCreateStatement(con, job);
} finally {
job.runlock();
}

final List<Job.JobHistory> history = getJobHistoriesToSave(job);
transactionTemplate.execute(status -> jdbcTemplate.execute((Connection con) -> {
int rowCount = updateJob(con, job);
if (rowCount == 0) {
createJob(con, job);
}
if (!history.isEmpty()) {
saveHistory(con, job, history);
}
return null;
}));
markHistoryAsSaved(history);
int rowCount = updateStatement.executeUpdate();
if (rowCount == 0) {
createStatement.executeUpdate();
if (batchCreateStatement != null) {
batchCreateStatement.executeBatch();
}
}
if (!history.isEmpty()) {
saveHistory(con, job, history);
}
} finally {
SqlHelper.tryToClose(createStatement);
SqlHelper.tryToClose(batchCreateStatement);
SqlHelper.tryToClose(updateStatement);
}
return history;
}));
markHistoryAsSaved(savedHistory);
}

protected PreparedStatement getBatchCreateStatement(Connection connection, Job job)
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.dcache.srm.request.ReserveSpaceRequest;
import org.dcache.srm.scheduler.AsynchronousSaveJobStorage;
import org.dcache.srm.scheduler.CanonicalizingJobStorage;
import org.dcache.srm.scheduler.FinalStateOnlyJobStorageDecorator;
import org.dcache.srm.scheduler.ForceOnlyJobStorageDecorator;
import org.dcache.srm.scheduler.JobStorage;
import org.dcache.srm.scheduler.JobStorageFactory;
import org.dcache.srm.scheduler.NoopJobStorage;
Expand Down Expand Up @@ -69,7 +69,7 @@ private <J extends Job> void add(Configuration.DatabaseParameters config,
.newInstance(config, scheduledExecutor);
js = new AsynchronousSaveJobStorage<>(js, executor);
if (config.getStoreCompletedRequestsOnly()) {
js = new FinalStateOnlyJobStorageDecorator<>(js);
js = new ForceOnlyJobStorageDecorator<>(js);
}
} else {
js = new NoopJobStorage<>();
Expand Down
Expand Up @@ -66,10 +66,6 @@ public Set<J> getJobs(String scheduler, State state) throws DataAccessException

public void saveJob(final J job, final boolean force)
{
if (!force && !isJdbcLogRequestHistoryInDBEnabled()) {
return;
}

UpdateState existingState;
if (force) {
existingState = states.put(job.getId(), UpdateState.QUEUED_FORCED);
Expand Down Expand Up @@ -124,12 +120,6 @@ public void run()
}
}

@Override
public boolean isJdbcLogRequestHistoryInDBEnabled()
{
return storage.isJdbcLogRequestHistoryInDBEnabled();
}

@Override
public Set<Long> getLatestCompletedJobIds(int maxNum) throws DataAccessException
{
Expand Down
Expand Up @@ -106,12 +106,6 @@ public void saveJob(J job, boolean force) throws DataAccessException
storage.saveJob(job, force);
}

@Override
public boolean isJdbcLogRequestHistoryInDBEnabled()
{
return storage.isJdbcLogRequestHistoryInDBEnabled();
}

@Override
public Set<Long> getLatestCompletedJobIds(int maxNum) throws DataAccessException
{
Expand Down
Expand Up @@ -12,10 +12,10 @@
*
* @author timur
*/
public class FinalStateOnlyJobStorageDecorator<J extends Job> implements JobStorage<J> {
public class ForceOnlyJobStorageDecorator<J extends Job> implements JobStorage<J> {

private final JobStorage<J> jobStorage;
public FinalStateOnlyJobStorageDecorator(JobStorage<J> jobStorage ) {
public ForceOnlyJobStorageDecorator(JobStorage<J> jobStorage ) {
this.jobStorage = jobStorage;
}

Expand Down Expand Up @@ -47,7 +47,7 @@ public Set<J> getJobs(String scheduler, State state) throws DataAccessException

@Override
public void saveJob(J job, boolean force) throws DataAccessException {
if (force || job.getState().isFinal()) {
if (force) {
jobStorage.saveJob(job, force);
}
}
Expand Down Expand Up @@ -82,9 +82,4 @@ public Set<J> getActiveJobs()
return jobStorage.getActiveJobs();
}

@Override
public boolean isJdbcLogRequestHistoryInDBEnabled()
{
return jobStorage.isJdbcLogRequestHistoryInDBEnabled();
}
}
Expand Up @@ -101,8 +101,6 @@ public interface JobStorage<J extends Job> {
void saveJob(J job, boolean force)
throws DataAccessException;

boolean isJdbcLogRequestHistoryInDBEnabled();

Set<Long> getLatestCompletedJobIds(int maxNum) throws DataAccessException;
Set<Long> getLatestDoneJobIds(int maxNum) throws DataAccessException;
Set<Long> getLatestFailedJobIds(int maxNum) throws DataAccessException;
Expand Down
Expand Up @@ -74,9 +74,4 @@ public Set<J> getActiveJobs()
return Collections.emptySet();
}

@Override
public boolean isJdbcLogRequestHistoryInDBEnabled()
{
return false;
}
}
Expand Up @@ -5,7 +5,6 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -123,12 +122,6 @@ public void saveJob(J job, boolean force) throws DataAccessException
updateExpirationSet(job);
}

@Override
public boolean isJdbcLogRequestHistoryInDBEnabled()
{
return storage.isJdbcLogRequestHistoryInDBEnabled();
}

@Override
public Set<Long> getLatestCompletedJobIds(int maxNum) throws DataAccessException
{
Expand Down
Expand Up @@ -29,37 +29,16 @@ public void setUp() throws Exception
}

@Test
public void whenRequestHistoryLoggingIsDisabledAndSavingWithForceThenActualSaveIsWithForce() throws Exception
public void whenSavingWithForceThenActualSaveIsWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(false);
asyncStorage.saveJob(job, true);
runTasks();
verify(storage).saveJob(job, true);
}

@Test
public void whenRequestHistoryLoggingIsEnabledAndSavingWithForceThenActualSaveIsWithForce() throws Exception
public void whenSavingWithoutForceThenActualSaveIsWithoutForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, true);
runTasks();
verify(storage).saveJob(job, true);
}

@Test
public void whenRequestHistoryLoggingIsEnabledAndSavingWithoutForceThenActualSaveIsWithoutForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, false);
runTasks();
verify(storage).saveJob(job, false);
}

@Test
public void whenSavingTwiceWithoutForceThenActualSaveIsOnceWithoutForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, false);
asyncStorage.saveJob(job, false);
runTasks();
verify(storage).saveJob(job, false);
Expand All @@ -68,7 +47,6 @@ public void whenSavingTwiceWithoutForceThenActualSaveIsOnceWithoutForce() throws
@Test
public void whenSavingTwiceWithForceThenActualSaveIsOnceWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, true);
asyncStorage.saveJob(job, true);
runTasks();
Expand All @@ -78,7 +56,6 @@ public void whenSavingTwiceWithForceThenActualSaveIsOnceWithForce() throws Excep
@Test
public void whenSavingTwiceWithAndWithoutForceThenActualSaveIsOnceWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, true);
asyncStorage.saveJob(job, false);
runTasks();
Expand All @@ -88,7 +65,6 @@ public void whenSavingTwiceWithAndWithoutForceThenActualSaveIsOnceWithForce() th
@Test
public void whenSavingTwiceWithoutAndWithForceThenActualSaveIsOnceWithForce() throws Exception
{
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
asyncStorage.saveJob(job, false);
asyncStorage.saveJob(job, true);
runTasks();
Expand All @@ -100,7 +76,6 @@ public void whenExecutionQueueIsFullForcedSaveIsStillExecuted() throws Exception
{
Executor executor = mock(Executor.class);
asyncStorage = new AsynchronousSaveJobStorage<>(storage, executor);
when(storage.isJdbcLogRequestHistoryInDBEnabled()).thenReturn(true);
doThrow(RejectedExecutionException.class).when(executor).execute(any(Runnable.class));
asyncStorage.saveJob(job, true);
verify(storage).saveJob(job, true);
Expand Down
3 changes: 1 addition & 2 deletions skel/share/defaults/srm.properties
Expand Up @@ -883,8 +883,7 @@ srm.persistence.reserve-space.enable.history = ${srm.persistence.enable.history}
# The setting does not control which information is stored in the database. When a
# request is eventually stored, all available information is stored.
#
# If srm.persistence.enable or srm.persistence.enable.history are set to false,
# this setting has no effect.
# If srm.persistence.enable is set to false, this setting has no effect.
#
(forbidden)srmStoreCompletedRequestsOnly = Use srm.persistence.enable.store-transient-state
(one-of?true|false)srm.persistence.enable.store-transient-state = false
Expand Down

0 comments on commit 56485c3

Please sign in to comment.