Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoolManager] - Synchronized -> Semaphore, wrap side-effects in Effect[F].delay #1934

Merged
merged 22 commits into from Aug 17, 2018

Conversation

@tbrown1979
Copy link
Contributor

@tbrown1979 tbrown1979 commented Jul 3, 2018

This PR is an attempt to change the use of synchronized { ... } to using a Semaphore. To make this easier I wrapped all the Unit functions in F so that .acquireing a permit was easier. I opted to .acquire and release the permits instead of doing .withPermit just so we could be explicit in all the cases during the .borrow method. Doing so allowed me to push the async handling down to the specific cases that require it, instead of every case hitting the callback.

This is the first step towards purifying the PoolManager and hopefully making it a little more easy to reason about. I have plans for further changes after this, such as removing the vars.

Thoughts are appreciated 馃槃If you'd like to see this done a different way I'm all ears!

tbrown1979 added 6 commits Jun 15, 2018
tbrown1979
tbrown1979
tbrown1979
tbrown1979
tbrown1979
tbrown1979
maxWaitQueueLimit = maxWaitQueueLimit,
maxConnectionsPerRequestKey = maxConnectionsPerRequestKey
))
}.unsafeRunSync()

This comment has been minimized.

@jmcardon

jmcardon Jul 3, 2018
Member

If we're going to force toIO, might as well make the whole call return F[Client[F]] instead. There's no need to maintain source compat on a deprecated method since M7 of 0.18.

This comment has been minimized.

@tbrown1979

tbrown1979 Jul 4, 2018
Author Contributor

@jmcardon Is it okay to just remove the PooledHttp1Client then? I only did this so that we could keep it around, but figured I'd get questions about it.

connection.foreach { s =>
if (!s.isClosed) s.shutdown()
}
decrConnection(key)

This comment has been minimized.

@jmcardon

jmcardon Jul 3, 2018
Member

This doesn't look right. disposeConnection now lack synchronization entirely.

This comment has been minimized.

@tbrown1979

tbrown1979 Jul 4, 2018
Author Contributor

@jmcardon Good catch. I will get this fixed!

This comment has been minimized.

@tbrown1979

tbrown1979 Jul 4, 2018
Author Contributor

@jmcardon This should be fixed. Let me know if you have any suggestions about the changes I made.

tbrown1979 added 4 commits Jul 4, 2018
tbrown1979
tbrown1979
tbrown1979
tbrown1979
import cats.implicits._
//import cats.syntax.all._

//import scala.annotation.tailrec

This comment has been minimized.

@aeons

aeons Jul 4, 2018
Member

Delete these?

This comment has been minimized.

@tbrown1979

tbrown1979 Jul 4, 2018
Author Contributor

Will do!

tbrown1979
Copy link
Member

@jmcardon jmcardon left a comment

Notes

maxConnectionsPerRequestKey = maxConnectionsPerRequestKey
))
}
.unsafeRunSync()

This comment has been minimized.

@jmcardon

jmcardon Jul 4, 2018
Member

Honestly I am not ok with this unsafeRunSync call if it's literally the same thing as http1client.

Can we remove this altogether? @rossabaker @ChristopherDavenport

This comment has been minimized.

@ChristopherDavenport

ChristopherDavenport Jul 5, 2018
Member

I mean I'm a fan of enforcing safety by construction. Any reason we should leave this?

This comment has been minimized.

This comment has been minimized.

@tbrown1979

tbrown1979 Jul 5, 2018
Author Contributor

