Skip to content

Commit

Permalink
dcache-xrootd: (WIP) Add support to the xrootd (kxr_)posc flag in (kX…
Browse files Browse the repository at this point in the history
…R_)open.

Motivation:

The xrootd protocol has a kxr_posc flag(enabling persist on successful close semantics)
in kXR_open. If the file isn’t explicitly closed, it is
not persisted (i.e., automatically deleted).

Modification:

The plan is to do something similar to what SRM does if the kxr_posc flag is set:
- create a temporary upload file (from the door)
- then move the file into the proper place after successful close (from the door)

Result:

The file will be visible and persisted when closed.

Target: master
Require-notes: yes
Require-book: no
Ticket: http://rt.dcache.org/Ticket/Display.html?id=9195
Patch: https://rb.dcache.org/r/10374
Committed: master@74ba7de921
  • Loading branch information
vgaronne committed Sep 26, 2017
1 parent 5a3c2e1 commit c9155d1
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 24 deletions.
Expand Up @@ -52,6 +52,9 @@
import diskCacheV111.vehicles.DoorTransferFinishedMessage;
import diskCacheV111.vehicles.IoDoorEntry;
import diskCacheV111.vehicles.IoDoorInfo;
import diskCacheV111.vehicles.PnfsCancelUpload;
import diskCacheV111.vehicles.PnfsCommitUpload;
import diskCacheV111.vehicles.PnfsCreateUploadPath;
import diskCacheV111.vehicles.PoolIoFileMessage;
import diskCacheV111.vehicles.PoolMoverKillMessage;

Expand All @@ -60,6 +63,7 @@
import dmg.cells.nucleus.CellInfoProvider;
import dmg.cells.nucleus.CellMessageReceiver;
import dmg.cells.nucleus.CellPath;
import dmg.cells.nucleus.NoRouteToCellException;
import dmg.cells.services.login.LoginManagerChildrenInfo;

import org.dcache.acl.enums.AccessType;
Expand All @@ -71,6 +75,7 @@
import org.dcache.cells.MessageCallback;
import org.dcache.namespace.ACLPermissionHandler;
import org.dcache.namespace.ChainedPermissionHandler;
import org.dcache.namespace.CreateOption;
import org.dcache.namespace.FileAttribute;
import org.dcache.namespace.FileType;
import org.dcache.namespace.PermissionHandler;
Expand Down Expand Up @@ -128,6 +133,7 @@ public class XrootdDoor
private List<FsPath> _readPaths = Collections.singletonList(FsPath.ROOT);
private List<FsPath> _writePaths = Collections.singletonList(FsPath.ROOT);

private CellStub _pnfsStub;
private CellStub _poolStub;
private PoolManagerStub _poolManagerStub;
private CellStub _billingStub;
Expand Down Expand Up @@ -156,6 +162,11 @@ public class XrootdDoor
private final Map<Integer,XrootdTransfer> _transfers =
new ConcurrentHashMap<>();

@Required
public void setPnfsStub(CellStub pnfsStub) {
_pnfsStub = pnfsStub;
}

@Required
public void setPoolStub(CellStub stub)
{
Expand Down Expand Up @@ -311,6 +322,97 @@ public void getInfo(PrintWriter pw)
XROOTD_PROTOCOL_MINOR_VERSION));
}

private void uploadDone(Subject subject, Restriction restriction,
FsPath path, FsPath uploadPath, boolean createDir,
boolean overwrite)
throws CacheException {
try {
EnumSet<CreateOption> options = EnumSet.noneOf(CreateOption.class);
if (overwrite) {
options.add(CreateOption.OVERWRITE_EXISTING);
}
PnfsCommitUpload msg
= new PnfsCommitUpload(subject,
restriction,
uploadPath,
path,
options,
EnumSet.of(PNFSID, SIZE, STORAGEINFO));
msg = _pnfsStub.sendAndWait(msg);
} catch (InterruptedException ex) {
throw new CacheException("Operation interrupted", ex);
} catch (NoRouteToCellException ex) {
throw new CacheException("Internal communication failure", ex);
}
}

