-
Notifications
You must be signed in to change notification settings - Fork 789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Close H2Stream
readBuffer
on data.endStream
#7147
Conversation
def extractAuthority(headers: List[(String, String)]): Option[Uri.Authority] = | ||
headers.collectFirstSome { | ||
case (PseudoHeaders.AUTHORITY, value) => | ||
val index = value.indexOf(":") | ||
if (index > 0 && index < value.length) { | ||
Option( | ||
Uri.Authority( | ||
userInfo = None, | ||
host = Uri.RegName(value.take(index)), | ||
port = value.drop(index + 1).toInt.some, | ||
) | ||
) | ||
} else Option.empty | ||
Uri.fromString(value).toOption.flatMap(_.authority) | ||
case (_, _) => None | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test revealed another bug 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not an Ember expert, but changes look good to me
@@ -339,7 +339,9 @@ private[h2] class H2Stream[F[_]: Concurrent]( | |||
if (needsWindowUpdate && !isClosed && sizeReadOk) { | |||
enqueue.offer(Chunk.singleton(H2Frame.WindowUpdate(id, windowSize - newSize))) | |||
} else Applicative[F].unit | |||
_ <- if (data.endStream) s.trailWith(List.empty).void else Applicative[F].unit | |||
_ <- | |||
if (data.endStream) s.readBuffer.close *> s.trailWith(List.empty).void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably it's kind of bikeshedding, but is it intentional that H2Stream#receiveData
doesn't close readBuffer
on cancelation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good question, cancelation is handled in various places by sending an exception to the readBuffer
.
http4s/ember-core/shared/src/main/scala/org/http4s/ember/core/h2/H2Stream.scala
Lines 440 to 448 in a1687d9
private[H2Stream] def cancelWith(msg: String)(implicit F: Monad[F]): F[Unit] = { | |
// Unsure of this, but also unsure about exposing custom throwable | |
val t = new CancellationException(msg) | |
writeBlock.complete(Left(t)) >> | |
request.complete(Left(t)) >> | |
response.complete(Left(t)) >> | |
readBuffer.send(Left(t)) >> | |
trailers.complete(Left(t)).void | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but does it work for receiveData
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiveData
is called only from the readLoop
. The readLoop
runs forever when you create a server or a client: the only way to cancel it is to close the server/client resource. But to close the server/client resource you must have first closed all on-going requests.
http4s/ember-core/shared/src/main/scala/org/http4s/ember/core/h2/H2Connection.scala
Line 204 in fe80361
def readLoop: F[Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. I can't beat my feeling suspicious about this. But this kind of leak perhaps would be noticeable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The H2 state machine is rather complex and difficult to reason about, it's not impossible there are some bugs here. This PR and the previous one it fixes (which itself was a fix) proves that 😅 probably we are missing some test coverage in this area, that would help address your suspicions, and part of that question is how to cancel receiveData
in the first place.
Hello all! I just discovered this PR and took a look at its changes. I found that - _ <-
- if (data.endStream) s.readBuffer.close *> s.trailWith(List.empty).void
- else Applicative[F].unit
+ _ <- data.endStream.whenA(s.readBuffer.close *> s.trailWith(List.empty).void) I am not that experienced at scala, so can I ask you, is |
Yes, you are right, this would be a good place to use It would look like this: _ <- (s.readBuffer.close *> s.trailWith(List.empty)).void.whenA(data.endStream) There is a (very small) performance penalty to use it instead of an |
This performance penalty caused because of method calling rather then directly using |
The signature for def whenA[A](cond: Boolean)(f: => F[A]): F[Unit] The https://docs.scala-lang.org/tour/by-name-parameters.html What this means is that instead of directly passing |
Thanks for the explanation, @armanbilge! |
Fixes #7146. The
Channel
introduced in #7096 was not being closed in the right place.