Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add signaling support for connection pool waiting #115

Merged
merged 3 commits into from
Apr 3, 2018

Conversation

KJTsanaktsidis
Copy link

The current behaviour when the poolLimit is reached and a new connection
is required is to poll every 100ms to see if there is now headroom to
make a new connection. This adds tremendous latency to the
limit-hit-path.

This commit changes the checkout behaviour to watch on a condition
variable for connections to become available, and the checkin behaviour
to signal this variable. This should allow waiters to use connections
immediately after they become available.

A new parameter is also added to DialInfo, PoolTimeout, which is the
maximum time that clients will wait for connection headroom to become
available. By default this is unlimited.

Full disclosure - I haven't yet actually tried this in our production environment yet. The only testing this has so far is that it passes the regression tests. I'm pull-requesting this early to get some feedback on the design of this change and to see if this would be useful for other people :). I intend to try and get this into production sometime in the next couple of weeks.

cluster.go Outdated
if err == errPoolTimeout {
// No need to remove servers from the topology if acquiring a socket fails for this reason.
return nil, err
} else if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be caught by go vet ( or gometalinter, I can't remember which one would complain), but the else if can be replaced by a normal if, as the last statement in the previous if case is return.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - this should be fixed now

case <-waitDone:
case <-time.After(poolTimeout):
// timeoutHit is part of the wait condition, so needs to be changed under mutex.
server.Lock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I might be missing something here, but,
the server lock is locked in line 124.

The waiter on line 158 wait for a signal, but the signal is sent only if a socket is returned to the pool.
The broadcast in line 151 won't be executed before an already used socket is released, as that code path tries to acquire lock that is is held by the parent goroutine (line 124) and releases on Wait() (line 158), no?

This is only after a cursory reading of the code. Please correct me if I am wrong.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the server lock is locked in line 124.

This lock on 148 is in a separate goroutine - there is a new goroutine spawned here to wait for the timeout to expire and signal the waiters if so.

The broadcast in line 151 won't be executed before an already used socket is released, as that code path tries to acquire lock that is is held by the parent goroutine

Perhaps this is the most confusing part - the invocation of Wait on 158 actually releases the server mutex - the CondVar's L member is initialised to the server mutex on line 90, and the Wait method is defined to unlock L, wait for a signal, and then lock L again before returning control to the caller.

So, in the case where a socket is released in time -

  • The call to AcquireSocket locks the server mutex
  • The call to AcquireSocket observes that there are no free connections
  • The call to AcquireSocket unlocks the server mutex and waits for a signal (I believe this is atomic)
  • The call to RecycleSocket locks the server mutex
  • The call to RecycleSocket returns a connection to the free list
  • The call to RecycleSocket broadcasts
  • The call to AcquireSocket is woken from its Wait, but is still blocked waiting for the server mutex, so Wait does not yet return
  • The call to RecycleSocket unlocks the server mutex
  • The call to AcquireSocket locks the server mutex and returns control from Wait
  • The call to AcquireSocket observes that there are now free connections and continues as usual.

In the case where a socket is not released on time -

  • The call to AcquireSocket locks the server mutex
  • The call to AcquireSocket observes that there are no free connections
  • The call to AcquireSocket unlocks the server mutex and waits for a signal
  • The timer goroutine hits the time.After timeout
  • The timer goroutine takes the server mutex
  • The timer goroutine sets the timeout flag
  • The timer goroutine broadcasts
  • The call to AcquireSocket is woken from its Wait, but is still blocked waiting for the server mutex, so Wait does not yet return
  • The timer goroutine unlocks the server mutex
  • The call to AcquireSocket locks the server mutex and returns control from Wait
  • The call to AcquireSocket notices that the timeout flag has been set
  • The call to AcquireSocket unlocks the mutex and errors out

I mean, this is concurrent code with mutexs and semaphores, so I'm practically guaranteed to be wrong, but this is my understanding of how this works and hopefully this answers your question.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I forgot that the Wait() releases the lock before yielding the goroutine.

// that there is always a connection available for replset topology discovery). Thus, once
// a connection is returned to the pool, _every_ waiter needs to check if the connection count
// is underneath their particular value for poolLimit.
server.poolWaiter.Broadcast()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I am not 100% sure of my reading of the code ( mgo was never super fun to read, and I haven't had a closer look at it since some time ago, but you take a lock on the server when you try to acquire a socket, and block wait on a signal. The signal won't be raised, if you first need to take a lock on the same mutex, no ?

Before, the pool was unlocked before returning an ErrPoolLimit, but it is not the case anymore.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mgo was never super fun to read

Yyyyyyup......

The signal won't be raised, if you first need to take a lock on the same mutex, no ?

I think my answer above covers this - the acquiring code implicitly releases the mutex in the call to Wait and re-acquires it when Wait returns. So the release code should be able to grab the mutex and send the signal.

Before, the pool was unlocked before returning an ErrPoolLimit, but it is not the case anymore.

I think i'm calling Unlock() on line 167:

} else {
	if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit {
		server.Unlock()
		return nil, false, errPoolLimit
	}
}

Is this not sufficient?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. I'd like to blame the low caffeine level in my blood for raising this issue ;)

@KJTsanaktsidis
Copy link
Author

Thanks a bunch for your review. I know this kind of code is really complex to reason about so I'm really glad to have another pair of eyes on it! Let me know if the above makes sense or if I just sound crazy...

@KJTsanaktsidis
Copy link
Author

The test failure in TestStreamsNextNoEventTimeout just on go 1.9 mongo 3.6 looks unrelated...

szank
szank previously approved these changes Feb 28, 2018
@domodwyer
Copy link

Hi @KJTsanaktsidis,

This is a great improvement to the limit code, it seems a lot of people use the pool limit so this will definitely be a big win!

I'll try and get something together to drive this code path properly - all my existing benchmark/verification setup limits concurrency at the application layer - it might take a couple days, I hope you understand.

Thanks for taking the time to improve this - it's really appreciated - I'll get back to you!

Dom

@KJTsanaktsidis
Copy link
Author

No worries, thanks for your effort picking up this project and maintaining this important piece of infrastructure for Go/Mongo users!

Looks like I got some conflicts from #116 - I'll push up a fix shortly. I'm still hoping to get this into production for our product in the next couple of weeks and give you a report on how it went.

@KJTsanaktsidis
Copy link
Author

KJTsanaktsidis commented Mar 23, 2018

@domodwyer I added a couple of commits to this PR to report metrics around how long goroutines spend waiting for the pool limit. In order to deploy this to production, we want to be able to report pool wait times into a Datadog histogram. Unfortunately, the stats mechanism in mgo is only really good for counters and gauges.

I added in the last commit a mechanism for mgo to send event-based information to the library user over a set of channels; the user of the library can optionally watch these channels and do something with the events (we are sending them to statsd for histogram/percentile estimation).

We kind of need this information in some way to know if the connection limit stuff is going to have an adverse effect, but I'm totally flexible on the API design if you have any better ideas. The channel-solution had the benefit of being low-overhead, optional, and unopinionated about what to do with event data. It also shouldn't be a breaking API change to add extra channels to the struct to publish different kinds of events. I did consider doing histogram bucketing in the stats code, but that raises lots of questions about what buckets to use, what percentiles to export, etc.

szank
szank previously requested changes Mar 23, 2018
@@ -0,0 +1,2 @@
{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you wanted this file to be added.
Could you please remove it and add .vscode to .gitgnore?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. That's embarrassing :)

cluster_test.go Outdated
return nil
}
}
c.Assert(getEvent().WaitTime, Equals, time.Duration(0))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nitpick, but I would check if the result of getEvent() is not nil before accessing the WaitTime.
If all works well that's not a problem, but if in subsequent commits someone breaks the stats by chance, this will panic. And failing unit test is much easier to deal with than a panicking one (imho).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed; just added an extra NotNil assert before this one