private void abortUpload(Subject subject, Restriction restriction,
FsPath path, FsPath uploadPath, String reason)
throws CacheException {
try {
PnfsCancelUpload msg = new PnfsCancelUpload(subject, restriction,
uploadPath, path,
EnumSet.noneOf(FileAttribute.class),
"XROOTD upload aborted: " + reason);
_pnfsStub.sendAndWait(msg);
} catch (InterruptedException ex) {
throw new CacheException("Operation interrupted", ex);
} catch (NoRouteToCellException ex) {
throw new CacheException("Internal communication failure", ex);
}
}

private XrootdTransfer
createUploadTransfer(InetSocketAddress client, FsPath path,
String ioQueue, UUID uuid, InetSocketAddress local,
Subject subject, Restriction restriction, boolean createDir,
boolean overwrite, Long size, FsPath uploadPath)
throws CacheException, InterruptedException {

XrootdTransfer transfer
= new XrootdTransfer(_pnfs, subject, restriction, uploadPath) {
@Override
public synchronized void finished(CacheException error) {
try {
super.finished(error);

_transfers.remove(getFileHandle());
if (error == null) {
uploadDone(subject, restriction, path, uploadPath,
createDir, overwrite);

notifyBilling(0, "");
_log.info("Transfer {}@{} finished",
getPnfsId(), getPool());
} else {
int rc = error.getRc();
String message = error.getMessage();
abortUpload(subject, restriction, path, uploadPath, message);
notifyBilling(rc, message);
_log.warn("Transfer {}@{} failed: {} (error code={})",
getPnfsId(), getPool(), message, rc);
}
} catch (CacheException ex) {
String message = ex.getMessage();
int rc = ex.getRc();
notifyBilling(rc, message);
_log.warn("Post upload operation failed: {} (error code={})",
message, rc);
}
}
};
transfer.setCellAddress(getCellAddress());
transfer.setPoolManagerStub(_poolManagerStub);
transfer.setPoolStub(_poolStub);
transfer.setBillingStub(_billingStub);
transfer.setClientAddress(client);
transfer.setUUID(uuid);
transfer.setDoorAddress(local);
transfer.setIoQueue(ioQueue == null ? _ioQueue : ioQueue);
transfer.setFileHandle(_handleCounter.getAndIncrement());
return transfer;
}

private XrootdTransfer
createTransfer(InetSocketAddress client, FsPath path,
String ioQueue, UUID uuid, InetSocketAddress local, Subject subject,
Expand Down Expand Up @@ -398,18 +500,50 @@ public synchronized void finished(CacheException error)
return transfer;
}

private FsPath getUploadPath(Subject subject, Restriction restriction,
boolean createDir, boolean overwrite, Long size, FsPath path,
FsPath rootPath)
throws CacheException, InterruptedException {
try {
EnumSet<CreateOption> options = EnumSet.noneOf(CreateOption.class);
if (overwrite) {
options.add(CreateOption.OVERWRITE_EXISTING);
}
if (createDir) {
options.add(CreateOption.CREATE_PARENTS);
}
PnfsCreateUploadPath msg = new PnfsCreateUploadPath(subject,
restriction, path, rootPath, size, null, null, null,
options);
msg = _pnfsStub.sendAndWait(msg);
return msg.getUploadPath();
} catch (NoRouteToCellException ex) {
throw new CacheException("Internal communication failure", ex);
}
}

