diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/QoSResponseReceiver.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/QoSResponseReceiver.java index 4c491977147..ff62f2ec83e 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/QoSResponseReceiver.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/QoSResponseReceiver.java @@ -59,10 +59,18 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ package org.dcache.services.bulk.activity.plugin.qos; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import diskCacheV111.util.PnfsId; +import diskCacheV111.vehicles.Message; import dmg.cells.nucleus.CellMessageReceiver; +import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.dcache.qos.data.QoSAction; import org.dcache.vehicles.qos.QoSTransitionCompletedMessage; /** @@ -83,6 +91,24 @@ public QoSTransitionFuture register(String pnfsid) { return future; } + public void setAsyncListener(PnfsId pnfsid, ListenableFuture asyncFuture) { + asyncFuture.addListener(() -> { + Serializable error; + try { + error = asyncFuture.get().getErrorObject(); + } catch (ExecutionException | InterruptedException e) { + error = Throwables.getRootCause(e); + } + + /* + * Only report completion on submission error. + */ + if (error != null) { + messageArrived(new QoSTransitionCompletedMessage(pnfsid, QoSAction.VOID, error)); + } + }, MoreExecutors.directExecutor()); + } + public void messageArrived(QoSTransitionCompletedMessage message) { QoSTransitionFuture future = waiting.remove(message.getPnfsId().toString()); if (future != null) { diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java index 8ade2470bce..88434e694c8 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java @@ -151,7 +151,8 @@ public ListenableFuture perform(String rid, long client.setRequirementsService(qosEngine); try { - client.fileQoSRequirementsModified(requirements, subject); + ListenableFuture asyncFuture = client.fileQoSRequirementsModifiedAsync(requirements, subject); + responseReceiver.setAsyncListener(pnfsId, asyncFuture); } catch (CacheException | InterruptedException | NoRouteToCellException e) { return Futures.immediateFailedFuture(e); } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/remote/receivers/QoSRequirementsReceiver.java b/modules/dcache-qos/src/main/java/org/dcache/qos/remote/receivers/QoSRequirementsReceiver.java index a1f2ae335c1..f32b446b7da 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/remote/receivers/QoSRequirementsReceiver.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/remote/receivers/QoSRequirementsReceiver.java @@ -59,13 +59,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ package org.dcache.qos.remote.receivers; -import com.google.common.base.Throwables; import diskCacheV111.util.CacheException; import diskCacheV111.vehicles.Message; import diskCacheV111.vehicles.PnfsAddCacheLocationMessage; import diskCacheV111.vehicles.PnfsClearCacheLocationMessage; import dmg.cells.nucleus.CellMessageReceiver; -import java.util.concurrent.ExecutionException; +import dmg.cells.nucleus.Reply; import java.util.function.Consumer; import org.dcache.cells.MessageReply; import org.dcache.qos.services.engine.handler.FileQoSStatusHandler; @@ -154,23 +153,16 @@ public MessageReply messageArrived( return reply; } - /** - * Made this synchronous to support Frontend request. - */ - public QoSRequirementsModifiedMessage messageArrived(QoSRequirementsModifiedMessage message) { + public MessageReply messageArrived(QoSRequirementsModifiedMessage message) { + MessageReply reply = new MessageReply<>(); if (messageGuard.getStatus("QoSRequirementsModifiedMessage", message) == Status.DISABLED) { message.setFailed(CacheException.SERVICE_UNAVAILABLE, "messages disabled"); - return message; - } - try { - fileStatusHandler.handleQoSModification(message).get(); - } catch (InterruptedException e) { - message.setFailed(CacheException.TIMEOUT, e); - } catch (ExecutionException e) { - message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, Throwables.getRootCause(e)); + reply.reply(message); + return reply; } - return message; + + return fileStatusHandler.handleQoSModification(message); } public void messageArrived(QoSCancelRequirementsModifiedMessage message) { diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java index 0ccb7ae77a4..49cb6e8620b 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java @@ -74,7 +74,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.io.PrintWriter; import java.io.Serializable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import javax.security.auth.Subject; import org.dcache.cells.CellStub; import org.dcache.cells.MessageReply; @@ -161,13 +160,14 @@ public void handleClearCacheLocation(PnfsId pnfsId, String pool) { }); } - public Future handleQoSModification( + public MessageReply handleQoSModification( QoSRequirementsModifiedMessage message) { counters.increment(QOS_MODIFIED.name()); final FileQoSRequirements requirements = message.getRequirements(); final Subject subject = message.getSubject(); PnfsId pnfsId = requirements.getPnfsId(); - return executor.submit(() -> { + MessageReply reply = new MessageReply<>(); + executor.submit(() -> { Exception exception = null; try { LOGGER.debug("handleQoSModification calling fileQoSRequirementsModified for {}.", @@ -198,7 +198,10 @@ public Future handleQoSModification( requirements.getPnfsId(), exception.getMessage()); handleActionCompleted(pnfsId, VOID, exception.toString()); } - }, message); + reply.reply(message); + }); + + return reply; } public void handleQoSModificationCancelled(PnfsId pnfsId, Subject subject) { diff --git a/modules/dcache/src/main/java/org/dcache/qos/listeners/QoSRequirementsListener.java b/modules/dcache/src/main/java/org/dcache/qos/listeners/QoSRequirementsListener.java index c6dfcc5c7e2..3ae90bc9b49 100644 --- a/modules/dcache/src/main/java/org/dcache/qos/listeners/QoSRequirementsListener.java +++ b/modules/dcache/src/main/java/org/dcache/qos/listeners/QoSRequirementsListener.java @@ -80,6 +80,7 @@ public interface QoSRequirementsListener { /** * A client sends this when it wishes to change a file's QoS requirements. + * This call is assumed to block. * * @param newRequirements describing principally how many peristent disk and tape copies are * required. diff --git a/modules/dcache/src/main/java/org/dcache/qos/remote/clients/RemoteQoSRequirementsClient.java b/modules/dcache/src/main/java/org/dcache/qos/remote/clients/RemoteQoSRequirementsClient.java index 00e8e5e149d..c0eb06c060e 100644 --- a/modules/dcache/src/main/java/org/dcache/qos/remote/clients/RemoteQoSRequirementsClient.java +++ b/modules/dcache/src/main/java/org/dcache/qos/remote/clients/RemoteQoSRequirementsClient.java @@ -127,6 +127,18 @@ public void fileQoSRequirementsModified(FileQoSRequirements newRequirements, Sub } } + /** + * An async version which can be used in the context of bulk processing. + * + * @param newRequirements describing principally how many persistent disk and tape copies are + * required. + */ + public ListenableFuture + fileQoSRequirementsModifiedAsync(FileQoSRequirements newRequirements, Subject subject) + throws CacheException, NoRouteToCellException, InterruptedException { + return requirementsService.send(new QoSRequirementsModifiedMessage(newRequirements, subject)); + } + @Override public void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid, Subject subject) throws QoSException { /*