New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http2: more robust handling of IncomingStreamBuffer.onPull #3621
http2: more robust handling of IncomingStreamBuffer.onPull #3621
Conversation
Test PASSed. |
@@ -359,8 +364,8 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper | |||
outstandingConnectionLevelWindow += windowSizeIncrement | |||
} | |||
|
|||
buffer.onDataFrame(d).getOrElse( | |||
maybeFinishStream(d.endStream)) | |||
buffer.onDataFrame(d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main change is that, before, we moved out of the state receiving data when we received the frame with END_STREAM
which is right if we watch the states from the outside. However, internally, the final frames might not yet have been dispatched in which case we would have dropped the buffer on the floor and leaked it in cases where the connection was aborted in this state.
Alternatively we could have spelled this out as another state (END_STREAM received but not yet all data dispatched), however, since we already have a state matrix with the product of the state of all the four ends of each stream (network and user in and out), this would have easily let to a multitude of new states. That's why I went with the slightly impure solution of extending the meaning of receiving states until after the user handler has actually pulled all the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM (but not 100% I get it)
akka-http2-support/src/test/scala/akka/http/impl/engine/http2/Http2ServerSpec.scala
Outdated
Show resolved
Hide resolved
27c653f
to
e0da123
Compare
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I didn't understand yet how the state machine goes to Closed
now, but I guess this is via the outlet failure/completion?
multiplexer.pushControlFrame(RstStreamFrame(streamId, ErrorCode.FLOW_CONTROL_ERROR)) | ||
// also close response delivery if that has already started | ||
multiplexer.closeStream(streamId) | ||
Some(Closed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the transition to Closed
is no longer signaled from here.
How does it go into that state now? Is it an effect of shutdown
failing the outlet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's now using afterBufferEvent
consistently to figure out the next state after each call into IncomingStreamBuffer
where before this return value was sometimes used and sometimes ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, which looks at buffer.isDone
which checks outlet.isClosed
👍
Refs #3233