public XrootdTransfer
write(InetSocketAddress client, FsPath path, String ioQueue, UUID uuid,
boolean createDir, boolean overwrite, Long size,
InetSocketAddress local, Subject subject, Restriction restriction)
throws CacheException, InterruptedException
{
write(InetSocketAddress client, FsPath path, String ioQueue, UUID uuid,
boolean createDir, boolean overwrite, Long size,
InetSocketAddress local, Subject subject, Restriction restriction,
boolean persistOnSuccessfulClose, FsPath rootPath)
throws CacheException, InterruptedException {

if (!isWriteAllowed(path)) {
throw new PermissionDeniedCacheException("Write permission denied");
}

XrootdTransfer transfer =
createTransfer(client, path, ioQueue, uuid, local, subject, restriction);
XrootdTransfer transfer;
if (persistOnSuccessfulClose) {
FsPath uploadPath = getUploadPath(subject, restriction, createDir,
overwrite, size, path, rootPath);
transfer = createUploadTransfer(client, path, ioQueue, uuid, local,
subject, restriction, createDir, overwrite, size,
uploadPath);
} else {
transfer = createTransfer(client, path, ioQueue, uuid, local,
subject, restriction);
}
transfer.setOverwriteAllowed(overwrite);
int handle = transfer.getFileHandle();
InetSocketAddress address = null;
Expand All @@ -432,8 +566,8 @@ public synchronized void finished(CacheException error)
throw new CacheException(transfer.getPool() + " failed to open TCP socket");
}

transfer.setStatus("Mover " + transfer.getPool() + "/" +
transfer.getMoverId() + ": Receiving");
transfer.setStatus("Mover " + transfer.getPool() + "/"
+ transfer.getMoverId() + ": Receiving");
} finally {
if (address == null) {
transfer.deleteNameSpaceEntry();
Expand All @@ -446,12 +580,12 @@ public synchronized void finished(CacheException error)
} catch (InterruptedException e) {
explanation = "transfer interrupted";
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
"Transfer interrupted");
"Transfer interrupted");
throw e;
} catch (RuntimeException e) {
explanation = "bug found: " + e.toString();
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
e.toString());
e.toString());
throw e;
} finally {
if (address == null) {
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.dcache.xrootd.door;

import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.net.InetAddresses;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -45,13 +46,15 @@
import org.dcache.auth.LoginReply;
import org.dcache.auth.attributes.Restriction;
import org.dcache.auth.attributes.Restrictions;
import org.dcache.auth.attributes.RootDirectory;
import org.dcache.cells.AbstractMessageCallback;
import org.dcache.namespace.FileAttribute;
import org.dcache.util.Checksum;
import org.dcache.util.Checksums;
import org.dcache.util.list.DirectoryEntry;
import org.dcache.vehicles.PnfsListDirectoryMessage;
import org.dcache.xrootd.core.XrootdException;
import org.dcache.xrootd.protocol.XrootdProtocol;
import org.dcache.xrootd.protocol.messages.DirListRequest;
import org.dcache.xrootd.protocol.messages.DirListResponse;
import org.dcache.xrootd.protocol.messages.MkDirRequest;
Expand Down Expand Up @@ -82,10 +85,13 @@ public class XrootdRedirectHandler extends ConcurrentXrootdRequestHandler
LoggerFactory.getLogger(XrootdRedirectHandler.class);

private final XrootdDoor _door;
private final FsPath _rootPath;

private Restriction _authz = Restrictions.denyAll();
private final Map<String,String> _appIoQueues;
private final Map<String, String> _appIoQueues;

private FsPath _rootPath;
private FsPath _userRootPath;
private boolean _isLoggedIn;

/**
* Custom entries for kXR_Qconfig requests.
Expand Down Expand Up @@ -177,16 +183,18 @@ protected XrootdResponse<OpenRequest> doOnOpen(ChannelHandlerContext ctx, OpenRe
XrootdTransfer transfer;
if (neededPerm == FilePerm.WRITE) {
boolean createDir = req.isMkPath();
boolean overwrite = req.isDelete();

transfer =
_door.write(remoteAddress, createFullPath(req.getPath()), ioQueue,
uuid, createDir, overwrite, size, localAddress,
req.getSubject(), _authz);
boolean overwrite = req.isDelete() && !req.isNew();
boolean persistOnSuccessfulClose = (req.getOptions()
& XrootdProtocol.kXR_posc) == XrootdProtocol.kXR_posc;
// TODO: replace with req.isPersistOnSuccessfulClose() with the latest xrootd4j

transfer = _door.write(remoteAddress, createFullPath(req.getPath()),
ioQueue, uuid, createDir, overwrite, size, localAddress,
req.getSubject(), _authz, persistOnSuccessfulClose,
((_isLoggedIn) ? _userRootPath : _rootPath));
} else {
transfer =
_door.read(remoteAddress, createFullPath(req.getPath()), ioQueue,
uuid, localAddress, req.getSubject(), _authz);
transfer = _door.read(remoteAddress, createFullPath(req.getPath()), ioQueue,
uuid, localAddress, req.getSubject(), _authz);
}

// ok, open was successful
Expand Down Expand Up @@ -540,6 +548,9 @@ private void logDebugOnOpen(OpenRequest req)
if ((options & kXR_retstat) == kXR_retstat) {
openFlags += " kXR_retstat";
}
if ((options & kXR_posc) == kXR_posc) {
openFlags += " kXR_posc";
}

_log.debug("open flags: "+openFlags);

Expand Down Expand Up @@ -734,12 +745,21 @@ public void success(PnfsListDirectoryMessage message)
/**
* Execute login strategy to make an user authorization decision.
*/
private void loggedIn(LoginEvent event)
{
private void loggedIn(LoginEvent event) {
LoginReply reply = event.getLoginReply();
_authz = Restrictions.none();
if (reply != null) {
_authz = reply.getRestriction();
_isLoggedIn = true;
_userRootPath = reply.getLoginAttributes().stream()
.filter(RootDirectory.class::isInstance)
.findFirst()
.map(RootDirectory.class::cast)
.map(RootDirectory::getRoot)
.map(FsPath::create)
.orElse(FsPath.ROOT);
} else {
_isLoggedIn = false;
}
}

Expand Down
Expand Up @@ -27,6 +27,15 @@
<property name="excludedDestinations" value="${xrootd.loginbroker.update-topic}"/>
</bean>

<bean id="pnfs-stub" class="org.dcache.cells.CellStub">
<description>PNFS manager communication stub</description>
<property name="destination" value="${xrootd.service.pnfsmanager}"/>
<property name="timeout" value="${xrootd.service.pnfsmanager.timeout}"/>
<property name="timeoutUnit" value="${xrootd.service.pnfsmanager.timeout.unit}"/>
<property name="flags" value="#{ T(dmg.cells.nucleus.CellEndpoint.SendFlag).RETRY_ON_NO_ROUTE_TO_CELL }"/>
</bean>


<bean id="pool-stub" class="org.dcache.cells.CellStub">
<description>Pool cell stub</description>
<property name="timeout" value="${xrootd.service.pool.timeout}"/>
Expand Down Expand Up @@ -120,6 +129,7 @@

<bean id="door" class="org.dcache.xrootd.door.XrootdDoor">
<description>Gateway between xrootd protocol handler and dCache</description>
<property name="pnfsStub" ref="pnfs-stub"/>
<property name="poolStub" ref="pool-stub"/>
<property name="poolManagerStub">
<bean class="org.dcache.poolmanager.PoolManagerStub">
Expand Down
8 changes: 8 additions & 0 deletions skel/share/defaults/xrootd.properties
Expand Up @@ -70,6 +70,14 @@ xrootd.service.poolmanager.timeout = 5400000
# Cell address of pnfsmanager service
xrootd.service.pnfsmanager=${dcache.service.pnfsmanager}

# Timeout for pnfsmanager requests
xrootd.service.pnfsmanager.timeout = 120
(one-of?MILLISECONDS|\
SECONDS|\
MINUTES|\
HOURS|DAYS)\
xrootd.service.pnfsmanager.timeout.unit=SECONDS

# Cell address of gplazma service
xrootd.service.gplazma=${dcache.service.gplazma}

Expand Down

0 comments on commit c9155d1

Please sign in to comment.