Skip to content

Commit

Permalink
Avoid a blocking write of the internal buffer
Browse files Browse the repository at this point in the history
When using this API, this IO block is extremely unlikely to occur, but
it still breaks the API contract, so fix it as NIO makes it rather easy.
With this fix the functionality now seems "final" to me.
Shuffle around code to make read and write identical, and remove common
code. I will now continue refactoring NIO2 and APR based on the template
to see if I can move some code to SocketWrapperBase.
  • Loading branch information
rmaucher committed May 14, 2019
1 parent 58a95c6 commit e2c5671
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 88 deletions.
138 changes: 50 additions & 88 deletions java/org/apache/tomcat/util/net/NioEndpoint.java
Expand Up @@ -1503,11 +1503,32 @@ public void run() {
return;
}
if (read) {
nBytes = getSocket().read(buffers, offset, length);
updateLastRead();
// Read from main buffer first
if (!socketBufferHandler.isReadBufferEmpty()) {
// There is still data inside the main read buffer, it needs to be read first
socketBufferHandler.configureReadBufferForRead();
for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
}
}
if (nBytes == 0) {
nBytes = getSocket().read(buffers, offset, length);
updateLastRead();
}
} else {
nBytes = getSocket().write(buffers, offset, length);
updateLastWrite();
boolean doWrite = true;
// Write from main buffer first
if (!socketBufferHandler.isWriteBufferEmpty()) {
// There is still data inside the main write buffer, it needs to be written first
doWrite(false);
if (!socketBufferHandler.isWriteBufferEmpty()) {
doWrite = false;
}
}
if (doWrite) {
nBytes = getSocket().write(buffers, offset, length);
updateLastWrite();
}
}
if (nBytes != 0) {
completionDone = false;
Expand Down Expand Up @@ -1543,90 +1564,37 @@ public void run() {
public <A> CompletionState read(ByteBuffer[] dsts, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
IOException ioe = getError();
if (ioe != null) {
handler.failed(ioe, attachment);
return CompletionState.ERROR;
}
if (timeout == -1) {
timeout = toTimeout(getReadTimeout());
} else if (unit.toMillis(timeout) != getReadTimeout()) {
setReadTimeout(unit.toMillis(timeout));
}
if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
try {
if (!readPending.tryAcquire(timeout, unit)) {
handler.failed(new SocketTimeoutException(), attachment);
return CompletionState.ERROR;
}
} catch (InterruptedException e) {
handler.failed(e, attachment);
return CompletionState.ERROR;
}
} else {
if (!readPending.tryAcquire()) {
if (block == BlockingMode.NON_BLOCK) {
return CompletionState.NOT_DONE;
} else {
handler.failed(new ReadPendingException(), attachment);
return CompletionState.ERROR;
}
}
}
VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
OperationState<A> state = new OperationState<>(true, dsts, offset, length, block,
attachment, check, handler, readPending, completion);
readOperation = state;
long nBytes = 0;
if (!socketBufferHandler.isReadBufferEmpty()) {
// There is still data inside the main read buffer, use it to fill out the destination buffers
// Note: It is not necessary to put this code in the completion handler
socketBufferHandler.configureReadBufferForRead();
for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
nBytes += transfer(socketBufferHandler.getReadBuffer(), dsts[offset + i]);
}
if (nBytes > 0) {
completion.completed(Long.valueOf(nBytes), state);
}
}
if (nBytes == 0) {
state.run();
}
if (block == BlockingMode.BLOCK) {
synchronized (state) {
if (state.state == CompletionState.PENDING) {
try {
state.wait(unit.toMillis(timeout));
if (state.state == CompletionState.PENDING) {
return CompletionState.ERROR;
}
} catch (InterruptedException e) {
handler.failed(new SocketTimeoutException(), attachment);
return CompletionState.ERROR;
}
}
}
}
return state.state;
return readOrWrite(true, dsts, offset, length, block, timeout, unit, attachment, check, handler);
}

@Override
public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
return readOrWrite(false, srcs, offset, length, block, timeout, unit, attachment, check, handler);
}

private <A> CompletionState readOrWrite(boolean read,
ByteBuffer[] buffers, int offset, int length,
BlockingMode block, long timeout, TimeUnit unit, A attachment,
CompletionCheck check, CompletionHandler<Long, ? super A> handler) {
IOException ioe = getError();
if (ioe != null) {
handler.failed(ioe, attachment);
return CompletionState.ERROR;
}
if (timeout == -1) {
timeout = toTimeout(getWriteTimeout());
} else if (unit.toMillis(timeout) != getWriteTimeout()) {
setWriteTimeout(unit.toMillis(timeout));
timeout = toTimeout(read ? getReadTimeout() : getWriteTimeout());
} else if (unit.toMillis(timeout) != (read ? getReadTimeout() : getWriteTimeout())) {
if (read) {
setReadTimeout(unit.toMillis(timeout));
} else {
setWriteTimeout(unit.toMillis(timeout));
}
}
if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) {
try {
if (!writePending.tryAcquire(timeout, unit)) {
if (read ? !readPending.tryAcquire(timeout, unit) : !writePending.tryAcquire(timeout, unit)) {
handler.failed(new SocketTimeoutException(), attachment);
return CompletionState.ERROR;
}
Expand All @@ -1635,29 +1603,23 @@ public <A> CompletionState write(ByteBuffer[] srcs, int offset, int length,
return CompletionState.ERROR;
}
} else {
if (!writePending.tryAcquire()) {
if (read ? !readPending.tryAcquire() : !writePending.tryAcquire()) {
if (block == BlockingMode.NON_BLOCK) {
return CompletionState.NOT_DONE;
} else {
handler.failed(new WritePendingException(), attachment);
handler.failed(read ? new ReadPendingException() : new WritePendingException(), attachment);
return CompletionState.ERROR;
}
}
}
if (!socketBufferHandler.isWriteBufferEmpty()) {
// First flush the main buffer as needed
try {
doWrite(true);
} catch (IOException e) {
handler.failed(e, attachment);
return CompletionState.ERROR;
}
}
VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>();
OperationState<A> state = new OperationState<>(false, srcs, offset, length, block,
attachment, check, handler, writePending, completion);
writeOperation = state;
// It should be less necessary to check the buffer state as it is easy to flush before
OperationState<A> state = new OperationState<>(read, buffers, offset, length, block,
attachment, check, handler, read ? readPending : writePending, completion);
if (read) {
readOperation = state;
} else {
writeOperation = state;
}
state.run();
if (block == BlockingMode.BLOCK) {
synchronized (state) {
Expand Down
4 changes: 4 additions & 0 deletions webapps/docs/changelog.xml
Expand Up @@ -100,6 +100,10 @@
Add async IO for APR connector for consistency, but disable it by
default due to low performance. (remm)
</add>
<fix>
Avoid blocking write of internal buffer for NIO when using async IO.
(remm)
</fix>
</changelog>
</subsection>
<subsection name="Other">
Expand Down

0 comments on commit e2c5671

Please sign in to comment.