Skip to content

Commit

Permalink
http2: support Strict entities in Http2SubStream
Browse files Browse the repository at this point in the history
This avoids materializing a stream for outgoing Strict entities.
  • Loading branch information
jrudolph committed Jun 1, 2021
1 parent b210c37 commit 5612e4e
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 30 deletions.
Expand Up @@ -34,7 +34,7 @@ private[http2] case class Http2SubStream(
// outgoing response trailing headers can either be passed in eagerly via an attribute
// or streaming as the LastChunk of a chunked data stream
trailingHeaders: OptionVal[ParsedHeadersFrame],
data: Source[Any /* ByteString | HttpEntity.ChunkStreamPart */ , Any],
data: Either[ByteString, Source[Any /* ByteString | HttpEntity.ChunkStreamPart */ , Any]],
correlationAttributes: Map[AttributeKey[_], _]
) {
def streamId: Int = initialHeaders.streamId
Expand All @@ -49,22 +49,28 @@ private[http2] case class Http2SubStream(
def createEntity(contentLength: Long, contentTypeOption: OptionVal[ContentType]): RequestEntity = {
def contentType: ContentType = contentTypeOption.getOrElse(ContentTypes.`application/octet-stream`)

if (data == Source.empty || contentLength == 0 || !hasEntity) {
if (contentTypeOption.isEmpty) HttpEntity.Empty
else HttpEntity.Strict(contentType, ByteString.empty)
} else if (contentLength > 0) {
val byteSource: Source[ByteString, Any] = data.collect {
case b: ByteString => b
case HttpEntity.Chunk(data, _) => data
// ignore: HttpEntity.LastChunk
}
HttpEntity.Default(contentType, contentLength, byteSource)
} else {
val chunkSource: Source[HttpEntity.ChunkStreamPart, Any] = data.map {
case b: ByteString => HttpEntity.Chunk(b)
case p: HttpEntity.ChunkStreamPart => p
}
HttpEntity.Chunked(contentType, chunkSource)
data match {
case Right(data) =>
if (data == Source.empty || contentLength == 0 || !hasEntity) {
if (contentTypeOption.isEmpty) HttpEntity.Empty
else HttpEntity.Strict(contentType, ByteString.empty)
} else if (contentLength > 0) {
val byteSource: Source[ByteString, Any] = data.collect {
case b: ByteString => b
case HttpEntity.Chunk(data, _) => data
// ignore: HttpEntity.LastChunk
}
HttpEntity.Default(contentType, contentLength, byteSource)
} else {
val chunkSource: Source[HttpEntity.ChunkStreamPart, Any] = data.map {
case b: ByteString => HttpEntity.Chunk(b)
case p: HttpEntity.ChunkStreamPart => p
}
HttpEntity.Chunked(contentType, chunkSource)
}
case Left(dataBytes) =>
if (dataBytes.isEmpty && contentTypeOption.isEmpty) HttpEntity.Empty
else HttpEntity.Strict(contentType, dataBytes)
}
}
}
Expand All @@ -73,8 +79,9 @@ private[http2] object Http2SubStream {
def apply(entity: HttpEntity, headers: ParsedHeadersFrame, trailingHeaders: OptionVal[ParsedHeadersFrame], correlationAttributes: Map[AttributeKey[_], _] = Map.empty): Http2SubStream = {
val data =
entity match {
case HttpEntity.Chunked(_, chunks) => chunks
case x => x.dataBytes
case HttpEntity.Chunked(_, chunks) => Right(chunks)
case HttpEntity.Strict(_, data) => Left(data)
case x => Right(x.dataBytes)
}
Http2SubStream(headers, trailingHeaders, data, correlationAttributes)
}
Expand Down
Expand Up @@ -362,7 +362,7 @@ private[http2] abstract class Http2Demux(http2Settings: Http2CommonSettings, ini
// keep the buffer limited to the number of concurrent streams as negotiated
// with the other side.
val bufferedSubStreamOutput = new BufferedOutlet[Http2SubStream](substreamOut)
override def dispatchSubstream(initialHeaders: ParsedHeadersFrame, data: Source[Any, Any], correlationAttributes: Map[AttributeKey[_], _]): Unit =
override def dispatchSubstream(initialHeaders: ParsedHeadersFrame, data: Either[ByteString, Source[Any, Any]], correlationAttributes: Map[AttributeKey[_], _]): Unit =
bufferedSubStreamOutput.push(Http2SubStream(initialHeaders, OptionVal.None, data, correlationAttributes))

setHandler(substreamIn, new InHandler {
Expand Down
Expand Up @@ -34,7 +34,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
def multiplexer: Http2Multiplexer
def settings: Http2CommonSettings
def pushGOAWAY(errorCode: ErrorCode, debug: String): Unit
def dispatchSubstream(initialHeaders: ParsedHeadersFrame, data: Source[Any, Any], correlationAttributes: Map[AttributeKey[_], _]): Unit
def dispatchSubstream(initialHeaders: ParsedHeadersFrame, data: Either[ByteString, Source[Any, Any]], correlationAttributes: Map[AttributeKey[_], _]): Unit
def isUpgraded: Boolean

def wrapTrailingHeaders(headers: ParsedHeadersFrame): Option[HttpEntity.ChunkStreamPart]
Expand Down Expand Up @@ -114,7 +114,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
}
} else
// stream was cancelled by peer before our response was ready
stream.data.runWith(Sink.cancelled)(subFusingMaterializer)
stream.data.foreach(_.runWith(Sink.cancelled)(subFusingMaterializer))

}

Expand Down Expand Up @@ -254,7 +254,7 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper

// FIXME: after multiplexer PR is merged
// prioInfo.foreach(multiplexer.updatePriority)
dispatchSubstream(frame, data, correlationAttributes)
dispatchSubstream(frame, Right(data), correlationAttributes)
nextState

case x => receivedUnexpectedFrame(x)
Expand Down Expand Up @@ -596,10 +596,15 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
}
object OutStream {
def apply(sub: Http2SubStream): OutStream = {
val subIn = new SubSinkInlet[Any](s"substream-in-${sub.streamId}")
val info = new OutStreamImpl(sub.streamId, OptionVal.None, multiplexer.currentInitialWindow, sub.trailingHeaders)
info.registerIncomingData(subIn)
sub.data.runWith(subIn.sink)(subFusingMaterializer)
sub.data match {
case Right(data) =>
val subIn = new SubSinkInlet[Any](s"substream-in-${sub.streamId}")
info.registerIncomingData(subIn)
data.runWith(subIn.sink)(subFusingMaterializer)
case Left(data) =>
info.addAllData(data)
}
info
}
}
Expand All @@ -626,6 +631,12 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
inlet.pull()
inlet.setHandler(this)
}
def addAllData(data: ByteString): Unit = {
require(buffer.isEmpty)
buffer = data
upstreamClosed = true
if (canSend) multiplexer.enqueueOutStream(streamId)
}

def nextFrame(maxBytesToSend: Int): DataFrame = {
val toTake = maxBytesToSend min buffer.size min outboundWindowLeft
Expand Down Expand Up @@ -656,12 +667,11 @@ private[http2] trait Http2StreamHandling { self: GraphStageLogic with LogHelper
} else
None

private def maybePull(): Unit = {
private def maybePull(): Unit =
// TODO: Check that buffer is not too much over the limit (which we might warn the user about)
// The problem here is that backpressure will only work properly if batch elements like
// ByteString have a reasonable size.
if (buffer.size < multiplexer.maxBytesToBufferPerSubstream && !inlet.hasBeenPulled && !inlet.isClosed) inlet.pull()
}
if (!upstreamClosed && buffer.size < multiplexer.maxBytesToBufferPerSubstream && !inlet.hasBeenPulled && !inlet.isClosed) inlet.pull()

/** Cleans up internal state (but not external) */
private def cleanupStream(): Unit = {
Expand Down
Expand Up @@ -40,7 +40,7 @@ class RequestParsingSpec extends AkkaSpec() with Inside with Inspectors {
priorityInfo = None
),
trailingHeaders = OptionVal.None,
data = data,
data = Right(data),
correlationAttributes = Map.empty
)
// Create the parsing function
Expand Down

0 comments on commit 5612e4e

Please sign in to comment.