Skip to content

Commit

Permalink
Rework notify to handle nested inline completion handlers correctly.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1650285 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jan 8, 2015
1 parent ba36136 commit 852991e
Showing 1 changed file with 27 additions and 22 deletions.
49 changes: 27 additions & 22 deletions java/org/apache/tomcat/util/net/Nio2Endpoint.java
Expand Up @@ -714,6 +714,13 @@ private void closeSocket(AsynchronousSocketChannel socket) {


public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> { public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> {


private static final ThreadLocal<Boolean> writeCompletionInProgress = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};

private SendfileData sendfileData = null; private SendfileData sendfileData = null;
private boolean upgradeInit = false; private boolean upgradeInit = false;


Expand All @@ -726,6 +733,7 @@ public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> {
private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler; private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler;
private final Semaphore writePending = new Semaphore(1); private final Semaphore writePending = new Semaphore(1);
private volatile boolean writeInterest = true; private volatile boolean writeInterest = true;
private boolean writeNotify = false;




public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) { public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) {
Expand Down Expand Up @@ -774,11 +782,12 @@ public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
this.writeCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() { this.writeCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
public void completed(Integer nBytes, ByteBuffer attachment) { public void completed(Integer nBytes, ByteBuffer attachment) {
boolean notify = false; writeNotify = false;
synchronized (writeCompletionHandler) { synchronized (writeCompletionHandler) {
if (nBytes.intValue() < 0) { if (nBytes.intValue() < 0) {
failed(new EOFException(sm.getString("iob.failedwrite")), attachment); failed(new EOFException(sm.getString("iob.failedwrite")), attachment);
} else if (Nio2SocketWrapper.this.bufferedWrites.size() > 0) { } else if (Nio2SocketWrapper.this.bufferedWrites.size() > 0) {
writeCompletionInProgress.set(Boolean.TRUE);
// Continue writing data using a gathering write // Continue writing data using a gathering write
ArrayList<ByteBuffer> arrayList = new ArrayList<>(); ArrayList<ByteBuffer> arrayList = new ArrayList<>();
if (attachment.hasRemaining()) { if (attachment.hasRemaining()) {
Expand All @@ -793,22 +802,25 @@ public void completed(Integer nBytes, ByteBuffer attachment) {
Nio2SocketWrapper.this.getSocket().write(array, 0, array.length, Nio2SocketWrapper.this.getSocket().write(array, 0, array.length,
Nio2SocketWrapper.this.getTimeout(), TimeUnit.MILLISECONDS, Nio2SocketWrapper.this.getTimeout(), TimeUnit.MILLISECONDS,
array, gatheringWriteCompletionHandler); array, gatheringWriteCompletionHandler);
writeCompletionInProgress.set(Boolean.FALSE);
} else if (attachment.hasRemaining()) { } else if (attachment.hasRemaining()) {
// Regular write // Regular write
writeCompletionInProgress.set(Boolean.TRUE);
Nio2SocketWrapper.this.getSocket().write(attachment, Nio2SocketWrapper.this.getTimeout(), Nio2SocketWrapper.this.getSocket().write(attachment, Nio2SocketWrapper.this.getTimeout(),
TimeUnit.MILLISECONDS, attachment, writeCompletionHandler); TimeUnit.MILLISECONDS, attachment, writeCompletionHandler);
writeCompletionInProgress.set(Boolean.FALSE);
} else { } else {
// All data has been written // All data has been written
if (writeInterest && !Nio2Endpoint.isInline()) { if (writeInterest) {
writeInterest = false; writeInterest = false;
notify = true; writeNotify = true;
} }
writePending.release(); writePending.release();
socketWriteBuffer.clear(); socketWriteBuffer.clear();
writeBufferFlipped = false; writeBufferFlipped = false;
} }
} }
if (notify) { if (writeNotify && !writeCompletionInProgress.get().booleanValue()) {
endpoint.processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, false); endpoint.processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, false);
} }
} }
Expand All @@ -830,12 +842,13 @@ public void failed(Throwable exc, ByteBuffer attachment) {
gatheringWriteCompletionHandler = new CompletionHandler<Long, ByteBuffer[]>() { gatheringWriteCompletionHandler = new CompletionHandler<Long, ByteBuffer[]>() {
@Override @Override
public void completed(Long nBytes, ByteBuffer[] attachment) { public void completed(Long nBytes, ByteBuffer[] attachment) {
boolean notify = false; writeNotify = false;
synchronized (writeCompletionHandler) { synchronized (writeCompletionHandler) {
if (nBytes.longValue() < 0) { if (nBytes.longValue() < 0) {
failed(new EOFException(sm.getString("iob.failedwrite")), attachment); failed(new EOFException(sm.getString("iob.failedwrite")), attachment);
} else if (Nio2SocketWrapper.this.bufferedWrites.size() > 0 || arrayHasData(attachment)) { } else if (Nio2SocketWrapper.this.bufferedWrites.size() > 0 || arrayHasData(attachment)) {
// Continue writing data // Continue writing data
writeCompletionInProgress.set(Boolean.TRUE);
ArrayList<ByteBuffer> arrayList = new ArrayList<>(); ArrayList<ByteBuffer> arrayList = new ArrayList<>();
for (ByteBuffer buffer : attachment) { for (ByteBuffer buffer : attachment) {
if (buffer.hasRemaining()) { if (buffer.hasRemaining()) {
Expand All @@ -851,18 +864,19 @@ public void completed(Long nBytes, ByteBuffer[] attachment) {
Nio2SocketWrapper.this.getSocket().write(array, 0, array.length, Nio2SocketWrapper.this.getSocket().write(array, 0, array.length,
Nio2SocketWrapper.this.getTimeout(), TimeUnit.MILLISECONDS, Nio2SocketWrapper.this.getTimeout(), TimeUnit.MILLISECONDS,
array, gatheringWriteCompletionHandler); array, gatheringWriteCompletionHandler);
writeCompletionInProgress.set(Boolean.FALSE);
} else { } else {
// All data has been written // All data has been written
if (writeInterest && !Nio2Endpoint.isInline()) { if (writeInterest) {
writeInterest = false; writeInterest = false;
notify = true; writeNotify = true;
} }
writePending.release(); writePending.release();
socketWriteBuffer.clear(); socketWriteBuffer.clear();
writeBufferFlipped = false; writeBufferFlipped = false;
} }
} }
if (notify) { if (writeNotify && !writeCompletionInProgress.get().booleanValue()) {
endpoint.processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, false); endpoint.processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, false);
} }
} }
Expand Down Expand Up @@ -1115,8 +1129,8 @@ public void write(boolean block, byte[] buf, int off, int len) throws IOExceptio
// Could be "smart" with coordination with the main CoyoteOutputStream to // Could be "smart" with coordination with the main CoyoteOutputStream to
// indicate the end of a write // indicate the end of a write
// Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
if (writePending.tryAcquire()) { synchronized (writeCompletionHandler) {
synchronized (writeCompletionHandler) { if (writePending.tryAcquire()) {
// No pending completion handler, so writing to the main buffer // No pending completion handler, so writing to the main buffer
// is possible // is possible
int thisTime = transfer(buf, off, len, socketWriteBuffer); int thisTime = transfer(buf, off, len, socketWriteBuffer);
Expand All @@ -1127,9 +1141,7 @@ public void write(boolean block, byte[] buf, int off, int len) throws IOExceptio
addToBuffers(buf, off, len); addToBuffers(buf, off, len);
} }
flush(false, true); flush(false, true);
} } else {
} else {
synchronized (writeCompletionHandler) {
addToBuffers(buf, off, len); addToBuffers(buf, off, len);
} }
} }
Expand Down Expand Up @@ -1212,7 +1224,6 @@ protected boolean flush(boolean block, boolean hasPermit) throws IOException {
socketWriteBuffer.flip(); socketWriteBuffer.flip();
writeBufferFlipped = true; writeBufferFlipped = true;
} }
Nio2Endpoint.startInline();
if (bufferedWrites.size() > 0) { if (bufferedWrites.size() > 0) {
// Gathering write of the main buffer plus all leftovers // Gathering write of the main buffer plus all leftovers
ArrayList<ByteBuffer> arrayList = new ArrayList<>(); ArrayList<ByteBuffer> arrayList = new ArrayList<>();
Expand All @@ -1235,7 +1246,6 @@ protected boolean flush(boolean block, boolean hasPermit) throws IOException {
// Nothing was written // Nothing was written
writePending.release(); writePending.release();
} }
Nio2Endpoint.endInline();
if (writePending.availablePermits() > 0) { if (writePending.availablePermits() > 0) {
if (socketWriteBuffer.remaining() == 0) { if (socketWriteBuffer.remaining() == 0) {
socketWriteBuffer.clear(); socketWriteBuffer.clear();
Expand Down Expand Up @@ -1475,13 +1485,8 @@ public SendfileState processSendfile(Nio2SocketWrapper socket) {
data.socket = socket; data.socket = socket;
data.buffer = buffer; data.buffer = buffer;
data.length -= nRead; data.length -= nRead;
startInline(); socket.getSocket().write(buffer, socket.getTimeout(), TimeUnit.MILLISECONDS,
try { data, sendfile);
socket.getSocket().write(buffer, socket.getTimeout(), TimeUnit.MILLISECONDS,
data, sendfile);
} finally {
endInline();
}
if (data.doneInline) { if (data.doneInline) {
if (data.error) { if (data.error) {
return SendfileState.ERROR; return SendfileState.ERROR;
Expand Down

0 comments on commit 852991e

Please sign in to comment.