Skip to content

Commit

Permalink
dcache-qos: propagate subject to QoS Adjuster
Browse files Browse the repository at this point in the history
Motivation:

Most of the QoS engine was brought over/adpated from Resilience;
in the context of the latter, it was not necessary to propagate
the Subject of the request in the various internal messages.

With QoS, there is an added function allowing for QoS modification;
the subject is needed in this context.

Modification:

Add all the hooks to propagate the subject downstream to the
actual adjustement tasks.  Also, allow the Engine to determine
what to do with a QoSModify request on the basis of subject
and file attributes.

Result:

Proper handling of the QoSModify requests.

Target: master
Request: 9.0
Request: 8.2
Requires-notes: yes
  • Loading branch information
alrossi committed Apr 12, 2023
1 parent 777aa7b commit 7d898c5
Show file tree
Hide file tree
Showing 29 changed files with 340 additions and 49 deletions.
Expand Up @@ -93,7 +93,7 @@ public enum TargetType {

public static final Set<FileAttribute> MINIMALLY_REQUIRED_ATTRIBUTES
= Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE,
FileAttribute.RETENTION_POLICY));
FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.RETENTION_POLICY));

private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy();

Expand Down
Expand Up @@ -111,7 +111,7 @@ public synchronized void cancel(BulkRequestTarget target) {
client.setRequirementsService(qosEngine);
PnfsId pnfsId = target.getAttributes().getPnfsId();
try {
client.fileQoSRequirementsModifiedCancelled(pnfsId);
client.fileQoSRequirementsModifiedCancelled(pnfsId, subject);
} catch (QoSException e) {
LOGGER.error("fileQoSRequirementsModifiedCancelled failed: {}, {}.", pnfsId,
e.getMessage());
Expand Down Expand Up @@ -151,7 +151,7 @@ public ListenableFuture<QoSTransitionCompletedMessage> perform(String rid, long
client.setRequirementsService(qosEngine);

try {
client.fileQoSRequirementsModified(requirements);
client.fileQoSRequirementsModified(requirements, subject);
} catch (CacheException | InterruptedException | NoRouteToCellException e) {
return Futures.immediateFailedFuture(e);
}
Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.security.auth.Subject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.dcache.pinmanager.PinManagerPinMessage;
import org.dcache.pinmanager.PinManagerUnpinMessage;
import org.dcache.poolmanager.PoolMonitor;
import org.dcache.qos.QoSException;
import org.dcache.qos.QoSTransitionEngine;
import org.dcache.qos.data.FileQoSRequirements;
import org.dcache.qos.remote.clients.RemoteQoSRequirementsClient;
Expand Down Expand Up @@ -444,6 +446,7 @@ public Response cmrResources(
break;
case "qos":
String targetQos = reqPayload.getString("target");
Subject subject = RequestUser.isAdmin() ? Subjects.ROOT : RequestUser.getSubject();
if (!useQosService) {
new QoSTransitionEngine(poolmanager,
poolMonitor,
Expand All @@ -458,11 +461,11 @@ public Response cmrResources(
FileAttributes attr
= pnfsHandler.getFileAttributes(path.toString(),
NamespaceUtils.getRequestedAttributes(false, false,
true, false, false));
true, false, true));
FileQoSRequirements requirements = getBasicRequirements(targetQos, attr);
RemoteQoSRequirementsClient client = new RemoteQoSRequirementsClient();
client.setRequirementsService(qosEngine);
client.fileQoSRequirementsModified(requirements);
client.fileQoSRequirementsModified(requirements, subject);
}
break;
case "pin":
Expand Down
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.qos.listeners;

import diskCacheV111.util.PnfsId;
import javax.security.auth.Subject;
import org.dcache.qos.QoSException;
import org.dcache.qos.vehicles.QoSAdjustmentResponse;
import org.dcache.qos.vehicles.QoSScannerVerificationRequest;
Expand Down Expand Up @@ -95,8 +96,9 @@ void fileQoSVerificationRequested(QoSScannerVerificationRequest verificationRequ
* This is a notification to cancel and remove the verification operation for a file.
*
* @param pnfsId the file for which to cancel verification operation.
* @param subject of the request
*/
void fileQoSVerificationCancelled(PnfsId pnfsId) throws QoSException;
void fileQoSVerificationCancelled(PnfsId pnfsId, Subject subject) throws QoSException;

/**
* Scanning activity has been cancelled; this is a notification to cancel all outstanding
Expand Down
Expand Up @@ -59,7 +59,9 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.qos.local.clients;

import diskCacheV111.util.CacheException;
import diskCacheV111.util.PnfsId;
import javax.security.auth.Subject;
import org.dcache.qos.QoSException;
import org.dcache.qos.data.FileQoSRequirements;
import org.dcache.qos.data.FileQoSUpdate;
Expand All @@ -80,13 +82,13 @@ public FileQoSRequirements fileQoSRequirementsRequested(FileQoSUpdate update)
}

@Override
public void fileQoSRequirementsModified(FileQoSRequirements newRequirements)
throws QoSException {
provider.handleModifiedRequirements(newRequirements);
public void fileQoSRequirementsModified(FileQoSRequirements newRequirements, Subject subject)
throws QoSException, CacheException {
provider.handleModifiedRequirements(newRequirements, subject);
}

@Override
public void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid) {
public void fileQoSRequirementsModifiedCancelled(PnfsId pnfsid, Subject subject) {
/*
* The local client is an embedded one and thus would not be used to
* dispatch a cancellation request. This is a NOP.
Expand Down
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.qos.local.clients;

import diskCacheV111.util.PnfsId;
import javax.security.auth.Subject;
import org.dcache.qos.QoSException;
import org.dcache.qos.listeners.QoSVerificationListener;
import org.dcache.qos.services.verifier.handlers.VerifyOperationHandler;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void fileQoSAdjustmentCompleted(QoSAdjustmentResponse response) {
}

@Override
public void fileQoSVerificationCancelled(PnfsId pnfsId) throws QoSException {
public void fileQoSVerificationCancelled(PnfsId pnfsId, Subject subject) throws QoSException {
fileOpHandler.handleFileOperationCancelled(pnfsId);
}

Expand Down
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.qos.remote.clients;

import diskCacheV111.util.PnfsId;
import javax.security.auth.Subject;
import org.dcache.cells.CellStub;
import org.dcache.qos.QoSException;
import org.dcache.qos.listeners.QoSVerificationListener;
Expand Down Expand Up @@ -97,8 +98,10 @@ public void fileQoSAdjustmentCompleted(QoSAdjustmentResponse adjustmentResponse)
}

@Override
public void fileQoSVerificationCancelled(PnfsId pnfsId) throws QoSException {
verificationService.send(new QoSVerificationCancelledMessage(pnfsId));
public void fileQoSVerificationCancelled(PnfsId pnfsId, Subject subject) throws QoSException {
QoSVerificationCancelledMessage msg = new QoSVerificationCancelledMessage(pnfsId);
msg.setSubject(subject);
verificationService.send(msg);
}

public void fileQoSBatchedVerificationCancelled(String id) {
Expand Down
Expand Up @@ -59,10 +59,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.qos.remote.receivers;

import com.google.common.base.Throwables;
import diskCacheV111.util.CacheException;
import diskCacheV111.vehicles.Message;
import diskCacheV111.vehicles.PnfsAddCacheLocationMessage;
import diskCacheV111.vehicles.PnfsClearCacheLocationMessage;
import dmg.cells.nucleus.CellMessageReceiver;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.dcache.cells.MessageReply;
import org.dcache.qos.services.engine.handler.FileQoSStatusHandler;
Expand Down Expand Up @@ -151,12 +154,22 @@ public MessageReply<QoSRequirementsRequestMessage> messageArrived(
return reply;
}

/**
* Made this synchronous to support Frontend request.
*/
public QoSRequirementsModifiedMessage messageArrived(QoSRequirementsModifiedMessage message) {
if (messageGuard.getStatus("QoSRequirementsModifiedMessage", message)
== Status.DISABLED) {
message.setFailed(CacheException.SERVICE_UNAVAILABLE, "messages disabled");
return message;
}
fileStatusHandler.handleQoSModification(message.getRequirements());
try {
fileStatusHandler.handleQoSModification(message).get();
} catch (InterruptedException e) {
message.setFailed(CacheException.TIMEOUT, e);
} catch (ExecutionException e) {
message.setFailed(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, Throwables.getRootCause(e));
}
return message;
}

Expand All @@ -165,7 +178,7 @@ public void messageArrived(QoSCancelRequirementsModifiedMessage message) {
== Status.DISABLED) {
return;
}
fileStatusHandler.handleQoSModificationCancelled(message.getPnfsId());
fileStatusHandler.handleQoSModificationCancelled(message.getPnfsId(), message.getSubject());
}

public void messageArrived(QoSActionCompleteMessage message) {
Expand Down
Expand Up @@ -59,8 +59,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.qos.services.adjuster.adjusters;

import static org.dcache.qos.util.QoSPermissionUtils.canModifyQos;

import com.google.common.collect.ImmutableList;
import diskCacheV111.util.PermissionDeniedCacheException;
import diskCacheV111.util.PnfsId;
import java.util.Optional;
import javax.security.auth.Subject;
import org.dcache.pool.classic.Cancellable;
import org.dcache.pool.repository.StickyRecord;
import org.dcache.qos.data.QoSAction;
Expand All @@ -85,12 +90,24 @@ public abstract class QoSAdjuster implements Cancellable {
protected PnfsId pnfsId;
protected FileAttributes attributes;
protected QoSAction action;
protected Subject subject ;
protected QoSAdjustTaskCompletionHandler completionHandler;

public void adjustQoS(QoSAdjusterTask task) {
pnfsId = task.getPnfsId();
action = task.getAction();
attributes = task.getAttributes();
subject = task.getSubject();

try {
checkPermissions();
} catch (PermissionDeniedCacheException e) {
/*
* handler is injected by factory build
*/
completionHandler.taskFailed(pnfsId, Optional.empty(), e);
return;
}

/*
* Generate the SESSION ID. This is used by the QoS status endpoint
Expand All @@ -104,4 +121,11 @@ public void adjustQoS(QoSAdjusterTask task) {
}

protected abstract void runAdjuster(QoSAdjusterTask task);

private void checkPermissions() throws PermissionDeniedCacheException {
if (!canModifyQos(subject, attributes)) {
throw new PermissionDeniedCacheException(String.format("subject does not have "
+ "permission for %s on %s.", action, pnfsId));
}
}
}
Expand Up @@ -132,6 +132,7 @@ private synchronized void sendMessageToRepository() {

LOGGER.debug("Sending {} message to {} for {}.", action, target, pnfsId);
ACTIVITY_LOGGER.info("Sending {} message to {} for {}.", action, target, pnfsId);
msg.setSubject(subject);
future = pools.send(new CellPath(target), msg);
}

Expand Down
Expand Up @@ -174,6 +174,7 @@ private synchronized void handleStaging() {
getProtocolInfo(),
QOS_PIN_REQUEST_ID,
QOS_PIN_TEMP_LIFETIME);
message.setSubject(subject);
future = pinManager.send(message, Long.MAX_VALUE);
future.addListener(this::handleCompletion, executorService);
LOGGER.debug("handleStaging, sent pin manager request for {}.", pnfsId);
Expand All @@ -192,6 +193,7 @@ private void cancelPin() {
LOGGER.debug("handleStaging, cancelling pin {}.", pnfsId);
ACTIVITY_LOGGER.info("handleStaging, cancelling pin {}", pnfsId);
PinManagerUnpinMessage message = new PinManagerUnpinMessage(pnfsId);
message.setSubject(subject);
pinManager.send(message, Long.MAX_VALUE);
LOGGER.debug("handleStaging, sent pin manager request to unpin {}.", pnfsId);
}
Expand Down
Expand Up @@ -64,6 +64,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.security.auth.Subject;
import org.dcache.pool.classic.Cancellable;
import org.dcache.pool.migration.PoolMigrationCopyFinishedMessage;
import org.dcache.qos.data.FileQoSUpdate;
Expand All @@ -86,6 +87,7 @@ public final class QoSAdjusterTask extends ErrorAwareTask implements Cancellable

private final PnfsId pnfsId;
private final QoSAction type;
private final Subject subject;
private final int retry;
private final QoSAdjusterFactory factory;
private final FileAttributes attributes;
Expand Down Expand Up @@ -120,13 +122,13 @@ public QoSAdjusterTask(QoSAdjustmentRequest request, QoSAdjusterFactory factory)
this.source = request.getSource();
this.target = request.getTarget();
this.poolGroup = request.getPoolGroup();
this.subject = request.getSubject();
this.status = Status.INITIALIZED;
}

public QoSAdjusterTask(QoSAdjusterTask task, int retry) {
this.pnfsId = task.pnfsId;
this.type = task.type;
;
this.retry = retry;
this.factory = task.factory;
this.attributes = task.attributes;
Expand All @@ -135,14 +137,14 @@ public QoSAdjusterTask(QoSAdjusterTask task, int retry) {
this.target = task.target;
this.poolGroup = task.poolGroup;
this.status = task.status;
this.subject = task.subject;
}

@Override
public void run() {
synchronized (this) {
status = Status.RUNNING;
exception = null;
adjuster = factory.newBuilder().of(type).build();
startTime = System.currentTimeMillis();
}

Expand All @@ -154,6 +156,7 @@ public void run() {
if (isCancelled()) {
break;
}
adjuster = factory.newBuilder().of(type).build();
adjuster.adjustQoS(this);
break;
}
Expand Down Expand Up @@ -221,6 +224,10 @@ public String getSource() {
return source;
}

public Subject getSubject() {
return subject;
}

public String getTarget() {
return target;
}
Expand Down

0 comments on commit 7d898c5

Please sign in to comment.