Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: support chunked trailing headers in transformDataBytes #2748

Merged

Conversation

@pasikon
Copy link
Contributor

pasikon commented Oct 10, 2019

Purpose

To support trailing headers in akka.http.scaladsl.model.HttpEntity.Chunked#transformDataBytes to improve compatibility with akka-grpc which is setting trailing header (gRPC status) in akka.http.scaladsl.model.HttpEntity.Chunked.

When data of such an entity is about to be processed with akka.http.scaladsl.model.HttpEntity.Chunked#transformDataBytes we get IllegalArgumentException("Chunked.transformDataBytes not allowed for chunks with metadata")

References

References #2218 #45 #1869

Changes

If the subject of data transormation - akka.http.scaladsl.model.HttpEntity.chunks is containing the chunk with trailing headers concatenate it to the chunks which are the result of data transformation with akka.http.scaladsl.model.HttpEntity.Chunked#transformDataBytes

Background Context

I'm instrumenting akka-grpc calls with kamon-akka which performing such http entity transformations, they are not supported at the moment, I would like to fix that

@lightbend-cla-validator

This comment has been minimized.

Copy link

lightbend-cla-validator commented Oct 10, 2019

Hi @pasikon,

Thank you for your contribution! We really value the time you've taken to put this together.

Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement:

http://www.lightbend.com/contribute/cla

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 10, 2019

Thank you for your pull request! After a quick sanity check one of the team will reply with 'OK TO TEST' to kick off our automated validation on Jenkins. This compiles the project, runs the tests, and checks for things like binary compatibility and source code formatting. When two team members have also manually reviewed and (perhaps after asking for some amendments) accepted your contribution, it should be good to be merged.

For more details about our contributing process, check out CONTRIBUTING.md - and feel free to ask!

@jrudolph

This comment has been minimized.

Copy link
Member

jrudolph commented Oct 10, 2019

OK TO TEST

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 10, 2019

Test FAILed.

Pull request validation report

Failed Test Suites

Test result for 'akka-http-tests / Pr-validation / ./ executeTests'

[info] ScalaTest
[info] Run completed in 3 minutes, 57 seconds.
[info] Total number of tests run: 1233
[info] Suites: completed 63, aborted 0
[info] Tests: succeeded 1231, failed 2, canceled 0, ignored 0, pending 39
[info] *** 2 TESTS FAILED ***
[error] Failed: Total 1386, Failed 2, Errors 0, Passed 1384, Pending 39
[error] Failed tests:
[error] 	akka.http.scaladsl.server.directives.CodingDirectivesSpec
@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 11, 2019

Test PASSed.

@pasikon

This comment has been minimized.

Copy link
Contributor Author

pasikon commented Oct 22, 2019

Can somebody please look at it, I need it in my project and I wonder if this is the correct way of fixing the issue... 😼

Copy link
Member

jrudolph left a comment

Interesting solution but not sure it really works in all cases. In any case the mutable state needs to be removed. Would be good if it could be simplified. We also need to assess if the added complexity and performance cost is worth the benefit.

@jrudolph

This comment has been minimized.

Copy link
Member

jrudolph commented Oct 22, 2019

Here's an alternative slightly simpler version:

    override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): HttpEntity.Chunked = {
      val transformChunks = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import akka.stream.scaladsl.GraphDSL.Implicits._

        val bcast = builder.add(Broadcast[ChunkStreamPart](2))
        val concat = builder.add(Concat[ChunkStreamPart](2))

        val chunkTransformer: Flow[ChunkStreamPart, ChunkStreamPart, Any] =
          Flow[ChunkStreamPart]
            .collect {
              case Chunk(b, "") => b
            }
            .via(transformer)
            .map(b => Chunk(b))

        val trailerBypass: Flow[ChunkStreamPart, ChunkStreamPart, Any] =
          Flow[ChunkStreamPart]
            // make sure to filter out any errors here, otherwise they don't go through the user transformer
            .recover { case NonFatal(ex) => Chunk(ByteString(0), "") }
            .collect { case lc @ LastChunk(_, s) if s.nonEmpty => lc }

        bcast ~> chunkTransformer ~> concat
        bcast ~> trailerBypass ~> concat
        FlowShape(bcast.in, concat.out)
      }

      HttpEntity.Chunked(contentType, chunks.via(transformChunks))
    }
@jrudolph

This comment has been minimized.

Copy link
Member

jrudolph commented Oct 22, 2019

I agree it would be good to support trailers in the light of grpc using trailing headers. On the other hand, this change might be somewhat expensive because the processing path for each chunk is now somewhat more heavy with control flow. On the other hand, you can always amortize the processing costs by providing bigger chunks.