cluster_test.go Outdated
case ev := <-statsEvents.OnPoolTimeout:
c.Assert(ev.WaitTime, Equals, stats.TotalPoolWaitTime)
default:
c.Fail()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.Fatalf("Could not retrieve OnPoolTimeout statsEvent")?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, this should have a message. fixed.

stats.go Outdated
case statsEvents.OnSocketAcquired <- eventStruct:
default:
}
statsMutex.Unlock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nitpick:
You could either defer the mutex unlock, or move if before the send on the channel.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel send doesn't need the mutex, and there's no particular reason to order the events in the channel according to what order the stats mutex was acquired in (which isn't even guaranteed to be the order mongo saw the connections in) so I moved the unlock above the send.

stats.go Outdated
func GetStatsEvents() *StatsEvents {
return statsEvents
}

// ResetStats reset Stats to the previous database state
func ResetStats() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the library user calls ResetStats(), GetStatsEvents() needs to be called immediately, at least from my understanding of the intended usage.

Why not return new stats from the ResetStats() func. It can always be ignored.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this would be a breaking change - someone might be passing the ResetStats function somewhere that expects a func() callback type or something, for example.

It might be better to just decouple the StatsEvents thing from the Stats thing entirely, and require an explicit call to SetStatsEvents(bool) or something to turn it on and off (and don't provide a reset function at all - doesn't really seem necessary)

@szank
Copy link

szank commented Mar 23, 2018

Hi @KJTsanaktsidis.
I like the metrics idea.
Here is a suggestion though.
What do you think about making the Stats an interface?
Something like

type MgoMetrics interface{
OnSocketAcquired SocketAcquireMetric
OnPoolTimeout SocketAcquireMetric
}

You could modify your current implementation to send data on the chan on every call of the interface,
provide a default NOP implementation if library user doesn't want stats.

Your current implementation could stay as a reference one, it's perfectly valid.
I am suggesting it because:

  1. Setting an arbitrary chan size rubs be the wrong way. Using my suggested approach the user would be able to decide by himself what channel sizes to use.

  2. I don't think that the metrics (while very useful !) are an integral part of the database API. Using the interface approach is IMHO cleaner. The reference metrics implementation could even stay in another package, somewhere under mgo/metrics?

  3. The user would have an easier time integrating the metrics in their own system if they wanted to have a struct gathering and processing metrics from the whole system, that would implement the mgo metrics interface among other things.

@KJTsanaktsidis
Copy link
Author

I did consider the interface approach - unfortunately, if we did that, adding a new kind of event to it would be a breaking change since people's structs would no longer match the expanded interface. In a similar vein, I considered a struct with a whole bunch of callback-members you could assign, like this:

type StatsEventsCallbacks struct {
    OnSocketAcquired func(time.Duration),
    OnPoolTimeout func(time.Duration),
}

// library user code
callbacks := &StatsEventsCallbacks{
    OnSocketAcquired: func(t time.Duration) {
        // something
    }
}
mgo.SetStatsEventsCallbacks(callbacks)

That way, if new events were added, users could just not fill them in on their structs and so it wouldn't be a breaking change. Ultimately the channel sends just felt more idiomatic go to me, but if you prefer the callback approach I'm more than happy to roll with that instead (and maybe just wrap it in to a set of channels in my application code).

If the only thing that concerns you about the channel approach is the arbitrary channel size, I'm also happy to let the caller specify that in some kind of SetStatsEvents(enabled bool, bufferSize int) call and to separate it out of the SetStats/ResetStats mechanism entirely.

@KJTsanaktsidis
Copy link
Author

Thanks again for the feedback by the way :)

