From 7d898c5601d30c6f8cc027e4080f3b82c2072f4b Mon Sep 17 00:00:00 2001 From: Albert Rossi Date: Wed, 12 Apr 2023 03:31:42 -0500 Subject: [PATCH] dcache-qos: propagate subject to QoS Adjuster Motivation: Most of the QoS engine was brought over/adpated from Resilience; in the context of the latter, it was not necessary to propagate the Subject of the request in the various internal messages. With QoS, there is an added function allowing for QoS modification; the subject is needed in this context. Modification: Add all the hooks to propagate the subject downstream to the actual adjustement tasks. Also, allow the Engine to determine what to do with a QoSModify request on the basis of subject and file attributes. Result: Proper handling of the QoSModify requests. Target: master Request: 9.0 Request: 8.2 Requires-notes: yes --- .../services/bulk/activity/BulkActivity.java | 2 +- .../plugin/qos/UpdateQoSActivity.java | 4 +- .../resources/namespace/FileResources.java | 7 +- .../listeners/QoSVerificationListener.java | 4 +- .../clients/LocalQoSRequirementsClient.java | 10 ++- .../clients/LocalQoSVerificationClient.java | 3 +- .../clients/RemoteQoSVerificationClient.java | 7 +- .../receivers/QoSRequirementsReceiver.java | 17 +++- .../adjuster/adjusters/QoSAdjuster.java | 24 ++++++ .../adjusters/ReplicaStateAdjuster.java | 1 + .../adjuster/adjusters/StagingAdjuster.java | 2 + .../adjuster/util/QoSAdjusterTask.java | 11 ++- .../engine/handler/FileQoSStatusHandler.java | 44 +++++++--- .../provider/ALRPStorageUnitQoSProvider.java | 17 ++-- .../provider/QoSRequirementsProvider.java | 5 +- .../verifier/data/VerifyOperation.java | 11 +++ .../data/VerifyOperationDaoDelegate.java | 14 ++++ .../verifier/data/db/JdbcOperationUpdate.java | 8 ++ .../data/db/JdbcVerifyOperationDao.java | 40 ++++++++- .../verifier/data/db/VerifyOperationDao.java | 7 +- .../handlers/VerifyOperationHandler.java | 1 + .../dcache/qos/util/QoSPermissionUtils.java | 83 +++++++++++++++++++ .../qos/vehicles/QoSAdjustmentRequest.java | 12 ++- .../org/dcache/qos/model/db.changelog-8.0.xml | 11 +++ .../org/dcache/qos/data/FileQoSUpdate.java | 11 +++ .../QoSCancelRequirementsModifiedMessage.java | 4 +- .../qos/QoSRequirementsModifiedMessage.java | 4 +- .../listeners/QoSRequirementsListener.java | 5 +- .../clients/RemoteQoSRequirementsClient.java | 20 ++++- 29 files changed, 340 insertions(+), 49 deletions(-) create mode 100644 modules/dcache-qos/src/main/java/org/dcache/qos/util/QoSPermissionUtils.java diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java index e2cd997a704..7b99a07bafb 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java @@ -93,7 +93,7 @@ public enum TargetType { public static final Set MINIMALLY_REQUIRED_ATTRIBUTES = Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE, - FileAttribute.RETENTION_POLICY)); + FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.RETENTION_POLICY)); private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java index 364d6dc00d7..8ade2470bce 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java @@ -111,7 +111,7 @@ public synchronized void cancel(BulkRequestTarget target) { client.setRequirementsService(qosEngine); PnfsId pnfsId = target.getAttributes().getPnfsId(); try { - client.fileQoSRequirementsModifiedCancelled(pnfsId); + client.fileQoSRequirementsModifiedCancelled(pnfsId, subject); } catch (QoSException e) { LOGGER.error("fileQoSRequirementsModifiedCancelled failed: {}, {}.", pnfsId, e.getMessage()); @@ -151,7 +151,7 @@ public ListenableFuture perform(String rid, long client.setRequirementsService(qosEngine); try { - client.fileQoSRequirementsModified(requirements); + client.fileQoSRequirementsModified(requirements, subject); } catch (CacheException | InterruptedException | NoRouteToCellException e) { return Futures.immediateFailedFuture(e); } diff --git a/modules/dcache-frontend/src/main/java/org/dcache/restful/resources/namespace/FileResources.java b/modules/dcache-frontend/src/main/java/org/dcache/restful/resources/namespace/FileResources.java index 13e202261f5..f7aaed69a17 100644 --- a/modules/dcache-frontend/src/main/java/org/dcache/restful/resources/namespace/FileResources.java +++ b/modules/dcache-frontend/src/main/java/org/dcache/restful/resources/namespace/FileResources.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.inject.Named; +import javax.security.auth.Subject; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.BadRequestException; import javax.ws.rs.Consumes; @@ -66,6 +67,7 @@ import org.dcache.pinmanager.PinManagerPinMessage; import org.dcache.pinmanager.PinManagerUnpinMessage; import org.dcache.poolmanager.PoolMonitor; +import org.dcache.qos.QoSException; import org.dcache.qos.QoSTransitionEngine; import org.dcache.qos.data.FileQoSRequirements; import org.dcache.qos.remote.clients.RemoteQoSRequirementsClient; @@ -444,6 +446,7 @@ public Response cmrResources( break; case "qos": String targetQos = reqPayload.getString("target"); + Subject subject = RequestUser.isAdmin() ? Subjects.ROOT : RequestUser.getSubject(); if (!useQosService) { new QoSTransitionEngine(poolmanager, poolMonitor, @@ -458,11 +461,11 @@ public Response cmrResources( FileAttributes attr = pnfsHandler.getFileAttributes(path.toString(), NamespaceUtils.getRequestedAttributes(false, false, - true, false, false)); + true, false, true)); FileQoSRequirements requirements = getBasicRequirements(targetQos, attr); RemoteQoSRequirementsClient client = new RemoteQoSRequirementsClient(); client.setRequirementsService(qosEngine); - client.fileQoSRequirementsModified(requirements); + client.fileQoSRequirementsModified(requirements, subject); } break; case "pin": diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/listeners/QoSVerificationListener.java b/modules/dcache-qos/src/main/java/org/dcache/qos/listeners/QoSVerificationListener.java index df51996579f..2593faad61b 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/listeners/QoSVerificationListener.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/listeners/QoSVerificationListener.java @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.qos.listeners; import diskCacheV111.util.PnfsId; +import javax.security.auth.Subject; import org.dcache.qos.QoSException; import org.dcache.qos.vehicles.QoSAdjustmentResponse; import org.dcache.qos.vehicles.QoSScannerVerificationRequest; @@ -95,8 +96,9 @@ void fileQoSVerificationRequested(QoSScannerVerificationRequest verificationRequ * This is a notification to cancel and remove the verification operation for a file. * * @param pnfsId the file for which to cancel verification operation. + * @param subject of the request */ - void fileQoSVerificationCancelled(PnfsId pnfsId) throws QoSException; + void fileQoSVerificationCancelled(PnfsId pnfsId, Subject subject) throws QoSException; /** * Scanning activity has been cancelled; this is a notification to cancel all outstanding diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/local/clients/LocalQoSRequirementsClient.java b/modules/dcache-qos/src/main/java/org/dcache/qos/local/clients/LocalQoSRequirementsClient.java index 8960601a41a..02ac1ee87ea 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/local/clients/LocalQoSRequirementsClient.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/local/clients/LocalQoSRequirementsClient.java @@ -59,7 +59,9 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ package org.dcache.qos.local.clients; +import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; +import javax.security.auth.Subject; import org.dcache.qos.QoSException; import org.dcache.qos.data.FileQoSRequirements; import org.dcache.qos.data.FileQoSUpdate; @@ -80,13 +82,13 @@ public FileQoSRequirements fileQoSRequirementsRequested(FileQoSUpdate update) } @Override - public void fileQoSRequirementsModified(FileQoSRequirements newRequirements) - throws QoSException { - provider.handleModifiedRequirements(newRequirements); + public void fileQoSRequirementsModified(FileQoSRequirements newRequirements, Subject subject) + throws QoSException, CacheException { + provider.handleModifiedRequirements(newRequirements, subject); } @Override - public void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid) { + public void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid, Subject subject) { /* * The local client is an embedded one and thus would not be used to * dispatch a cancellation request. This is a NOP. diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/local/clients/LocalQoSVerificationClient.java b/modules/dcache-qos/src/main/java/org/dcache/qos/local/clients/LocalQoSVerificationClient.java index c5fb4871b73..a082d006a2a 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/local/clients/LocalQoSVerificationClient.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/local/clients/LocalQoSVerificationClient.java @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.qos.local.clients; import diskCacheV111.util.PnfsId; +import javax.security.auth.Subject; import org.dcache.qos.QoSException; import org.dcache.qos.listeners.QoSVerificationListener; import org.dcache.qos.services.verifier.handlers.VerifyOperationHandler; @@ -91,7 +92,7 @@ public void fileQoSAdjustmentCompleted(QoSAdjustmentResponse response) { } @Override - public void fileQoSVerificationCancelled(PnfsId pnfsId) throws QoSException { + public void fileQoSVerificationCancelled(PnfsId pnfsId, Subject subject) throws QoSException { fileOpHandler.handleFileOperationCancelled(pnfsId); } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/remote/clients/RemoteQoSVerificationClient.java b/modules/dcache-qos/src/main/java/org/dcache/qos/remote/clients/RemoteQoSVerificationClient.java index 1bf12dbd578..a422d67db93 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/remote/clients/RemoteQoSVerificationClient.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/remote/clients/RemoteQoSVerificationClient.java @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.qos.remote.clients; import diskCacheV111.util.PnfsId; +import javax.security.auth.Subject; import org.dcache.cells.CellStub; import org.dcache.qos.QoSException; import org.dcache.qos.listeners.QoSVerificationListener; @@ -97,8 +98,10 @@ public void fileQoSAdjustmentCompleted(QoSAdjustmentResponse adjustmentResponse) } @Override - public void fileQoSVerificationCancelled(PnfsId pnfsId) throws QoSException { - verificationService.send(new QoSVerificationCancelledMessage(pnfsId)); + public void fileQoSVerificationCancelled(PnfsId pnfsId, Subject subject) throws QoSException { + QoSVerificationCancelledMessage msg = new QoSVerificationCancelledMessage(pnfsId); + msg.setSubject(subject); + verificationService.send(msg); } public void fileQoSBatchedVerificationCancelled(String id) { diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/remote/receivers/QoSRequirementsReceiver.java b/modules/dcache-qos/src/main/java/org/dcache/qos/remote/receivers/QoSRequirementsReceiver.java index 76d3eaca3f6..a1f2ae335c1 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/remote/receivers/QoSRequirementsReceiver.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/remote/receivers/QoSRequirementsReceiver.java @@ -59,10 +59,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ package org.dcache.qos.remote.receivers; +import com.google.common.base.Throwables; +import diskCacheV111.util.CacheException; import diskCacheV111.vehicles.Message; import diskCacheV111.vehicles.PnfsAddCacheLocationMessage; import diskCacheV111.vehicles.PnfsClearCacheLocationMessage; import dmg.cells.nucleus.CellMessageReceiver; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.dcache.cells.MessageReply; import org.dcache.qos.services.engine.handler.FileQoSStatusHandler; @@ -151,12 +154,22 @@ public MessageReply messageArrived( return reply; } + /** + * Made this synchronous to support Frontend request. + */ public QoSRequirementsModifiedMessage messageArrived(QoSRequirementsModifiedMessage message) { if (messageGuard.getStatus("QoSRequirementsModifiedMessage", message) == Status.DISABLED) { + message.setFailed(CacheException.SERVICE_UNAVAILABLE, "messages disabled"); return message; } - fileStatusHandler.handleQoSModification(message.getRequirements()); + try { + fileStatusHandler.handleQoSModification(message).get(); + } catch (InterruptedException e) { + message.setFailed(CacheException.TIMEOUT, e); + } catch (ExecutionException e) { + message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, Throwables.getRootCause(e)); + } return message; } @@ -165,7 +178,7 @@ public void messageArrived(QoSCancelRequirementsModifiedMessage message) { == Status.DISABLED) { return; } - fileStatusHandler.handleQoSModificationCancelled(message.getPnfsId()); + fileStatusHandler.handleQoSModificationCancelled(message.getPnfsId(), message.getSubject()); } public void messageArrived(QoSActionCompleteMessage message) { diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/QoSAdjuster.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/QoSAdjuster.java index 1b7e1a5d85c..42d6b3e8164 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/QoSAdjuster.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/QoSAdjuster.java @@ -59,8 +59,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ package org.dcache.qos.services.adjuster.adjusters; +import static org.dcache.qos.util.QoSPermissionUtils.canModifyQos; + import com.google.common.collect.ImmutableList; +import diskCacheV111.util.PermissionDeniedCacheException; import diskCacheV111.util.PnfsId; +import java.util.Optional; +import javax.security.auth.Subject; import org.dcache.pool.classic.Cancellable; import org.dcache.pool.repository.StickyRecord; import org.dcache.qos.data.QoSAction; @@ -85,12 +90,24 @@ public abstract class QoSAdjuster implements Cancellable { protected PnfsId pnfsId; protected FileAttributes attributes; protected QoSAction action; + protected Subject subject ; protected QoSAdjustTaskCompletionHandler completionHandler; public void adjustQoS(QoSAdjusterTask task) { pnfsId = task.getPnfsId(); action = task.getAction(); attributes = task.getAttributes(); + subject = task.getSubject(); + + try { + checkPermissions(); + } catch (PermissionDeniedCacheException e) { + /* + * handler is injected by factory build + */ + completionHandler.taskFailed(pnfsId, Optional.empty(), e); + return; + } /* * Generate the SESSION ID. This is used by the QoS status endpoint @@ -104,4 +121,11 @@ public void adjustQoS(QoSAdjusterTask task) { } protected abstract void runAdjuster(QoSAdjusterTask task); + + private void checkPermissions() throws PermissionDeniedCacheException { + if (!canModifyQos(subject, attributes)) { + throw new PermissionDeniedCacheException(String.format("subject does not have " + + "permission for %s on %s.", action, pnfsId)); + } + } } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/ReplicaStateAdjuster.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/ReplicaStateAdjuster.java index c76a25d023f..c5b366f4ed0 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/ReplicaStateAdjuster.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/ReplicaStateAdjuster.java @@ -132,6 +132,7 @@ private synchronized void sendMessageToRepository() { LOGGER.debug("Sending {} message to {} for {}.", action, target, pnfsId); ACTIVITY_LOGGER.info("Sending {} message to {} for {}.", action, target, pnfsId); + msg.setSubject(subject); future = pools.send(new CellPath(target), msg); } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/StagingAdjuster.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/StagingAdjuster.java index ec0bf1de5d8..3403b553bdf 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/StagingAdjuster.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/adjusters/StagingAdjuster.java @@ -174,6 +174,7 @@ private synchronized void handleStaging() { getProtocolInfo(), QOS_PIN_REQUEST_ID, QOS_PIN_TEMP_LIFETIME); + message.setSubject(subject); future = pinManager.send(message, Long.MAX_VALUE); future.addListener(this::handleCompletion, executorService); LOGGER.debug("handleStaging, sent pin manager request for {}.", pnfsId); @@ -192,6 +193,7 @@ private void cancelPin() { LOGGER.debug("handleStaging, cancelling pin {}.", pnfsId); ACTIVITY_LOGGER.info("handleStaging, cancelling pin {}", pnfsId); PinManagerUnpinMessage message = new PinManagerUnpinMessage(pnfsId); + message.setSubject(subject); pinManager.send(message, Long.MAX_VALUE); LOGGER.debug("handleStaging, sent pin manager request to unpin {}.", pnfsId); } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/util/QoSAdjusterTask.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/util/QoSAdjusterTask.java index ca9a7996c49..9db5a55977b 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/util/QoSAdjusterTask.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/adjuster/util/QoSAdjusterTask.java @@ -64,6 +64,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Optional; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import javax.security.auth.Subject; import org.dcache.pool.classic.Cancellable; import org.dcache.pool.migration.PoolMigrationCopyFinishedMessage; import org.dcache.qos.data.FileQoSUpdate; @@ -86,6 +87,7 @@ public final class QoSAdjusterTask extends ErrorAwareTask implements Cancellable private final PnfsId pnfsId; private final QoSAction type; + private final Subject subject; private final int retry; private final QoSAdjusterFactory factory; private final FileAttributes attributes; @@ -120,13 +122,13 @@ public QoSAdjusterTask(QoSAdjustmentRequest request, QoSAdjusterFactory factory) this.source = request.getSource(); this.target = request.getTarget(); this.poolGroup = request.getPoolGroup(); + this.subject = request.getSubject(); this.status = Status.INITIALIZED; } public QoSAdjusterTask(QoSAdjusterTask task, int retry) { this.pnfsId = task.pnfsId; this.type = task.type; - ; this.retry = retry; this.factory = task.factory; this.attributes = task.attributes; @@ -135,6 +137,7 @@ public QoSAdjusterTask(QoSAdjusterTask task, int retry) { this.target = task.target; this.poolGroup = task.poolGroup; this.status = task.status; + this.subject = task.subject; } @Override @@ -142,7 +145,6 @@ public void run() { synchronized (this) { status = Status.RUNNING; exception = null; - adjuster = factory.newBuilder().of(type).build(); startTime = System.currentTimeMillis(); } @@ -154,6 +156,7 @@ public void run() { if (isCancelled()) { break; } + adjuster = factory.newBuilder().of(type).build(); adjuster.adjustQoS(this); break; } @@ -221,6 +224,10 @@ public String getSource() { return source; } + public Subject getSubject() { + return subject; + } + public String getTarget() { return target; } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java index 36d3b4783e6..0ccb7ae77a4 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/handler/FileQoSStatusHandler.java @@ -74,6 +74,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.io.PrintWriter; import java.io.Serializable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import javax.security.auth.Subject; import org.dcache.cells.CellStub; import org.dcache.cells.MessageReply; import org.dcache.qos.QoSException; @@ -85,6 +87,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.qos.listeners.QoSVerificationListener; import org.dcache.qos.services.engine.util.QoSEngineCounters; import org.dcache.qos.vehicles.QoSVerificationRequest; +import org.dcache.vehicles.qos.QoSRequirementsModifiedMessage; import org.dcache.vehicles.qos.QoSRequirementsRequestMessage; import org.dcache.vehicles.qos.QoSTransitionCompletedMessage; import org.slf4j.Logger; @@ -158,33 +161,54 @@ public void handleClearCacheLocation(PnfsId pnfsId, String pool) { }); } - public void handleQoSModification(FileQoSRequirements requirements) { + public Future handleQoSModification( + QoSRequirementsModifiedMessage message) { counters.increment(QOS_MODIFIED.name()); + final FileQoSRequirements requirements = message.getRequirements(); + final Subject subject = message.getSubject(); PnfsId pnfsId = requirements.getPnfsId(); - executor.execute(() -> { + return executor.submit(() -> { + Exception exception = null; try { LOGGER.debug("handleQoSModification calling fileQoSRequirementsModified for {}.", pnfsId); - requirementsListener.fileQoSRequirementsModified(requirements); + requirementsListener.fileQoSRequirementsModified(requirements, subject); LOGGER.debug("handleQoSModification calling fileQoSStatusChanged for {}, {}.", pnfsId, QOS_MODIFIED); - fileQoSStatusChanged(new FileQoSUpdate(pnfsId, null, QOS_MODIFIED)); - } catch (QoSException | CacheException | NoRouteToCellException | InterruptedException e) { + FileQoSUpdate update = new FileQoSUpdate(pnfsId, null, QOS_MODIFIED); + update.setSubject(subject); + fileQoSStatusChanged(update); + message.setSucceeded(); + } catch (CacheException e) { + exception = e; + message.setFailed(e.getRc(), e); + } catch (QoSException e) { + exception = e; + message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, e); + } catch (NoRouteToCellException e) { + exception = e; + message.setFailed(CacheException.SERVICE_UNAVAILABLE, e); + } catch (InterruptedException e) { + message.setFailed(CacheException.TIMEOUT, e); + exception = e; + } + + if (exception != null) { LOGGER.error("Failed to handle QoS requirements for {}: {}.", - requirements.getPnfsId(), e.getMessage()); - handleActionCompleted(pnfsId, VOID, e.toString()); + requirements.getPnfsId(), exception.getMessage()); + handleActionCompleted(pnfsId, VOID, exception.toString()); } - }); + }, message); } - public void handleQoSModificationCancelled(PnfsId pnfsId) { + public void handleQoSModificationCancelled(PnfsId pnfsId, Subject subject) { counters.increment(QOS_MODIFIED_CANCELED.name()); executor.execute(() -> { try { LOGGER.debug( "handleQoSModificationCancelled notifying verification listener to cancel {}.", pnfsId); - verificationListener.fileQoSVerificationCancelled(pnfsId); + verificationListener.fileQoSVerificationCancelled(pnfsId, subject); } catch (QoSException e) { LOGGER.error("Failed to handle QoS requirements for {}: {}.", pnfsId, e.toString()); } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/provider/ALRPStorageUnitQoSProvider.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/provider/ALRPStorageUnitQoSProvider.java index 59ebfbd9c66..d3c91d20c47 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/provider/ALRPStorageUnitQoSProvider.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/provider/ALRPStorageUnitQoSProvider.java @@ -67,12 +67,14 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import static org.dcache.qos.data.QoSMessageType.QOS_MODIFIED; import static org.dcache.qos.data.QoSMessageType.SYSTEM_SCAN; import static org.dcache.qos.data.QoSMessageType.VALIDATE_ONLY; +import static org.dcache.qos.util.QoSPermissionUtils.canModifyQos; import com.google.common.annotations.VisibleForTesting; import diskCacheV111.poolManager.PoolSelectionUnit; import diskCacheV111.poolManager.StorageUnit; import diskCacheV111.util.AccessLatency; import diskCacheV111.util.CacheException; +import diskCacheV111.util.PermissionDeniedCacheException; import diskCacheV111.util.PnfsHandler; import diskCacheV111.util.PnfsId; import diskCacheV111.util.RetentionPolicy; @@ -84,6 +86,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.HashSet; import java.util.List; import java.util.Set; +import javax.security.auth.Subject; import org.dcache.auth.Subjects; import org.dcache.auth.attributes.Restrictions; import org.dcache.cells.CellStub; @@ -114,6 +117,8 @@ public class ALRPStorageUnitQoSProvider implements QoSRequirementsProvider, Cell public static final Set REQUIRED_QOS_ATTRIBUTES = Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, + FileAttribute.OWNER, + FileAttribute.OWNER_GROUP, FileAttribute.ACCESS_LATENCY, FileAttribute.RETENTION_POLICY, FileAttribute.STORAGEINFO, @@ -198,8 +203,8 @@ public FileQoSRequirements fetchRequirements(FileQoSUpdate update) throws QoSExc * REVISIT For now, we do not handle changes to number or partitioning of copies. */ @Override - public void handleModifiedRequirements(FileQoSRequirements newRequirements) - throws QoSException { + public void handleModifiedRequirements(FileQoSRequirements newRequirements, Subject subject) + throws CacheException, QoSException { PnfsId pnfsId = newRequirements.getPnfsId(); LOGGER.debug("handleModifiedRequirements for {}.", pnfsId); @@ -233,11 +238,11 @@ public void handleModifiedRequirements(FileQoSRequirements newRequirements) modifiedAttributes.setRetentionPolicy(REPLICA); } - try { + if (canModifyQos(subject, currentAttributes)) { pnfsHandler().setFileAttributes(pnfsId, modifiedAttributes); - } catch (CacheException e) { - throw new QoSException("Failed to set attributes for " + newRequirements.getPnfsId(), - e); + } else { + throw new PermissionDeniedCacheException("User does not have permissions to set " + + "attributes for " + newRequirements.getPnfsId()); } } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/provider/QoSRequirementsProvider.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/provider/QoSRequirementsProvider.java index 26e2c2adf64..ace1f43d345 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/provider/QoSRequirementsProvider.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/engine/provider/QoSRequirementsProvider.java @@ -59,6 +59,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ package org.dcache.qos.services.engine.provider; +import diskCacheV111.util.CacheException; +import javax.security.auth.Subject; import org.dcache.qos.QoSException; import org.dcache.qos.data.FileQoSRequirements; import org.dcache.qos.data.FileQoSUpdate; @@ -80,6 +82,7 @@ public interface QoSRequirementsProvider { * * @param newRequirements in particular the number and distribution of persistent disk and tape * replicas. + * @param subject subject of the request. */ - void handleModifiedRequirements(FileQoSRequirements newRequirements) throws QoSException; + void handleModifiedRequirements(FileQoSRequirements newRequirements, Subject subject) throws QoSException, CacheException; } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperation.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperation.java index 7b3782cc416..5fe81b90bfb 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperation.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperation.java @@ -73,6 +73,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Collection; import java.util.HashSet; import java.util.Set; +import javax.security.auth.Subject; +import org.dcache.auth.Subjects; import org.dcache.qos.QoSException; import org.dcache.qos.data.FileQoSUpdate; import org.dcache.qos.data.QoSAction; @@ -103,6 +105,7 @@ public final class VerifyOperation implements Comparable { private VerifyOperationState state; private QoSAction previousAction; private QoSAction action; + private Subject subject; private String poolGroup; private String storageUnit; @@ -240,6 +243,10 @@ public String getStorageUnit() { return storageUnit; } + public Subject getSubject() { + return subject == null ? Subjects.ROOT : subject; + } + public String getTarget() { return target; } @@ -374,6 +381,10 @@ public void setStorageUnit(String storageUnit) { this.storageUnit = storageUnit; } + public void setSubject(Subject subject) { + this.subject = subject; + } + public void setTarget(String target) { this.target = target; } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDaoDelegate.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDaoDelegate.java index 1942eae43e7..32e8097ac16 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDaoDelegate.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDaoDelegate.java @@ -92,6 +92,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import javax.annotation.concurrent.GuardedBy; +import org.dcache.qos.QoSException; import org.dcache.qos.data.FileQoSUpdate; import org.dcache.qos.data.QoSMessageType; import org.dcache.qos.services.verifier.data.db.VerifyOperationDao; @@ -325,6 +326,8 @@ public int count(VerifyOperationFilter filter) { * Operation is written to the store but not to the queues, to preserve * the prioritization by arrival time (since gaps may be created when * the queues are close to max capacity). + * If a duplicate request arrives with a second subject, the original subject + * is not changed. REVISIT */ public boolean createOrUpdateOperation(FileQoSUpdate data) { PnfsId pnfsId = data.getPnfsId(); @@ -354,6 +357,7 @@ public boolean createOrUpdateOperation(FileQoSUpdate data) { operation.setRetried(0); operation.setNeeded(0); operation.setState(READY); + operation.setSubject(data.getSubject()); write.lock(); try { @@ -376,6 +380,10 @@ public boolean createOrUpdateOperation(FileQoSUpdate data) { pnfsId, storageUnit); return false; } + } catch (QoSException e) { + LOGGER.error("createOrUpdateOperation, could not store operation for {}: {}.", pnfsId, + e.toString()); + return false; } finally { write.unlock(); } @@ -527,6 +535,8 @@ public void resetOperation(VerifyOperation operation, boolean retry) { if (retry || operation.getNeededAdjustments() < 2) { enqueueFirst(operation); } + } catch (QoSException e) { + LOGGER.error("resetOperation, could not update reset {}: {}", operation, e.toString()); } finally { write.unlock(); } @@ -617,6 +627,8 @@ public void updateOperation(PnfsId pnfsId, CacheException error) { dao.update(dao.whereUnique().pnfsId(pnfsId), dao.fromOperation(operation)); callback.signal(); } + } catch (QoSException e) { + LOGGER.debug("could not update operation for ({}, {}): {}.", pnfsId, error, e.toString()); } finally { write.unlock(); } @@ -680,6 +692,8 @@ public void voidOperation(VerifyOperation operation) { try { dao.update(dao.whereUnique().pnfsId(pnfsId), dao.fromOperation(operation)); callback.signal(); + } catch (QoSException e) { + LOGGER.error("could not void operation {}: {}.", operation, e.toString()); } finally { write.unlock(); } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/JdbcOperationUpdate.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/JdbcOperationUpdate.java index 189e178ab3a..07b9d7aceed 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/JdbcOperationUpdate.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/JdbcOperationUpdate.java @@ -41,6 +41,14 @@ public VerifyOperationUpdate pnfsid(PnfsId pnfsId) { return this; } + @Override + public VerifyOperationUpdate subject(String subject) { + if (subject != null) { + set("subject", subject); + } + return this; + } + @Override public VerifyOperationUpdate arrivalTime(long epochMillis) { set("arrived", epochMillis); diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/JdbcVerifyOperationDao.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/JdbcVerifyOperationDao.java index 3c22b619a12..c39754d3f36 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/JdbcVerifyOperationDao.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/JdbcVerifyOperationDao.java @@ -20,11 +20,17 @@ import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -33,8 +39,10 @@ import java.util.stream.Stream; import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; +import javax.security.auth.Subject; import org.dcache.db.JdbcCriterion; import org.dcache.db.JdbcUpdate; +import org.dcache.qos.QoSException; import org.dcache.qos.data.QoSAction; import org.dcache.qos.data.QoSMessageType; import org.dcache.qos.services.verifier.data.VerifyOperation; @@ -103,10 +111,32 @@ private static VerifyOperation toOperation(ResultSet rs, int row) throws SQLExce operation.setException(new CacheException(rs.getInt("rc"), error)); } + operation.setSubject(Subject.class.cast(deserialize(rs.getString("subject")))); + LOGGER.debug("toOperation, returning {}.", operation); return operation; } + private static String serialize(Subject subject) throws QoSException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ObjectOutputStream ostream = new ObjectOutputStream(baos)) { + ostream.writeObject(subject); + } catch (IOException e) { + throw new QoSException("problem serializing subject", e); + } + return Base64.getEncoder().encodeToString(baos.toByteArray()); + } + + private static Object deserialize(String base64) throws SQLException { + byte[] array = Base64.getDecoder().decode(base64); + ByteArrayInputStream bais = new ByteArrayInputStream(array); + try (ObjectInputStream istream = new ObjectInputStream(bais)) { + return istream.readObject(); + } catch (IOException | ClassNotFoundException e) { + throw new SQLException("problem deserializing subject", e); + } + } + private static Object[] concatArguments(Collection first, Collection second) { return Stream.concat(first.stream(), second.stream()).toArray(Object[]::new); } @@ -134,7 +164,7 @@ public VerifyOperationUpdate set() { } @Override - public VerifyOperationUpdate fromOperation(VerifyOperation operation) { + public VerifyOperationUpdate fromOperation(VerifyOperation operation) throws QoSException { return set().exception(operation.getException()) .retried(operation.getRetried()) .tried(operation.getTried()) @@ -144,7 +174,8 @@ public VerifyOperationUpdate fromOperation(VerifyOperation operation) { .previous(operation.getPreviousAction()) .needed(operation.getNeededAdjustments()) .source(operation.getSource()) - .target(operation.getTarget()); + .target(operation.getTarget()) + .subject(serialize(operation.getSubject())); } /** @@ -159,7 +190,7 @@ public VerifyOperationUpdate fromOperation(VerifyOperation operation) { * @return true if stored, false if just updated. */ @Override - public boolean store(VerifyOperation operation) { + public boolean store(VerifyOperation operation) throws QoSException { PnfsId pnfsId = operation.getPnfsId(); String storageUnit = operation.getStorageUnit(); VerifyOperationUpdate insert = set().pnfsid(pnfsId) @@ -172,7 +203,8 @@ public boolean store(VerifyOperation operation) { .needed(operation.getNeededAdjustments()) .parent(operation.getParent()) .source(operation.getSource()) - .state(operation.getState()); + .state(operation.getState()) + .subject(serialize(operation.getSubject())); LOGGER.debug("store operation for {}.", operation.getPnfsId()); diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/VerifyOperationDao.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/VerifyOperationDao.java index 46014eaa8a1..fab2b847c64 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/VerifyOperationDao.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/db/VerifyOperationDao.java @@ -25,6 +25,7 @@ import java.util.Map; import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; +import org.dcache.qos.QoSException; import org.dcache.qos.data.QoSAction; import org.dcache.qos.data.QoSMessageType; import org.dcache.qos.services.verifier.data.VerifyOperation; @@ -91,6 +92,8 @@ interface VerifyOperationUpdate { VerifyOperationUpdate pnfsid(PnfsId pnfsId); + VerifyOperationUpdate subject(String subject); + VerifyOperationUpdate arrivalTime(long epochMillis); VerifyOperationUpdate lastUpdate(long epochMillis); @@ -141,12 +144,12 @@ interface VerifyOperationUpdate { * Returns a field value builder based on the fields of the file operation which can change * while being processed. */ - VerifyOperationUpdate fromOperation(VerifyOperation operation); + VerifyOperationUpdate fromOperation(VerifyOperation operation) throws QoSException; /** * Returns true if stored, false if not. */ - boolean store(VerifyOperation operation); + boolean store(VerifyOperation operation) throws QoSException; /** * Returns the VerifyOperations matching a selection criterion with an upper limit on the diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/handlers/VerifyOperationHandler.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/handlers/VerifyOperationHandler.java index 71575a59930..a697a880153 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/handlers/VerifyOperationHandler.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/handlers/VerifyOperationHandler.java @@ -574,6 +574,7 @@ private void handleAdjustment(FileQoSRequirements requirements, request.setPnfsId(requirements.getPnfsId()); request.setAttributes(requirements.getAttributes()); request.setPoolGroup(operation.getPoolGroup()); + request.setSubject(operation.getSubject()); String source = operation.getSource(); diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/util/QoSPermissionUtils.java b/modules/dcache-qos/src/main/java/org/dcache/qos/util/QoSPermissionUtils.java new file mode 100644 index 00000000000..a941a9e8071 --- /dev/null +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/util/QoSPermissionUtils.java @@ -0,0 +1,83 @@ +/* +COPYRIGHT STATUS: +Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and +software are sponsored by the U.S. Department of Energy under Contract No. +DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide +non-exclusive, royalty-free license to publish or reproduce these documents +and software for U.S. Government purposes. All documents and software +available from this server are protected under the U.S. and Foreign +Copyright Laws, and FNAL reserves all rights. + +Distribution of the software available from this server is free of +charge subject to the user following the terms of the Fermitools +Software Legal Information. + +Redistribution and/or modification of the software shall be accompanied +by the Fermitools Software Legal Information (including the copyright +notice). + +The user is asked to feed back problems, benefits, and/or suggestions +about the software to the Fermilab Software Providers. + +Neither the name of Fermilab, the URA, nor the names of the contributors +may be used to endorse or promote products derived from this software +without specific prior written permission. + +DISCLAIMER OF LIABILITY (BSD): + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, +OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Liabilities of the Government: + +This software is provided by URA, independent from its Prime Contract +with the U.S. Department of Energy. URA is acting independently from +the Government and in its own private capacity and is not acting on +behalf of the U.S. Government, nor as its contractor nor its agent. +Correspondingly, it is understood and agreed that the U.S. Government +has no connection to this software and in no manner whatsoever shall +be liable for nor assume any responsibility or obligation for any claim, +cost, or damages arising out of or resulting from the use of the software +available from this server. + +Export Control: + +All documents and software available from this server are subject to U.S. +export control laws. Anyone downloading information from this server is +obligated to secure any necessary Government licenses before exporting +documents or software obtained from this server. + */ +package org.dcache.qos.util; + +import javax.security.auth.Subject; + +import org.dcache.auth.Subjects; +import org.dcache.vehicles.FileAttributes; + +public class QoSPermissionUtils { + + /** + * Determines if the user is allowed to modify qos. + * Currently the user must either be the owner of the file or be ROOT. + * + * @param subject + * @param attributes + */ + public static boolean canModifyQos(Subject subject, FileAttributes attributes) { + return Subjects.isRoot(subject) || Subjects.getUid(subject) == attributes.getOwner(); + } + + private QoSPermissionUtils() { + // static class + } +} diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/vehicles/QoSAdjustmentRequest.java b/modules/dcache-qos/src/main/java/org/dcache/qos/vehicles/QoSAdjustmentRequest.java index 6a25d948806..007d06201c4 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/vehicles/QoSAdjustmentRequest.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/vehicles/QoSAdjustmentRequest.java @@ -62,14 +62,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.PnfsId; import diskCacheV111.vehicles.PoolManagerPoolInformation; import java.io.Serializable; +import javax.security.auth.Subject; import org.dcache.qos.data.QoSAction; import org.dcache.vehicles.FileAttributes; public class QoSAdjustmentRequest implements Serializable { - private static final long serialVersionUID = -1201617117743113180L; + private static final long serialVersionUID = -561618660521542972L; private PnfsId pnfsId; + private Subject subject; private QoSAction action; private FileAttributes attributes; private PoolManagerPoolInformation targetInfo; @@ -132,4 +134,12 @@ public String getPoolGroup() { public void setPoolGroup(String poolGroup) { this.poolGroup = poolGroup; } + + public Subject getSubject() { + return subject; + } + + public void setSubject(Subject subject) { + this.subject = subject; + } } diff --git a/modules/dcache-qos/src/main/resources/org/dcache/qos/model/db.changelog-8.0.xml b/modules/dcache-qos/src/main/resources/org/dcache/qos/model/db.changelog-8.0.xml index 8cc6c5ef58a..fbe3bf2b486 100644 --- a/modules/dcache-qos/src/main/resources/org/dcache/qos/model/db.changelog-8.0.xml +++ b/modules/dcache-qos/src/main/resources/org/dcache/qos/model/db.changelog-8.0.xml @@ -153,4 +153,15 @@ + + + + + + + + + + + diff --git a/modules/dcache-vehicles/src/main/java/org/dcache/qos/data/FileQoSUpdate.java b/modules/dcache-vehicles/src/main/java/org/dcache/qos/data/FileQoSUpdate.java index 94efbaed3c7..97357b23796 100644 --- a/modules/dcache-vehicles/src/main/java/org/dcache/qos/data/FileQoSUpdate.java +++ b/modules/dcache-vehicles/src/main/java/org/dcache/qos/data/FileQoSUpdate.java @@ -64,6 +64,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import javax.security.auth.Subject; +import org.dcache.auth.Subjects; /** * A transient encapsulation of pertinent configuration data regarding a file, synthesized from a @@ -83,6 +85,7 @@ public static String getFormattedDateFromMillis(long millis) { private final PnfsId pnfsId; private final QoSMessageType type; + private Subject subject; private String pool; private String effectivePoolGroup; private String storageUnit; @@ -120,6 +123,10 @@ public PnfsId getPnfsId() { return pnfsId; } + public Subject getSubject() { + return subject == null ? Subjects.ROOT: subject; + } + public QoSMessageType getMessageType() { return type; } @@ -136,6 +143,10 @@ public void setPool(String pool) { this.pool = pool; } + public void setSubject(Subject subject) { + this.subject = subject; + } + public String toString() { return String.format("(%s)(%s)(%s)(%s)(%s)", pnfsId, pool, type, effectivePoolGroup, storageUnit); diff --git a/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/qos/QoSCancelRequirementsModifiedMessage.java b/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/qos/QoSCancelRequirementsModifiedMessage.java index 061cff5827f..fd6eced60e9 100644 --- a/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/qos/QoSCancelRequirementsModifiedMessage.java +++ b/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/qos/QoSCancelRequirementsModifiedMessage.java @@ -61,13 +61,15 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.PnfsId; import diskCacheV111.vehicles.Message; +import javax.security.auth.Subject; public class QoSCancelRequirementsModifiedMessage extends Message { private final PnfsId pnfsId; - public QoSCancelRequirementsModifiedMessage(PnfsId pnfsId) { + public QoSCancelRequirementsModifiedMessage(PnfsId pnfsId, Subject subject) { this.pnfsId = pnfsId; + setSubject(subject); } public PnfsId getPnfsId() { diff --git a/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/qos/QoSRequirementsModifiedMessage.java b/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/qos/QoSRequirementsModifiedMessage.java index aa9431926cb..7a6475b666f 100644 --- a/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/qos/QoSRequirementsModifiedMessage.java +++ b/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/qos/QoSRequirementsModifiedMessage.java @@ -60,14 +60,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.vehicles.qos; import diskCacheV111.vehicles.Message; +import javax.security.auth.Subject; import org.dcache.qos.data.FileQoSRequirements; public class QoSRequirementsModifiedMessage extends Message { private final FileQoSRequirements requirements; - public QoSRequirementsModifiedMessage(FileQoSRequirements requirements) { + public QoSRequirementsModifiedMessage(FileQoSRequirements requirements, Subject subject) { this.requirements = requirements; + setSubject(subject); } public FileQoSRequirements getRequirements() { diff --git a/modules/dcache/src/main/java/org/dcache/qos/listeners/QoSRequirementsListener.java b/modules/dcache/src/main/java/org/dcache/qos/listeners/QoSRequirementsListener.java index 9d11607bff0..c6dfcc5c7e2 100644 --- a/modules/dcache/src/main/java/org/dcache/qos/listeners/QoSRequirementsListener.java +++ b/modules/dcache/src/main/java/org/dcache/qos/listeners/QoSRequirementsListener.java @@ -62,6 +62,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.CacheException; import diskCacheV111.util.PnfsId; import dmg.cells.nucleus.NoRouteToCellException; +import javax.security.auth.Subject; import org.dcache.qos.QoSException; import org.dcache.qos.data.FileQoSRequirements; import org.dcache.qos.data.FileQoSUpdate; @@ -83,7 +84,7 @@ public interface QoSRequirementsListener { * @param newRequirements describing principally how many peristent disk and tape copies are * required. */ - void fileQoSRequirementsModified(FileQoSRequirements newRequirements) + void fileQoSRequirementsModified(FileQoSRequirements newRequirements, Subject subject) throws QoSException, CacheException, NoRouteToCellException, InterruptedException; /** @@ -91,5 +92,5 @@ void fileQoSRequirementsModified(FileQoSRequirements newRequirements) * * @param pnfsid of the file for which the modification was requested. */ - void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid) throws QoSException; + void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid, Subject subject) throws QoSException; } diff --git a/modules/dcache/src/main/java/org/dcache/qos/remote/clients/RemoteQoSRequirementsClient.java b/modules/dcache/src/main/java/org/dcache/qos/remote/clients/RemoteQoSRequirementsClient.java index 15326538e28..00e8e5e149d 100644 --- a/modules/dcache/src/main/java/org/dcache/qos/remote/clients/RemoteQoSRequirementsClient.java +++ b/modules/dcache/src/main/java/org/dcache/qos/remote/clients/RemoteQoSRequirementsClient.java @@ -65,6 +65,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import dmg.cells.nucleus.NoRouteToCellException; import java.io.Serializable; import java.util.concurrent.ExecutionException; +import javax.security.auth.Subject; import org.dcache.cells.CellStub; import org.dcache.qos.QoSException; import org.dcache.qos.data.FileQoSRequirements; @@ -110,17 +111,28 @@ public FileQoSRequirements fileQoSRequirementsRequested(FileQoSUpdate update) } @Override - public void fileQoSRequirementsModified(FileQoSRequirements newRequirements) + public void fileQoSRequirementsModified(FileQoSRequirements newRequirements, Subject subject) throws CacheException, NoRouteToCellException, InterruptedException { - requirementsService.sendAndWait(new QoSRequirementsModifiedMessage(newRequirements)); + QoSRequirementsModifiedMessage message = new QoSRequirementsModifiedMessage(newRequirements, subject); + message = requirementsService.sendAndWait(message); + Serializable error = message.getErrorObject(); + if (error == null) { + return; + } + + if (error instanceof CacheException) { + throw (CacheException) error; + } else { + throw new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, String.valueOf(error)); + } } @Override - public void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid) throws QoSException { + public void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid, Subject subject) throws QoSException { /* * Fire and forget. The sender will need to listen for a response. */ - requirementsService.send(new QoSCancelRequirementsModifiedMessage(pnfsid)); + requirementsService.send(new QoSCancelRequirementsModifiedMessage(pnfsid, subject)); } public void setRequirementsService(CellStub requirementsService) {