Skip to content

Commit

Permalink
More work on servlet 3.1 non-blocking for HTTP/2. NumberWriter works.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1705349 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Sep 25, 2015
1 parent 8c0b095 commit 3214a83
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 42 deletions.
3 changes: 2 additions & 1 deletion java/org/apache/coyote/AbstractProcessor.java
Expand Up @@ -78,7 +78,8 @@ public AbstractProcessor(AbstractEndpoint<?> endpoint) {
}


private AbstractProcessor(AbstractEndpoint<?> endpoint, Request coyoteRequest, Response coyoteResponse) {
private AbstractProcessor(AbstractEndpoint<?> endpoint, Request coyoteRequest,
Response coyoteResponse) {
this.endpoint = endpoint;
asyncStateMachine = new AsyncStateMachine(this);
request = coyoteRequest;
Expand Down
8 changes: 5 additions & 3 deletions java/org/apache/coyote/ActionCode.java
Expand Up @@ -195,13 +195,15 @@ public enum ActionCode {

/**
* Indicator that Servlet is interested in being
* notified when data is available to be read
* notified when data is available to be read.
*/
NB_READ_INTEREST,

/**
*Indicator that the Servlet is interested
*in being notified when it can write data
* Used with non-blocking writes to determine if a write is currently
* allowed (sets passed parameter to <code>true</code>) or not (sets passed
* parameter to <code>false</code>). If a write is not allowed then callback
* will be triggered at some future point when write becomes possible again.
*/
NB_WRITE_INTEREST,

Expand Down
2 changes: 2 additions & 0 deletions java/org/apache/coyote/http2/AbstractStream.java
Expand Up @@ -147,4 +147,6 @@ protected synchronized void decrementWindowSize(int decrement) {
protected abstract String getConnectionId();

protected abstract int getWeight();

protected abstract void doNotifyAll();
}
9 changes: 8 additions & 1 deletion java/org/apache/coyote/http2/Http2UpgradeHandler.java
Expand Up @@ -638,14 +638,21 @@ private synchronized void releaseBackLog(int increment) {
if (allocation > 0) {
backLogSize -= allocation;
synchronized (entry.getKey()) {
entry.getKey().notifyAll();
entry.getKey().doNotifyAll();
}
}
}
}
}



@Override
protected synchronized void doNotifyAll() {
this.notifyAll();
}


private int allocate(AbstractStream stream, int allocation) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(),
Expand Down
3 changes: 3 additions & 0 deletions java/org/apache/coyote/http2/LocalStrings.properties
Expand Up @@ -72,7 +72,10 @@ stream.write=Connection [{0}], Stream [{1}]

stream.outputBuffer.flush.debug=Connection [{0}], Stream [{1}], flushing output with buffer at position [{2}], writeInProgress [{3}] and closed [{4}]

streamProcessor.dispatch=Connection [{0}], Stream [{1}], status [{2}]
streamProcessor.httpupgrade.notsupported=HTTP upgrade is not supported within HTTP/2 streams
streamProcessor.process.loopend=Connection [{0}], Stream [{1}], loop end, state [{2}], dispatches [{3}]
streamProcessor.process.loopstart=Connection [{0}], Stream [{1}], loop start, status [{2}], dispatches [{3}]
streamProcessor.ssl.error=Unable to retrieve SSL request attributes

streamStateMachine.debug.change=Connection [{0}], Stream [{1}], State changed from [{2}] to [{3}]
Expand Down
71 changes: 59 additions & 12 deletions java/org/apache/coyote/http2/Stream.java
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.Iterator;

import org.apache.coyote.ActionCode;
import org.apache.coyote.InputBuffer;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Request;
Expand Down Expand Up @@ -134,11 +135,15 @@ public synchronized void incrementWindowSize(int windowSizeIncrement) throws Htt
}


private synchronized int reserveWindowSize(int reservation) throws IOException {
private synchronized int reserveWindowSize(int reservation, boolean block) throws IOException {
long windowSize = getWindowSize();
while (windowSize < 1) {
try {
wait();
if (block) {
wait();
} else {
return 0;
}
} catch (InterruptedException e) {
// Possible shutdown / rst or similar. Use an IOException to
// signal to the client that further I/O isn't possible for this
Expand All @@ -158,6 +163,20 @@ private synchronized int reserveWindowSize(int reservation) throws IOException {
}


@Override
protected synchronized void doNotifyAll() {
if (coyoteResponse.getWriteListener() == null) {
// Blocking IO so thread will be waiting. Release it.
// Use notifyAll() to be safe (should be unnecessary)
this.notifyAll();
} else {
if (outputBuffer.isRegisteredForWrite()) {
coyoteResponse.action(ActionCode.DISPATCH_WRITE, null);
}
}
}


@Override
public void emitHeader(String name, String value, boolean neverIndex) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -226,7 +245,7 @@ void flushData() throws IOException {
if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.write", getConnectionId(), getIdentifier()));
}
outputBuffer.flush();
outputBuffer.flush(true);
}


Expand Down Expand Up @@ -308,6 +327,7 @@ class StreamOutputBuffer implements OutputBuffer {
private volatile long written = 0;
private volatile boolean closed = false;
private volatile boolean endOfStreamSent = false;
private volatile boolean writeInterest = false;

/* The write methods are synchronized to ensure that only one thread at
* a time is able to access the buffer. Without this protection, a
Expand All @@ -330,22 +350,25 @@ public synchronized int doWrite(ByteChunk chunk) throws IOException {
if (len > 0 && !buffer.hasRemaining()) {
// Only flush if we have more data to write and the buffer
// is full
flush(true);
if (flush(true, coyoteResponse.getWriteListener() == null)) {
break;
}
}
}
written += offset;
return offset;
}

public synchronized void flush() throws IOException {
flush(false);
public synchronized boolean flush(boolean block) throws IOException {
return flush(false, block);
}

private synchronized void flush(boolean writeInProgress) throws IOException {
private synchronized boolean flush(boolean writeInProgress, boolean block)
throws IOException {
if (log.isDebugEnabled()) {
log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), getIdentifier(),
Integer.toString(buffer.position()), Boolean.toString(writeInProgress),
Boolean.toString(closed)));
log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(),
getIdentifier(), Integer.toString(buffer.position()),
Boolean.toString(writeInProgress), Boolean.toString(closed)));
}
if (!coyoteResponse.isCommitted()) {
coyoteResponse.sendHeaders();
Expand All @@ -357,12 +380,17 @@ private synchronized void flush(boolean writeInProgress) throws IOException {
handler.writeBody(Stream.this, buffer, 0, true);
}
// Buffer is empty. Nothing to do.
return;
return false;
}
buffer.flip();
int left = buffer.remaining();
while (left > 0) {
int streamReservation = reserveWindowSize(left);
int streamReservation = reserveWindowSize(left, block);
if (streamReservation == 0) {
// Must be non-blocking
buffer.compact();
return true;
}
while (streamReservation > 0) {
int connectionReservation =
handler.reserveWindowSize(Stream.this, streamReservation);
Expand All @@ -375,6 +403,25 @@ private synchronized void flush(boolean writeInProgress) throws IOException {
}
}
buffer.clear();
return false;
}

synchronized boolean isReady() {
if (getWindowSize() > 0 && handler.getWindowSize() > 0) {
return true;
} else {
writeInterest = true;
return false;
}
}

synchronized boolean isRegisteredForWrite() {
if (writeInterest) {
writeInterest = false;
return true;
} else {
return false;
}
}

@Override
Expand Down

0 comments on commit 3214a83

Please sign in to comment.