Skip to content

Commit

Permalink
nfs: use buffer pool and shallow encoding to avoid extra copy
Browse files Browse the repository at this point in the history
Motivation:
To avoid extra copy EDSOperationREAD uses direct byte buffer which is
stored in a thread local variable. Though this model works well for a
fixed number of threads, but with a large number of processing threads,
in thread-per-request model number, for example, the buffers might
grow unexpectedly.

Modification:
Use PooledMemoryManager from grizzly. In addition, new ShallowREAD4resok
class is used to avoid yet another internal copy for a better
throughput.

Result:
Number of direct buffers are independent from number of processing
threads. The shallow copy improves READ performance by 30%-70%.

IO test (8 threads reading 10GB file)
  fio --name 'read-test' --description 'Simple NFS read test' --rw=read  --numjobs 8 --thread --blocksize 1M --filename 10GB

without:
  READ: bw=928MiB/s (973MB/s), 116MiB/s-116MiB/s (122MB/s-122MB/s), io=80.0GiB (85.9GB), run=88246-88246msec

with:
  READ: bw=1819MiB/s (1907MB/s), 227MiB/s-227MiB/s (238MB/s-238MB/s), io=80.0GiB (85.9GB), run=45042-45044msec

Acked-by: Albert Rossi
Target: master
Require-book: no
Require-notes: yes
  • Loading branch information
kofemann committed Jan 10, 2023
1 parent bdc2382 commit a2b1b0e
Showing 1 changed file with 45 additions and 15 deletions.
Expand Up @@ -2,31 +2,41 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import org.dcache.nfs.ChimeraNFSException;
import org.dcache.nfs.nfsstat;
import org.dcache.nfs.v4.AbstractNFSv4Operation;
import org.dcache.nfs.v4.CompoundContext;
import org.dcache.nfs.v4.NFSv4Defaults;
import org.dcache.nfs.v4.xdr.READ4res;
import org.dcache.nfs.v4.xdr.READ4resok;
import org.dcache.nfs.v4.xdr.nfs_argop4;
import org.dcache.nfs.v4.xdr.nfs_opnum4;
import org.dcache.nfs.v4.xdr.nfs_resop4;
import org.dcache.oncrpc4j.grizzly.GrizzlyUtils;
import org.dcache.oncrpc4j.rpc.OncRpcException;
import org.dcache.oncrpc4j.xdr.Xdr;
import org.dcache.oncrpc4j.xdr.XdrEncodingStream;
import org.dcache.pool.repository.RepositoryChannel;
import org.dcache.util.ByteUnit;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.memory.PooledMemoryManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EDSOperationREAD extends AbstractNFSv4Operation {

private static final Logger _log = LoggerFactory.getLogger(EDSOperationREAD.class.getName());

// Bind a direct buffer to each thread.
private static final ThreadLocal<ByteBuffer> BUFFERS = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return ByteBuffer.allocateDirect((int) NFSv4Defaults.NFS4_MAXIOBUFFERSIZE);
}
};
// one pool with 1MB chunks (max NFS rsize)
private final static MemoryManager<? extends Buffer> POOLED_BUFFER_ALLOCATOR =
new PooledMemoryManager(
ByteUnit.MiB.toBytes(1), // base chunk size
1, // number of pools
2, // grow facter per pool, ignored, see above
GrizzlyUtils.getDefaultWorkerPoolSize(), // expected concurrency
PooledMemoryManager.DEFAULT_HEAP_USAGE_PERCENTAGE,
PooledMemoryManager.DEFAULT_PREALLOCATED_BUFFERS_PERCENTAGE,
true // direct buffers
);

private final NfsTransferService nfsTransferService;

Expand All @@ -51,17 +61,22 @@ public void process(CompoundContext context, nfs_resop4 result) {
return;
}

ByteBuffer bb = BUFFERS.get();
var gBuffer = POOLED_BUFFER_ALLOCATOR.allocate(count);
gBuffer.allowBufferDispose(true);
ByteBuffer bb = gBuffer.toByteBuffer();
bb.clear().limit(count);
RepositoryChannel fc = mover.getMoverChannel();

bb.rewind();
RepositoryChannel fc = mover.getMoverChannel();
int bytesRead = fc.read(bb, offset);

if (bytesRead > 0) {
// the positions of Buffer and ByteBuffer are independent, thus keep it in sync manually
gBuffer.position(bytesRead);
}
gBuffer.flip();

res.status = nfsstat.NFS_OK;
res.resok4 = new READ4resok();
bb.flip();
res.resok4.data = bb;
res.resok4 = new ShallowREAD4resok(gBuffer);
if (bytesRead == -1 || offset + bytesRead == fc.size()) {
res.resok4.eof = true;
}
Expand All @@ -77,4 +92,19 @@ public void process(CompoundContext context, nfs_resop4 result) {
res.status = nfsstat.NFSERR_SERVERFAULT;
}
}

// version of READ4resok that uses shallow encoding to avoid extra copy
private static class ShallowREAD4resok extends READ4resok {

private final Buffer buf;
public ShallowREAD4resok(Buffer buf) {
this.buf = buf;
}

public void xdrEncode(XdrEncodingStream xdr)
throws OncRpcException, IOException {
xdr.xdrEncodeBoolean(eof);
((Xdr)xdr).xdrEncodeShallowByteBuffer(buf);
}
}
}

0 comments on commit a2b1b0e

Please sign in to comment.