Skip to content

Commit

Permalink
Fix first set of issues found with NIO2 and new SocketBufferHandler
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1652004 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jan 15, 2015
1 parent 9718447 commit a9c95a3
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 117 deletions.
3 changes: 1 addition & 2 deletions java/org/apache/coyote/http11/InternalNio2InputBuffer.java
Expand Up @@ -118,8 +118,7 @@ protected class SocketInputBuffer implements InputBuffer {
* Read bytes into the specified chunk.
*/
@Override
public int doRead(ByteChunk chunk, Request req )
throws IOException {
public int doRead(ByteChunk chunk, Request req ) throws IOException {

if (pos >= lastValid) {
if (!fill(true)) //read body, must be blocking, as the thread is inside the app
Expand Down
10 changes: 1 addition & 9 deletions java/org/apache/tomcat/util/net/Nio2Channel.java
Expand Up @@ -52,21 +52,13 @@ public void reset(AsynchronousSocketChannel channel, SocketWrapperBase<Nio2Chann
throws IOException {
this.sc = channel;
this.socket = socket;
bufHandler.getReadBuffer().clear();
bufHandler.getWriteBuffer().clear();
bufHandler.reset();
}

public SocketWrapperBase<Nio2Channel> getSocket() {
return socket;
}

public int getBufferSize() {
if ( bufHandler == null ) return 0;
int size = 0;
size += bufHandler.getReadBuffer()!=null?bufHandler.getReadBuffer().capacity():0;
size += bufHandler.getWriteBuffer()!=null?bufHandler.getWriteBuffer().capacity():0;
return size;
}

/**
* Closes this channel.
Expand Down
138 changes: 62 additions & 76 deletions java/org/apache/tomcat/util/net/Nio2Endpoint.java
Expand Up @@ -726,8 +726,7 @@ protected AtomicInteger initialValue() {
private boolean upgradeInit = false;

private final CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> readCompletionHandler;
private boolean flipped = false;
private volatile boolean readPending = false;
private final Semaphore readPending = new Semaphore(1);
private volatile boolean readInterest = true;

private final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler;
Expand All @@ -751,7 +750,7 @@ public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment)
if (nBytes.intValue() < 0) {
failed(new EOFException(), attachment);
} else {
readPending = false;
readPending.release();
if (readInterest && !Nio2Endpoint.isInline()) {
readInterest = false;
notify = true;
Expand All @@ -771,7 +770,7 @@ public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
ioe = new IOException(exc);
}
Nio2SocketWrapper.this.setError(ioe);
readPending = false;
readPending.release();
if (exc instanceof AsynchronousCloseException) {
// If already closed, don't call onError and close again
return;
Expand Down Expand Up @@ -947,30 +946,21 @@ public boolean isUpgradeInit() {
@Override
public boolean isReady() throws IOException {
synchronized (readCompletionHandler) {
if (readPending) {
if (!readPending.tryAcquire()) {
readInterest = true;
return false;
}
ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer();
if (!flipped) {
readBuffer.flip();
flipped = true;
}
if (readBuffer.remaining() > 0) {

socketBufferHandler.configureReadBufferForRead();
if (!socketBufferHandler.isReadBufferEmpty()) {
return true;
}

readBuffer.clear();
flipped = false;
int nRead = fillReadBuffer(false);

boolean isReady = nRead > 0;
if (isReady) {
if (!flipped) {
readBuffer.flip();
flipped = true;
}
} else {

if (!isReady) {
readInterest = true;
}
return isReady;
Expand All @@ -988,75 +978,70 @@ public int read(boolean block, byte[] b, int off, int len) throws IOException {
log.debug("Socket: [" + this + "], block: [" + block + "], length: [" + len + "]");
}

synchronized (readCompletionHandler) {
if (readPending) {
if (block) {
try {
readPending.acquire();
} catch (InterruptedException e) {
throw new IOException(e);
}
} else {
if (!readPending.tryAcquire()) {
if (log.isDebugEnabled()) {
log.debug("Socket: [" + this + "], Read: [0]");
}
return 0;
}
}

ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer();
synchronized (readCompletionHandler) {
socketBufferHandler.configureReadBufferForRead();

int remaining = socketBufferHandler.getReadBuffer().remaining();

if (!flipped) {
readBuffer.flip();
flipped = true;
}
int remaining = readBuffer.remaining();
// Is there enough data in the read buffer to satisfy this request?
if (remaining >= len) {
readBuffer.get(b, off, len);
socketBufferHandler.getReadBuffer().get(b, off, len);
if (log.isDebugEnabled()) {
log.debug("Socket: [" + this + "], Read from buffer: [" + len + "]");
}
readPending.release();
return len;
}

// Copy what data there is in the read buffer to the byte array
int leftToWrite = len;
int newOffset = off;
if (remaining > 0) {
readBuffer.get(b, off, remaining);
socketBufferHandler.getReadBuffer().get(b, off, remaining);
leftToWrite -= remaining;
newOffset += remaining;
}

// Fill the read buffer as best we can
readBuffer.clear();
flipped = false;
int nRead = fillReadBuffer(block);
// Fill the read buffer as best we can. Only do a blocking read if
// the current read is blocking AND there wasn't any data left over
// in the read buffer.
int nRead = fillReadBuffer(block && remaining == 0);

// Full as much of the remaining byte array as possible with the data
// that was just read
// Fill as much of the remaining byte array as possible with the
// data that was just read
if (nRead > 0) {
if (!flipped) {
readBuffer.flip();
flipped = true;
}
socketBufferHandler.configureReadBufferForRead();
if (nRead > leftToWrite) {
readBuffer.get(b, newOffset, leftToWrite);
socketBufferHandler.getReadBuffer().get(b, newOffset, leftToWrite);
leftToWrite = 0;
} else {
readBuffer.get(b, newOffset, nRead);
socketBufferHandler.getReadBuffer().get(b, newOffset, nRead);
leftToWrite -= nRead;
}
} else if (nRead == 0) {
if (block) {
if (!flipped) {
readBuffer.flip();
flipped = true;
}
} else {
readInterest = true;
}
} else if (nRead == 0 && !block) {
readInterest = true;
} else if (nRead == -1) {
throw new EOFException();
}

if (log.isDebugEnabled()) {
log.debug("Socket: [" + this + "], Read: [" + (len - leftToWrite) + "]");
}

return len - leftToWrite;
}
}
Expand All @@ -1065,7 +1050,8 @@ public int read(boolean block, byte[] b, int off, int len) throws IOException {
@Override
public void unRead(ByteBuffer returnedInput) {
if (returnedInput != null) {
getSocket().getBufHandler().getReadBuffer().put(returnedInput);
socketBufferHandler.configureReadBufferForWrite();
socketBufferHandler.getReadBuffer().put(returnedInput);
}
}

Expand All @@ -1079,17 +1065,17 @@ public void close() throws IOException {
}


/* Callers of this method must:
* - have acquired the readPending semaphore
* - have acquired a lock on readCompletionHandler
*/
private int fillReadBuffer(boolean block) throws IOException {
ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer();
socketBufferHandler.configureReadBufferForWrite();
int nRead = 0;
if (block) {
readPending = true;
readBuffer.clear();
flipped = false;
try {
nRead = getSocket().read(readBuffer)
.get(getTimeout(), TimeUnit.MILLISECONDS).intValue();
readPending = false;
nRead = getSocket().read(socketBufferHandler.getReadBuffer()).get(
getTimeout(), TimeUnit.MILLISECONDS).intValue();
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
Expand All @@ -1103,15 +1089,12 @@ private int fillReadBuffer(boolean block) throws IOException {
throw ex;
}
} else {
readPending = true;
readBuffer.clear();
flipped = false;
Nio2Endpoint.startInline();
getSocket().read(readBuffer, getTimeout(), TimeUnit.MILLISECONDS,
getSocket().read(socketBufferHandler.getReadBuffer(), getTimeout(), TimeUnit.MILLISECONDS,
this, readCompletionHandler);
Nio2Endpoint.endInline();
if (!readPending) {
nRead = readBuffer.position();
if (readPending.availablePermits() == 1) {
nRead = socketBufferHandler.getReadBuffer().position();
}
}
return nRead;
Expand Down Expand Up @@ -1230,7 +1213,9 @@ private boolean flushNonBlocking(boolean hasPermit) {
writeCompletionHandler);
} else {
// Nothing was written
writePending.release();
if (!hasPermit) {
writePending.release();
}
}
}
return hasDataToWrite();
Expand All @@ -1250,19 +1235,21 @@ public boolean hasDataToWrite() {
@Override
public boolean isReadPending() {
synchronized (readCompletionHandler) {
return readPending;
return readPending.availablePermits() == 0;
}
}


@Override
public void registerReadInterest() {
synchronized (readCompletionHandler) {
if (readPending) {
readInterest = true;
} else {
if (readPending.tryAcquire()) {
readPending.release();

// If no read is pending, notify
getEndpoint().processSocket(this, SocketStatus.OPEN_READ, true);
} else {
readInterest = true;
}
}
}
Expand Down Expand Up @@ -1352,10 +1339,9 @@ public void awaitBytes(SocketWrapperBase<Nio2Channel> socket) {
if (socket == null || socket.getSocket() == null) {
return;
}
ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getReadBuffer();
byteBuffer.clear();
socket.getSocket().read(byteBuffer, socket.getTimeout(),
TimeUnit.MILLISECONDS, socket, awaitBytes);
socket.getSocket().getBufHandler().configureReadBufferForWrite();
socket.getSocket().read(socket.getSocket().getBufHandler().getReadBuffer(),
socket.getTimeout(), TimeUnit.MILLISECONDS, socket, awaitBytes);
}

public enum SendfileState {
Expand Down Expand Up @@ -1449,8 +1435,8 @@ public SendfileState processSendfile(Nio2SocketWrapper socket) {
return SendfileState.ERROR;
}
}
socket.getSocket().getBufHandler().configureReadBufferForWrite();
ByteBuffer buffer = socket.getSocket().getBufHandler().getWriteBuffer();
buffer.clear();
int nRead = -1;
try {
nRead = data.fchannel.read(buffer);
Expand All @@ -1459,10 +1445,10 @@ public SendfileState processSendfile(Nio2SocketWrapper socket) {
}

if (nRead >= 0) {
buffer.flip();
data.socket = socket;
data.buffer = buffer;
data.length -= nRead;
socket.getSocket().getBufHandler().configureReadBufferForRead();
socket.getSocket().write(buffer, socket.getTimeout(), TimeUnit.MILLISECONDS,
data, sendfile);
if (data.doneInline) {
Expand Down
7 changes: 0 additions & 7 deletions java/org/apache/tomcat/util/net/NioChannel.java
Expand Up @@ -64,13 +64,6 @@ public void reset() throws IOException {
this.sendFile = false;
}

public int getBufferSize() {
if ( bufHandler == null ) return 0;
int size = 0;
size += bufHandler.getReadBuffer().capacity();
size += bufHandler.getWriteBuffer().capacity();
return size;
}

/**
* Returns true if the network buffer has been flushed out and is empty.
Expand Down
6 changes: 4 additions & 2 deletions java/org/apache/tomcat/util/net/NioEndpoint.java
Expand Up @@ -1438,8 +1438,10 @@ public int read(boolean block, byte[] b, int off, int len)
newOffset += remaining;
}

// Fill the read buffer as best we can
int nRead = fillReadBuffer(block);
// Fill the read buffer as best we can. Only do a blocking read if
// the current read is blocking AND there wasn't any data left over
// in the read buffer.
int nRead = fillReadBuffer(block && remaining == 0);

// Full as much of the remaining byte array as possible with the
// data that was just read
Expand Down

0 comments on commit a9c95a3

Please sign in to comment.