Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamed response processing performance improvements #2645

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Changes to internal classes
ProblemFilters.exclude[Problem]("akka.http.impl.engine.rendering.*")
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import HttpProtocols._
import akka.annotation.InternalApi
import ResponseRenderingContext.CloseRequested
import akka.http.impl.engine.server.UpgradeToOtherProtocolResponseHeader
import akka.stream.scaladsl.Sink
import headers._

import scala.util.control.NonFatal
Expand Down Expand Up @@ -80,8 +81,8 @@ private[http] class HttpResponseRendererFactory(
case Strict(outElement) =>
push(out, outElement)
if (close) completeStage()
case Streamed(outStream) =>
try transfer(outStream)
case HeadersAndStreamedEntity(headerData, outStream) =>
try transfer(headerData, outStream)
catch {
case NonFatal(e) =>
transferring = false
Expand All @@ -98,9 +99,9 @@ private[http] class HttpResponseRendererFactory(
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
}
setHandler(out, waitForDemandHandler)
def transfer(outStream: Source[ResponseRenderingOutput, Any]): Unit = {
def transfer(headerData: ByteString, outStream: Source[ByteString, Any]): Unit = {
transferring = true
val sinkIn = new SubSinkInlet[ResponseRenderingOutput]("RenderingSink")
val sinkIn = new SubSinkInlet[ByteString]("RenderingSink")
def stopTransfer(): Unit = {
transferring = false
setHandler(out, waitForDemandHandler)
Expand All @@ -109,13 +110,21 @@ private[http] class HttpResponseRendererFactory(
}

sinkIn.setHandler(new InHandler {
override def onPush(): Unit = push(out, sinkIn.grab())
override def onPush(): Unit = push(out, ResponseRenderingOutput.HttpData(sinkIn.grab()))
override def onUpstreamFinish(): Unit =
if (close) completeStage()
else stopTransfer()
})

var headersSent = false
def sendHeaders(): Unit = {
push(out, ResponseRenderingOutput.HttpData(headerData))
headersSent = true
}
setHandler(out, new OutHandler {
override def onPull(): Unit = sinkIn.pull()
override def onPull(): Unit =
if (!headersSent) sendHeaders()
else sinkIn.pull()
override def onDownstreamFinish(): Unit = {
completeStage()
sinkIn.cancel()
Expand All @@ -124,7 +133,7 @@ private[http] class HttpResponseRendererFactory(

try {
outStream.runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
sinkIn.pull()
if (isAvailable(out)) sendHeaders()
} catch {
case NonFatal(e) =>
stopTransfer()
Expand Down Expand Up @@ -239,8 +248,16 @@ private[http] class HttpResponseRendererFactory(
def renderContentLengthHeader(contentLength: Long) =
if (status.allowsEntity) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r

def byteStrings(entityBytes: => Source[ByteString, Any]): Source[ResponseRenderingOutput, Any] =
renderByteStrings(r.asByteString, entityBytes, skipEntity = noEntity).map(ResponseRenderingOutput.HttpData(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so the concat was in here, and is replaced with simply separate 'push' calls here. Makes sense.

It seems renderByteStrings is now unused entirely, so perhaps we should remove it?

def headersAndEntity(entityBytes: => Source[ByteString, Any]): StrictOrStreamed =
if (noEntity) {
entityBytes.runWith(Sink.cancelled)(subFusingMaterializer)
Strict(ResponseRenderingOutput.HttpData(r.asByteString))
} else {
HeadersAndStreamedEntity(
r.asByteString,
entityBytes
)
}

@tailrec def completeResponseRendering(entity: ResponseEntity): StrictOrStreamed =
entity match {
Expand Down Expand Up @@ -268,20 +285,20 @@ private[http] class HttpResponseRendererFactory(
renderHeaders(headers)
renderEntityContentType(r, entity)
renderContentLengthHeader(contentLength) ~~ CrLf
Streamed(byteStrings(data.via(CheckContentLengthTransformer.flow(contentLength))))
headersAndEntity(data.via(CheckContentLengthTransformer.flow(contentLength)))

case HttpEntity.CloseDelimited(_, data) =>
renderHeaders(headers, alwaysClose = ctx.requestMethod != HttpMethods.HEAD)
renderEntityContentType(r, entity) ~~ CrLf
Streamed(byteStrings(data))
headersAndEntity(data)

case HttpEntity.Chunked(contentType, chunks) =>
if (ctx.requestProtocol == `HTTP/1.0`)
completeResponseRendering(HttpEntity.CloseDelimited(contentType, chunks.map(_.data)))
else {
renderHeaders(headers)
renderEntityContentType(r, entity) ~~ CrLf
Streamed(byteStrings(chunks.via(ChunkTransformer.flow)))
headersAndEntity(chunks.via(ChunkTransformer.flow))
}
}

Expand All @@ -292,7 +309,7 @@ private[http] class HttpResponseRendererFactory(

sealed trait StrictOrStreamed
case class Strict(bytes: ResponseRenderingOutput) extends StrictOrStreamed
case class Streamed(source: Source[ResponseRenderingOutput, Any]) extends StrictOrStreamed
case class HeadersAndStreamedEntity(headerBytes: ByteString, remainingData: Source[ByteString, Any]) extends StrictOrStreamed
}

sealed trait CloseMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,6 @@ private[rendering] object RenderSupport {
r ~~ headers.`Content-Type` ~~ ct ~~ CrLf
}

def renderByteStrings(header: ByteString, entityBytes: => Source[ByteString, Any],
skipEntity: Boolean = false): Source[ByteString, Any] = {
val messageStart = Source.single(header)
val messageBytes =
if (!skipEntity) (messageStart ++ entityBytes).mapMaterializedValue(_ => ())
else CancelSecond(messageStart, entityBytes)
messageBytes
}

object ChunkTransformer {
val flow = Flow.fromGraph(new ChunkTransformer).named("renderChunks")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import scala.util.control.NonFatal
import akka.NotUsed
import akka.actor.Cancellable
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.japi.Function
import akka.event.LoggingAdapter
import akka.util.ByteString
Expand All @@ -35,6 +36,8 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws.Message
import akka.http.impl.util.LogByteStringTools._

import scala.util.Failure

/**
* INTERNAL API
*
Expand Down Expand Up @@ -422,9 +425,11 @@ private[http] object HttpServerBluePrint {
if (response0.entity.isStrict) response0 // response stream cannot fail
else response0.mapEntity { e =>
val (newEntity, fut) = HttpEntity.captureTermination(e)
fut.failed.foreach { ex =>
log.error(ex, s"Response stream for [${requestStart.debugString}] failed with '${ex.getMessage}'. Aborting connection.")
}(materializer.executionContext)
fut.onComplete {
case Failure(ex) =>
log.error(ex, s"Response stream for [${requestStart.debugString}] failed with '${ex.getMessage}'. Aborting connection.")
case _ => // ignore
}(ExecutionContexts.sameThreadExecutionContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

newEntity
}

Expand Down