Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Ember H2 end of stream handling #6882

Merged
merged 21 commits into from
Jan 16, 2023

Conversation

valencik
Copy link
Member

@valencik valencik commented Dec 28, 2022

This PR attempts to fix three issues:

Silent errors

Errors in processCreatedStreams were being silently dropped by a .attempt + .drain combo.
Fix: errors are now logged via logger.error

sendData on Closed stream

sendMessageBody would call sendData after closing the stream.

This was the hardest part, so I'm going to spend some time on it here.
Previously, within sendMessageBody looked like:

  def sendMessageBody(mess: Message[F]): F[Unit] = {
    val trailers = mess.attributes.lookup(Message.Keys.TrailerHeaders[F])
    mess.body.chunks.noneTerminate.zipWithNext
      .foreach {
        case (Some(c), Some(Some(_))) =>
          sendData(c.toByteVector, false)
        case (Some(c), Some(None) | None) =>
          sendData(c.toByteVector, trailers.isEmpty)
        case (None, _) =>
          if (trailers.isDefined) Applicative[F].unit
          else sendData(ByteVector.empty, true)
      }
      .compile
      .drain

Within the foreach the three cases map to:

case (Some(c), Some(Some(_))) => // more data to come
case (Some(c), Some(None) | None) => // last chunk
case (None, _) => // end of the stream

When processing the last chunk, the middle case, sendData would be called with endStream=true if no trailer headers were defined. As a result, sendData would modify the stream state to be closed. Therefore, when the final element of the stream hits and we try to send an empty bytevector, we do so on an already closed stream.

It's perhaps important to note that this issue is currently happening on http2 enabled servers but is hidden by the silent errors issue. Also that this issue does not affect clients / remote receivers as the second sendData call fails on the server.

Fix: handling an empty stream is now separate from handling the end of a stream, we no longer call sendData on a closed stream

MaxFrameSize

sendMessageBody did not consider the remote streams maxFrameSize
This PR applies the fix from #6845, so thank you very much @janilcgarcia for your work there.
Fix: maxFrameSize is respected via chunkLimit(maxFrameSize)

Prior to this commit `sendMessageBody` would call `sendData` any number
of times, then once for the last chunk, and finally one more time for
the stream's ending `None`. This fix removes the `noneTerminate` and
instead handles the empty stream case in an `ifEmpty`.
@mergify mergify bot added series/0.23 PRs targeting 0.23.x module:ember-core labels Dec 28, 2022
@@ -305,8 +305,10 @@ private[ember] object H2Server {
def processCreatedStreams(h2: H2Connection[F]): F[Unit] =
Stream
.fromQueueUnterminated(h2.createdStreams)
.map(i => Stream.eval(processCreatedStream(h2, i).attempt))
.parJoin(localSettings.maxConcurrentStreams.maxConcurrency)
.parEvalMap(localSettings.maxConcurrentStreams.maxConcurrency)(i =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are discarding the results, this can/should be parEvalMapUnordered I think.

Comment on lines 80 to 85
Stream.eval(
// Message empty with trailing headers, do nothing
if (trailers.isDefined) Applicative[F].unit
// Message empty no trailing headers, send empty bytevector
else sendData(ByteVector.empty, true)
) >> Stream.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be Stream.exec(...) :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw: this was a big readability improvement 🙏

@armanbilge armanbilge marked this pull request as ready for review December 29, 2022 01:25
Comment on lines 89 to 94
.foreach {
case (c, Some(_)) =>
sendData(c.toByteVector, endStream = false)
case (c, None) =>
sendData(c.toByteVector, endStream = trailers.isEmpty)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.foreach {
case (c, Some(_)) =>
sendData(c.toByteVector, endStream = false)
case (c, None) =>
sendData(c.toByteVector, endStream = trailers.isEmpty)
}
.foreach { case (c, opt) =>
val endStream = opt.isEmpty && trailers.isEmpty
sendData(c.toByteVector, endStream)
}

Co-authored-by: Diego E. Alonso <diesalbla@gmail.com>
Comment on lines 81 to 84
// Message empty with trailing headers, do nothing
if (trailers.isDefined) Applicative[F].unit
// Message empty no trailing headers, send empty bytevector
else sendData(ByteVector.empty, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a thing, just a tiny improvement suggestion

Suggested change
// Message empty with trailing headers, do nothing
if (trailers.isDefined) Applicative[F].unit
// Message empty no trailing headers, send empty bytevector
else sendData(ByteVector.empty, true)
// if the Message is empty with trailing headers, do nothing
// otherwise, if the Message is empty with no trailing headers, send an empty ByteVector
sendData(ByteVector.empty, true).when(trailers.isEmpty)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! there's also unlessA(trailers.isDefined) , maybe that reads even better:

"send an empty bytevector unless there are trailers"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"send an empty bytevector when there are no trailers" also works fine for me 😛

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fun...
Now we're always using trailers.isEmpty which makes me realize we only care about their absense. How about this:

   def sendMessageBody(mess: Message[F]): F[Unit] = {
-    val trailers = mess.attributes.lookup(Message.Keys.TrailerHeaders[F])
+    val noTrailers = mess.attributes.lookup(Message.Keys.TrailerHeaders[F]).isEmpty
     val maxFrameSize = remoteSettings.map(_.maxFrameSize.frameSize)
     maxFrameSize.flatMap(maxFrameSize =>
       mess.body
         .ifEmpty[F, Byte](
-          Stream.exec(
-            // send empty bytevector when there are no trailing headers
-            sendData(ByteVector.empty, true).whenA(trailers.isEmpty)
-          )
+          Stream.exec(sendData(ByteVector.empty, true).whenA(noTrailers))
         )
         .chunkLimit(maxFrameSize)
         .zipWithNext
         .foreach { case (c, nextChunk) =>
-          val isEndStream = nextChunk.isEmpty && trailers.isEmpty
+          val isEndStream = nextChunk.isEmpty && noTrailers
           sendData(c.toByteVector, isEndStream)
         }
         .compile

At this point the empty stream handling code is shorter than the comment explaining it.
Additionally this makes me wonder if Vault should have an exists method that returns Boolean 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmm, yes, I like it, I'm going with it. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to open a quick PR to Vault and go with contains instead of exists to match the underlying Map API.
typelevel/vault#447

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use the new contains :)

Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes, tests, write-ups all look very good to me (to the best of my understanding of H2 😇 ). Thank you and @janilcgarcia for all of your work on this!

Comment on lines 47 to 50
_ <- writeBlock.complete(Either.unit)
_ <- req.complete(Left(new Exception()))
_ <- resp.complete(Left(new Exception()))
_ <- trailers.complete(Right(Headers.empty))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity is completing all of these Deferreds necessary for the tests to run?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, no, completing these was not necessary.

@rossabaker
Copy link
Member

Is anything preventing a merge here? I'm looking to prepare a release...

@armanbilge
Copy link
Member

I reviewed, but I'm not confident in my H2 knowledge. Would still be good to get it in.

@valencik
Copy link
Member Author

Took another look at this, reviewing each commit, and confirming locally that tests still pass when we merge latest series/0.23. ✅
I also removed the chunkLimit(maxFrameSize) and confirmed that any test that has a body greater than the max frame size setting fails.

Finally, I published locally and ran a modified version of the scala-cli reproduction from the original issue: #6844 (comment)

click for scala-cli script
//> using scala "2.13.10"
//> using lib "org.typelevel::cats-effect:3.4.2"
//> using lib "co.fs2::fs2-core:3.4.0"
//> using lib "co.fs2::fs2-io:3.4.0"
//> using lib "org.http4s::http4s-dsl:0.23.16-406-c9b8ee6-SNAPSHOT"
//> using lib "org.http4s::http4s-ember-server:0.23.16-406-c9b8ee6-SNAPSHOT"
//> using lib "org.slf4j:slf4j-simple:2.0.5"

import cats.syntax.all._
import cats.effect.{Async, ExitCode, IO, IOApp, Resource}
import cats.effect.syntax.all._
import org.http4s.HttpRoutes
import org.http4s.MediaType
import org.http4s.headers.`Content-Type`
import org.http4s.dsl.Http4sDsl
import org.http4s.ember.server.EmberServerBuilder
import com.comcast.ip4s._

object RunServer extends IOApp with Http4sDsl[IO] {
  private val app = HttpRoutes.of[IO] {
    case GET -> Root / "bytes" / IntVar(numberOfBytes) =>
      val bytes = Array.fill(numberOfBytes)('A'.toByte)

      Ok(bytes).map(_.withContentType(`Content-Type`(MediaType.text.plain)))
    }.orNotFound

    private val server =
      EmberServerBuilder
        .default[IO]
        .withHttpApp(app)
        .withHttp2
        .withPort(port"5000")
        .build

    override def run(args: List[String]): IO[ExitCode] =
      server.useForever
}

The bug no longer exists on my locally published (this) version.

I feel pretty confident that this PR fixes the issues outlined in the description.
I think this PR is good to merge.

@valencik
Copy link
Member Author

gRPC test

After chatting with @ChristopherDavenport yesterday, he encouraged me to try out https://github.com/ChristopherDavenport/grpc-playground with a snapshot from this work.
Because gRPC uses http/2 and specifically the Trailer headers for communicating the gRPC status, it's a bit of an integration test.

The long and short of it is that both 0.23.16 and my snapshot work in the grpc-playground.
I've copied the full log output here: https://gist.github.com/valencik/a2666aa38e6fbe095d6dfa20c4f5b057

We can see the incoming request in ember's logs:

HTTP/2.0 POST http://127.0.0.1:8080/Greeter/sayHello Headers(content-type: application/grpc, te: trailers, user-agent: grpc-java-netty/1.47.1, grpc-accept-encoding: gzip)

The response (still in ember) with Trailer header

HTTP/1.1 200 OK Headers(Content-Length: 12, Content-Type: application/grpc+proto, Trailer: grpc-status) 

And as http/2 frame writes:

Write - Headers(identifier=3, dependency=None, endStream=false, endHeaders=true, headerBlock=ByteVector(39 bytes, 0x885c0231325f901d75d0620d263d4c4d6564ff75d8749f40854d833505b3889acac8b21234da8f), padding=None)
Write - Data(identifier=3, data=ByteVector(12 bytes, 0x00000000070a055361726168), pad=None, endStream=false)

Note the endStream=false on the DATA frame, because we have a Trailer.
And finally, those trailing headers:

Write - Headers(identifier=3, dependency=None, endStream=true, endHeaders=true, headerBlock=ByteVector(12 bytes, 0x40889acac8b21234da8f0130), padding=None)

This is reflected in the test client's logs:

INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[:status: 200, content-length: 12, content-type: applicati[17/1935]
INBOUND DATA: streamId=3 padding=0 endStream=false length=12 bytes=00000000070a055361726168 
OUTBOUND PING: ack=false bytes=1234 
INBOUND HEADERS: streamId=3 headers=GrpcHttp2ResponseHeaders[grpc-status: 0] padding=0 endStream=true 

FIN

With this grpc test, the scala-cli script now working, the new unit tests, the reviews, and the CI a nice shade of spring time green, I'm gonna use my new merging powers and smash that merge button!
Thanks for all the help and guidance everyone ❤️

@channingwalton
Copy link

Does this fix #4935?

@armanbilge
Copy link
Member

@channingwalton oh, are you using HTTP/2?

@channingwalton
Copy link

@channingwalton oh, are you using HTTP/2?

doh! Sorry, I got too excited about ember stream handling and didn't read the PR properly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants