Skip to content

Commit

Permalink
nfs-proxy: introduce nfsv4.1 based proxy
Browse files Browse the repository at this point in the history
it will re-use existing mover if found. This allows on writes
to continue on the same mover if client due-to network
errors fall back to IO through MDS.

The implementation uses only single slot per on the
server. As a result all IO requests for a single client will
be serialized.

The NFS door update to provide layouts for NFSv4.0 clients
as well. The modified CLOSE operation is required to return
layout on close for v4.0 clients.

This is a first dirty implementation, but good enough to
replace DCAP based proxy.

Target: master
Acked-by: Paul Millar
Require-notes: yes
Require-book: no
  • Loading branch information
kofemann committed Feb 19, 2015
1 parent 4a49c48 commit 96dc283
Show file tree
Hide file tree
Showing 7 changed files with 487 additions and 40 deletions.
Expand Up @@ -56,8 +56,9 @@
import org.dcache.chimera.FsInode;
import org.dcache.chimera.FsInodeType;
import org.dcache.chimera.JdbcFs;
import org.dcache.chimera.nfsv41.door.proxy.DcapProxyIoFactory;
import org.dcache.chimera.nfsv41.door.proxy.NfsProxyIoFactory;
import org.dcache.chimera.nfsv41.door.proxy.ProxyIoAdapter;
import org.dcache.chimera.nfsv41.door.proxy.ProxyIoFactory;
import org.dcache.chimera.nfsv41.door.proxy.ProxyIoMdsOpFactory;
import org.dcache.chimera.nfsv41.mover.NFS4ProtocolInfo;
import org.dcache.commons.stats.RequestExecutionTimeGauges;
Expand All @@ -77,11 +78,13 @@
import org.dcache.nfs.v4.Layout;
import org.dcache.nfs.v4.MDSOperationFactory;
import org.dcache.nfs.v4.NFS4Client;
import org.dcache.nfs.v4.NFS4State;
import org.dcache.nfs.v4.NFSServerV41;
import org.dcache.nfs.v4.NFSv41DeviceManager;
import org.dcache.nfs.v4.NFSv41Session;
import org.dcache.nfs.v4.NFSv4Defaults;
import org.dcache.nfs.v4.RoundRobinStripingPattern;
import org.dcache.nfs.v4.StateDisposeListener;
import org.dcache.nfs.v4.StripingPattern;
import org.dcache.nfs.v4.xdr.device_addr4;
import org.dcache.nfs.v4.xdr.deviceid4;
Expand Down Expand Up @@ -186,7 +189,7 @@ public class NFSv41Door extends AbstractCellComponent implements

private LoginBrokerHandler _loginBrokerHandler;

private DcapProxyIoFactory _proxyIoFactory;
private ProxyIoFactory _proxyIoFactory;

