Skip to content

Commit

Permalink
core: remove complex graph building from streamed entity rendering
Browse files Browse the repository at this point in the history
It turned out that aside from materialization which is still somewhat slow
at least some bits of slowness were transferred from materialization to building
stream graphs. The culprit in this case is `Source.concat`, which is really
slow.
  • Loading branch information
jrudolph committed Aug 14, 2019
1 parent 720b646 commit 9fd6b0b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
@@ -0,0 +1,2 @@
# Changes to internal classes
ProblemFilters.exclude[Problem]("akka.http.impl.engine.rendering.*")
Expand Up @@ -21,6 +21,7 @@ import RenderSupport._
import HttpProtocols._
import akka.annotation.InternalApi
import ResponseRenderingContext.CloseRequested
import akka.stream.scaladsl.Sink
import headers._

import scala.util.control.NonFatal
Expand Down Expand Up @@ -81,9 +82,11 @@ private[http] class HttpResponseRendererFactory(
case Strict(outElement) =>
push(out, outElement)
if (close) completeStage()
case Streamed(outStream) =>
try transfer(outStream)
catch {
case HeadersAndStreamedEntity(headerData, outStream) =>
try {
push(out, ResponseRenderingOutput.HttpData(headerData))
transfer(outStream)
} catch {
case NonFatal(e) =>
transferring = false
log.error(e, s"Rendering of response failed because response entity stream materialization failed with '${e.getMessage}'. Sending out 500 response instead.")
Expand All @@ -99,9 +102,9 @@ private[http] class HttpResponseRendererFactory(
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
}
setHandler(out, waitForDemandHandler)
def transfer(outStream: Source[ResponseRenderingOutput, Any]): Unit = {
def transfer(outStream: Source[ByteString, Any]): Unit = {
transferring = true
val sinkIn = new SubSinkInlet[ResponseRenderingOutput]("RenderingSink")
val sinkIn = new SubSinkInlet[ByteString]("RenderingSink")
def stopTransfer(): Unit = {
transferring = false
setHandler(out, waitForDemandHandler)
Expand All @@ -110,7 +113,7 @@ private[http] class HttpResponseRendererFactory(
}

sinkIn.setHandler(new InHandler {
override def onPush(): Unit = push(out, sinkIn.grab())
override def onPush(): Unit = push(out, ResponseRenderingOutput.HttpData(sinkIn.grab()))
override def onUpstreamFinish(): Unit =
if (close) completeStage()
else stopTransfer()
Expand All @@ -125,7 +128,7 @@ private[http] class HttpResponseRendererFactory(

try {
outStream.runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
sinkIn.pull()
if (isAvailable(out)) sinkIn.pull()
} catch {
case NonFatal(e) =>
stopTransfer()
Expand Down Expand Up @@ -240,8 +243,16 @@ private[http] class HttpResponseRendererFactory(
def renderContentLengthHeader(contentLength: Long) =
if (status.allowsEntity) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r

def byteStrings(entityBytes: => Source[ByteString, Any]): Source[ResponseRenderingOutput, Any] =
renderByteStrings(r.asByteString, entityBytes, skipEntity = noEntity).map(ResponseRenderingOutput.HttpData(_))
def headersAndEntity(entityBytes: => Source[ByteString, Any]): StrictOrStreamed =
if (noEntity) {
entityBytes.runWith(Sink.cancelled)(subFusingMaterializer)
Strict(ResponseRenderingOutput.HttpData(r.asByteString))
} else {
HeadersAndStreamedEntity(
r.asByteString,
entityBytes
)
}

@tailrec def completeResponseRendering(entity: ResponseEntity): StrictOrStreamed =
entity match {
Expand Down Expand Up @@ -269,20 +280,20 @@ private[http] class HttpResponseRendererFactory(
renderHeaders(headers)
renderEntityContentType(r, entity)
renderContentLengthHeader(contentLength) ~~ CrLf
Streamed(byteStrings(data.via(CheckContentLengthTransformer.flow(contentLength))))
headersAndEntity(data.via(CheckContentLengthTransformer.flow(contentLength)))

case HttpEntity.CloseDelimited(_, data) =>
renderHeaders(headers, alwaysClose = ctx.requestMethod != HttpMethods.HEAD)
renderEntityContentType(r, entity) ~~ CrLf
Streamed(byteStrings(data))
headersAndEntity(data)

case HttpEntity.Chunked(contentType, chunks) =>
if (ctx.requestProtocol == `HTTP/1.0`)
completeResponseRendering(HttpEntity.CloseDelimited(contentType, chunks.map(_.data)))
else {
renderHeaders(headers)
renderEntityContentType(r, entity) ~~ CrLf
Streamed(byteStrings(chunks.via(ChunkTransformer.flow)))
headersAndEntity(chunks.via(ChunkTransformer.flow))
}
}

Expand All @@ -293,7 +304,7 @@ private[http] class HttpResponseRendererFactory(

sealed trait StrictOrStreamed
case class Strict(bytes: ResponseRenderingOutput) extends StrictOrStreamed
case class Streamed(source: Source[ResponseRenderingOutput, Any]) extends StrictOrStreamed
case class HeadersAndStreamedEntity(headerBytes: ByteString, remainingData: Source[ByteString, Any]) extends StrictOrStreamed
}

sealed trait CloseMode
Expand Down

0 comments on commit 9fd6b0b

Please sign in to comment.