Skip to content

Commit

Permalink
dcache-qos: cache modify requests until processed by executor
Browse files Browse the repository at this point in the history
Motivation:

With ingest queues servicing large
numbers of requests, as we have
for performance reasons configured as
defaults on QoS and Bulk, there is a
potential for a race between the
processing of the original modify
request and a subsequent cancellation
request, such that cancellation
may not find the request as it
is still in the executor queue.

Modification:

On both the engine and the verifier,
we cache the request pnfsid upon
arrival and remove it when processed
on a worker thread.

Should it be cancelled and still
be awaiting processing, it will
simply be eliminated from the
cache; otherwise, the full
cancellation proceeds as usual.

Result:

We do not have a trailing stream
of requests still being processed
after cancellation (from Bulk)
completes.

Target: master
Request: 9.2
Patch: https://rb.dcache.org/r/14166/
Requires-notes: yes
Acked-by: Tigran
  • Loading branch information
alrossi authored and mksahakyan committed Nov 20, 2023
1 parent 3a255bb commit dee8cbc
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 49 deletions.
Expand Up @@ -85,6 +85,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -141,6 +142,11 @@ public final class FileQoSStatusHandler implements CellInfoProvider,

private final AtomicLong handledExpired = new AtomicLong(0L);

/**
* This is just a concurrent implementation of a set, which is how it is used here.
*/
private final Set<PnfsId> modifyRequests = new ConcurrentSkipListSet<>();

private QoSRequirementsListener requirementsListener;
private QoSVerificationListener verificationListener;
private CellStub qosTransitionTopic;
Expand Down Expand Up @@ -228,67 +234,87 @@ public void handleClearCacheLocation(PnfsId pnfsId, String pool) {
public MessageReply<QoSRequirementsModifiedMessage> handleQoSModification(
QoSRequirementsModifiedMessage message) {
counters.increment(QOS_MODIFIED.name());
modifyRequests.add(message.getRequirements().getPnfsId());
MessageReply<QoSRequirementsModifiedMessage> reply = new MessageReply<>();
qosModifyExecutor.submit(() -> {
final FileQoSRequirements requirements = message.getRequirements();
final Subject subject = message.getSubject();
PnfsId pnfsId = requirements.getPnfsId();
Exception exception = null;
try {
LOGGER.debug("handleQoSModification calling fileQoSRequirementsModified for {}.",
pnfsId);
requirementsListener.fileQoSRequirementsModified(requirements, subject);
LOGGER.debug("handleQoSModification calling fileQoSStatusChanged for {}, {}.",
pnfsId, QOS_MODIFIED);
policyStateExecutor.submit(() -> {
FileQoSUpdate update = new FileQoSUpdate(pnfsId, null, QOS_MODIFIED);
update.setSubject(subject);
try {
fileQoSStatusChanged(update);
} catch (QoSException e) {
String error = String.format("could not complete fileQoSStatusChanged "
+ "for %s: %s, cause %s.", update, e.getMessage(),
Throwables.getRootCause(e));
handleActionCompleted(pnfsId, VOID, error);
}
});
if (!modifyRequests.contains(pnfsId)) {
LOGGER.debug("handleQoSModification, for {} was cancelled, "
+ "skipping.", pnfsId);
message.setSucceeded();
} catch (CacheException e) {
exception = e;
message.setFailed(e.getRc(), e);
} catch (QoSException e) {
exception = e;
message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, e);
} catch (NoRouteToCellException e) {
exception = e;
message.setFailed(CacheException.SERVICE_UNAVAILABLE, e);
} catch (InterruptedException e) {
message.setFailed(CacheException.TIMEOUT, e);
exception = e;
} else {
try {
LOGGER.debug(
"handleQoSModification calling fileQoSRequirementsModified for {}.",
pnfsId);
requirementsListener.fileQoSRequirementsModified(requirements, subject);
LOGGER.debug("handleQoSModification calling fileQoSStatusChanged for {}, {}.",
pnfsId, QOS_MODIFIED);
policyStateExecutor.submit(() -> {
if (!modifyRequests.remove(pnfsId)) {
LOGGER.debug("handleQoSModification, for {} was cancelled, "
+ "skipping.", pnfsId);
message.setSucceeded();
} else {
FileQoSUpdate update = new FileQoSUpdate(pnfsId, null, QOS_MODIFIED);
update.setSubject(subject);
try {
fileQoSStatusChanged(update);
message.setSucceeded();
} catch (QoSException e) {
String error = String.format(
"could not complete fileQoSStatusChanged "
+ "for %s: %s, cause %s.", update, e.getMessage(),
Throwables.getRootCause(e));
message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, e);
handleActionCompleted(pnfsId, VOID, error);
}
}
});
} catch (CacheException e) {
exception = e;
message.setFailed(e.getRc(), e);
} catch (QoSException e) {
exception = e;
message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, e);
} catch (NoRouteToCellException e) {
exception = e;
message.setFailed(CacheException.SERVICE_UNAVAILABLE, e);
} catch (InterruptedException e) {
message.setFailed(CacheException.TIMEOUT, e);
exception = e;
}
}

if (exception != null) {
LOGGER.error("Failed to handle QoS requirements for {}: {}.",
requirements.getPnfsId(), exception.getMessage());
handleActionCompleted(pnfsId, VOID, exception.toString());
}

reply.reply(message);
});
return reply;
}