馃敟 killed!

}
incrConnection(key) *>
F.liftIO {
F.runAsync(Async.shift(executionContext) *> builder(key)) {

This comment has been minimized.

@jmcardon

jmcardon Jul 4, 2018
Member

This doesn't seem necessary to do at all considering we're in F[_] now.

This should be done without runAsync and liftIO imo.

tbrown1979 added 2 commits Jul 5, 2018
tbrown1979
tbrown1979
鈥ect
IO { promise.completeWith(pipe.channelWrite(buffer).map(Function.const(false))); () }
case Left(t) =>
IO { promise.failure(t); () }
F.start {

This comment has been minimized.

@edmundnoble

edmundnoble Jul 5, 2018
Contributor

You need to run the outer F to kick it off when you use start; as it is, this code does nothing at all anymore.

Copy link
Member

@jmcardon jmcardon left a comment

Is there a reason this bubbled up to blaze-core? I'm definitely missing something.

IO { promise.completeWith(pipe.channelWrite(buffer).map(Function.const(false))); () }
case Left(t) =>
IO { promise.failure(t); () }
F.start {

This comment has been minimized.

@jmcardon

jmcardon Jul 5, 2018
Member

This never runs the action in start, LOL.

Do you mind reverting this? This isn't a necessary change and I'd rather not touch blaze core here.

disposeConnection(key, None)
IO(callback(Left(error)))
}
F.start {

This comment has been minimized.

@jmcardon

jmcardon Jul 5, 2018
Member

incrConnection(key) *> F.start

@@ -7,7 +7,7 @@ import fs2._
import org.http4s.blaze.pipeline.Command.EOF
import scala.concurrent.{ExecutionContext, Future}

class FailingWriter(implicit protected val F: Effect[IO]) extends EntityBodyWriter[IO] {
class FailingWriter(implicit protected val F: Concurrent[IO]) extends EntityBodyWriter[IO] {

This comment has been minimized.

@jmcardon

jmcardon Jul 5, 2018
Member

Wait why???

tbrown1979 and others added 6 commits Jul 5, 2018
tbrown1979
tbrown1979
tbrown1979
tbrown1979
tbrown1979
鈥own1979/http4s into attempt-at-purifying-pool-mgr
@jmcardon jmcardon dismissed their stale review Aug 2, 2018

out of date

@tbrown1979
Copy link
Contributor Author

@tbrown1979 tbrown1979 commented Aug 3, 2018

I've been on vacation for the last few weeks. I'll get back around to this soon 馃槃

@rossabaker
Copy link
Member

@rossabaker rossabaker commented Aug 7, 2018

You got some nice comments from others, and I've lost track of this. What else is in scope for this refactoring?

@tbrown1979
Copy link
Contributor Author

@tbrown1979 tbrown1979 commented Aug 7, 2018

@rossabaker I think the last thing I had left to do was to convert MimeLoaderApp to an IOApp. Again, apologies that I haven't been able to get around to that. I will pick this up this week and get that done
Edit: Made the last change that was requested. This should be good to review provided it passes all ci checks 馃槃

tbrown1979
Copy link
Member

@rossabaker rossabaker left a comment

Thanks! I dug in a bit deeper. One question with regard to borrow().

decrConnection(key)
if (!isClosed) {
def go(): F[NextConnection] =
semaphore.acquire *>

This comment has been minimized.

@rossabaker

rossabaker Aug 11, 2018
Member

We used to check isClosed inside the lock. Is it going to hurt anything to not?

This comment has been minimized.

@tbrown1979

tbrown1979 Aug 13, 2018
Author Contributor

This does seem like it could be an issue. I'll wait for a response on the follow-up comment just to make sure I do this the correct way. Good catch!

}

F.delay(logger.debug(s"Requesting connection: $stats")) *>
go()

This comment has been minimized.

@rossabaker

rossabaker Aug 11, 2018
Member

How about pulling the acquire and all the releases out of go() and doing a semaphore.withPermit? That would be both more concise and robust in case of failures. The recursive call to go() could continue with the single permit.

This comment has been minimized.

@tbrown1979

tbrown1979 Aug 13, 2018
Author Contributor

@rossabaker I originally did this, but I found I couldn't match the previous behavior with that. If we use the semaphore.withPermit here then we'll be waiting for the F.asyncF calls to finish, which may take a while - and this would be a change in behavior from what the PoolManager did before this PR.

This executes async, which immediately finishes the synchronized block of code.

I think this probably could be done with F.asyncF surrounding the whole thing and then hitting the callback in each case. However, I thought that the explicit .release we're doing here allows a little more clarity and preciseness than the alternative.

Please let me know what your thoughts are on that. Let me know if I'm missing something 馃槃

This comment has been minimized.

@SystemFw

SystemFw Aug 13, 2018
Member

if the equivalent getConnection is also async (with start), shouldn't be enough to not have asyncF in that case, and use withPermit?

tbrown1979 added 2 commits Aug 14, 2018
tbrown1979
tbrown1979
@rossabaker rossabaker added this to the 0.19.0-M2 milestone Aug 15, 2018
Copy link
Member

@rossabaker rossabaker left a comment

I think this is right now. The connection pool scares me -- and us all -- but tests pass.

Copy link
Member

@ChristopherDavenport ChristopherDavenport left a comment

I'm petrified of this code. Not because of the code here, but because of the code its interacting with.

Tests pass, it looks good. We should test the snapshot with this before we release next week.

@rossabaker rossabaker merged commit 183ab81 into http4s:master Aug 17, 2018
2 checks passed
2 checks passed
continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@tbrown1979
Copy link
Contributor Author

@tbrown1979 tbrown1979 commented Aug 17, 2018

diesalbla added a commit to diesalbla/http4s that referenced this pull request Nov 1, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

7 participants
You can鈥檛 perform that action at this time.