Skip to content

Commit

Permalink
Ensure ReadListener.onError() is fired if client drops the connection
Browse files Browse the repository at this point in the history
  • Loading branch information
markt-asf committed Feb 18, 2021
1 parent 16d4beb commit 86ccc43
Show file tree
Hide file tree
Showing 4 changed files with 388 additions and 14 deletions.
34 changes: 21 additions & 13 deletions java/org/apache/coyote/http11/Http11InputBuffer.java
Expand Up @@ -761,11 +761,13 @@ void init(SocketWrapperBase<?> socketWrapper) {
private boolean fill(boolean block) throws IOException {

if (log.isDebugEnabled()) {
log.debug("Before fill(): [" + parsingHeader +
log.debug("Before fill(): parsingHeader: [" + parsingHeader +
"], parsingRequestLine: [" + parsingRequestLine +
"], parsingRequestLinePhase: [" + parsingRequestLinePhase +
"], parsingRequestLineStart: [" + parsingRequestLineStart +
"], byteBuffer.position() [" + byteBuffer.position() + "]");
"], byteBuffer.position(): [" + byteBuffer.position() +
"], byteBuffer.limit(): [" + byteBuffer.limit() +
"], end: [" + end + "]");
}

if (parsingHeader) {
Expand All @@ -780,19 +782,25 @@ private boolean fill(boolean block) throws IOException {
byteBuffer.limit(end).position(end);
}

byteBuffer.mark();
if (byteBuffer.position() < byteBuffer.limit()) {
byteBuffer.position(byteBuffer.limit());
}
byteBuffer.limit(byteBuffer.capacity());
SocketWrapperBase<?> socketWrapper = this.wrapper;
int nRead = -1;
if (socketWrapper != null) {
nRead = socketWrapper.read(block, byteBuffer);
} else {
throw new CloseNowException(sm.getString("iib.eof.error"));
byteBuffer.mark();
try {
if (byteBuffer.position() < byteBuffer.limit()) {
byteBuffer.position(byteBuffer.limit());
}
byteBuffer.limit(byteBuffer.capacity());
SocketWrapperBase<?> socketWrapper = this.wrapper;
if (socketWrapper != null) {
nRead = socketWrapper.read(block, byteBuffer);
} else {
throw new CloseNowException(sm.getString("iib.eof.error"));
}
} finally {
// Ensure that the buffer limit and position are returned to a
// consistent "ready for read" state if an error occurs during in
// the above code block.
byteBuffer.limit(byteBuffer.position()).reset();
}
byteBuffer.limit(byteBuffer.position()).reset();

if (log.isDebugEnabled()) {
log.debug("Received ["
Expand Down
170 changes: 169 additions & 1 deletion test/org/apache/catalina/core/TestAsyncContextImpl.java
Expand Up @@ -17,6 +17,7 @@
package org.apache.catalina.core;

import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -866,7 +867,7 @@ public void run() {
}
}

private static class TrackingListener implements AsyncListener {
public static class TrackingListener implements AsyncListener {

private final boolean completeOnError;
private final boolean completeOnTimeout;
Expand Down Expand Up @@ -3016,4 +3017,171 @@ public void run() {
}
}


/*
* Tests an error on an async thread when the client closes the connection
* before fully writing the request body.
*
* Required sequence is:
* - enter Servlet's service() method
* - startAsync()
* - start async thread
* - read partial body
* - close client connection
* - read on async thread -> I/O error
* - exit Servlet's service() method
*
* This test makes extensive use of instance fields in the Servlet that
* would normally be considered very poor practice. It is only safe in this
* test as the Servlet only processes a single request.
*/
@Test
public void testCanceledPost() throws Exception {
CountDownLatch partialReadLatch = new CountDownLatch(1);
CountDownLatch clientCloseLatch = new CountDownLatch(1);
CountDownLatch threadCompleteLatch = new CountDownLatch(1);

AtomicBoolean testFailed = new AtomicBoolean(true);

// Setup Tomcat instance
Tomcat tomcat = getTomcatInstance();

// No file system docBase required
Context ctx = tomcat.addContext("", null);

PostServlet postServlet = new PostServlet(partialReadLatch, clientCloseLatch, threadCompleteLatch, testFailed);
Wrapper wrapper = Tomcat.addServlet(ctx, "postServlet", postServlet);
wrapper.setAsyncSupported(true);
ctx.addServletMappingDecoded("/*", "postServlet");

tomcat.start();

PostClient client = new PostClient();
client.setPort(getPort());
client.setRequest(new String[] { "POST / HTTP/1.1" + SimpleHttpClient.CRLF +
"Host: localhost:" + SimpleHttpClient.CRLF +
"Content-Length: 100" + SimpleHttpClient.CRLF +
SimpleHttpClient.CRLF +
"This is 16 bytes"
});
client.connect();
client.sendRequest();

// Wait server to read partial request body
partialReadLatch.await();

client.disconnect();

clientCloseLatch.countDown();

threadCompleteLatch.await();

Assert.assertFalse(testFailed.get());
}


private static final class PostClient extends SimpleHttpClient {

@Override
public boolean isResponseBodyOK() {
return true;
}
}


private static final class PostServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

private final transient CountDownLatch partialReadLatch;
private final transient CountDownLatch clientCloseLatch;
private final transient CountDownLatch threadCompleteLatch;
private final AtomicBoolean testFailed;

public PostServlet(CountDownLatch doPostLatch, CountDownLatch clientCloseLatch,
CountDownLatch threadCompleteLatch, AtomicBoolean testFailed) {
this.partialReadLatch = doPostLatch;
this.clientCloseLatch = clientCloseLatch;
this.threadCompleteLatch = threadCompleteLatch;
this.testFailed = testFailed;
}

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {

AsyncContext ac = req.startAsync();
Thread t = new PostServletThread(ac, partialReadLatch, clientCloseLatch, threadCompleteLatch, testFailed);
t.start();

try {
threadCompleteLatch.await();
} catch (InterruptedException e) {
// Ignore
}
}
}


private static final class PostServletThread extends Thread {

private final AsyncContext ac;
private final CountDownLatch partialReadLatch;
private final CountDownLatch clientCloseLatch;
private final CountDownLatch threadCompleteLatch;
private final AtomicBoolean testFailed;

public PostServletThread(AsyncContext ac, CountDownLatch partialReadLatch, CountDownLatch clientCloseLatch,
CountDownLatch threadCompleteLatch, AtomicBoolean testFailed) {
this.ac = ac;
this.partialReadLatch = partialReadLatch;
this.clientCloseLatch = clientCloseLatch;
this.threadCompleteLatch = threadCompleteLatch;
this.testFailed = testFailed;
}

@Override
public void run() {
try {
int bytesRead = 0;
byte[] buffer = new byte[32];
InputStream is = null;

try {
is = ac.getRequest().getInputStream();

// Read the partial request body
while (bytesRead < 16) {
int read = is.read(buffer);
if (read == -1) {
// Error condition
return;
}
bytesRead += read;
}
} catch (IOException ioe) {
// Error condition
return;
} finally {
partialReadLatch.countDown();
}

// Wait for client to close connection
clientCloseLatch.await();

// Read again
try {
is.read();
} catch (IOException e) {
e.printStackTrace();
// Required. Clear the error marker.
testFailed.set(false);
}
} catch (InterruptedException e) {
// Ignore
} finally {
threadCompleteLatch.countDown();
}
}
}
}

0 comments on commit 86ccc43

Please sign in to comment.