Skip to content

Commit

Permalink
Allow a maximum size parameter for HttpEntity 'toStrict' (akka#2186)
Browse files Browse the repository at this point in the history
And apply configurable default limit otherwise.

Fixes akka#268. Refs akka#2137.
  • Loading branch information
raboof authored and Synesso committed Sep 5, 2018
1 parent 4c6deda commit 91eae3a
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 19 deletions.
Expand Up @@ -128,7 +128,7 @@ public interface HttpEntity {
HttpEntity withoutSizeLimit();

/**
* Returns a future of a strict entity that contains the same data as this entity
* Returns a CompletionStage of a strict entity that contains the same data as this entity
* which is only completed when the complete entity has been collected. As the
* duration of receiving the complete entity cannot be predicted, a timeout needs to
* be specified to guard the process against running and keeping resources infinitely.
Expand All @@ -138,6 +138,17 @@ public interface HttpEntity {
*/
CompletionStage<HttpEntity.Strict> toStrict(long timeoutMillis, Materializer materializer);

/**
* Returns a CompletionStage of a strict entity that contains the same data as this entity
* which is only completed when the complete entity has been collected. As the
* duration of receiving the complete entity cannot be predicted, a timeout needs to
* be specified to guard the process against running and keeping resources infinitely.
*
* Use getDataBytes and stream processing instead if the expected data is big or
* is likely to take a long time.
*/
CompletionStage<HttpEntity.Strict> toStrict(long timeoutMillis, long maxBytes, Materializer materializer);

/**
* Discards the entities data bytes by running the {@code dataBytes} Source contained in this entity.
*
Expand Down
Expand Up @@ -182,7 +182,7 @@ interface MessageTransformations<Self> {
<T> Self transformEntityDataBytes(Graph<FlowShape<ByteString, ByteString>, T> transformer);

/**
* Returns a future of Self message with strict entity that contains the same data as this entity
* Returns a CompletionStage of Self message with strict entity that contains the same data as this entity
* which is only completed when the complete entity has been collected. As the
* duration of receiving the complete entity cannot be predicted, a timeout needs to
* be specified to guard the process against running and keeping resources infinitely.
Expand All @@ -191,5 +191,16 @@ interface MessageTransformations<Self> {
* is likely to take a long time.
*/
CompletionStage<? extends Self> toStrict(long timeoutMillis, Executor ec, Materializer materializer);

/**
* Returns a CompletionStage of Self message with strict entity that contains the same data as this entity
* which is only completed when the complete entity has been collected. As the
* duration of receiving the complete entity cannot be predicted, a timeout needs to
* be specified to guard the process against running and keeping resources infinitely.
*
* Use getEntity().getDataBytes and stream processing instead if the expected data is big or
* is likely to take a long time.
*/
CompletionStage<? extends Self> toStrict(long timeoutMillis, long maxBytes, Executor ec, Materializer materializer);
}
}
13 changes: 13 additions & 0 deletions akka-http-core/src/main/mima-filters/10.1.4.backwards.excludes
@@ -0,0 +1,13 @@
# Not meant for user extension, so new methods should be fine:
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpEntity.toStrict")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.toStrict")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpMessage#MessageTransformations.toStrict")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpMessage.toStrict")

