From e2c5671b3ab1710754c278660f941411fb66a798 Mon Sep 17 00:00:00 2001 From: remm Date: Tue, 14 May 2019 11:35:20 +0200 Subject: [PATCH] Avoid a blocking write of the internal buffer When using this API, this IO block is extremely unlikely to occur, but it still breaks the API contract, so fix it as NIO makes it rather easy. With this fix the functionality now seems "final" to me. Shuffle around code to make read and write identical, and remove common code. I will now continue refactoring NIO2 and APR based on the template to see if I can move some code to SocketWrapperBase. --- .../apache/tomcat/util/net/NioEndpoint.java | 138 +++++++----------- webapps/docs/changelog.xml | 4 + 2 files changed, 54 insertions(+), 88 deletions(-) diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 0831273d641b..a6f0f62a5e30 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -1503,11 +1503,32 @@ public void run() { return; } if (read) { - nBytes = getSocket().read(buffers, offset, length); - updateLastRead(); + // Read from main buffer first + if (!socketBufferHandler.isReadBufferEmpty()) { + // There is still data inside the main read buffer, it needs to be read first + socketBufferHandler.configureReadBufferForRead(); + for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) { + nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]); + } + } + if (nBytes == 0) { + nBytes = getSocket().read(buffers, offset, length); + updateLastRead(); + } } else { - nBytes = getSocket().write(buffers, offset, length); - updateLastWrite(); + boolean doWrite = true; + // Write from main buffer first + if (!socketBufferHandler.isWriteBufferEmpty()) { + // There is still data inside the main write buffer, it needs to be written first + doWrite(false); + if (!socketBufferHandler.isWriteBufferEmpty()) { + doWrite = false; + } + } + if (doWrite) { + nBytes = getSocket().write(buffers, offset, length); + updateLastWrite(); + } } if (nBytes != 0) { completionDone = false; @@ -1543,90 +1564,37 @@ public void run() { public CompletionState read(ByteBuffer[] dsts, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler handler) { - IOException ioe = getError(); - if (ioe != null) { - handler.failed(ioe, attachment); - return CompletionState.ERROR; - } - if (timeout == -1) { - timeout = toTimeout(getReadTimeout()); - } else if (unit.toMillis(timeout) != getReadTimeout()) { - setReadTimeout(unit.toMillis(timeout)); - } - if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { - try { - if (!readPending.tryAcquire(timeout, unit)) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } else { - if (!readPending.tryAcquire()) { - if (block == BlockingMode.NON_BLOCK) { - return CompletionState.NOT_DONE; - } else { - handler.failed(new ReadPendingException(), attachment); - return CompletionState.ERROR; - } - } - } - VectoredIOCompletionHandler completion = new VectoredIOCompletionHandler<>(); - OperationState state = new OperationState<>(true, dsts, offset, length, block, - attachment, check, handler, readPending, completion); - readOperation = state; - long nBytes = 0; - if (!socketBufferHandler.isReadBufferEmpty()) { - // There is still data inside the main read buffer, use it to fill out the destination buffers - // Note: It is not necessary to put this code in the completion handler - socketBufferHandler.configureReadBufferForRead(); - for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) { - nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]); - } - if (nBytes > 0) { - completion.completed(Long.valueOf(nBytes), state); - } - } - if (nBytes == 0) { - state.run(); - } - if (block == BlockingMode.BLOCK) { - synchronized (state) { - if (state.state == CompletionState.PENDING) { - try { - state.wait(unit.toMillis(timeout)); - if (state.state == CompletionState.PENDING) { - return CompletionState.ERROR; - } - } catch (InterruptedException e) { - handler.failed(new SocketTimeoutException(), attachment); - return CompletionState.ERROR; - } - } - } - } - return state.state; + return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler); } @Override public CompletionState write(ByteBuffer[] srcs, int offset, int length, BlockingMode block, long timeout, TimeUnit unit, A attachment, CompletionCheck check, CompletionHandler handler) { + return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler); + } + + private CompletionState readOrWrite(boolean read, + ByteBuffer[] buffers, int offset, int length, + BlockingMode block, long timeout, TimeUnit unit, A attachment, + CompletionCheck check, CompletionHandler handler) { IOException ioe = getError(); if (ioe != null) { handler.failed(ioe, attachment); return CompletionState.ERROR; } if (timeout == -1) { - timeout = toTimeout(getWriteTimeout()); - } else if (unit.toMillis(timeout) != getWriteTimeout()) { - setWriteTimeout(unit.toMillis(timeout)); + timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout()); + } else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) { + if (read) { + setReadTimeout(unit.toMillis(timeout)); + } else { + setWriteTimeout(unit.toMillis(timeout)); + } } if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { - if (!writePending.tryAcquire(timeout, unit)) { + if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); return CompletionState.ERROR; } @@ -1635,29 +1603,23 @@ public CompletionState write(ByteBuffer[] srcs, int offset, int length, return CompletionState.ERROR; } } else { - if (!writePending.tryAcquire()) { + if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) { if (block == BlockingMode.NON_BLOCK) { return CompletionState.NOT_DONE; } else { - handler.failed(new WritePendingException(), attachment); + handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment); return CompletionState.ERROR; } } } - if (!socketBufferHandler.isWriteBufferEmpty()) { - // First flush the main buffer as needed - try { - doWrite(true); - } catch (IOException e) { - handler.failed(e, attachment); - return CompletionState.ERROR; - } - } VectoredIOCompletionHandler completion = new VectoredIOCompletionHandler<>(); - OperationState state = new OperationState<>(false, srcs, offset, length, block, - attachment, check, handler, writePending, completion); - writeOperation = state; - // It should be less necessary to check the buffer state as it is easy to flush before + OperationState state = new OperationState<>(read, buffers, offset, length, block, + attachment, check, handler, read ? readPending : writePending, completion); + if (read) { + readOperation = state; + } else { + writeOperation = state; + } state.run(); if (block == BlockingMode.BLOCK) { synchronized (state) { diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index c8f851517c56..92f2aa0da705 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -100,6 +100,10 @@ Add async IO for APR connector for consistency, but disable it by default due to low performance. (remm) + + Avoid blocking write of internal buffer for NIO when using async IO. + (remm) +