Skip to content

Commit

Permalink
Fix some flushing issues identified when running the Autobhan WebSock…
Browse files Browse the repository at this point in the history
…et test suite

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1651219 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jan 12, 2015
1 parent 82d2c0e commit 8823e03
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 21 deletions.
Expand Up @@ -45,6 +45,8 @@ public class UpgradeServletOutputStream extends ServletOutputStream {
// synchronization may be required (see fireListenerLock for an example).
private final Object writeLock = new Object();

private volatile boolean flushing = false;

private volatile boolean closeRequired = false;

// Start in blocking-mode
Expand All @@ -69,11 +71,17 @@ public final boolean isReady() {
}

// Make sure isReady() and onWritePossible() have a consistent view of
// buffer and fireListener when determining if the listener should fire
// fireListener when determining if the listener should fire
synchronized (fireListenerLock) {
boolean result = socketWrapper.isReadyForWrite();
fireListener = !result;
return result;
if (flushing) {
socketWrapper.registerWriteInterest();
fireListener = true;
return false;
} else {
boolean result = socketWrapper.isReadyForWrite();
fireListener = !result;
return result;
}
}
}

Expand Down Expand Up @@ -124,10 +132,30 @@ public void write(byte[] b, int off, int len) throws IOException {

@Override
public void flush() throws IOException {
socketWrapper.flush(listener == null);
flushInternal(listener == null, true);
}


private void flushInternal(boolean block, boolean updateFlushing) throws IOException {
try {
synchronized (writeLock) {
if (updateFlushing) {
flushing = socketWrapper.flush(block);
} else {
socketWrapper.flush(block);
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
onError(t);
if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new IOException(t);
}
}
}

@Override
public void close() throws IOException {
closeRequired = true;
Expand Down Expand Up @@ -156,18 +184,14 @@ private void writeInternal(byte[] b, int off, int len) throws IOException {


protected final void onWritePossible() throws IOException {
try {
synchronized (writeLock) {
socketWrapper.flush(false);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
onError(t);
if (t instanceof IOException) {
throw t;
} else {
throw new IOException(t);
if (flushing) {
flushInternal(false, true);
if (flushing) {
socketWrapper.registerWriteInterest();
return;
}
} else {
flushInternal(false, false);
}

// Make sure isReady() and onWritePossible() have a consistent view
Expand Down
Expand Up @@ -104,15 +104,17 @@ public void onWritePossible(boolean useDispatch) {
}
if (complete) {
sos.flush();
wsWriteTimeout.unregister(this);
clearHandler(null, useDispatch);
if (close) {
close();
complete = sos.isReady();
if (complete) {
wsWriteTimeout.unregister(this);
clearHandler(null, useDispatch);
if (close) {
close();
}
}
break;
}
}

} catch (IOException ioe) {
wsWriteTimeout.unregister(this);
clearHandler(ioe, useDispatch);
Expand Down

0 comments on commit 8823e03

Please sign in to comment.