Skip to content

Commit

Permalink
dcache-qos,dcache-bulk: allow qos update to call engine asynchronously
Browse files Browse the repository at this point in the history
Motivation:

During testing of the rule engine extension, it was noticed
that the message queue of the System cell for the QoSEngine
was backing up considerably.  Inspection of the code
revealed a blunder made here:

master@7d898c5601d30c6f8cc027e4080f3b82c2072f4b
`dcache-qos:  propagate subject to QoS Adjuster`

where a get() on the message future was incorrectly added to
the messageArrived method of `QoSRequirementsReceiver`,
nullifying the effect of passing off the work to
a different thread pool.

Modification:

This patch removes the get(), but also adds the ability
to use the remote client asynchronously by making
the receiver return a Reply, and by adding an
async version of the call to the remote client.
The latter is then incorporated into the bulk
service's activity logic.

Result:

The message queue blocking is eliminated and
the bulk service communication with QoS is
more fully asynchronous.

Target: master
Request: 9.1
Request: 9.0
Request: 8.2
Requires-notes: yes (the wrongful message-queue blocking)
Patch: https://rb.dcache.org/r/14067
Acked-by: Lea
  • Loading branch information
alrossi committed Aug 25, 2023
1 parent 4af8e26 commit de47e03
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 20 deletions.
Expand Up @@ -59,10 +59,18 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.services.bulk.activity.plugin.qos;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.Message;
import dmg.cells.nucleus.CellMessageReceiver;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.dcache.qos.data.QoSAction;
import org.dcache.vehicles.qos.QoSTransitionCompletedMessage;

/**
Expand All @@ -83,6 +91,24 @@ public QoSTransitionFuture register(String pnfsid) {
return future;
}

public void setAsyncListener(PnfsId pnfsid, ListenableFuture<Message> asyncFuture) {
asyncFuture.addListener(() -> {
Serializable error;
try {
error = asyncFuture.get().getErrorObject();
} catch (ExecutionException | InterruptedException e) {
error = Throwables.getRootCause(e);
}

/*
* Only report completion on submission error.
*/
if (error != null) {
messageArrived(new QoSTransitionCompletedMessage(pnfsid, QoSAction.VOID, error));
}
}, MoreExecutors.directExecutor());
}

public void messageArrived(QoSTransitionCompletedMessage message) {
QoSTransitionFuture future = waiting.remove(message.getPnfsId().toString());
if (future != null) {
Expand Down
Expand Up @@ -151,7 +151,8 @@ public ListenableFuture<QoSTransitionCompletedMessage> perform(String rid, long
client.setRequirementsService(qosEngine);

try {
client.fileQoSRequirementsModified(requirements, subject);
ListenableFuture asyncFuture = client.fileQoSRequirementsModifiedAsync(requirements, subject);
responseReceiver.setAsyncListener(pnfsId, asyncFuture);
} catch (CacheException | InterruptedException | NoRouteToCellException e) {
return Futures.immediateFailedFuture(e);
}
Expand Down
Expand Up @@ -59,13 +59,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.qos.remote.receivers;

import com.google.common.base.Throwables;
import diskCacheV111.util.CacheException;
import diskCacheV111.vehicles.Message;
import diskCacheV111.vehicles.PnfsAddCacheLocationMessage;
import diskCacheV111.vehicles.PnfsClearCacheLocationMessage;
import dmg.cells.nucleus.CellMessageReceiver;
import java.util.concurrent.ExecutionException;
import dmg.cells.nucleus.Reply;
import java.util.function.Consumer;
import org.dcache.cells.MessageReply;
import org.dcache.qos.services.engine.handler.FileQoSStatusHandler;
Expand Down Expand Up @@ -154,23 +153,16 @@ public MessageReply<QoSRequirementsRequestMessage> messageArrived(
return reply;
}

/**
* Made this synchronous to support Frontend request.
*/
public QoSRequirementsModifiedMessage messageArrived(QoSRequirementsModifiedMessage message) {
public MessageReply<QoSRequirementsModifiedMessage> messageArrived(QoSRequirementsModifiedMessage message) {
MessageReply<QoSRequirementsModifiedMessage> reply = new MessageReply<>();
if (messageGuard.getStatus("QoSRequirementsModifiedMessage", message)
== Status.DISABLED) {
message.setFailed(CacheException.SERVICE_UNAVAILABLE, "messages disabled");
return message;
}
try {
fileStatusHandler.handleQoSModification(message).get();
} catch (InterruptedException e) {
message.setFailed(CacheException.TIMEOUT, e);
} catch (ExecutionException e) {
message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, Throwables.getRootCause(e));
reply.reply(message);
return reply;
}
return message;

return fileStatusHandler.handleQoSModification(message);
}

public void messageArrived(QoSCancelRequirementsModifiedMessage message) {
Expand Down
Expand Up @@ -74,7 +74,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.security.auth.Subject;
import org.dcache.cells.CellStub;
import org.dcache.cells.MessageReply;
Expand Down Expand Up @@ -161,13 +160,14 @@ public void handleClearCacheLocation(PnfsId pnfsId, String pool) {
});
}

public Future<QoSRequirementsModifiedMessage> handleQoSModification(
public MessageReply<QoSRequirementsModifiedMessage> handleQoSModification(
QoSRequirementsModifiedMessage message) {
counters.increment(QOS_MODIFIED.name());
final FileQoSRequirements requirements = message.getRequirements();
final Subject subject = message.getSubject();
PnfsId pnfsId = requirements.getPnfsId();
return executor.submit(() -> {
MessageReply<QoSRequirementsModifiedMessage> reply = new MessageReply<>();
executor.submit(() -> {
Exception exception = null;
try {
LOGGER.debug("handleQoSModification calling fileQoSRequirementsModified for {}.",
Expand Down Expand Up @@ -198,7 +198,10 @@ public Future<QoSRequirementsModifiedMessage> handleQoSModification(
requirements.getPnfsId(), exception.getMessage());
handleActionCompleted(pnfsId, VOID, exception.toString());
}
}, message);
reply.reply(message);
});

return reply;
}

public void handleQoSModificationCancelled(PnfsId pnfsId, Subject subject) {
Expand Down
Expand Up @@ -80,6 +80,7 @@ public interface QoSRequirementsListener {

/**
* A client sends this when it wishes to change a file's QoS requirements.
* This call is assumed to block.
*
* @param newRequirements describing principally how many peristent disk and tape copies are
* required.
Expand Down
Expand Up @@ -127,6 +127,18 @@ public void fileQoSRequirementsModified(FileQoSRequirements newRequirements, Sub
}
}

/**
* An async version which can be used in the context of bulk processing.
*
* @param newRequirements describing principally how many persistent disk and tape copies are
* required.
*/
public ListenableFuture<QoSRequirementsModifiedMessage>
fileQoSRequirementsModifiedAsync(FileQoSRequirements newRequirements, Subject subject)
throws CacheException, NoRouteToCellException, InterruptedException {
return requirementsService.send(new QoSRequirementsModifiedMessage(newRequirements, subject));
}

@Override
public void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid, Subject subject) throws QoSException {
/*
Expand Down

0 comments on commit de47e03

Please sign in to comment.