Skip to content

Commit

Permalink
Clarify receiveEndOfStream vs sendEndOfStream
Browse files Browse the repository at this point in the history
Correct comment re streamId for initial HTTP upgrade
Refactor the handling of the switch from HTTP/1.1 to HTTP/2 (still not completely happy with this)
Refactor StreamStateMachine
- add debug logging for state changes
- remove checks for changes - these are handled by checkFrameType
  (considering more changes here too)
Use -1 as the connection ID for the test client to make debug logs easier to read
Add another test for section 5.1

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1683844 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jun 5, 2015
1 parent db24886 commit b9b99f6
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 94 deletions.
10 changes: 5 additions & 5 deletions java/org/apache/coyote/http2/Http2Parser.java
Expand Up @@ -138,13 +138,13 @@ private void readDataFrame(int streamId, int flags, int payloadSize) throws IOEx
if (dest == null) { if (dest == null) {
swallow(payloadSize); swallow(payloadSize);
if (endOfStream) { if (endOfStream) {
output.endOfStream(streamId); output.receiveEndOfStream(streamId);
} }
} else { } else {
synchronized (dest) { synchronized (dest) {
input.fill(true, dest, payloadSize); input.fill(true, dest, payloadSize);
if (endOfStream) { if (endOfStream) {
output.endOfStream(streamId); output.receiveEndOfStream(streamId);
} }
dest.notifyAll(); dest.notifyAll();
} }
Expand Down Expand Up @@ -200,7 +200,7 @@ private void readHeadersFrame(int streamId, int flags, int payloadSize) throws I


if (Flags.isEndOfStream(flags)) { if (Flags.isEndOfStream(flags)) {
if (headersCurrentStream == -1) { if (headersCurrentStream == -1) {
output.endOfStream(streamId); output.receiveEndOfStream(streamId);
} else { } else {
headersEndStream = true; headersEndStream = true;
} }
Expand Down Expand Up @@ -318,7 +318,7 @@ private void readContinuationFrame(int streamId, int flags, int payloadSize)
output.headersEnd(streamId); output.headersEnd(streamId);
headersCurrentStream = -1; headersCurrentStream = -1;
if (headersEndStream) { if (headersEndStream) {
output.endOfStream(streamId); output.receiveEndOfStream(streamId);
headersEndStream = false; headersEndStream = false;
} }
} }
Expand Down Expand Up @@ -494,7 +494,7 @@ static interface Output {


// Data frames // Data frames
ByteBuffer getInputByteBuffer(int streamId, int payloadSize) throws Http2Exception; ByteBuffer getInputByteBuffer(int streamId, int payloadSize) throws Http2Exception;
void endOfStream(int streamId); void receiveEndOfStream(int streamId);


// Header frames // Header frames
HeaderEmitter headersStart(int streamId) throws Http2Exception; HeaderEmitter headersStart(int streamId) throws Http2Exception;
Expand Down
7 changes: 4 additions & 3 deletions java/org/apache/coyote/http2/Http2UpgradeHandler.java
Expand Up @@ -131,7 +131,7 @@ public Http2UpgradeHandler(Adapter adapter, Request coyoteRequest) {
this.adapter = adapter; this.adapter = adapter;
this.connectionId = Integer.toString(connectionIdGenerator.getAndIncrement()); this.connectionId = Integer.toString(connectionIdGenerator.getAndIncrement());


// Initial HTTP request becomes stream 0. // Initial HTTP request becomes stream 1.
if (coyoteRequest != null) { if (coyoteRequest != null) {
Integer key = Integer.valueOf(1); Integer key = Integer.valueOf(1);
Stream stream = new Stream(key, this, coyoteRequest); Stream stream = new Stream(key, this, coyoteRequest);
Expand Down Expand Up @@ -423,6 +423,7 @@ void writeBody(Stream stream, ByteBuffer data, int len) throws IOException {
header[3] = FrameType.DATA.getIdByte(); header[3] = FrameType.DATA.getIdByte();
if (stream.getOutputBuffer().isFinished()) { if (stream.getOutputBuffer().isFinished()) {
header[4] = FLAG_END_OF_STREAM; header[4] = FLAG_END_OF_STREAM;
stream.sendEndOfStream();
} }
ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue()); ByteUtil.set31Bits(header, 5, stream.getIdentifier().intValue());
socketWrapper.write(true, header, 0, header.length); socketWrapper.write(true, header, 0, header.length);
Expand Down Expand Up @@ -716,10 +717,10 @@ public ByteBuffer getInputByteBuffer(int streamId, int payloadSize) throws Http2




@Override @Override
public void endOfStream(int streamId) { public void receiveEndOfStream(int streamId) {
Stream stream = getStream(streamId); Stream stream = getStream(streamId);
if (stream != null) { if (stream != null) {
stream.setEndOfStream(); stream.receiveEndOfStream();
} }
} }


Expand Down
4 changes: 2 additions & 2 deletions java/org/apache/coyote/http2/LocalStrings.properties
Expand Up @@ -61,8 +61,8 @@ stream.write=Connection [{0}], Stream [{1}]


streamProcessor.httpupgrade.notsupported=HTTP upgrade is not supported within HTTP/2 streams streamProcessor.httpupgrade.notsupported=HTTP upgrade is not supported within HTTP/2 streams


streamStateMachine.invalidFrame.windowUpdate=Connection [{0}], Received Data frame for stream [{1}] in state [{2}] streamStateMachine.debug.change=Connection [{0}], Stream [{1}], State changed from [{2}] to [{3}]
streamStateMachine.invalidFrame.windowUpdate=Connection [{0}], Received Window Update frame for stream [{1}] in state [{2}] streamStateMachine.invalidFrame=Connection [{0}], Stream [{1}], State [{2}], Frame type [{3}]


upgradeHandler.connectionError=An error occurred that requires the HTTP/2 connection to be closed. upgradeHandler.connectionError=An error occurred that requires the HTTP/2 connection to be closed.
upgradeHandler.goaway.debug=Connection [{0}], Goaway, Last stream [{1}], Error code [{2}], Debug data [{3}] upgradeHandler.goaway.debug=Connection [{0}], Goaway, Last stream [{1}], Error code [{2}], Debug data [{3}]
Expand Down
33 changes: 23 additions & 10 deletions java/org/apache/coyote/http2/Stream.java
Expand Up @@ -40,13 +40,13 @@ public class Stream extends AbstractStream implements HeaderEmitter {
private final Http2UpgradeHandler handler; private final Http2UpgradeHandler handler;
private final Request coyoteRequest; private final Request coyoteRequest;
private final Response coyoteResponse = new Response(); private final Response coyoteResponse = new Response();
private final StreamInputBuffer inputBuffer = new StreamInputBuffer(); private final StreamInputBuffer inputBuffer;
private final StreamOutputBuffer outputBuffer = new StreamOutputBuffer(); private final StreamOutputBuffer outputBuffer = new StreamOutputBuffer();
private final StreamStateMachine state; private final StreamStateMachine state;




public Stream(Integer identifier, Http2UpgradeHandler handler) { public Stream(Integer identifier, Http2UpgradeHandler handler) {
this(identifier, handler, new Request()); this(identifier, handler, null);
} }




Expand All @@ -56,15 +56,22 @@ public Stream(Integer identifier, Http2UpgradeHandler handler, Request coyoteReq
setParentStream(handler); setParentStream(handler);
setWindowSize(handler.getRemoteSettings().getInitialWindowSize()); setWindowSize(handler.getRemoteSettings().getInitialWindowSize());
state = new StreamStateMachine(this); state = new StreamStateMachine(this);
this.coyoteRequest = coyoteRequest; if (coyoteRequest == null) {
this.coyoteRequest.setInputBuffer(inputBuffer); // HTTP/2 new request
this.coyoteResponse.setOutputBuffer(outputBuffer); this.coyoteRequest = new Request();
this.coyoteRequest.setResponse(coyoteResponse); this.inputBuffer = new StreamInputBuffer();
if (coyoteRequest.isFinished()) { this.coyoteRequest.setInputBuffer(inputBuffer);
// Update the state machine } else {
// HTTP/1.1 upgrade
this.coyoteRequest = coyoteRequest;
this.inputBuffer = null;
// Headers have been populated by this point
state.receiveHeaders(); state.receiveHeaders();
// TODO Assuming the body has been read at this point is not valid
state.recieveEndOfStream(); state.recieveEndOfStream();
} }
this.coyoteResponse.setOutputBuffer(outputBuffer);
this.coyoteRequest.setResponse(coyoteResponse);
} }




Expand Down Expand Up @@ -101,7 +108,7 @@ public void reset(long errorCode) {
log.debug(sm.getString("stream.reset.debug", getConnectionId(), getIdentifier(), log.debug(sm.getString("stream.reset.debug", getConnectionId(), getIdentifier(),
Long.toString(errorCode))); Long.toString(errorCode)));
} }
state.recieveReset(); state.receiveReset();
} }




Expand Down Expand Up @@ -233,10 +240,16 @@ void headersEnd() {
} }




void setEndOfStream() { void receiveEndOfStream() {
state.recieveEndOfStream(); state.recieveEndOfStream();
} }



void sendEndOfStream() {
state.sendEndOfStream();
}


StreamOutputBuffer getOutputBuffer() { StreamOutputBuffer getOutputBuffer() {
return outputBuffer; return outputBuffer;
} }
Expand Down
120 changes: 48 additions & 72 deletions java/org/apache/coyote/http2/StreamStateMachine.java
Expand Up @@ -19,6 +19,8 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;


import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.util.res.StringManager;


/** /**
Expand All @@ -33,117 +35,87 @@
*/ */
public class StreamStateMachine { public class StreamStateMachine {


private static final Log log = LogFactory.getLog(StreamStateMachine.class);
private static final StringManager sm = StringManager.getManager(StreamStateMachine.class); private static final StringManager sm = StringManager.getManager(StreamStateMachine.class);


private final Stream stream; private final Stream stream;
private State state = State.IDLE; private State state;




public StreamStateMachine(Stream stream) { public StreamStateMachine(Stream stream) {
this.stream = stream; this.stream = stream;
stateChange(null, State.IDLE);
} }




public synchronized void sendPushPromise() { public synchronized void sendPushPromise() {
if (state == State.IDLE) { stateChange(State.IDLE, State.RESERVED_LOCAL);
state = State.RESERVED_LOCAL;
} else {
// TODO: ProtocolExcpetion? i18n
throw new IllegalStateException();
}
} }




public synchronized void receivePushPromis() { public synchronized void receivePushPromis() {
if (state == State.IDLE) { stateChange(State.IDLE, State.RESERVED_REMOTE);
state = State.RESERVED_REMOTE;
} else {
// TODO: ProtocolExcpetion? i18n
throw new IllegalStateException();
}
} }




public synchronized void sendHeaders() { public synchronized void sendHeaders() {
if (state == State.IDLE) { stateChange(State.IDLE, State.OPEN);
state = State.OPEN; stateChange(State.RESERVED_LOCAL, State.HALF_CLOSED_REMOTE);
} else if (state == State.RESERVED_LOCAL) {
state = State.HALF_CLOSED_REMOTE;
} else {
// TODO: ProtocolExcpetion? i18n
throw new IllegalStateException();
}
} }




public synchronized void receiveHeaders() { public synchronized void receiveHeaders() {
if (state == State.IDLE) { stateChange(State.IDLE, State.OPEN);
state = State.OPEN; stateChange(State.RESERVED_REMOTE, State.HALF_CLOSED_LOCAL);
} else if (state == State.RESERVED_REMOTE) {
state = State.HALF_CLOSED_LOCAL;
} else {
// TODO: ProtocolExcpetion? i18n
throw new IllegalStateException();
}
} }




public synchronized void sendEndOfStream() { public synchronized void sendEndOfStream() {
if (state == State.OPEN) { stateChange(State.OPEN, State.HALF_CLOSED_LOCAL);
state = State.HALF_CLOSED_LOCAL; stateChange(State.HALF_CLOSED_REMOTE, State.CLOSED_TX);
} else if (state == State.HALF_CLOSED_REMOTE) {
state = State.CLOSED;
} else {
// TODO: ProtocolExcpetion? i18n
throw new IllegalStateException();
}
} }




public synchronized void recieveEndOfStream() { public synchronized void recieveEndOfStream() {
if (state == State.OPEN) { stateChange(State.OPEN, State.HALF_CLOSED_REMOTE);
state = State.HALF_CLOSED_REMOTE; stateChange(State.HALF_CLOSED_LOCAL, State.CLOSED_RX);
} else if (state == State.HALF_CLOSED_LOCAL) {
state = State.CLOSED;
} else {
// TODO: ProtocolExcpetion? i18n
throw new IllegalStateException();
}
} }




public synchronized void sendReset() { private void stateChange(State oldState, State newState) {
state = State.CLOSED; if (state == oldState) {
state = newState;
if (log.isDebugEnabled()) {
log.debug(sm.getString("streamStateMachine.debug.change", stream.getConnectionId(),
stream.getIdentifier(), oldState, newState));
}
}
} }




public synchronized void recieveReset() { public synchronized void sendReset() {
state = State.CLOSED_RESET; state = State.CLOSED_TX;
} }




public synchronized void receiveHeader() { public synchronized void receiveReset() {
// Doesn't change state (that happens at the end of the headers when state = State.CLOSED_RST;
// receiveHeaders() is called. This just checks that the stream is in a
// valid state to receive headers.
if (state == State.CLOSED_RESET) {
// Allow this. Client may not know that stream has been reset.
} else if (state == State.IDLE || state == State.RESERVED_REMOTE) {
// Allow these. This is normal operation.
} else {
// TODO: ProtocolExcpetion? i18n
throw new IllegalStateException();
}
} }




public synchronized void checkFrameType(FrameType frameType) throws Http2Exception { public synchronized void checkFrameType(FrameType frameType) throws Http2Exception {
// No state change. Checks that the frame type is valid for the current // No state change. Checks that the frame type is valid for the current
// state of this stream. // state of this stream.
if (!isFrameTypePermitted(frameType)) { if (!isFrameTypePermitted(frameType)) {
int errorStream;
if (state.connectionErrorForInvalidFrame) {
errorStream = 0;
} else {
errorStream = stream.getIdentifier().intValue();
}
throw new Http2Exception(sm.getString("streamStateMachine.invalidFrame", throw new Http2Exception(sm.getString("streamStateMachine.invalidFrame",
stream.getConnectionId(), stream.getIdentifier(), state, frameType), stream.getConnectionId(), stream.getIdentifier(), state, frameType),
0, state.errorCodeForInvalidFrame); errorStream, state.errorCodeForInvalidFrame);
} }
} }


Expand All @@ -154,27 +126,31 @@ public synchronized boolean isFrameTypePermitted(FrameType frameType) {




private enum State { private enum State {
IDLE (ErrorCode.PROTOCOL_ERROR, FrameType.HEADERS, FrameType.PRIORITY), IDLE (true, ErrorCode.PROTOCOL_ERROR, FrameType.HEADERS, FrameType.PRIORITY),
OPEN (ErrorCode.PROTOCOL_ERROR, FrameType.DATA, FrameType.HEADERS, OPEN (true, ErrorCode.PROTOCOL_ERROR, FrameType.DATA, FrameType.HEADERS,
FrameType.PRIORITY, FrameType.RST, FrameType.PUSH_PROMISE, FrameType.PRIORITY, FrameType.RST, FrameType.PUSH_PROMISE,
FrameType.WINDOW_UPDATE), FrameType.WINDOW_UPDATE),
RESERVED_LOCAL (ErrorCode.PROTOCOL_ERROR, FrameType.PRIORITY, FrameType.RST, RESERVED_LOCAL (true, ErrorCode.PROTOCOL_ERROR, FrameType.PRIORITY, FrameType.RST,
FrameType.WINDOW_UPDATE), FrameType.WINDOW_UPDATE),
RESERVED_REMOTE (ErrorCode.PROTOCOL_ERROR, FrameType.HEADERS, FrameType.PRIORITY, RESERVED_REMOTE (true, ErrorCode.PROTOCOL_ERROR, FrameType.HEADERS, FrameType.PRIORITY,
FrameType.RST), FrameType.RST),
HALF_CLOSED_LOCAL (ErrorCode.PROTOCOL_ERROR, FrameType.DATA, FrameType.HEADERS, HALF_CLOSED_LOCAL (true, ErrorCode.PROTOCOL_ERROR, FrameType.DATA, FrameType.HEADERS,
FrameType.PRIORITY, FrameType.RST, FrameType.PUSH_PROMISE, FrameType.PRIORITY, FrameType.RST, FrameType.PUSH_PROMISE,
FrameType.WINDOW_UPDATE), FrameType.WINDOW_UPDATE),
HALF_CLOSED_REMOTE (ErrorCode.STREAM_CLOSED, FrameType.PRIORITY, FrameType.RST, HALF_CLOSED_REMOTE (true, ErrorCode.STREAM_CLOSED, FrameType.PRIORITY, FrameType.RST,
FrameType.WINDOW_UPDATE),
CLOSED (ErrorCode.PROTOCOL_ERROR, FrameType.PRIORITY, FrameType.RST,
FrameType.WINDOW_UPDATE), FrameType.WINDOW_UPDATE),
CLOSED_RESET (ErrorCode.PROTOCOL_ERROR, FrameType.PRIORITY); CLOSED_RX (true, ErrorCode.STREAM_CLOSED, FrameType.PRIORITY),
CLOSED_RST (false, ErrorCode.STREAM_CLOSED, FrameType.PRIORITY),
CLOSED_TX (true, ErrorCode.STREAM_CLOSED, FrameType.PRIORITY, FrameType.RST,
FrameType.WINDOW_UPDATE);


private final boolean connectionErrorForInvalidFrame;
private final ErrorCode errorCodeForInvalidFrame; private final ErrorCode errorCodeForInvalidFrame;
private final Set<FrameType> frameTypesPermitted = new HashSet<>(); private final Set<FrameType> frameTypesPermitted = new HashSet<>();


private State(ErrorCode errorCode, FrameType... frameTypes) { private State(boolean connectionErrorForInvalidFrame, ErrorCode errorCode,
FrameType... frameTypes) {
this.connectionErrorForInvalidFrame = connectionErrorForInvalidFrame;
this.errorCodeForInvalidFrame = errorCode; this.errorCodeForInvalidFrame = errorCode;
for (FrameType frameType : frameTypes) { for (FrameType frameType : frameTypes) {
frameTypesPermitted.add(frameType); frameTypesPermitted.add(frameType);
Expand Down
4 changes: 2 additions & 2 deletions test/org/apache/coyote/http2/Http2TestBase.java
Expand Up @@ -236,7 +236,7 @@ protected void openClientConnection() throws IOException {


input = new TestInput(is); input = new TestInput(is);
output = new TestOutput(); output = new TestOutput();
parser = new Http2Parser("0", input, output); parser = new Http2Parser("-1", input, output);
} }




Expand Down Expand Up @@ -442,7 +442,7 @@ public ByteBuffer getInputByteBuffer(int streamId, int payloadSize) {




@Override @Override
public void endOfStream(int streamId) { public void receiveEndOfStream(int streamId) {
lastStreamId = Integer.toString(streamId); lastStreamId = Integer.toString(streamId);
trace.append(lastStreamId + "-EndOfStream\n"); trace.append(lastStreamId + "-EndOfStream\n");
} }
Expand Down
13 changes: 13 additions & 0 deletions test/org/apache/coyote/http2/TestHttp2Section_5_1.java
Expand Up @@ -73,7 +73,20 @@ public void halfClosedRemoteInvalidFrame() throws Exception {


// This should trigger a stream error // This should trigger a stream error
sendData(3, new byte[] {}); sendData(3, new byte[] {});
parser.readFrame(true);

Assert.assertTrue(output.getTrace(),
output.getTrace().startsWith("0-Goaway-[2147483647]-[" +
ErrorCode.STREAM_CLOSED.getErrorCode() + "]-["));
}


@Test
public void testClosedInvalidFrame() throws Exception {
http2Connect();


// Stream 1 is closed. This should trigger a stream error
sendData(1, new byte[] {});
parser.readFrame(true); parser.readFrame(true);


Assert.assertTrue(output.getTrace(), Assert.assertTrue(output.getTrace(),
Expand Down

0 comments on commit b9b99f6

Please sign in to comment.