@KJTsanaktsidis
Copy link
Author

FYI - I have shipped this change into production for my team today. I will report back any findings in maybe a week or so after it's baked for a bit.

@domodwyer
Copy link

Hi @KJTsanaktsidis

Again, sorry for disappearing as I was out of the country but seems @szank has been keeping this going along 👍

I'm a little concerned this has grown the scope of the PR a fair bit - I understand you needed it for metrics to ensure the fix worked as expected, but as you correctly point out there's quite a lot of complexity around the existing, and proposed metrics (how to collect, who's job it is to aggregate, if/how to handle metric drops, etc.)

You're totally right though, it's time the stats were improved - I think it might serve the public mgo better to tackle this as a separate PR however, maybe in a different branch till we're happy with the design and get this connection pooling change merged sooner rather than later. I might have an idea for a lockless solution that should be backwards compatible, but I'm currently trying to stave off jet lag by keeping myself busy so might not fit the job ;)

Thanks again for all your hard work and solid solutions.

Dom

@KJTsanaktsidis
Copy link
Author

Yeah I totally understand the metrics raise some broader design questions about how that should be done. Sounds like I should back that bit out tomorrow; its not a huge patch for us to carry out of tree and it won’t really stop us switching to an upstream solution once that’s baked.

Hopefully this metrics stuff has been a good spike to explore some ideas :)