# ToStrict is private[http]
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.util.ToStrict.this")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings.ParserSettings.getMaxToStrictBytes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings.ParserSettings.maxToStrictBytes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ParserSettings.maxToStrictBytes")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.settings.ParserSettingsImpl.*")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.settings.ParserSettingsImpl.*")
4 changes: 4 additions & 0 deletions akka-http-core/src/main/resources/reference.conf
Expand Up @@ -413,6 +413,10 @@ akka.http {
# programmatically via `withSizeLimit`.)
max-content-length = 8m

# The maximum number of bytes to allow when reading the entire entity into memory with `toStrict`
# (which is used by the `toStrictEntity` and `extractStrictEntity` directives)
max-to-strict-bytes = 8m

# Sets the strictness mode for parsing request target URIs.
# The following values are defined:
#
Expand Down
Expand Up @@ -23,6 +23,7 @@ private[akka] final case class ParserSettingsImpl(
maxHeaderValueLength: Int,
maxHeaderCount: Int,
maxContentLength: Long,
maxToStrictBytes: Long,
maxChunkExtLength: Int,
maxChunkSize: Int,
uriParsingMode: Uri.ParsingMode,
Expand Down Expand Up @@ -75,6 +76,7 @@ object ParserSettingsImpl extends SettingsCompanion[ParserSettingsImpl]("akka.ht
c.getIntBytes("max-header-value-length"),
c.getIntBytes("max-header-count"),
c.getPossiblyInfiniteBytes("max-content-length"),
c.getPossiblyInfiniteBytes("max-to-strict-bytes"),
c.getIntBytes("max-chunk-ext-length"),
c.getIntBytes("max-chunk-size"),
Uri.ParsingMode(c.getString("uri-parsing-mode")),
Expand All @@ -88,7 +90,8 @@ object ParserSettingsImpl extends SettingsCompanion[ParserSettingsImpl]("akka.ht
c.getBoolean("modeled-header-parsing"),
noCustomMethods,
noCustomStatusCodes,
noCustomMediaTypes)
noCustomMediaTypes
)
}

}
Expand Down
14 changes: 10 additions & 4 deletions akka-http-core/src/main/scala/akka/http/impl/util/package.scala
Expand Up @@ -92,11 +92,12 @@ package object util {

package util {

import akka.http.scaladsl.model.{ ContentType, HttpEntity }
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.http.scaladsl.model.{ ContentType, EntityStreamException, ErrorInfo, HttpEntity }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }

import scala.concurrent.duration.FiniteDuration

private[http] class ToStrict(timeout: FiniteDuration, contentType: ContentType)
private[http] class ToStrict(timeout: FiniteDuration, maxBytes: Option[Long], contentType: ContentType)
extends GraphStage[FlowShape[ByteString, HttpEntity.Strict]] {

val byteStringIn = Inlet[ByteString]("ToStrict.byteStringIn")
Expand Down Expand Up @@ -124,7 +125,12 @@ package util {
setHandler(byteStringIn, new InHandler {
override def onPush(): Unit = {
bytes ++= grab(byteStringIn)
pull(byteStringIn)
maxBytes match {
case Some(max) if bytes.length > max
failStage(new EntityStreamException(new ErrorInfo("Request too large", s"Request was longer than the maximum of $max")))
case _
pull(byteStringIn)
}
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(httpEntityOut)) {
Expand Down
Expand Up @@ -31,6 +31,7 @@ abstract class ParserSettings private[akka] () extends BodyPartParser.Settings {
def getMaxHeaderValueLength: Int
def getMaxHeaderCount: Int
def getMaxContentLength: Long
def getMaxToStrictBytes: Long
def getMaxChunkExtLength: Int
def getMaxChunkSize: Int
def getUriParsingMode: Uri.ParsingMode
Expand All @@ -56,6 +57,7 @@ abstract class ParserSettings private[akka] () extends BodyPartParser.Settings {
def withMaxHeaderValueLength(newValue: Int): ParserSettings = self.copy(maxHeaderValueLength = newValue)
def withMaxHeaderCount(newValue: Int): ParserSettings = self.copy(maxHeaderCount = newValue)
def withMaxContentLength(newValue: Long): ParserSettings = self.copy(maxContentLength = newValue)
def withMaxToStrictBytes(newValue: Long): ParserSettings = self.copy(maxToStrictBytes = newValue)
def withMaxChunkExtLength(newValue: Int): ParserSettings = self.copy(maxChunkExtLength = newValue)
def withMaxChunkSize(newValue: Int): ParserSettings = self.copy(maxChunkSize = newValue)
def withUriParsingMode(newValue: Uri.ParsingMode): ParserSettings = self.copy(uriParsingMode = newValue.asScala)
Expand Down
Expand Up @@ -68,12 +68,31 @@ sealed trait HttpEntity extends jm.HttpEntity {

/**
* Collects all possible parts and returns a potentially future Strict entity for easier processing.
* The Future is failed with an TimeoutException if the stream isn't completed after the given timeout.
*/
def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] =
dataBytes
.via(new akka.http.impl.util.ToStrict(timeout, contentType))
.runWith(Sink.head)
* The Future is failed with an TimeoutException if the stream isn't completed after the given timeout,
* or with a EntityStreamException when the end of the entity is not reached within the maximum number of bytes
* as configured in `akka.http.parsing.max-to-strict-bytes`. Not that this method does not support different
* defaults for client- and server use: if you want that, use the `toStrict` method and pass in an explicit
* maximum number of bytes.
*/
def toStrict(timeout: FiniteDuration)(implicit fm: Materializer): Future[HttpEntity.Strict] = {
import akka.http.impl.util._
val config = fm.asInstanceOf[ActorMaterializer].system.settings.config
toStrict(timeout, config.getPossiblyInfiniteBytes("akka.http.parsing.max-to-strict-bytes"))
}

/**
* Collects all possible parts and returns a potentially future Strict entity for easier processing.
* The Future is failed with an TimeoutException if the stream isn't completed after the given timeout,
* or with a EntityStreamException when the end of the entity is not reached within the maximum number of bytes.
*/
def toStrict(timeout: FiniteDuration, maxBytes: Long)(implicit fm: Materializer): Future[HttpEntity.Strict] = contentLengthOption match {
case Some(contentLength) if contentLength > maxBytes
FastFuture.failed(new EntityStreamException(new ErrorInfo("Request too large", s"Request of size $contentLength was longer than the maximum of $maxBytes")))
case _
dataBytes
.via(new akka.http.impl.util.ToStrict(timeout, Some(maxBytes), contentType))
.runWith(Sink.head)
}

/**
* Discards the entities data bytes by running the `dataBytes` Source contained in this `entity`.
Expand Down Expand Up @@ -172,6 +191,10 @@ sealed trait HttpEntity extends jm.HttpEntity {
override def toStrict(timeoutMillis: Long, materializer: Materializer): CompletionStage[jm.HttpEntity.Strict] =
toStrict(timeoutMillis.millis)(materializer).toJava

/** Java API */
override def toStrict(timeoutMillis: Long, maxBytes: Long, materializer: Materializer): CompletionStage[jm.HttpEntity.Strict] =
toStrict(timeoutMillis.millis)(materializer).toJava

/** Java API */
override def withContentType(contentType: jm.ContentType): HttpEntity = {
import JavaMapping.Implicits._
Expand Down
Expand Up @@ -95,6 +95,10 @@ sealed trait HttpMessage extends jm.HttpMessage {
def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: Materializer): Future[Self] =
entity.toStrict(timeout).fast.map(this.withEntity)

/** Returns a shareable and serializable copy of this message with a strict entity. */
def toStrict(timeout: FiniteDuration, maxBytes: Long)(implicit ec: ExecutionContext, fm: Materializer): Future[Self] =
entity.toStrict(timeout, maxBytes).fast.map(this.withEntity)

/** Returns a copy of this message with the entity and headers set to the given ones. */
def withHeadersAndEntity(headers: immutable.Seq[HttpHeader], entity: MessageEntity): Self

Expand Down Expand Up @@ -185,6 +189,11 @@ sealed trait HttpMessage extends jm.HttpMessage {
val ex = ExecutionContext.fromExecutor(ec)
toStrict(timeoutMillis.millis)(ex, materializer).toJava
}
/** Java API */
def toStrict(timeoutMillis: Long, maxBytes: Long, ec: Executor, materializer: Materializer): CompletionStage[Self] = {
val ex = ExecutionContext.fromExecutor(ec)
toStrict(timeoutMillis.millis, maxBytes)(ex, materializer).toJava
}
}

object HttpMessage {
Expand Down
Expand Up @@ -31,6 +31,7 @@ abstract class ParserSettings private[akka] () extends akka.http.javadsl.setting
def maxHeaderValueLength: Int
def maxHeaderCount: Int
def maxContentLength: Long
def maxToStrictBytes: Long
def maxChunkExtLength: Int
def maxChunkSize: Int
def uriParsingMode: Uri.ParsingMode
Expand All @@ -53,6 +54,7 @@ abstract class ParserSettings private[akka] () extends akka.http.javadsl.setting
override def getUriParsingMode: akka.http.javadsl.model.Uri.ParsingMode = uriParsingMode
override def getMaxHeaderCount = maxHeaderCount
override def getMaxContentLength = maxContentLength
override def getMaxToStrictBytes = maxToStrictBytes
override def getMaxHeaderValueLength = maxHeaderValueLength
override def getIncludeTlsSessionInfoHeader = includeTlsSessionInfoHeader
override def getIllegalHeaderWarnings = illegalHeaderWarnings
Expand Down Expand Up @@ -85,6 +87,7 @@ abstract class ParserSettings private[akka] () extends akka.http.javadsl.setting
override def withMaxHeaderValueLength(newValue: Int): ParserSettings = self.copy(maxHeaderValueLength = newValue)
override def withMaxHeaderCount(newValue: Int): ParserSettings = self.copy(maxHeaderCount = newValue)
override def withMaxContentLength(newValue: Long): ParserSettings = self.copy(maxContentLength = newValue)
override def withMaxToStrictBytes(newValue: Long): ParserSettings = self.copy(maxToStrictBytes = newValue)
override def withMaxChunkExtLength(newValue: Int): ParserSettings = self.copy(maxChunkExtLength = newValue)
override def withMaxChunkSize(newValue: Int): ParserSettings = self.copy(maxChunkSize = newValue)
override def withIllegalHeaderWarnings(newValue: Boolean): ParserSettings = self.copy(illegalHeaderWarnings = newValue)
Expand Down
Expand Up @@ -20,6 +20,7 @@ import akka.stream.ActorMaterializer
import akka.http.scaladsl.model.HttpEntity._
import akka.http.impl.util.StreamUtils
import akka.testkit._
import org.scalatest.concurrent.ScalaFutures

import scala.util.Random

Expand Down Expand Up @@ -99,6 +100,45 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
}.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data")
}
}
"support toStrict with the default max size" - {
"Infinite data stream" in {
intercept[EntityStreamException] {
Await.result(Chunked(tpe, Source.repeat(Chunk(abc))).toStrict(awaitAtMost), awaitAtMost)
}.getMessage must be("Request too large: Request was longer than the maximum of 8388608")
}
}
"support toStrict with a max size" - {
"Strict" in {
intercept[EntityStreamException] {
Await.result(Strict(tpe, abc).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost)
}.getMessage must be("Request too large: Request of size 3 was longer than the maximum of 1")
}
"Default" in {
intercept[EntityStreamException] {
Await.result(Default(tpe, 11, source(abc, de, fgh, ijk)).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost)
}.getMessage must be("Request too large: Request of size 11 was longer than the maximum of 1")
}
"CloseDelimited" in {
intercept[EntityStreamException] {
Await.result(CloseDelimited(tpe, source(abc, de, fgh, ijk)).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost)
}.getMessage must be("Request too large: Request was longer than the maximum of 1")
}
"Chunked w/o LastChunk" in {
intercept[EntityStreamException] {
Await.result(Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk))).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost)
}.getMessage must be("Request too large: Request was longer than the maximum of 1")
}
"Chunked with LastChunk" in {
intercept[EntityStreamException] {
Await.result(Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost)
}.getMessage must be("Request too large: Request was longer than the maximum of 1")
}
"Infinite data stream" in {
intercept[EntityStreamException] {
Await.result(Chunked(tpe, Source.repeat(Chunk(abc))).toStrict(awaitAtMost, maxBytes = 1), awaitAtMost)
}.getMessage must be("Request too large: Request was longer than the maximum of 1")
}
}
"support transformDataBytes" - {
"Strict" in {
Strict(tpe, abc) must transformTo(Strict(tpe, doubleChars("abc") ++ trailer))
Expand Down
5 changes: 5 additions & 0 deletions akka-http/src/main/mima-filters/10.1.4.backwards.excludes
@@ -1,5 +1,10 @@
# Changes against 10.1.4

ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.server.directives.BasicDirectives.toStrictEntity")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.server.directives.BasicDirectives.toStrictEntity")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.server.directives.BasicDirectives.extractStrictEntity")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.server.directives.BasicDirectives.extractStrictEntity")

# Changes to internal API
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.javadsl.unmarshalling.sse.EventStreamUnmarshalling.fromEventStream")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling.fromEventStream")
Expand Down

0 comments on commit 91eae3a

Please sign in to comment.