Skip to content

Commit

Permalink
srmmanager: Refactor how various operations handle active transfers
Browse files Browse the repository at this point in the history
Motivation:

Some operations have to check and affect ongoing transfer on a particular SURL.

Modification:

As a first step towards support this in a clustered SrmManager deployment, this
patch centralizes the logic in the SRM class.

Result:

No user visible changes.

Target: trunk
Request: 3.0
Require-notes: no
Require-book: no
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Acked-by: Paul Millar <paul.millar@desy.de>

Reviewed at https://rb.dcache.org/r/9880/
  • Loading branch information
gbehrmann committed Nov 8, 2016
1 parent bd75e3d commit 7ab3f5e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 107 deletions.
118 changes: 104 additions & 14 deletions modules/srm-server/src/main/java/org/dcache/srm/SRM.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

package org.dcache.srm;

import static com.google.common.base.Preconditions.*;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
Expand All @@ -85,13 +84,14 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;

import dmg.cells.nucleus.CellLifeCycleAware;

Expand Down Expand Up @@ -126,9 +126,9 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.srm.scheduler.State;
import org.dcache.srm.util.Configuration;

import static com.google.common.base.Preconditions.*;
import static com.google.common.collect.Iterables.concat;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* SRM class creates an instance of SRM client class and publishes it on a
Expand All @@ -139,6 +139,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
public class SRM implements CellLifeCycleAware
{
private static final Logger logger = LoggerFactory.getLogger(SRM.class);
private static final String SFN_STRING = "SFN=";
private final InetAddress host;
private final Configuration configuration;
private RequestCredentialStorage requestCredentialStorage;
Expand Down Expand Up @@ -644,26 +645,115 @@ public <T extends Job> Set<Long> getActiveJobIds(Class<T> type, String descripti
return ids;
}

public <T extends FileRequest<?>> Iterable<T> getActiveFileRequests(Class<T> type, final URI surl)
throws DataAccessException
{
return Iterables.filter(getActiveJobs(type), request -> request.isTouchingSurl(surl));
}

/**
* Returns true if an upload on the given SURL exists.
*/
public boolean isFileBusy(URI surl) throws DataAccessException
{
return hasActivePutRequests(surl);
return StreamSupport.stream(getActiveFileRequests(PutFileRequest.class, surl).spliterator(), false)
.findAny().isPresent();
}

private boolean hasActivePutRequests(URI surl) throws DataAccessException
/**
* Returns true if multiple uploads on the given SURL exist.
*/
public boolean hasMultipleUploads(URI surl)
{
return StreamSupport.stream(getActiveFileRequests(PutFileRequest.class, surl).spliterator(), false)
.limit(2).count() > 1;
}

/**
* Returns the file id of an active upload on the given SURL.
*/
public String getUploadFileId(URI surl)
{
return StreamSupport.stream(getActiveFileRequests(PutFileRequest.class, surl).spliterator(), false)
.map(PutFileRequest::getFileId)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

/**
* Aborts uploads and downloads on the given SURL. Returns true if and only if an
* upload was aborted.
*/
public boolean abortTransfers(URI surl, String reason) throws SRMException
{
Set<PutFileRequest> requests = getActiveJobs(PutFileRequest.class);
for (PutFileRequest request: requests) {
if (request.getSurl().equals(surl)) {
return true;
boolean didAbortUpload = false;

for (PutFileRequest request : getActiveFileRequests(PutFileRequest.class, surl)) {
try {
request.abort(reason);
didAbortUpload = true;
} catch (IllegalStateTransition e) {
// The request likely aborted or finished before we could abort it
logger.debug("Attempted to abort put request {}, but failed: {}",
request.getId(), e.getMessage());
}
}
return false;

for (GetFileRequest request : getActiveFileRequests(GetFileRequest.class, surl)) {
try {
request.abort(reason);
} catch (IllegalStateTransition e) {
// The request likely aborted or finished before we could abort it
logger.debug("Attempted to abort get request {}, but failed: {}",
request.getId(), e.getMessage());
}
}

return didAbortUpload;
}

public <T extends FileRequest<?>> Iterable<T> getActiveFileRequests(Class<T> type, final URI surl)
throws DataAccessException
/**
* Checks if an active upload blocks the removal of a directory.
*/
public void checkRemoveDirectory(URI surl) throws SRMInvalidPathException, SRMNonEmptyDirectoryException
{
return Iterables.filter(getActiveJobs(type), request -> request.isTouchingSurl(surl));
String path = getPath(surl);
for (PutFileRequest putFileRequest : getActiveJobs(PutFileRequest.class)) {
String requestPath = getPath(putFileRequest.getSurl());
if (path.equals(requestPath)) {
throw new SRMInvalidPathException("Not a directory");
}
if (requestPath.startsWith(path)) {
throw new SRMNonEmptyDirectoryException("Directory is not empty");
}
}
}

private static String getPath(URI surl)
{
String path = surl.getPath();
String query = surl.getQuery();
if (query != null) {
int i = query.indexOf(SFN_STRING);
if (i != -1) {
path = query.substring(i + SFN_STRING.length());
}
}
/* REVISIT
*
* This is not correct in the presence of symlinked directories. The
* simplified path may refer to a different directory than the one
* we will delete.
*
* For now we ignore this problem - fixing it requires resolving the
* paths to an absolute path, which requires additional name space
* lookups.
*/
path = Files.simplifyPath(path);
if (!path.endsWith("/")) {
path = path + "/";
}
return path;
}
}
64 changes: 22 additions & 42 deletions modules/srm-server/src/main/java/org/dcache/srm/handler/SrmRm.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,49 +99,29 @@ private SrmRmResponse srmRm()
if (returnStatus.getStatus().getStatusCode() == TStatusCode.SRM_INTERNAL_ERROR) {
throw new SRMInternalErrorException(returnStatus.getStatus().getExplanation());
}
if (returnStatus.getStatus().getStatusCode() == TStatusCode.SRM_AUTHORIZATION_FAILURE) {
continue;
}

// [SRM 2.2, 4.3.2, e)] srmRm aborts the SURLs from srmPrepareToPut requests not yet
// in SRM_PUT_DONE state, and must set its file status as SRM_ABORTED.
//
// [SRM 2.2, 4.3.2, f)] srmRm must remove SURLs even if the statuses of the SURLs
// are SRM_FILE_BUSY. In this case, operations such as srmPrepareToPut or srmCopy
// that holds the SURL status as SRM_FILE_BUSY must return SRM_INVALID_PATH upon
// status request or srmPutDone.
//
// It seems the SRM specs is undecided about whether to move put requests to
// SRM_ABORTED or SRM_INVALID_PATH. We choose SRM_ABORTED as it seems like the saner
// of the two options.
URI surl = URI.create(surls[i].toString());
for (PutFileRequest request : srm.getActiveFileRequests(PutFileRequest.class, surl)) {
try {
request.abort("file was deleted by request " + JDC.getSession() + ".");
returnStatus.setStatus(new TReturnStatus(TStatusCode.SRM_SUCCESS, "Upload was aborted."));
} catch (IllegalStateTransition e) {
// The request likely aborted or finished before we could abort it
LOGGER.debug("srmRm attempted to abort put request {}, but failed: {}",
request.getId(), e.getMessage());
} catch (SRMException e) {
returnStatus.setStatus(new TReturnStatus(e.getStatusCode(), e.getMessage()));
}
}

if (returnStatus.getStatus().getStatusCode() != TStatusCode.SRM_SUCCESS) {
continue;
}

// [SRM 2.2, 4.3.2, d)] srmLs,srmPrepareToGet or srmBringOnline must not find these
// removed files any more. It must set file requests on SURL from srmPrepareToGet
// as SRM_ABORTED.
for (GetFileRequest request : srm.getActiveFileRequests(GetFileRequest.class, surl)) {
if (returnStatus.getStatus().getStatusCode() == TStatusCode.SRM_SUCCESS ||
returnStatus.getStatus().getStatusCode() == TStatusCode.SRM_INVALID_PATH) {

// [SRM 2.2, 4.3.2, e)] srmRm aborts the SURLs from srmPrepareToPut requests not yet
// in SRM_PUT_DONE state, and must set its file status as SRM_ABORTED.
//
// [SRM 2.2, 4.3.2, f)] srmRm must remove SURLs even if the statuses of the SURLs
// are SRM_FILE_BUSY. In this case, operations such as srmPrepareToPut or srmCopy
// that holds the SURL status as SRM_FILE_BUSY must return SRM_INVALID_PATH upon
// status request or srmPutDone.
//
// It seems the SRM specs is undecided about whether to move put requests to
// SRM_ABORTED or SRM_INVALID_PATH. We choose SRM_ABORTED as it seems like the saner
// of the two options.

// [SRM 2.2, 4.3.2, d)] srmLs,srmPrepareToGet or srmBringOnline must not find these
// removed files any more. It must set file requests on SURL from srmPrepareToGet
// as SRM_ABORTED.
URI surl = URI.create(surls[i].toString());
try {
request.abort("file was deleted by request " + JDC.getSession() + ".");
} catch (IllegalStateTransition e) {
// The request likely aborted or finished before we could abort it
LOGGER.debug("srmRm attempted to abort get request {}, but failed: {}",
request.getId(), e.getMessage());
if (srm.abortTransfers(surl, "File was deleted by request " + JDC.getSession() + ".")) {
returnStatus.setStatus(new TReturnStatus(TStatusCode.SRM_SUCCESS, "Upload was aborted."));
}
} catch (SRMException e) {
returnStatus.setStatus(new TReturnStatus(e.getStatusCode(), e.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.dcache.srm.handler;

import com.google.common.io.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -14,7 +13,6 @@
import org.dcache.srm.SRMInvalidPathException;
import org.dcache.srm.SRMNonEmptyDirectoryException;
import org.dcache.srm.SRMUser;
import org.dcache.srm.request.PutFileRequest;
import org.dcache.srm.v2_2.SrmRmdirRequest;
import org.dcache.srm.v2_2.SrmRmdirResponse;
import org.dcache.srm.v2_2.TReturnStatus;
Expand All @@ -26,7 +24,6 @@ public class SrmRmdir
{
private static final Logger LOGGER =
LoggerFactory.getLogger(SrmRmdir.class);
private static final String SFN_STRING = "SFN=";

private final AbstractStorageElement storage;
private final SrmRmdirRequest request;
Expand Down Expand Up @@ -71,53 +68,17 @@ private SrmRmdirResponse srmRmdir()
throws SRMException
{
URI surl = URI.create(request.getSURL().toString());
String path = getPath(surl);

/* If surl is a prefix to any active upload, then we report the directory as
* non-empty. This is not strictly required by the SRM spec, however S2 tests
* (usecase.RmdirBeingPutInto) check for this behaviour.
*/
for (PutFileRequest putFileRequest : srm.getActiveJobs(PutFileRequest.class)) {
String requestPath = getPath(putFileRequest.getSurl());
if (path.equals(requestPath)) {
throw new SRMInvalidPathException("Not a directory");
}
if (requestPath.startsWith(path)) {
throw new SRMNonEmptyDirectoryException("Directory is not empty");
}
}
srm.checkRemoveDirectory(surl);

storage.removeDirectory(user, surl, request.getRecursive() != null && request.getRecursive());
return new SrmRmdirResponse(new TReturnStatus(TStatusCode.SRM_SUCCESS, null));
}

private static String getPath(URI surl)
{
String path = surl.getPath();
String query = surl.getQuery();
if (query != null) {
int i = query.indexOf(SFN_STRING);
if (i != -1) {
path = query.substring(i + SFN_STRING.length());
}
}
/* REVISIT
*
* This is not correct in the presence of symlinked directories. The
* simplified path may refer to a different directory than the one
* we will delete.
*
* For now we ignore this problem - fixing it requires resolving the
* paths to an absolute path, which requires additional name space
* lookups.
*/
path = Files.simplifyPath(path);
if (!path.endsWith("/")) {
path = path + "/";
}
return path;
}

public static final SrmRmdirResponse getFailedResponse(String error)
{
return getFailedResponse(error, TStatusCode.SRM_FAILURE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,11 @@ public synchronized void run() throws IllegalStateTransition
t0 = System.currentTimeMillis();
}

PutFileRequest request =
Iterables.getFirst(SRM.getSRM().getActiveFileRequests(PutFileRequest.class, surl), null);
String fileId = SRM.getSRM().getUploadFileId(surl);

TMetaDataPathDetail detail;

if (request != null) {
if (fileId != null) {
// [SRM 2.2, 4.4.3]
//
// SRM_FILE_BUSY
Expand All @@ -142,7 +141,7 @@ public synchronized void run() throws IllegalStateTransition
try {
FileMetaData fmd = getStorage().getFileMetaData(getUser(),
surl,
request.getFileId());
fileId);
detail = convertFileMetaDataToTMetaDataPathDetail(surl,
fmd,
parent.getLongFormat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,11 @@ public void run() throws IllegalStateTransition, SRMException
// SRM_FILE_BUSY at the file level. If another srmPrepareToPut or srmCopy is requested on
// the same SURL, SRM_FILE_BUSY must be returned if the SURL can be overwritten, otherwise
// SRM_DUPLICATION_ERROR must be returned at the file level.
for (PutFileRequest request : SRM.getSRM().getActiveFileRequests(PutFileRequest.class, getSurl())) {
if (request != this) {
if (!getContainerRequest().isOverwrite()) {
throw new SRMDuplicationException("The requested SURL is locked by another upload.");
} else {
throw new SRMFileBusyException("The requested SURL is locked by another upload.");
}
if (SRM.getSRM().hasMultipleUploads(getSurl())) {
if (!getContainerRequest().isOverwrite()) {
throw new SRMDuplicationException("The requested SURL is locked by another upload.");
} else {
throw new SRMFileBusyException("The requested SURL is locked by another upload.");
}
}

Expand Down

0 comments on commit 7ab3f5e

Please sign in to comment.