Skip to content

Commit

Permalink
http2: 'pull' was lost when closing while waiting for window (#3672)
Browse files Browse the repository at this point in the history
* http2: 'pull' was lost when closing while waiting for window

* Scenario where a misbehaving server causes an error
  • Loading branch information
raboof committed Dec 3, 2020
1 parent f09f4c2 commit ab38d36
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 2 deletions.
Expand Up @@ -228,10 +228,13 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage
if (sendableOutstreams.contains(streamId)) {
val sendableExceptClosed = sendableOutstreams - streamId

if (sendableExceptClosed.isEmpty) Idle
if (sendableExceptClosed.isEmpty)
if (pulled) WaitingForData else Idle
else withSendableOutstreams(sendableExceptClosed)
} else
this

def pulled: Boolean
}

private[http2] case class WaitingForNetworkToSendData(sendableOutstreams: immutable.Set[Int]) extends WithSendableOutStreams {
Expand All @@ -251,6 +254,8 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage

def withSendableOutstreams(sendableOutStreams: Set[Int]) =
WaitingForNetworkToSendData(sendableOutStreams)

override def pulled = false
}

/** Pulled and data is pending but no connection-level window available */
Expand All @@ -270,6 +275,8 @@ private[http2] trait Http2MultiplexerSupport { logic: GraphStageLogic with Stage

def withSendableOutstreams(sendableOutStreams: Set[Int]) =
WaitingForConnectionWindow(sendableOutStreams)

override def pulled = true
}

def maxBytesToBufferPerSubstream = 2 * currentMaxFrameSize // for now, let's buffer two frames per substream
Expand Down
Expand Up @@ -233,6 +233,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
def receivedUnexpectedFrame(e: StreamFrameEvent): StreamState = {
debug(s"Received unexpected frame of type ${e.frameTypeName} for stream ${e.streamId} in state $stateName")
pushGOAWAY(ErrorCode.PROTOCOL_ERROR, s"Received unexpected frame of type ${e.frameTypeName} for stream ${e.streamId} in state $stateName")
multiplexer.closeStream(e.streamId)
Closed
}

Expand Down Expand Up @@ -337,7 +338,9 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
multiplexer.closeStream(r.streamId)
outStream.cancelStream()
Closed
case _ => receivedUnexpectedFrame(event)
case _ =>
outStream.cancelStream()
receivedUnexpectedFrame(event)
}

override def handleOutgoingEnded(): StreamState = HalfClosedLocalWaitingForPeerStream(correlationAttributes)
Expand Down
Expand Up @@ -452,6 +452,60 @@ class Http2ClientSpec extends AkkaSpecWithMaterializer("""

connectionShouldStillBeUsable()
}
"handle RST_STREAM while waiting for a window update" in new WaitingForRequestData {
val entitySize = 70000
entityDataOut.sendNext(ByteString(Array.fill[Byte](entitySize)(0x23))) // 70000 > Http2Protocol.InitialWindowSize
network.sendWINDOW_UPDATE(TheStreamId, 10000) // enough window for the stream but not for the window

network.expectDATA(TheStreamId, false, Http2Protocol.InitialWindowSize)

// enough stream-level WINDOW, but too little connection-level WINDOW
network.expectNoBytes(100.millis)

// now the demuxer is in the WaitingForConnectionWindow state, cancel the connection
network.sendRST_STREAM(TheStreamId, ErrorCode.CANCEL)

entityDataOut.expectCancellation()
network.expectNoBytes(100.millis)

// now increase connection-level window again and see if everything still works
network.sendWINDOW_UPDATE(0, 10000)
network.expectNoBytes(100.millis) // don't expect anything, stream has been cancelled in the meantime

connectionShouldStillBeUsable()
}
"handle unknown frames while waiting for a window update" in new WaitingForRequestData {
user.emitRequest(Get("/secondRequest"))
val otherRequestStreamId = network.expect[HeadersFrame]().streamId

val entitySize = 70000
entityDataOut.sendNext(ByteString(Array.fill[Byte](entitySize)(0x23))) // 70000 > Http2Protocol.InitialWindowSize
network.sendWINDOW_UPDATE(TheStreamId, 10000) // enough window for the stream but not for the window

network.expectDATA(TheStreamId, false, Http2Protocol.InitialWindowSize)

// enough stream-level WINDOW, but too little connection-level WINDOW
network.expectNoBytes(100.millis)

// now the stream handler is in the OpenSendingData state, and waiting for the
// response headers, unexpectedly send response data:
network.sendDATA(TheStreamId, endStream = false, ByteString("surprise!"))

entityDataOut.expectCancellation()
network.expectGOAWAY(otherRequestStreamId)

// make sure the demuxer also moved back from WaitingForConnectionWindow to Idle
network.sendWINDOW_UPDATE(0, 10000)
network.expectNoBytes(100.millis) // don't expect anything, stream has been cancelled in the meantime

// TODO the client stack should not accept new connections anymore
// TODO what to do with requests that are already in the queue at this point?
// user.requestOut.expectCancellation()

// Check finishing old requests is still allowed
network.sendHEADERS(otherRequestStreamId, true, Seq(RawHeader(":status", "200")))
user.expectResponse()
}
}

"respect flow-control" should {
Expand Down

0 comments on commit ab38d36

Please sign in to comment.