Other than letting this bake in our production environment for a bit, is there anything else I should do here do you think?

@KJTsanaktsidis
Copy link
Author

FWIW I’d be keen to hear your thoughts on the metrics when you’ve put them together too :) I’ll keep a look out here!

@domodwyer
Copy link

No I think the connections bit of the PR is all good! No problems with a merge, I'll run a test during the normal release process and should be good to go for a release.

Regarding the metrics I'd love it if you helped collaborate on the design/implementation/whatever-you-like - I'll open an issue for the improvement design in the coming days - even just a "this would work for us" or a "we also need X" would be most appreciated!

Dom

The current behaviour when the poolLimit is reached and a new connection
is required is to poll every 100ms to see if there is now headroom to
make a new connection. This adds tremendous latency to the
limit-hit-path.

This commit changes the checkout behaviour to watch on a condition
variable for connections to become available, and the checkin behaviour
to signal this variable. This should allow waiters to use connections
immediately after they become available.

A new parameter is also added to DialInfo, PoolTimeout, which is the
maximum time that clients will wait for connection headroom to become
available. By default this is unlimited.
I'm using vscode and accidently committed the .vscode directroy;
.gitignore this footgun.
This exposes four new counters

* The number of times a socket was successfully obtained from the
  connection pool
* The number of times the connection pool needed to be waited on
* The total time that has been spent waiting for a conneciton to become
  available
* The number of times socket acquisition failed due to a pool timeout
@KJTsanaktsidis
Copy link
Author

Alright, I've done some rebasing. I've left in "Add stats for connection pool timings", which simply adds some counters to the existing stats struct, but taken out "send events through a channel", which added the new API surface for receiving individual events. If you prefer I can remove the counters too but they don't add much in the way of new API style so I think they don't make a ton of tech debt and they're useful enough to leave there.

@domodwyer
Copy link

Honestly, this is great work - thanks @KJTsanaktsidis.

Dom

@KJTsanaktsidis
Copy link
Author

Thank you for your help!

Just an update on how this is going in our production usage. I had the pool limit kick in a couple of times this week with our app, and it seems that my change had the desired effect - the wait time hit a max of ~3ms, whereas the old code would have had this at at least 100ms due to the sleep-polling. This can be seen in the attached datadog graph (sorry about the redactions); when the app goes to open up a whole bunch of new connections, the limit kicks in and queues everything up for a few ms:

screenshot 2018-03-27 09 48 06

All in all, I'm pretty happy with how this is going. (subsequent to this graph, i turned on maxIdleTimeMs as well, which makes the surge in connections drop off after a couple of minutes - thanks to @gnawux !).

@domodwyer domodwyer merged commit 76ea203 into globalsign:development Apr 3, 2018
@domodwyer
Copy link

That's cool! Thanks for sharing (and thanks @gnawux too!).

I'll look at getting our staging environment up and running to check for any performance regressions, but I don't expect to find any - after that I'll cut a release.

Dom

@domodwyer domodwyer mentioned this pull request Apr 23, 2018
libi pushed a commit to libi/mgo that referenced this pull request Dec 1, 2022
* Add signaling support for connection pool waiting

The current behaviour when the poolLimit is reached and a new connection
is required is to poll every 100ms to see if there is now headroom to
make a new connection. This adds tremendous latency to the
limit-hit-path.

This commit changes the checkout behaviour to watch on a condition
variable for connections to become available, and the checkin behaviour
to signal this variable. This should allow waiters to use connections
immediately after they become available.

A new parameter is also added to DialInfo, PoolTimeout, which is the
maximum time that clients will wait for connection headroom to become
available. By default this is unlimited.

* Add stats for connection pool timings

This exposes four new counters

* The number of times a socket was successfully obtained from the
  connection pool
* The number of times the connection pool needed to be waited on
* The total time that has been spent waiting for a conneciton to become
  available
* The number of times socket acquisition failed due to a pool timeout

* Gitignore .vscode directory

I'm using vscode and accidently committed the .vscode directroy;
.gitignore this footgun.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants