Skip to content

Commit

Permalink
http2: more robust handling of IncomingStreamBuffer.onPull
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudolph committed Nov 16, 2020
1 parent e956fb4 commit e0da123
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 22 deletions.
Expand Up @@ -137,6 +137,10 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
def pullNextFrame(streamId: Int, maxSize: Int): PullFrameResult =
updateStateAndReturn(streamId, _.pullNextFrame(maxSize))

/** Entry-point to handle IncomingStreamBuffer.onPull through the state machine */
def incomingStreamPulled(streamId: Int): Unit =
updateState(streamId, _.incomingStreamPulled())

private def updateAllStates(handle: StreamState => StreamState): Unit =
streamStates.keys.foreach(updateState(_, handle))

Expand Down Expand Up @@ -257,6 +261,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
}

def pullNextFrame(maxSize: Int): (StreamState, PullFrameResult) = throw new IllegalStateException(s"pullNextFrame not supported in state $stateName")
def incomingStreamPulled(): StreamState = throw new IllegalStateException(s"incomingStreamPulled not supported in state $stateName")

/** Called to cleanup any state when the connection is torn down */
def shutdown(): Unit = ()
Expand Down Expand Up @@ -359,8 +364,8 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
outstandingConnectionLevelWindow += windowSizeIncrement
}

buffer.onDataFrame(d).getOrElse(
maybeFinishStream(d.endStream))
buffer.onDataFrame(d)
afterBufferEvent
}
case r: RstStreamFrame =>
buffer.onRstStreamFrame(r)
Expand All @@ -369,8 +374,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper

case h: ParsedHeadersFrame =>
buffer.onTrailingHeaders(h)

maybeFinishStream(h.endStream)
afterBufferEvent

case w: WindowUpdateFrame =>
incrementWindow(w.windowSizeIncrement)
Expand All @@ -379,15 +383,19 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
}
protected def onReset(streamId: Int): Unit

protected def maybeFinishStream(endStream: Boolean): StreamState =
if (endStream) afterEndStreamReceived else this
override def incomingStreamPulled(): StreamState = {
buffer.dispatchNextChunk()
afterBufferEvent
}

override def shutdown(): Unit = {
buffer.shutdown()
super.shutdown()
}

def incrementWindow(delta: Int): StreamState

def afterBufferEvent: StreamState = if (buffer.isDone) afterEndStreamReceived else this
}

// on the incoming side there's (almost) no difference between Open and HalfClosedLocal
Expand Down Expand Up @@ -473,31 +481,38 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
private var outstandingStreamWindow: Int = Http2Protocol.InitialWindowSize // adapt if we negotiate greater sizes by settings
outlet.setHandler(this)

def onPull(): Unit = dispatchNextChunk()
def onPull(): Unit = incomingStreamPulled(streamId)
override def onDownstreamFinish(): Unit = {
debug(s"Incoming side of stream [$streamId]: cancelling because downstream finished")
multiplexer.pushControlFrame(RstStreamFrame(streamId, ErrorCode.CANCEL))
// FIXME: go through state machine and don't manipulate vars directly here
streamStates -= streamId
wasClosed = true
buffer = ByteString.empty
trailingHeaders = None
}

def onDataFrame(data: DataFrame): Option[StreamState] = {
if (data.endStream) wasClosed = true
def isDone: Boolean = outlet.isClosed

outstandingStreamWindow -= data.sizeInWindow
if (outstandingStreamWindow < 0) {
def onDataFrame(data: DataFrame): Unit =
if (wasClosed) {
shutdown()
multiplexer.pushControlFrame(RstStreamFrame(streamId, ErrorCode.FLOW_CONTROL_ERROR))
// also close response delivery if that has already started
multiplexer.closeStream(streamId)
Some(Closed)
pushGOAWAY(ErrorCode.PROTOCOL_ERROR, s"Received unexpected DATA frame after stream was already (half-)closed")
} else {
buffer ++= data.payload
debug(s"Received DATA ${data.sizeInWindow} for stream [$streamId], remaining window space now $outstandingStreamWindow, buffered: ${buffer.size}")
dispatchNextChunk()
None // don't change state
if (data.endStream) wasClosed = true

outstandingStreamWindow -= data.sizeInWindow
if (outstandingStreamWindow < 0) {
shutdown()
multiplexer.pushControlFrame(RstStreamFrame(streamId, ErrorCode.FLOW_CONTROL_ERROR))
// also close response delivery if that has already started
multiplexer.closeStream(streamId)
} else {
buffer ++= data.payload
debug(s"Received DATA ${data.sizeInWindow} for stream [$streamId], remaining window space now $outstandingStreamWindow, buffered: ${buffer.size}")
dispatchNextChunk()
}
}
}
def onTrailingHeaders(headers: ParsedHeadersFrame): Unit = {
trailingHeaders = wrapTrailingHeaders(headers)
if (headers.endStream)
Expand All @@ -513,7 +528,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
wasClosed = true
}

private def dispatchNextChunk(): Unit = {
def dispatchNextChunk(): Unit = {
if (buffer.nonEmpty && outlet.isAvailable) {
val dataSize = buffer.size min settings.requestEntityChunkSize
outlet.push(wrapData(buffer.take(dataSize)))
Expand All @@ -535,6 +550,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
case None =>
outlet.complete()
}

}
}

Expand All @@ -558,7 +574,8 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
s"remaining connection window space now $outstandingConnectionLevelWindow, total buffered: $totalBufferedData")
}

def shutdown(): Unit = outlet.fail(Http2StreamHandling.ConnectionWasAbortedException)
def shutdown(): Unit =
if (!outlet.isClosed) outlet.fail(Http2StreamHandling.ConnectionWasAbortedException)
}

trait OutStream {
Expand Down
Expand Up @@ -414,6 +414,30 @@ class Http2ServerSpec extends AkkaSpecWithMaterializer("""
sendFrame(DataFrame(TheStreamId, endStream = false, ByteString("0" * 512001))) // more than default `incoming-stream-level-buffer-size = 512kB`
expectRST_STREAM(TheStreamId, ErrorCode.FLOW_CONTROL_ERROR)
}
"not leak stream if request entity is not fully pulled when connection dies" inAssertAllStagesStopped new WaitingForRequestData {
sendDATA(TheStreamId, endStream = false, ByteString("0000"))
entityDataIn.expectUtf8EncodedString("0000")
pollForWindowUpdates(500.millis)

sendDATA(TheStreamId, endStream = false, ByteString("1111"))
sendDATA(TheStreamId, endStream = true, ByteString.empty)

// DATA is left in IncomingStreamBuffer because we never pulled
// test infra closes connection
}
"fail if DATA frame arrives after incoming stream has already been closed (before response was sent)" inAssertAllStagesStopped new WaitingForRequestData {
sendDATA(TheStreamId, endStream = false, ByteString("0000"))
entityDataIn.expectUtf8EncodedString("0000")
pollForWindowUpdates(500.millis)

sendDATA(TheStreamId, endStream = false, ByteString("1111"))
sendDATA(TheStreamId, endStream = true, ByteString.empty) // close stream

// now send more DATA: checks that we have moved into a state where DATA is not expected any more
sendDATA(TheStreamId, endStream = false, ByteString("more data"))
val (_, errorCode) = expectGOAWAY()
errorCode shouldEqual ErrorCode.PROTOCOL_ERROR
}
"fail entity stream if advertised content-length doesn't match" in pending
}

Expand Down

0 comments on commit e0da123

Please sign in to comment.