Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: support chunked trailing headers in transformDataBytes #2748

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -507,15 +507,38 @@ object HttpEntity {
withSizeLimit(SizeLimit.Disabled)

override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): HttpEntity.Chunked = {
val newData =
chunks.map {
case Chunk(data, "") => data
case LastChunk("", Nil) => ByteString.empty
case _ =>
throw new IllegalArgumentException("Chunked.transformDataBytes not allowed for chunks with metadata")
} via transformer

HttpEntity.Chunked.fromData(contentType, newData)
// This construction allows to keep trailing headers. For that the stream is split into two
// tracks. One for the regular chunks and one for the LastChunk. Only the regular chunks are
// run through the user-supplied transformer, the LastChunk is just passed on. The tracks are
// then concatenated to produce the final stream.
val transformChunks = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import akka.stream.scaladsl.GraphDSL.Implicits._

val partition = builder.add(Partition[HttpEntity.ChunkStreamPart](2, {
case c: Chunk => 0
case c: LastChunk => 1
}))
val concat = builder.add(Concat[HttpEntity.ChunkStreamPart](2))

val chunkTransformer: Flow[HttpEntity.ChunkStreamPart, HttpEntity.ChunkStreamPart, Any] =
Flow[HttpEntity.ChunkStreamPart]
.map(_.data)
.via(transformer)
.map(b => Chunk(b))

val trailerBypass: Flow[HttpEntity.ChunkStreamPart, HttpEntity.ChunkStreamPart, Any] =
Flow[HttpEntity.ChunkStreamPart]
// make sure to filter out any errors here, otherwise they don't go through the user transformer
.recover { case NonFatal(ex) => Chunk(ByteString(0), "") }
// only needed to filter the out the result from recover in the line above
.collect { case lc @ LastChunk(_, s) if s.nonEmpty => lc }

partition ~> chunkTransformer ~> concat
partition ~> trailerBypass ~> concat
FlowShape(partition.in, concat.out)
}

HttpEntity.Chunked(contentType, chunks.via(transformChunks))
}

def withContentType(contentType: ContentType): HttpEntity.Chunked =
Expand Down
Expand Up @@ -10,6 +10,7 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.impl.util.StreamUtils
import akka.http.scaladsl.model.HttpEntity._
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.testkit._
Expand Down Expand Up @@ -161,6 +162,22 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk, LastChunk)) must
transformTo(Strict(tpe, doubleChars("abcfghijk") ++ trailer))
}
"Chunked with LastChunk with trailer header" in {
Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk("", RawHeader("Foo", "pip apo") :: Nil))) must
transformTo(Strict(tpe, doubleChars("abcfghijk") ++ trailer))
}
"Chunked with LastChunk with trailer header keep header chunk" in {
val entity = Chunked(tpe, source(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk("", RawHeader("Foo", "pip apo") :: Nil)))
val transformed = entity.transformDataBytes(duplicateBytesTransformer())
val parts = transformed.chunks.runWith(Sink.seq).awaitResult(100.millis)

parts.map(_.data).reduce(_ ++ _) mustEqual doubleChars("abcfghijk") ++ trailer

val lastPart = parts.last
lastPart.isLastChunk mustBe (true)
lastPart mustBe a[LastChunk]
lastPart.asInstanceOf[LastChunk].trailer mustEqual (RawHeader("Foo", "pip apo") :: Nil)
}
}
"support toString" - {
"Strict with binary MediaType" in {
Expand Down