New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid incorrectly responding with an empty body in AHC client #3338
Avoid incorrectly responding with an empty body in AHC client #3338
Conversation
This seems reasonable. I'm rerunning the tests, and opened a PR to disable the unrelated flaky test that tanked the first run. We'll see if this runs green again.
We'll want to cherry-pick this back to series/0.20.
/cc @djspiewak, who has thought harder about this code than I have.
override def onCompleted(): Unit = | ||
invokeCallback(logger)(cb(Right(response -> dispose))) | ||
onStreamCalled.get | ||
.ifM(F.unit, F.delay(invokeCallback(logger)(cb(Right(response -> dispose))))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
F.delay
ing something that calls unsafeRunSync
to be called in an unsafeRunSync
feels abusive, even for a spot we need Effect
. I'd like to think harder about this, but more importantly, I'd like to merge the bugfix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also one of the places where using the ifTrue
and ifFalse
labels helps readability a lot.
Appreciate the feedback, I've pushed another commit addressing the comments. |
bodyDisposal <- Ref.of[F, F[Unit]] { | ||
subscriber.stream(subscribeF).take(0).compile.drain | ||
subscriber.stream(subscribeF).pull.uncons.void.stream.compile.drain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain what "force finalization" means, @wemrysi?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevinmeredith in our usage, we noticed that if the body of responses produced by the AHC client weren't consumed, the connection was never closed. A solution to this was to add an effect to the Resource
finalizer that would subscribe to the body stream and then immediately cancel
the subscription, which closes the connection.
The original attempt at this used take(0)
which turns out to be a noop (it results in a pure, empty stream) and thus the subscribe/cancel effects are never run. Switching to pull.uncons
ensures that the first effect in the stream is sequenced and any finalizers are run when the stream ends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wemrysi!
Oops, we merged this to master. Cherry-picking. I think it may fix #3354. |
…ty body in AHC client
onCompleted
so that it is only called ifonStream
wasn't called.bodyDisposal
to actually sequence the stream finalizer via.pull.uncons.void
.take(0)
turns out to be a noop, so the previous version wasn't finalizing.Fixes #3293