Skip to content

Commit

Permalink
nfsv41: garbage-collect expired moved
Browse files Browse the repository at this point in the history
kill movers if NFS client did not show up within session lease time.
Notice, this functionality is independent of timeout manager and
complaint with NFS spec

This is fully working code (the early versions was available in 1
March (http://rb.dcache.org/r/5239/)), nevertheless, still some
thinking required :)

Ticket: #8044
Acked-by: Karsten Schwank
Target: master
Require-book: no
Require-notes: no
  • Loading branch information
kofemann committed Dec 2, 2013
1 parent 4395eca commit 3caea80
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 18 deletions.
Expand Up @@ -44,6 +44,7 @@ public void process(CompoundContext context, nfs_resop4 result) {
throw new ChimeraNFSException(nfsstat.NFSERR_BAD_STATEID,
"No mover associated with given stateid");
}
mover.attachSession(context.getSession());

ByteBuffer bb = ByteBuffer.allocate(count);
RepositoryChannel fc = mover.getMoverChannel();
Expand Down
Expand Up @@ -49,6 +49,7 @@ public void process(CompoundContext context, nfs_resop4 result) {
"No mover associated with given stateid");
}

mover.attachSession(context.getSession());
if( mover.getIoMode() != IoMode.WRITE ) {
throw new ChimeraNFSException(nfsstat.NFSERR_PERM, "an attempt to write without IO mode enabled");
}
Expand Down
Expand Up @@ -17,25 +17,41 @@
*/
package org.dcache.chimera.nfsv41.mover;

import java.io.IOException;
import java.nio.channels.CompletionHandler;
import java.util.Collections;
import java.util.Set;

import diskCacheV111.vehicles.PoolIoFileMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import diskCacheV111.util.DiskErrorCacheException;
import diskCacheV111.vehicles.PoolIoFileMessage;
import dmg.cells.nucleus.CellPath;

import org.dcache.nfs.v4.NFS4State;
import org.dcache.nfs.v4.NFSv41Session;
import org.dcache.nfs.v4.xdr.stateid4;
import org.dcache.pool.classic.Cancellable;
import org.dcache.pool.classic.PostTransferService;
import org.dcache.pool.movers.MoverChannelMover;
import org.dcache.pool.repository.ReplicaDescriptor;
import org.dcache.util.Checksum;

public class NfsMover extends MoverChannelMover<NFS4ProtocolInfo, NfsMover> {

private static final Logger _log = LoggerFactory.getLogger(NfsTransferService.class);
private NFSv41Session _session;
private final NFSv4MoverHandler _nfsIO;
private final NFS4State _state;
private volatile CompletionHandler<Void, Void> _completionHandler;

public NfsMover(ReplicaDescriptor handle, PoolIoFileMessage message, CellPath pathToDoor,
NfsTransferService nfsTransferService,
PostTransferService postTransferService) {
super(handle, message, pathToDoor, nfsTransferService, postTransferService);
_nfsIO = nfsTransferService.getNfsMoverHandler();
_state = new MoverState();
}

@Override
Expand All @@ -62,4 +78,80 @@ protected String getStatus() {
.append("]");
return s.toString();
}

/**
* Enable access with this mover.
* @param completionHandler to be called when mover finishes.
* @return handle to cancel mover if needed
* @throws DiskErrorCacheException
*/
public Cancellable enable(final CompletionHandler<Void,Void> completionHandler) throws DiskErrorCacheException {

open();
_completionHandler = completionHandler;
_nfsIO.add(this);
return new Cancellable() {
@Override
public void cancel() {
disable(null);
}
};

}

/**
* Disable access with this mover. If {@code error} is not a {@code null},
* the {@link CompletionHandler#failed(Throwable, A)} method will be called.
* @param error error to report, or {@code null} on success
*/
private void disable(Throwable error) {
_nfsIO.remove(NfsMover.this);
detachSession();
try {
getMoverChannel().close();
} catch (IOException e) {
_log.error("failed to close RAF {}", e.toString());
}
if(error == null) {
_completionHandler.completed(null, null);
} else {
_completionHandler.failed(error, null);
}
}

/**
* Attach mover tho the client's NFSv41 session.
* @param session to attach to
*/
synchronized void attachSession(NFSv41Session session) {
if (_session == null) {
_session = session;
_session.getClient().attachState(_state);
}
}

/**
* Detach mover from the client's session.
*/
synchronized void detachSession() {
if (_session != null) {
_session.getClient().detachState(_state);
_session = null;
}
}

/**
* A special {@link NFS4State} to kill the mover when disposed.
*/
private class MoverState extends NFS4State {

MoverState() {
super(NfsMover.this.getProtocolInfo().stateId());
}

@Override
protected void dispose() {
disable(new InterruptedException("Killing mover due to client inactivity"));
}
}
}
Expand Up @@ -37,7 +37,6 @@
import org.dcache.pool.movers.Mover;
import org.dcache.pool.movers.MoverFactory;
import org.dcache.pool.repository.ReplicaDescriptor;
import org.dcache.pool.repository.RepositoryChannel;
import org.dcache.util.NetworkUtils;
import org.dcache.util.PortRange;
import org.dcache.xdr.OncRpcException;
Expand Down Expand Up @@ -100,28 +99,15 @@ public Mover<?> createMover(ReplicaDescriptor handle, PoolIoFileMessage message,
public Cancellable execute(final NfsMover mover, final CompletionHandler<Void,Void> completionHandler) {
try {

final stateid4 stateid = mover.getStateId();
final RepositoryChannel repositoryChannel = mover.open();
_nfsIO.add(mover);
final Cancellable cancellableMover = mover.enable(completionHandler);

CellPath directDoorPath = new CellPath(mover.getPathToDoor().getDestinationAddress());
_door.send(directDoorPath, new PoolPassiveIoFileMessage<>(getCellName(), _localSocketAddresses, stateid));
_door.send(directDoorPath, new PoolPassiveIoFileMessage<>(getCellName(), _localSocketAddresses, mover.getStateId()));

/* An NFS mover doesn't complete until it is cancelled (the door sends a mover kill
* message when the file is closed).
*/
return new Cancellable() {
@Override
public void cancel() {
_nfsIO.remove(mover);
try {
repositoryChannel.close();
} catch (IOException e) {
_log.error("failed to close RAF", e);
}
completionHandler.completed(null, null);
}
};
return cancellableMover;
} catch (DiskErrorCacheException e) {
_faultListener.faultOccurred(new FaultEvent("repository", FaultAction.DISABLED,
e.getMessage(), e));
Expand All @@ -132,6 +118,10 @@ public void cancel() {
return null;
}

public NFSv4MoverHandler getNfsMoverHandler() {
return _nfsIO;
}

public void setEnableGss(boolean withGss) {
_withGss = withGss;
}
Expand Down

0 comments on commit 3caea80

Please sign in to comment.