public void handleQoSModificationCancelled(PnfsId pnfsId, Subject subject) {
counters.increment(QOS_MODIFIED_CANCELED.name());
qosModifyExecutor.execute(() -> {
try {
LOGGER.debug(
"handleQoSModificationCancelled notifying verification listener to cancel {}.",
pnfsId);
verificationListener.fileQoSVerificationCancelled(pnfsId, subject);
} catch (QoSException e) {
LOGGER.error("Failed to handle QoS requirements for {}: {}.", pnfsId, e.toString());
}
});
if (!modifyRequests.remove(pnfsId)) {
qosModifyExecutor.execute(() -> {
try {
LOGGER.debug(
"handleQoSModificationCancelled notifying verification listener to cancel {}.",
pnfsId);
verificationListener.fileQoSVerificationCancelled(pnfsId, subject);
} catch (QoSException e) {
LOGGER.error("Failed to handle QoS requirements for {}: {}.", pnfsId,
e.toString());
}
});
}
}

public void handleQoSPolicyInfoRequest(FileQoSPolicyInfoMessage message, MessageReply<Message> reply) {
Expand Down Expand Up @@ -458,17 +484,15 @@ private void fileQoSStatusChanged(FileQoSUpdate update) throws QoSException {
return;
}

FileAttributes attributes = requirements.getAttributes();
switch (messageType) {
case ADD_CACHE_LOCATION:
/*
* provide for lazy update inside namespace
*/
updateQosOnNamespace(pnfsId, attributes);
case QOS_MODIFIED:
FileAttributes attributes = requirements.getAttributes();
try {
if (messageType == ADD_CACHE_LOCATION) {
/*
* provide for lazy update inside namespace
*/
updateQosOnNamespace(pnfsId, attributes);
}
LOGGER.debug(
"fileQoSStatusChanged calling updateQoSTransition for {}, {}.",
pnfsId, messageType);
Expand All @@ -481,7 +505,6 @@ private void fileQoSStatusChanged(FileQoSUpdate update) throws QoSException {
}
break;
case CLEAR_CACHE_LOCATION:
attributes = requirements.getAttributes();
if (attributes.isUndefined(FileAttribute.LOCATIONS) || attributes.getLocations().isEmpty()) {
// empty location here could mean file deletion
engineDao.delete(pnfsId);
Expand Down
Expand Up @@ -477,6 +477,7 @@ public void cancel(PnfsId pnfsId, boolean remove) {
VerifyOperationFilter filter = new VerifyOperationFilter();
filter.setPnfsIds(pnfsId);
cancel(new VerifyOperationCancelFilter(filter, remove));

}

public void cancelFileOpForPool(String pool, boolean onlyParent) {
Expand Down
Expand Up @@ -73,6 +73,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import org.dcache.alarms.AlarmMarkerFactory;
Expand Down Expand Up @@ -168,6 +169,12 @@ private static void sendOutOfSyncAlarm() {
}
}

/**
* Tracks individual modify requests.
* This is just a concurrent implementation of a set, which is how it is used here.
*/
private final Set<PnfsId> modifyRequests = new ConcurrentSkipListSet<>();

/**
* Tracks scan requests and cancellations.
*/
Expand Down Expand Up @@ -268,7 +275,9 @@ public void handleFileOperationsCancelledForPool(String pool) {
*/
public void handleFileOperationCancelled(PnfsId pnfsId) {
counters.incrementReceived(QoSVerifierCounters.VRF_CNCL_MESSAGE);
updateExecutor.submit(() -> manager.cancel(pnfsId, true));
if (!modifyRequests.remove(pnfsId)) {
updateExecutor.submit(() -> manager.cancel(pnfsId, true));
}
}

/**
Expand Down Expand Up @@ -299,6 +308,9 @@ public void handleQoSActionCompleted(PnfsId pnfsId, VerifyOperationState opState
*/
public void handleUpdate(FileQoSUpdate data) {
LOGGER.debug("handleUpdate, update to be registered: {}", data);
if (!modifyRequests.remove(data.getPnfsId())) {
LOGGER.debug("handleUpdate, update has been cancelled: {}", data);
}
if (!manager.createOrUpdateOperation(data)) {
LOGGER.debug("handleUpdate, operation already registered for: {}", data.getPnfsId());
handleVerificationNop(data, false);
Expand Down Expand Up @@ -441,6 +453,8 @@ public void handleVerification(PnfsId pnfsId) {
public void handleVerificationRequest(QoSVerificationRequest request) {
counters.incrementReceived(QoSVerifierCounters.VRF_REQ_MESSAGE);
LOGGER.debug("handleVerificationRequest for {}.", request.getUpdate());
PnfsId pnfsId = request.getUpdate().getPnfsId();
modifyRequests.add(pnfsId);
updateExecutor.submit(() -> handleUpdate(request.getUpdate()));
}

Expand Down

0 comments on commit dee8cbc

Please sign in to comment.