Skip to content

Commit

Permalink
dcache-bulk: increment/decrement in-memory counts exclusively in the …
Browse files Browse the repository at this point in the history
…target object

Motivation:

See again #7164
BULK: in-memory counter is not decrementing when RELEASE activity completes

The current approach to keeping in-memory counts is flawed by the
fact that there is no single point at which the increment/decrement
takes place, leading to inconsistencies.

Modifications:

Inject the statistics class into the in-memory BulkRequestTarget
object, and do all incrementing and decrementing there.
Remove all calls to the statistics class from all other classes.

Result:

It looks like the counts are finally consistent.

Target: master
Request: 9.0
Patch: https://rb.dcache.org/r/13994/
Closes: #13994
Requires-notes: yes
Acked-by: Lea
  • Loading branch information
alrossi committed May 23, 2023
1 parent d7653b9 commit 4525fc1
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 58 deletions.
Expand Up @@ -91,7 +91,7 @@ protected LogTargetActivity(String name, TargetType targetType) {
public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, FsPath path,
FileAttributes attributes) {
long now = System.currentTimeMillis();
BulkRequestTarget t = BulkRequestTargetBuilder.builder().activity(this.getName()).id(tid)
BulkRequestTarget t = BulkRequestTargetBuilder.builder(null).activity(this.getName()).id(tid)
.ruid(ruid).state(State.RUNNING).path(path).createdAt(now).attributes(attributes)
.startedAt(now).lastUpdated(now).build();

Expand Down
Expand Up @@ -64,7 +64,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.util.BulkRequestTarget.State.COMPLETED;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.FAILED;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.READY;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.RUNNING;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

Expand Down Expand Up @@ -194,7 +193,6 @@ public void cancel() {
containerState = ContainerState.STOP;

target.cancel();
statistics.decrement(RUNNING.name());

LOGGER.debug("cancel {}: target state is now {}.", ruid, target.getState());

Expand All @@ -206,7 +204,6 @@ public void cancel() {
LOGGER.debug("cancel {}: waiting {}.", ruid, waiting.size());
waiting.values().forEach(r -> r.cancel(activity));
LOGGER.debug("cancel {}: waiting targets cancelled.", ruid);
statistics.decrement(RUNNING.name(), waiting.size());
waiting.clear();
}

Expand All @@ -222,7 +219,6 @@ public void cancel(long id) {
BatchedResult result = i.next();
if (result.getTarget().getId() == id) {
result.cancel(activity);
statistics.decrement(RUNNING.name());
i.remove();
break;
}
Expand Down Expand Up @@ -301,7 +297,6 @@ public void run() {
semaphore = new Semaphore(1); /* synchronous */
processDirTargets();
containerState = ContainerState.STOP;
statistics.decrement(RUNNING.name());
update(COMPLETED);
break;
default:
Expand Down Expand Up @@ -346,7 +341,6 @@ public void uncaughtException(Thread t, Throwable e) {
* manager thread.
*/
containerState = ContainerState.STOP;
statistics.decrement(RUNNING.name());
target.setErrorObject(e);
update(FAILED);
ThreadGroup group = t.getThreadGroup();
Expand Down Expand Up @@ -440,7 +434,6 @@ protected void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes di
if (pid == PID.INITIAL) {
targetStore.storeOrUpdate(toTarget(id, pid, path, Optional.of(dirAttributes),
SKIPPED, null));
statistics.increment(SKIPPED.name());
}
}
}
Expand Down Expand Up @@ -512,7 +505,7 @@ protected BulkRequestTarget toTarget(Long id, PID pid, FsPath path, Optional<Fil
errorMessage = root.getMessage();
}

return BulkRequestTargetBuilder.builder().attributes(attributes.orElse(null))
return BulkRequestTargetBuilder.builder(statistics).attributes(attributes.orElse(null))
.activity(activity.getName()).id(id).pid(pid).rid(rid).ruid(ruid).state(state)
.createdAt(System.currentTimeMillis()).errorType(errorType)
.errorMessage(errorMessage).path(path).build();
Expand Down
Expand Up @@ -213,7 +213,6 @@ protected void retryFailed(BatchedResult result, FileAttributes attributes)
throws BulkStorageException {
BulkRequestTarget completedTarget = result.getTarget();
completedTarget.resetToReady();
statistics.decrement(completedTarget.getState().name());
try {
perform(completedTarget);
} catch (InterruptedException e) {
Expand All @@ -237,7 +236,6 @@ private void addInfo(BulkRequestTarget target) {
LOGGER.error("addInfo {}, path {}, error {}.", ruid, target.getPath(), e.getMessage());
target.setState(FAILED);
target.setErrorObject(e);
statistics.increment(FAILED.name());
try {
targetStore.storeOrUpdate(target);
} catch (BulkStorageException ex) {
Expand All @@ -254,7 +252,6 @@ private void handleCompletion(BatchedResult result) {

BulkRequestTarget completedTarget = result.getTarget();
State state = completedTarget.getState();
statistics.decrement(RUNNING.name());

try {
if (state == FAILED && activity.getRetryPolicy().shouldRetry(completedTarget)) {
Expand Down
Expand Up @@ -146,7 +146,6 @@ protected void processFileTargets() throws InterruptedException {
LOGGER.error("problem handling target {}: {}.", tgt, e.toString());
tgt.setState(FAILED);
tgt.setErrorObject(e);
statistics.increment(FAILED.name());
try {
targetStore.storeOrUpdate(tgt);
} catch (BulkStorageException ex) {
Expand Down Expand Up @@ -185,7 +184,6 @@ protected void retryFailed(BatchedResult result, FileAttributes attributes)
FsPath path = completedTarget.getPath();
PID pid = completedTarget.getPid();
completedTarget.resetToReady();
statistics.decrement(completedTarget.getState().name());
try {
perform(id, pid, path, attributes);
} catch (InterruptedException e) {
Expand All @@ -201,7 +199,6 @@ private void handleCompletion(BatchedResult result, FileAttributes attributes) {

BulkRequestTarget completedTarget = result.getTarget();
State state = completedTarget.getState();
statistics.decrement(RUNNING.name());

try {
if (state == FAILED && activity.getRetryPolicy().shouldRetry(completedTarget)) {
Expand Down Expand Up @@ -285,7 +282,6 @@ private void register(Long id, PID pid, FsPath path, ListenableFuture future, Fi
* returned from the database.
*/
targetStore.storeOrUpdate(target);
statistics.increment(RUNNING.name());
} catch (BulkStorageException e) {
LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", ruid, result,
attributes, e.toString());
Expand Down
Expand Up @@ -112,7 +112,7 @@ public AbstractRequestContainerJob createRequestJob(BulkRequest request)
FileAttributes attributes = new FileAttributes();
attributes.setFileType(FileType.SPECIAL);
attributes.setPnfsId(PLACEHOLDER_PNFSID);
BulkRequestTarget target = BulkRequestTargetBuilder.builder()
BulkRequestTarget target = BulkRequestTargetBuilder.builder(statistics)
.activity(activity.getName())
.rid(request.getId()).ruid(request.getUid()).pid(PID.ROOT).attributes(attributes)
.path(ROOT_REQUEST_PATH).build();
Expand Down
Expand Up @@ -189,7 +189,7 @@ public void abort(BulkRequest request, Throwable exception) {

Throwable root = Throwables.getRootCause(exception);

BulkRequestTarget target = BulkRequestTargetBuilder.builder().rid(request.getId())
BulkRequestTarget target = BulkRequestTargetBuilder.builder(statistics).rid(request.getId())
.pid(PID.ROOT).activity(request.getActivity())
.path(ROOT_REQUEST_PATH).attributes(attributes)
.errorType(root.getClass().getCanonicalName())
Expand Down
Expand Up @@ -72,7 +72,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.services.bulk.util.BulkTargetFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -86,7 +85,6 @@ public final class JdbcBulkTargetStore implements BulkTargetStore {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBulkTargetStore.class);

private JdbcRequestTargetDao targetDao;
private BulkServiceStatistics statistics;

@Override
public void abort(BulkRequestTarget target)
Expand All @@ -106,15 +104,13 @@ public void abort(BulkRequestTarget target)

@Override
public void cancel(long id) {
int count = targetDao.update(targetDao.where().id(id), targetDao.set().state(State.CANCELLED));
statistics.increment(State.CANCELLED.name(), count);
targetDao.update(targetDao.where().id(id), targetDao.set().state(State.CANCELLED));
}

@Override
public void cancelAll(Long rid) {
int count = targetDao.update(targetDao.where().rid(rid).state(NON_TERMINAL),
targetDao.update(targetDao.where().rid(rid).state(NON_TERMINAL),
targetDao.set().state(State.CANCELLED));
statistics.increment(State.CANCELLED.name(), count);
}

@Override
Expand Down Expand Up @@ -184,11 +180,6 @@ public void setTargetDao(JdbcRequestTargetDao targetDao) {
this.targetDao = targetDao;
}

@Required
public void setStatistics(BulkServiceStatistics statistics) {
this.statistics = statistics;
}

@Override
public boolean store(BulkRequestTarget target) throws BulkStorageException {
targetDao.insert(prepareUpdate(target))
Expand All @@ -208,9 +199,8 @@ public void storeOrUpdate(BulkRequestTarget target) throws BulkStorageException

@Override
public void update(Long id, State state, String errorType, String errorMessage) throws BulkStorageException {
int count = targetDao.update(targetDao.where().id(id),
targetDao.update(targetDao.where().id(id),
targetDao.set().state(state).errorType(errorType).errorMessage(errorMessage));
statistics.increment(state.name(), count);
}

private JdbcRequestTargetUpdate prepareUpdate(BulkRequestTarget target) {
Expand Down
Expand Up @@ -85,6 +85,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.vehicles.FileAttributes;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
Expand Down Expand Up @@ -136,6 +137,7 @@ private static String getSelect(JdbcRequestTargetCriterion criterion) {
return criterion.isJoined() ? JOINED_SELECT : SELECT;
}

private BulkServiceStatistics statistics;
private JdbcBulkDaoUtils utils;

public int count(JdbcRequestTargetCriterion criterion) {
Expand Down Expand Up @@ -193,6 +195,11 @@ public JdbcRequestTargetUpdate set() {
return new JdbcRequestTargetUpdate();
}

@Required
public void setStatistics(BulkServiceStatistics statistics) {
this.statistics = statistics;
}

@Required
public void setUtils(JdbcBulkDaoUtils utils) {
this.utils = utils;
Expand Down Expand Up @@ -230,7 +237,7 @@ public BulkRequestTarget toRequestTarget(ResultSet rs, int row) throws SQLExcept

String errorType = rs.getString("error_type");
String errorMessage = rs.getString("error_message");
return BulkRequestTargetBuilder.builder()
return BulkRequestTargetBuilder.builder(statistics)
.id(rs.getLong("id"))
.pid(PID.values()[rs.getInt("pid")])
.rid(rs.getLong("rid"))
Expand Down
Expand Up @@ -60,11 +60,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.util;

import static java.util.Objects.requireNonNull;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.CANCELLED;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.CREATED;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.READY;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.RUNNING;

import com.google.common.base.Throwables;
import diskCacheV111.util.FsPath;
import diskCacheV111.util.PnfsId;
import java.sql.Timestamp;
import java.util.Optional;
import org.dcache.namespace.FileType;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.vehicles.FileAttributes;
Expand All @@ -78,8 +83,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
public final class BulkRequestTarget {

public static final State[] NON_TERMINAL = new State[]{State.CREATED, State.READY,
State.RUNNING};
public static final State[] NON_TERMINAL = new State[]{CREATED, READY,
RUNNING};

public static FsPath computeFsPath(String prefix, String target) {
if (prefix == null) {
Expand Down Expand Up @@ -115,6 +120,8 @@ public static String[] parse(String key) {
return key.split(KEY_SEPARATOR);
}

private final Optional<BulkServiceStatistics> statistics;

private Long id;
private PID pid;
private Long rid;
Expand All @@ -130,11 +137,15 @@ public static String[] parse(String key) {
private String errorMessage;
private FileAttributes attributes;

BulkRequestTarget() {
BulkRequestTarget(BulkServiceStatistics statistics) {
this.statistics = Optional.ofNullable(statistics);
/**
* Constructed by fluid builder.
*/
state = State.CREATED;
state = CREATED;
if (statistics != null) {
statistics.increment(CREATED.name());
}
createdAt = System.currentTimeMillis();
}

Expand All @@ -143,7 +154,8 @@ public synchronized boolean cancel() {
case CREATED:
case READY:
case RUNNING:
state = State.CANCELLED;
updateCounters(CANCELLED);
state = CANCELLED;
if (errorType == null) {
errorType = BulkServiceException.class.getCanonicalName();
errorMessage = getKey() + ": " + state;
Expand Down Expand Up @@ -190,7 +202,8 @@ public synchronized boolean isTerminated() {

public synchronized void resetToReady() {
++retried;
state = State.READY;
updateCounters(READY);
state = READY;
setErrorObject(null);
lastUpdated = System.currentTimeMillis();
}
Expand Down Expand Up @@ -278,7 +291,6 @@ public void setCreatedAt(long createdAt) {
public void setErrorObject(Object error) {
Throwable errorObject;
errorType = null;
errorObject = null;
if (error != null) {
if (error instanceof Throwable) {
errorObject = Throwables.getRootCause((Throwable) error);
Expand Down Expand Up @@ -352,6 +364,7 @@ public synchronized boolean setState(State state) {
case COMPLETED:
case FAILED:
case SKIPPED:
updateCounters(state);
this.state = state;
lastUpdated = System.currentTimeMillis();
return true;
Expand All @@ -369,6 +382,7 @@ public synchronized boolean setState(State state) {
case FAILED:
case SKIPPED:
case RUNNING:
updateCounters(state);
this.state = state;
lastUpdated = System.currentTimeMillis();
return true;
Expand All @@ -381,6 +395,7 @@ public synchronized boolean setState(State state) {
}
case CREATED:
default:
updateCounters(state);
this.state = state;
lastUpdated = System.currentTimeMillis();
return true;
Expand All @@ -393,4 +408,11 @@ public String toString() {
state, new Timestamp(createdAt), startedAt == null ? null : new Timestamp(startedAt),
new Timestamp(lastUpdated), retried, getType(), getPnfsId(), path, errorType, errorMessage);
}

private void updateCounters(State state) {
statistics.ifPresent(s -> {
s.decrement(this.state.name());
s.increment(state.name());
});
}
}
Expand Up @@ -71,8 +71,8 @@ public final class BulkRequestTargetBuilder {

private final BulkRequestTarget target;

public static BulkRequestTargetBuilder builder() {
return new BulkRequestTargetBuilder();
public static BulkRequestTargetBuilder builder(BulkServiceStatistics statistics) {
return new BulkRequestTargetBuilder(statistics);
}

public BulkRequestTargetBuilder id(Long id) {
Expand Down Expand Up @@ -149,7 +149,7 @@ public BulkRequestTarget build() {
return target;
}

private BulkRequestTargetBuilder() {
target = new BulkRequestTarget();
private BulkRequestTargetBuilder(BulkServiceStatistics statistics) {
target = new BulkRequestTarget(statistics);
}
}

0 comments on commit 4525fc1

Please sign in to comment.