Skip to content

Commit

Permalink
Investigating 57799 I found some issues in the SSL impl:
Browse files Browse the repository at this point in the history
- Possibly incomplete writes.
- After some errors (which shouldn't happen however), the pending flag could still be set.
- A bad idea to deal with possible behavior differences for read IO calls between NIO and NIO2 (according to my testing, it doesn't seem to be needed). Test: the byte counter.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1672297 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
rmaucher committed Apr 9, 2015
1 parent d1e2146 commit a3fc253
Showing 1 changed file with 34 additions and 71 deletions.
105 changes: 34 additions & 71 deletions java/org/apache/tomcat/util/net/SecureNio2Channel.java
Expand Up @@ -470,26 +470,29 @@ public void close(boolean force) throws IOException {
} }


private class FutureRead implements Future<Integer> { private class FutureRead implements Future<Integer> {
private ByteBuffer dst; private final ByteBuffer dst;
private final Future<Integer> integer;
public FutureRead(ByteBuffer dst) { public FutureRead(ByteBuffer dst) {
this.dst = dst; this.dst = dst;
this.integer = sc.read(netInBuffer);
} }
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
return false; readPending = false;
return integer.cancel(mayInterruptIfRunning);
} }
@Override @Override
public boolean isCancelled() { public boolean isCancelled() {
return false; return integer.isCancelled();
} }
@Override @Override
public boolean isDone() { public boolean isDone() {
return true; return integer.isDone();
} }
@Override @Override
public Integer get() throws InterruptedException, ExecutionException { public Integer get() throws InterruptedException, ExecutionException {
try { try {
return unwrap(netInBuffer.position()); return unwrap(integer.get().intValue());
} finally { } finally {
readPending = false; readPending = false;
} }
Expand All @@ -499,17 +502,17 @@ public Integer get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, throws InterruptedException, ExecutionException,
TimeoutException { TimeoutException {
try { try {
return unwrap(netInBuffer.position()); return unwrap(integer.get(timeout, unit).intValue());
} finally { } finally {
readPending = false; readPending = false;
} }
} }
protected Integer unwrap(int netread) throws ExecutionException { private Integer unwrap(int nRead) throws ExecutionException {
//are we in the middle of closing or closed? //are we in the middle of closing or closed?
if (closing || closed) if (closing || closed)
return Integer.valueOf(-1); return Integer.valueOf(-1);
//did we reach EOF? if so send EOF up one layer. //did we reach EOF? if so send EOF up one layer.
if (netread == -1) if (nRead < 0)
return Integer.valueOf(-1); return Integer.valueOf(-1);
//the data read //the data read
int read = 0; int read = 0;
Expand All @@ -526,16 +529,18 @@ protected Integer unwrap(int netread) throws ExecutionException {
} }
//compact the buffer //compact the buffer
netInBuffer.compact(); netInBuffer.compact();
if (unwrap.getStatus()==Status.OK || unwrap.getStatus()==Status.BUFFER_UNDERFLOW) { if (unwrap.getStatus() == Status.OK || unwrap.getStatus() == Status.BUFFER_UNDERFLOW) {
//we did receive some data, add it to our total //we did receive some data, add it to our total
read += unwrap.bytesProduced(); read += unwrap.bytesProduced();
//perform any tasks if needed //perform any tasks if needed
if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK) if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
tasks(); tasks();
}
//if we need more network data, then bail out for now. //if we need more network data, then bail out for now.
if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW) if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW) {
break; break;
} else if (unwrap.getStatus()==Status.BUFFER_OVERFLOW && read > 0) { }
} else if (unwrap.getStatus() == Status.BUFFER_OVERFLOW && read > 0) {
//buffer overflow can happen, if we have read data, then //buffer overflow can happen, if we have read data, then
//empty out the dst buffer before we do another read //empty out the dst buffer before we do another read
break; break;
Expand All @@ -550,46 +555,6 @@ protected Integer unwrap(int netread) throws ExecutionException {
} }
} }


private class FutureNetRead extends FutureRead {
private Future<Integer> integer;
protected FutureNetRead(ByteBuffer dst) {
super(dst);
this.integer = sc.read(netInBuffer);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return integer.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return integer.isCancelled();
}
@Override
public boolean isDone() {
return integer.isDone();
}
@Override
public Integer get() throws InterruptedException, ExecutionException {
try {
int netread = integer.get().intValue();
return unwrap(netread);
} finally {
readPending = false;
}
}
@Override
public Integer get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
try {
int netread = integer.get(timeout, unit).intValue();
return unwrap(netread);
} finally {
readPending = false;
}
}
}

