Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: 'pull' was lost when closing while waiting for window #3672

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -228,10 +228,13 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage
if (sendableOutstreams.contains(streamId)) {
val sendableExceptClosed = sendableOutstreams - streamId

if (sendableExceptClosed.isEmpty) Idle
if (sendableExceptClosed.isEmpty)
if (pulled) WaitingForData else Idle
else withSendableOutstreams(sendableExceptClosed)
} else
this

def pulled: Boolean
}

private[http2] case class WaitingForNetworkToSendData(sendableOutstreams: immutable.Set[Int]) extends WithSendableOutStreams {
Expand All @@ -251,6 +254,8 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage

def withSendableOutstreams(sendableOutStreams: Set[Int]) =
WaitingForNetworkToSendData(sendableOutStreams)

override def pulled = false
}

/** Pulled and data is pending but no connection-level window available */
Expand All @@ -270,6 +275,8 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage

def withSendableOutstreams(sendableOutStreams: Set[Int]) =
WaitingForConnectionWindow(sendableOutStreams)

override def pulled = true
}

def maxBytesToBufferPerSubstream = 2 * currentMaxFrameSize // for now, let's buffer two frames per substream
Expand Down
Expand Up @@ -233,6 +233,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
def receivedUnexpectedFrame(e: StreamFrameEvent): StreamState = {
debug(s"Received unexpected frame of type ${e.frameTypeName} for stream ${e.streamId} in state $stateName")
pushGOAWAY(ErrorCode.PROTOCOL_ERROR, s"Received unexpected frame of type ${e.frameTypeName} for stream ${e.streamId} in state $stateName")
multiplexer.closeStream(e.streamId)
Closed
}

Expand Down Expand Up @@ -337,7 +338,9 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
multiplexer.closeStream(r.streamId)
outStream.cancelStream()
Closed
case _ => receivedUnexpectedFrame(event)
case _ =>
outStream.cancelStream()
receivedUnexpectedFrame(event)
}

override def handleOutgoingEnded(): StreamState = HalfClosedLocalWaitingForPeerStream(correlationAttributes)
Expand Down
Expand Up @@ -452,6 +452,60 @@ class Http2ClientSpec extends AkkaSpecWithMaterializer("""

connectionShouldStillBeUsable()
}
"handle RST_STREAM while waiting for a window update" in new WaitingForRequestData {
val entitySize = 70000
entityDataOut.sendNext(ByteString(Array.fill[Byte](entitySize)(0x23))) // 70000 > Http2Protocol.InitialWindowSize
network.sendWINDOW_UPDATE(TheStreamId, 10000) // enough window for the stream but not for the window

network.expectDATA(TheStreamId, false, Http2Protocol.InitialWindowSize)

// enough stream-level WINDOW, but too little connection-level WINDOW
network.expectNoBytes(100.millis)

// now the demuxer is in the WaitingForConnectionWindow state, cancel the connection
network.sendRST_STREAM(TheStreamId, ErrorCode.CANCEL)

entityDataOut.expectCancellation()
network.expectNoBytes(100.millis)

// now increase connection-level window again and see if everything still works
network.sendWINDOW_UPDATE(0, 10000)
network.expectNoBytes(100.millis) // don't expect anything, stream has been cancelled in the meantime

connectionShouldStillBeUsable()
}
"handle unknown frames while waiting for a window update" in new WaitingForRequestData {
user.emitRequest(Get("/secondRequest"))
val otherRequestStreamId = network.expect[HeadersFrame]().streamId

val entitySize = 70000
entityDataOut.sendNext(ByteString(Array.fill[Byte](entitySize)(0x23))) // 70000 > Http2Protocol.InitialWindowSize
network.sendWINDOW_UPDATE(TheStreamId, 10000) // enough window for the stream but not for the window

network.expectDATA(TheStreamId, false, Http2Protocol.InitialWindowSize)

// enough stream-level WINDOW, but too little connection-level WINDOW
network.expectNoBytes(100.millis)

// now the stream handler is in the OpenSendingData state, and waiting for the
// response headers, unexpectedly send response data:
network.sendDATA(TheStreamId, endStream = false, ByteString("surprise!"))

entityDataOut.expectCancellation()
network.expectGOAWAY(otherRequestStreamId)

// make sure the demuxer also moved back from WaitingForConnectionWindow to Idle
network.sendWINDOW_UPDATE(0, 10000)
network.expectNoBytes(100.millis) // don't expect anything, stream has been cancelled in the meantime

// TODO the client stack should not accept new connections anymore
// TODO what to do with requests that are already in the queue at this point?
// user.requestOut.expectCancellation()
Copy link
Member Author

@raboof raboof Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indeed needs further changes before it can be enabled in the test, which are coming in #3673 - so OK to merge without this.


// Check finishing old requests is still allowed
network.sendHEADERS(otherRequestStreamId, true, Seq(RawHeader(":status", "200")))
user.expectResponse()
}
}

"respect flow-control" should {
Expand Down