Skip to content

Commit

Permalink
dcap: bump max command size sent by client to 8MB
Browse files Browse the repository at this point in the history
Motivation:
large vector read request may send dcap commands bigger than mover
can handle.

Modification:
Bump max number up to 8MB. Auto-grow command buffer if needed. Removed
unused variable small cosmetic cleanup.

Result:
better handling of big READV requests.

Ticket: #8996
Acked-by: Gerd Behrmann
Target: master
Target: 2.16
Require-book: no
Require-notes: yes
  • Loading branch information
kofemann committed Jul 14, 2016
1 parent 4a57c27 commit 5d0825d
Showing 1 changed file with 46 additions and 26 deletions.
Expand Up @@ -8,7 +8,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
Expand Down Expand Up @@ -45,14 +44,18 @@
import static org.dcache.util.ByteUnit.KiB;
import static org.dcache.util.ByteUnit.MiB;


public class DCapProtocol_3_nio implements MoverProtocol, ChecksumMover, CellArgsAware
{
private static Logger _log = LoggerFactory.getLogger(DCapProtocol_3_nio.class);
private static Logger _logSocketIO = LoggerFactory.getLogger("logger.dev.org.dcache.io.socket");
private static final Logger _logSpaceAllocation = LoggerFactory.getLogger("logger.dev.org.dcache.poolspacemonitor." + DCapProtocol_3_nio.class.getName());
private static final int INC_SPACE = MiB.toBytes(50);

/**
* Max request size that client sent by client that we will accept.
*/
private static final long MAX_REQUEST_SIZE = MiB.toBytes(8);

private final Map<String,Object> _context;
private final CellEndpoint _cell;

Expand All @@ -68,7 +71,6 @@ public class DCapProtocol_3_nio implements MoverProtocol, ChecksumMover, CellArg
private long _ioError = -1;
private PnfsId _pnfsId;
private int _sessionId = -1;
private boolean _wasChanged;

private Checksum _clientChecksum;
private ChecksumFactory _checksumFactory;
Expand Down Expand Up @@ -185,14 +187,14 @@ private void newFilePosition(long newPosition){
//
// helper class to use nio channels for input requests.
//
private class RequestBlock {
private static class RequestBlock {

private ByteBuffer _buffer;
private int _commandSize;
private int _commandCode;

private RequestBlock(){
_buffer = ByteBuffer.allocate(16384);
_buffer = ByteBuffer.allocate(64);
}
private void read(SocketChannel channel) throws Exception {

Expand All @@ -203,17 +205,35 @@ private void read(SocketChannel channel) throws Exception {
_buffer.rewind();
_commandSize = _buffer.getInt();

if(_commandSize < 4) {
throw new
CacheException(44, "Protocol Violation (cl<4)");
if (_commandSize < 4) {
throw new CacheException(44, "Protocol Violation (cl<4)");
}

try {
_buffer.clear().limit(_commandSize);
}catch(IllegalArgumentException iae) {
_log.error("Command size excided command block size : " + _commandSize + "/" + _buffer.capacity());
throw iae;
if (_commandSize > _buffer.capacity()) {

if (_commandSize > MAX_REQUEST_SIZE) {
/*
* well, protocol tells nothing about command block size limit (my bad).
* but we will send "protocol violation" to indicate client that we cant handle it.
*/
_log.warn("Command size excided command block size : {}/{}", _commandSize, MAX_REQUEST_SIZE);
// eat the data to keep TCP send buffer on the client side happy
int n = 0;
while (n < _commandSize) {
_buffer.clear();
n += channel.read(_buffer);
}
throw new CacheException(44, "Protocol Violation: request block too big (" + _commandSize + ")");
}

_log.info("Growing command block size from: {} to: {}", _buffer.capacity(), _commandSize);

// we don't need any cind of synchronization as mover single threaded.
_buffer = ByteBuffer.allocate(_commandSize);
}

_buffer.clear().limit(_commandSize);

fillBuffer(channel);
_buffer.rewind();
_commandCode = _buffer.getInt();
Expand Down Expand Up @@ -427,22 +447,19 @@ public void runIO(FileAttributes fileAttributes,
}

//
// get size of next command
// read and process DCAP request
//
try{

try {
requestBlock.read(socketChannel);

_log.debug("Request Block : {}", requestBlock);

}catch(EOFException eofe){
_log.debug("Dataconnection closed by peer : {}", eofe.toString());
throw eofe;
}catch(BufferUnderflowException bue){
throw new
CacheException(43,"Protocol Violation (csl<4)");
} catch (CacheException e) {
// CacheException thrown only on protocol violation.
cntOut.writeACK(9, e.getRc(), e.getMessage());
socketChannel.write(cntOut.buffer());
continue;
}

_log.debug("Request Block : {}", requestBlock);

_lastTransferred = System.currentTimeMillis();

switch(requestBlock.getCommandCode()){
Expand Down Expand Up @@ -738,6 +755,9 @@ public void runIO(FileAttributes fileAttributes,
// clear interrupted state
Thread.interrupted();
ioException = new InterruptedException(ee.getMessage());
} catch (EOFException e) {
_log.debug("Dataconnection closed by peer : {}", e.toString());
ioException = e;
}catch(Exception e){
ioException = e;
}finally{
Expand Down Expand Up @@ -991,7 +1011,7 @@ private void doTheWrite(RepositoryChannel fileChannel,
if(rest < 0) {
break;
}
_wasChanged = true;

while(rest > 0 ){

size = _bigBuffer.capacity() > rest ?
Expand Down

0 comments on commit 5d0825d

Please sign in to comment.