/** /**
* Reads a sequence of bytes from this channel into the given buffer. * Reads a sequence of bytes from this channel into the given buffer.
* *
Expand All @@ -599,19 +564,15 @@ public Integer get(long timeout, TimeUnit unit)
*/ */
@Override @Override
public Future<Integer> read(ByteBuffer dst) { public Future<Integer> read(ByteBuffer dst) {
if (!handshakeComplete) {
throw new IllegalStateException(sm.getString("channel.nio.ssl.incompleteHandshake"));
}
if (readPending) { if (readPending) {
throw new ReadPendingException(); throw new ReadPendingException();
} else { } else {
readPending = true; readPending = true;
} }
//did we finish our handshake? return new FutureRead(dst);
if (!handshakeComplete)
throw new IllegalStateException(sm.getString("channel.nio.ssl.incompleteHandshake"));
if (netInBuffer.position() > 0) {
return new FutureRead(dst);
} else {
return new FutureNetRead(dst);
}
} }


private class FutureWrite implements Future<Integer> { private class FutureWrite implements Future<Integer> {
Expand All @@ -630,6 +591,7 @@ protected FutureWrite(ByteBuffer src) {
} }
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
writePending = false;
return integer.cancel(mayInterruptIfRunning); return integer.cancel(mayInterruptIfRunning);
} }
@Override @Override
Expand All @@ -649,6 +611,9 @@ public Integer get() throws InterruptedException, ExecutionException {
if (integer.get().intValue() > 0 && written == 0) { if (integer.get().intValue() > 0 && written == 0) {
wrap(); wrap();
return get(); return get();
} else if (netOutBuffer.hasRemaining()) {
integer = sc.write(netOutBuffer);
return get();
} else { } else {
writePending = false; writePending = false;
return Integer.valueOf(written); return Integer.valueOf(written);
Expand All @@ -665,6 +630,9 @@ public Integer get(long timeout, TimeUnit unit)
if (integer.get(timeout, unit).intValue() > 0 && written == 0) { if (integer.get(timeout, unit).intValue() > 0 && written == 0) {
wrap(); wrap();
return get(timeout, unit); return get(timeout, unit);
} else if (netOutBuffer.hasRemaining()) {
integer = sc.write(netOutBuffer);
return get(timeout, unit);
} else { } else {
writePending = false; writePending = false;
return Integer.valueOf(written); return Integer.valueOf(written);
Expand Down Expand Up @@ -739,8 +707,9 @@ public void completed(Integer nBytes, A attach) {
if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK) if (unwrap.getHandshakeStatus() == HandshakeStatus.NEED_TASK)
tasks(); tasks();
//if we need more network data, then bail out for now. //if we need more network data, then bail out for now.
if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW) if (unwrap.getStatus() == Status.BUFFER_UNDERFLOW) {
break; break;
}
} else if (unwrap.getStatus() == Status.BUFFER_OVERFLOW && read > 0) { } else if (unwrap.getStatus() == Status.BUFFER_OVERFLOW && read > 0) {
//buffer overflow can happen, if we have read data, then //buffer overflow can happen, if we have read data, then
//empty out the dst buffer before we do another read //empty out the dst buffer before we do another read
Expand Down Expand Up @@ -776,21 +745,15 @@ public <A> void read(final ByteBuffer dst,
handler.completed(Integer.valueOf(-1), attachment); handler.completed(Integer.valueOf(-1), attachment);
return; return;
} }
if (readPending) {
throw new ReadPendingException();
} else {
readPending = true;
}
if (!handshakeComplete) { if (!handshakeComplete) {
throw new IllegalStateException(sm.getString("channel.nio.ssl.incompleteHandshake")); throw new IllegalStateException(sm.getString("channel.nio.ssl.incompleteHandshake"));
} }

if (readPending) {
ReadCompletionHandler<A> readCompletionHandler = new ReadCompletionHandler<>(dst, handler); throw new ReadPendingException();
if (netInBuffer.position() > 0 ) {
readCompletionHandler.completed(Integer.valueOf(netInBuffer.position()), attachment);
} else { } else {
sc.read(netInBuffer, timeout, unit, attachment, readCompletionHandler); readPending = true;
} }
sc.read(netInBuffer, timeout, unit, attachment, new ReadCompletionHandler<>(dst, handler));
} }


// TODO: Possible optimization for scatter // TODO: Possible optimization for scatter
Expand Down

0 comments on commit a3fc253

Please sign in to comment.