Here's another iteration that uses Partition instead of Broadcast to prevent the second branch from being active until the last element:

override def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): HttpEntity.Chunked = {
      val transformChunks = GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import akka.stream.scaladsl.GraphDSL.Implicits._

        val partition = builder.add(Partition[ChunkStreamPart](2, {
          case c: Chunk     => 0
          case c: LastChunk => 1
        }))
        val concat = builder.add(Concat[ChunkStreamPart](2))

        val chunkTransformer: Flow[ChunkStreamPart, ChunkStreamPart, Any] =
          Flow[ChunkStreamPart]
            .map(_.data)
            .via(transformer)
            .map(b => Chunk(b))

        val trailerBypass: Flow[ChunkStreamPart, ChunkStreamPart, Any] =
          Flow[ChunkStreamPart]
            // make sure to filter out any errors here, otherwise they don't go through the user transformer
            .recover { case NonFatal(ex) => Chunk(ByteString(0), "") }
            .collect { case lc @ LastChunk(_, s) if s.nonEmpty => lc }

        partition ~> chunkTransformer ~> concat
        partition ~> trailerBypass ~> concat
        FlowShape(partition.in, concat.out)
      }

      HttpEntity.Chunked(contentType, chunks.via(transformChunks))
    }

WDYT? I might find this acceptable. (And anyway, thanks for raising this issue and proposing a fix, @pasikon).

@akka-ci akka-ci added validating and removed tested labels Oct 22, 2019
@pasikon

This comment has been minimized.

Copy link
Contributor Author

pasikon commented Oct 22, 2019

@jrudolph yeah I will rely a lot on such byte processing, difficult to judge perf impact now but widening the grpc support is critical to my project, thanks for improving my proposal, its great to gain some experience with Akka Streams!
I have committed your improvement also the issue is that its not compiling with Scala 2.11, maybe you can help with that?

[error] /home/michal/dev/sources/akka-http/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala:536:50: type mismatch;
[error]  found   : akka.stream.Graph[akka.stream.FlowShape[Chunked.this.ChunkStreamPart,Chunked.this.ChunkStreamPart],akka.NotUsed]
[error]     (which expands to)  akka.stream.Graph[akka.stream.FlowShape[akka.http.javadsl.model.HttpEntity.ChunkStreamPart,akka.http.javadsl.model.HttpEntity.ChunkStreamPart],akka.NotUsed]
[error]  required: akka.stream.Graph[akka.stream.FlowShape[akka.http.scaladsl.model.HttpEntity.ChunkStreamPart,akka.http.scaladsl.model.HttpEntity.ChunkStreamPart],?]
[error]       HttpEntity.Chunked(contentType, chunks.via(transformChunks))

@akka-ci akka-ci added tested and removed validating labels Oct 22, 2019
@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 22, 2019

Test PASSed.

@jrudolph

This comment has been minimized.

Copy link
Member

jrudolph commented Oct 22, 2019

I have committed your improvement also the issue is that its not compiling with Scala 2.11, maybe you can help with that?

Try replacing ChunkStreamPart with HttpEntity.ChunkStreamPart. Otherwise, Scala 2.11 seems to confuse it with the nested interface in javadsl.model.Chunked.ChunkStreamPart.

@akka-ci akka-ci added validating and removed tested labels Oct 22, 2019
@akka-ci akka-ci added tested and removed validating labels Oct 22, 2019
@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 22, 2019

Test PASSed.

@pasikon pasikon requested a review from jrudolph Oct 22, 2019
@akka-ci akka-ci added validating and removed tested labels Oct 23, 2019
Copy link
Member

jrudolph left a comment

LGTM, thanks a lot @pasikon. (I added a few cosmetic changes)

@akka-ci akka-ci added needs-attention and removed validating labels Oct 23, 2019
@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 23, 2019

Test FAILed.

!!! Couldn't read commit file !!!

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 23, 2019

Test PASSed.

@jrudolph jrudolph changed the title HttpEntity trailing headers transformDataBytes support core: support chunked trailing headers in transformDataBytes Oct 23, 2019
@jrudolph jrudolph merged commit a82046e into akka:master Oct 23, 2019
4 checks passed
4 checks passed
Jenkins PR Auto-Formatter Successful
Details
Jenkins PR Validation 4177 tests run, 1074 skipped, 0 failed.
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
typesafe-cla-validator All users have signed the CLA
Details
@jrudolph

This comment has been minimized.

Copy link
Member

jrudolph commented Oct 23, 2019

Thanks a lot, @pasikon.

@jrudolph jrudolph added this to the 10.1.11 milestone Oct 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.