private final static TransferRetryPolicy RETRY_POLICY =
new TransferRetryPolicy(Integer.MAX_VALUE, NFS_RETRY_PERIOD,
Expand Down Expand Up @@ -283,19 +286,9 @@ public void init() throws Exception {
break;
case V41:
final NFSv41DeviceManager _dm = this;
_proxyIoFactory = new DcapProxyIoFactory(getCellAddress().getCellName() + "-dcap-proxy", "",
getArgs().getBooleanOption("export"));
_proxyIoFactory.setBillingStub(_billingStub);
_proxyIoFactory.setFileSystemProvider(_fileFileSystemProvider);
_proxyIoFactory.setPnfsHandler(_pnfsHandler);
_proxyIoFactory.setPoolManager(_poolManagerStub.getDestinationPath());
_proxyIoFactory.setIoQueue(_ioQueue);
_proxyIoFactory.setRetryPolicy(RETRY_POLICY);
_proxyIoFactory.startAdapter();
_nfs4 = new NFSServerV41(new ProxyIoMdsOpFactory(
_proxyIoFactory.getCellName(),
_proxyIoFactory.getCellDomainName(),
_proxyIoFactory, new MDSOperationFactory()),
_proxyIoFactory = new NfsProxyIoFactory(_dm);
_nfs4 = new NFSServerV41(new ProxyIoMdsOpFactory( this.getCellName(), this.getCellDomainName()
, _proxyIoFactory, new MDSOperationFactory()),
_dm, _vfs, _exportFile);
_rpcService.register(new OncRpcProgram(nfs4_prot.NFS4_PROGRAM, nfs4_prot.NFS_V4), _nfs4);
_loginBrokerHandler.start();
Expand All @@ -316,7 +309,7 @@ public void destroy() throws IOException {
if (_nfs4 != null) {
_nfs4.getStateHandler().shutdown();
}
if(_proxyIoFactory != null) {
if (_proxyIoFactory != null) {
_proxyIoFactory.shutdown();
}
}
Expand Down Expand Up @@ -513,7 +506,44 @@ public Layout layoutGet(CompoundContext context, Inode nfsInode, int layoutType,
layout4 layout = Layout.getLayoutSegment(deviceid, NFSv4Defaults.NFS4_STRIPE_SIZE, fh, ioMode,
0, nfs4_prot.NFS4_UINT64_MAX);

return new Layout(true, stateid, new layout4[]{layout});
/*
if we need to run proxy-io with NFSv4.0
*/
final NFS4Client client;
if (context.getMinorversion() == 0) {
client = context.getStateHandler().getClientIdByStateId(stateid);
} else {
client = context.getSession().getClient();
}
/*
* on on error client will issue layout return.
* return we need a different stateid for layout to keep
* mover as long as file is opened.
*
* Well, according to spec we have to return a different
* stateid anyway.
*/
final NFS4State layoutStateId = client.createState();

/*
* as we will never see layout return with this stateid clean it
* when open state id is disposed
*/
client.state(stateid).addDisposeListener(
// use java7 for 2.10 backport
new StateDisposeListener() {

@Override
public void notifyDisposed(NFS4State state) {
try {
client.releaseState(layoutStateId.stateid());
}catch(ChimeraNFSException e) {
_log.warn("can't release layout stateid.: {}", e.getMessage() );
}
}
}
);
return new Layout(true, layoutStateId.stateid(), new layout4[]{layout});

} catch (FileInCacheException e) {
cleanStateAndKillMover(stateid);
Expand Down
@@ -0,0 +1,248 @@
package org.dcache.chimera.nfsv41.door.proxy;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.dcache.util.NetworkUtils;

import org.dcache.nfs.nfsstat;
import org.dcache.nfs.v4.client.CompoundBuilder;
import org.dcache.nfs.v4.xdr.COMPOUND4args;
import org.dcache.nfs.v4.xdr.COMPOUND4res;
import org.dcache.nfs.v4.xdr.clientid4;
import org.dcache.nfs.v4.xdr.nfs_fh4;
import org.dcache.nfs.v4.xdr.nfs4_prot;
import org.dcache.nfs.v4.xdr.nfs_opnum4;
import org.dcache.nfs.v4.xdr.nfs_resop4;
import org.dcache.nfs.v4.xdr.sequenceid4;
import org.dcache.nfs.v4.xdr.sessionid4;
import org.dcache.nfs.v4.xdr.stateid4;
import org.dcache.nfs.v4.xdr.state_protect_how4;
import org.dcache.nfs.vfs.Inode;
import org.dcache.xdr.IpProtocolType;
import org.dcache.xdr.OncRpcException;
import org.dcache.xdr.OncRpcClient;
import org.dcache.xdr.RpcAuth;
import org.dcache.xdr.RpcAuthTypeUnix;
import org.dcache.xdr.RpcCall;
import org.dcache.xdr.XdrTransport;

/**
* A {@link ProxyIoAdapter} which proxies requests to an other NFSv4.1 server.
*/
public class NfsProxyIo implements ProxyIoAdapter {

// FIXME: for now we will use only a single slot. e.q serialize all requests
private final static int SLOT_ID = 0;
private final static int MAX_SLOT_ID = 0;

private final static int ROOT_UID = 0;
private final static int ROOT_GID = 0;
private final static int[] ROOT_GIDS = new int[0];

private final static String IMPL_DOMAIN = "dCache.ORG";
private final static String IMPL_NAME = "proxyio-nfs-client";

/**
* Most up-to-date seqid for a given stateid as defined by rfc5661.
*/
private final static int SEQ_UP_TO_DATE = 0;

private clientid4 _clientIdByServer;
private sequenceid4 _sequenceID;
private sessionid4 _sessionid;

private final stateid4 stateid;
private final nfs_fh4 fh;

private final InetSocketAddress remoteClient;
private final RpcCall client;
private final OncRpcClient rpcClient;
private final XdrTransport transport;
private final ScheduledExecutorService sessionThread;

public NfsProxyIo(InetSocketAddress poolAddress, InetSocketAddress remoteClient, Inode inode, stateid4 stateid, long size) throws IOException {
this.remoteClient = remoteClient;
rpcClient = new OncRpcClient(poolAddress, IpProtocolType.TCP);
transport = rpcClient.connect();

RpcAuth credential = new RpcAuthTypeUnix(ROOT_UID, ROOT_GID, ROOT_GIDS,
(int) (System.currentTimeMillis() / 1000),
NetworkUtils.getCanonicalHostName());
client = new RpcCall(nfs4_prot.NFS4_PROGRAM, nfs4_prot.NFS_V4, credential, transport);
sessionThread = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("proxy-nfs-session-" + poolAddress.getAddress().getHostAddress() + "-%d")
.build()
);

exchange_id();
create_session();
fh = new nfs_fh4(inode.toNfsHandle());
this.stateid = new stateid4(stateid.other, SEQ_UP_TO_DATE);
}

@Override
public synchronized int read(ByteBuffer dst, long position) throws IOException {

int needToRead = dst.remaining();
COMPOUND4args args = new CompoundBuilder()
.withSequence(false, _sessionid, _sequenceID.value, SLOT_ID, MAX_SLOT_ID)
.withPutfh(fh)
.withRead(dst.remaining(), position, stateid)
.withTag("pNFS read")
.build();
COMPOUND4res compound4res = sendCompound(args);
dst.put(compound4res.resarray.get(2).opread.resok4.data);
return needToRead - dst.remaining();
}

@Override
public synchronized int write(ByteBuffer src, long position) throws IOException {

byte[] data = new byte[src.remaining()];
src.get(data);

COMPOUND4args args = new CompoundBuilder()
.withSequence(false, _sessionid, _sequenceID.value, SLOT_ID, MAX_SLOT_ID)
.withPutfh(fh)
.withWrite(position, data, stateid)
.withTag("pNFS write")
.build();

COMPOUND4res compound4res = sendCompound(args);
return compound4res.resarray.get(2).opwrite.resok4.count.value;
}

@Override
public String toString() {
return String.format(" OS=%s, cl=[%s], pool=[%s]",
stateid,
remoteClient.getAddress().getHostAddress(),
transport.getRemoteSocketAddress().getAddress().getHostAddress());
}

@Override
public long size() {
return 0L;
}

@Override
public int getSessionId() {
// forced by interface
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public void close() throws IOException {
sessionThread.shutdown();
try {
destroy_session();
} finally {
rpcClient.close();
}
}
/**
* Call remote procedure nfsProcCompound.
*
* @param arg parameter (of type COMPOUND4args) to the remote procedure
* call.
* @return Result from remote procedure call (of type COMPOUND4res).
* @throws OncRpcException if an ONC/RPC error occurs.
* @throws IOException if an I/O error occurs.
*/
public COMPOUND4res nfsProcCompound(COMPOUND4args arg)
throws OncRpcException, IOException {
COMPOUND4res result = new COMPOUND4res();

client.call(nfs4_prot.NFSPROC4_COMPOUND_4, arg, result);

return result;
}

public XdrTransport getTransport() {
return client.getTransport();
}

private COMPOUND4res sendCompound(COMPOUND4args compound4args)
throws OncRpcException, IOException {

COMPOUND4res compound4res = nfsProcCompound(compound4args);
processSequence(compound4res);
nfsstat.throwIfNeeded(compound4res.status);
return compound4res;
}

private synchronized void exchange_id() throws OncRpcException, IOException {

COMPOUND4args args = new CompoundBuilder()
.withExchangeId(IMPL_DOMAIN, IMPL_NAME, UUID.randomUUID().toString(), 0, state_protect_how4.SP4_NONE)
.withTag("exchange_id")
.build();

COMPOUND4res compound4res = sendCompound(args);

_clientIdByServer = compound4res.resarray.get(0).opexchange_id.eir_resok4.eir_clientid;
_sequenceID = compound4res.resarray.get(0).opexchange_id.eir_resok4.eir_sequenceid;

if ((compound4res.resarray.get(0).opexchange_id.eir_resok4.eir_flags.value
& nfs4_prot.EXCHGID4_FLAG_USE_PNFS_DS) == 0) {
throw new IOException("remote server is not a DS");
}
}

private synchronized void create_session() throws OncRpcException, IOException {

COMPOUND4args args = new CompoundBuilder()
.withCreatesession(_clientIdByServer, _sequenceID)
.withTag("create_session")
.build();

COMPOUND4res compound4res = sendCompound(args);

_sessionid = compound4res.resarray.get(0).opcreate_session.csr_resok4.csr_sessionid;
_sequenceID.value = 0;

sessionThread.scheduleAtFixedRate(() -> {
try {
this.sequence();
} catch (IOException ex) {
//
}
},
60, 60, TimeUnit.SECONDS);
}

public void processSequence(COMPOUND4res compound4res) {

nfs_resop4 res = compound4res.resarray.get(0);
if (res.resop == nfs_opnum4.OP_SEQUENCE && res.opsequence.sr_status == nfsstat.NFS_OK) {
++_sequenceID.value;
}
}

private synchronized void sequence() throws OncRpcException, IOException {

COMPOUND4args args = new CompoundBuilder()
.withSequence(false, _sessionid, _sequenceID.value, SLOT_ID, MAX_SLOT_ID)
.withTag("sequence")
.build();
COMPOUND4res compound4res = sendCompound(args);
}

private synchronized void destroy_session() throws OncRpcException, IOException {

COMPOUND4args args = new CompoundBuilder()
.withDestroysession(_sessionid)
.withTag("destroy_session")
.build();

COMPOUND4res compound4res = sendCompound(args);
}
}

0 comments on commit 96dc283

Please sign in to comment.