diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 0831273d641b..a6f0f62a5e30 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -1503,11 +1503,32 @@ public void run() { return; } if (read) { - nBytes = getSocket().read(buffers, offset, length); - updateLastRead(); + // Read from main buffer first + if (!socketBufferHandler.isReadBufferEmpty()) { + // There is still data inside the main read buffer, it needs to be read first + socketBufferHandler.configureReadBufferForRead(); + for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) { + nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]); + } + } + if (nBytes == 0) { + nBytes = getSocket().read(buffers, offset, length); + updateLastRead(); + } } else { - nBytes = getSocket().write(buffers, offset, length); - updateLastWrite(); + boolean doWrite = true; + // Write from main buffer first + if (!socketBufferHandler.isWriteBufferEmpty()) { + // There is still data inside the main write buffer, it needs to be written first + doWrite(false); + if (!socketBufferHandler.isWriteBufferEmpty()) { + doWrite = false; + } + } + if (doWrite) { + nBytes = getSocket().write(buffers, offset, length); + updateLastWrite(); + } } if (nBytes != 0) { completionDone = false; @@ -1543,90 +1564,37 @@ public void run() { public CompletionState read(ByteBuffer[] dsts, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler handler) { - IOException ioe = getError(); - if (ioe != null) { - handler.failed(ioe, attachment); - return CompletionState.ERROR; - } - if (timeout == -1) { - timeout = toTimeout(getReadTimeout()); - } else if (unit.toMillis(timeout) != getReadTimeout()) { - setReadTimeout(unit.toMillis(timeout)); - } - if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { - try { - if (!readPending.tryAcquire(timeout, unit)) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } else { - if (!readPending.tryAcquire()) { - if (block == BlockingMode.NON_BLOCK) { - return CompletionState.NOT_DONE; - } else { - handler.failed(new ReadPendingException(), attachment); - return CompletionState.ERROR; - } - } - } - VectoredIOCompletionHandler completion = new VectoredIOCompletionHandler<>(); - OperationState state = new OperationState<>(true, dsts, offset, length, block, - attachment, check, handler, readPending, completion); - readOperation = state; - long nBytes = 0; - if (!socketBufferHandler.isReadBufferEmpty()) { - // There is still data inside the main read buffer, use it to fill out the destination buffers - // Note: It is not necessary to put this code in the completion handler - socketBufferHandler.configureReadBufferForRead(); - for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) { - nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]); - } - if (nBytes > 0) { - completion.completed(Long.valueOf(nBytes), state); - } - } - if (nBytes == 0) { - state.run(); - } - if (block == BlockingMode.BLOCK) { - synchronized (state) { - if (state.state == CompletionState.PENDING) { - try { - state.wait(unit.toMillis(timeout)); - if (state.state == CompletionState.PENDING) { - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } - } - } - return state.state; + return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler); } @Override public CompletionState write(ByteBuffer[] srcs, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler handler) { + return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler); + } + + private CompletionState readOrWrite(boolean read, + ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler handler) { IOException ioe = getError(); if (ioe != null) { handler.failed(ioe, attachment); return CompletionState.ERROR; } if (timeout == -1) { - timeout = toTimeout(getWriteTimeout()); - } else if (unit.toMillis(timeout) != getWriteTimeout()) { - setWriteTimeout(unit.toMillis(timeout)); + timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout()); + } else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) { + if (read) { + setReadTimeout(unit.toMillis(timeout)); + } else { + setWriteTimeout(unit.toMillis(timeout)); + } } if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { - if (!writePending.tryAcquire(timeout, unit)) { + if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); return CompletionState.ERROR; } @@ -1635,29 +1603,23 @@ public CompletionState write(ByteBuffer[] srcs, int offset, int length, return CompletionState.ERROR; } } else { - if (!writePending.tryAcquire()) { + if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) { if (block == BlockingMode.NON_BLOCK) { return CompletionState.NOT_DONE; } else { - handler.failed(new WritePendingException(), attachment); + handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment); return CompletionState.ERROR; } } } - if (!socketBufferHandler.isWriteBufferEmpty()) { - // First flush the main buffer as needed - try { - doWrite(true); - } catch (IOException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } VectoredIOCompletionHandler completion = new VectoredIOCompletionHandler<>(); - OperationState state = new OperationState<>(false, srcs, offset, length, block, - attachment, check, handler, writePending, completion); - writeOperation = state; - // It should be less necessary to check the buffer state as it is easy to flush before + OperationState state = new OperationState<>(read, buffers, offset, length, block, + attachment, check, handler, read ? readPending : writePending, completion); + if (read) { + readOperation = state; + } else { + writeOperation = state; + } state.run(); if (block == BlockingMode.BLOCK) { synchronized (state) { diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index c8f851517c56..92f2aa0da705 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -100,6 +100,10 @@ Add async IO for APR connector for consistency, but disable it by default due to low performance. (remm) + + Avoid blocking write of internal buffer for NIO when using async IO. + (remm) +