Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add maxUses pool config option #2147

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ I will __happily__ accept your pull request if it:

If your change involves breaking backwards compatibility please please point that out in the pull request & we can discuss & plan when and how to release it and what type of documentation or communication it will require.

### Setting up for local development

1. Clone the repo
2. From your workspace root run `yarn` and then `yarn lerna bootstrap`
3. Ensure you have a PostgreSQL instance running with SSL enabled and an empty database for tests
4. Ensure you have the proper environment variables configured for connecting to the instance
5. Run `yarn test` to run all the tests

## Troubleshooting and FAQ

The causes and solutions to common errors can be found among the [Frequently Asked Questions (FAQ)](https://github.com/brianc/node-postgres/wiki/FAQ)
Expand Down
26 changes: 26 additions & 0 deletions packages/pg-pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var pool2 = new Pool({
max: 20, // set pool max size to 20
idleTimeoutMillis: 1000, // close idle clients after 1 second
connectionTimeoutMillis: 1000, // return an error after 1 second if connection could not be established
maxUses: 7500, // close (and replace) a connection after it has been used 7500 times (see below for discussion)
})

//you can supply a custom client constructor
Expand Down Expand Up @@ -330,6 +331,31 @@ var bluebirdPool = new Pool({

__please note:__ in node `<=0.12.x` the pool will throw if you do not provide a promise constructor in one of the two ways mentioned above. In node `>=4.0.0` the pool will use the native promise implementation by default; however, the two methods above still allow you to "bring your own."

## maxUses and read-replica autoscaling (e.g. AWS Aurora)

The maxUses config option can help an application instance rebalance load against a replica set that has been auto-scaled after the connection pool is already full of healthy connections.

The mechanism here is that a connection is considered "expended" after it has been acquired and released `maxUses` number of times. Depending on the load on your system, this means there will be an approximate time in which any given connection will live, thus creating a window for rebalancing.

Imagine a scenario where you have 10 app instances providing an API running against a replica cluster of 3 that are accessed via a round-robin DNS entry. Each instance runs a connection pool size of 20. With an ambient load of 50 requests per second, the connection pool will fill likely up in a few minutes with healthy connections.

If you have weekly bursts of traffic which peak at 1,000 requests per second, you might want to grow your replicas to 10 during this period. Without setting `maxUses`, the new replicas will not be adopted by the app servers without an intervention -- namely, restarting each in turn in order to build up new connection pools that are balanced against all the replicas. Adding additional app server instances will help to some extent because they will adopt all the replicas in an even way, but the initial app servers will continue to focus additional load on the original replicas.

This is where the `maxUses` configuration option comes into play. Setting `maxUses` to 7500, will ensure that over a period of 30 minutes or so the new replicas will be adopted as the pre-existing connections are closed and replaced with new ones, thus creating a window for eventual balance.

You'll want to test based on your own scenarios, but one way to make a first guess at `maxUses` is to identify an acceptable window for rebalancing and then solve for the value:

```
maxUses = rebalanceWindowSeconds * totalRequestsPerSecond / numAppInstances / poolSize
```

In the example above, assuming we acquire and release 1 connection per request and we are aiming for a 30 minute rebalancing window:

```
maxUses = rebalanceWindowSeconds * totalRequestsPerSecond / numAppInstances / poolSize
7200 = 1800 * 1000 / 10 / 25
```

## tests

To run tests clone the repo, `npm i` in the working dir, and then run `npm test`
Expand Down
91 changes: 52 additions & 39 deletions packages/pg-pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@ const removeWhere = (list, predicate) => {
: list.splice(i, 1)[0]
}

class IdleItem {
constructor (client, idleListener, timeoutId) {
class ClientItem {
constructor(client, useCount) {
this.client = client
this.useCount = useCount
}
}

class IdleItem {
constructor (clientItem, idleListener, timeoutId) {
this.clientItem = clientItem
this.idleListener = idleListener
this.timeoutId = timeoutId
}
Expand Down Expand Up @@ -45,18 +52,18 @@ function promisify (Promise, callback) {
return { callback: cb, result: result }
}

function makeIdleListener (pool, client) {
function makeIdleListener (pool, clientItem) {
return function idleListener (err) {
err.client = client
err.client = clientItem.client

client.removeListener('error', idleListener)
client.on('error', () => {
clientItem.client.removeListener('error', idleListener)
clientItem.client.on('error', () => {
pool.log('additional client error after disconnection due to error', err)
})
pool._remove(client)
pool._remove(clientItem)
// TODO - document that once the pool emits an error
// the client has already been closed & purged and is unusable
pool.emit('error', err, client)
pool.emit('error', err, clientItem.client)
}
}

Expand All @@ -65,6 +72,7 @@ class Pool extends EventEmitter {
super()
this.options = Object.assign({}, options)
this.options.max = this.options.max || this.options.poolSize || 10
this.options.maxUses = this.options.maxUses || Infinity
this.log = this.options.log || function () { }
this.Client = this.options.Client || Client || require('pg').Client
this.Promise = this.options.Promise || global.Promise
Expand Down Expand Up @@ -95,7 +103,7 @@ class Pool extends EventEmitter {
this.log('pulse queue on ending')
if (this._idle.length) {
this._idle.slice().map(item => {
this._remove(item.client)
this._remove(item.clientItem)
})
}
if (!this._clients.length) {
Expand All @@ -117,30 +125,30 @@ class Pool extends EventEmitter {
if (this._idle.length) {
const idleItem = this._idle.pop()
clearTimeout(idleItem.timeoutId)
const client = idleItem.client
const clientItem = idleItem.clientItem
const idleListener = idleItem.idleListener

return this._acquireClient(client, pendingItem, idleListener, false)
return this._acquireClient(clientItem, pendingItem, idleListener, false)
}
if (!this._isFull()) {
return this.newClient(pendingItem)
}
throw new Error('unexpected condition')
}

_remove (client) {
_remove (clientItem) {
const removed = removeWhere(
this._idle,
item => item.client === client
item => item.clientItem === clientItem
)

if (removed !== undefined) {
clearTimeout(removed.timeoutId)
}

this._clients = this._clients.filter(c => c !== client)
client.end()
this.emit('remove', client)
this._clients = this._clients.filter(c => c !== clientItem)
clientItem.client.end()
this.emit('remove', clientItem.client)
}

connect (cb) {
Expand Down Expand Up @@ -191,8 +199,9 @@ class Pool extends EventEmitter {

newClient (pendingItem) {
const client = new this.Client(this.options)
this._clients.push(client)
const idleListener = makeIdleListener(this, client)
const clientItem = new ClientItem(client, 0)
this._clients.push(clientItem)
const idleListener = makeIdleListener(this, clientItem)

this.log('checking client timeout')

Expand All @@ -217,7 +226,7 @@ class Pool extends EventEmitter {
if (err) {
this.log('client failed to connect', err)
// remove the dead client from our list of clients
this._clients = this._clients.filter(c => c !== client)
this._clients = this._clients.filter(c => c !== clientItem)
if (timeoutHit) {
err.message = 'Connection terminated due to connection timeout'
}
Expand All @@ -230,63 +239,67 @@ class Pool extends EventEmitter {
}
} else {
this.log('new client connected')

return this._acquireClient(client, pendingItem, idleListener, true)
return this._acquireClient(clientItem, pendingItem, idleListener, true)
}
})
}

// acquire a client for a pending work item
_acquireClient (client, pendingItem, idleListener, isNew) {
_acquireClient (clientItem, pendingItem, idleListener, isNew) {
if (isNew) {
this.emit('connect', client)
this.emit('connect', clientItem.client)
}

this.emit('acquire', client)
this.emit('acquire', clientItem.client)

let released = false

client.release = (err) => {
clientItem.useCount += 1

clientItem.client.release = (err) => {
if (released) {
throwOnDoubleRelease()
}

released = true
this._release(client, idleListener, err)
this._release(clientItem, idleListener, err)
}

client.removeListener('error', idleListener)
clientItem.client.removeListener('error', idleListener)

if (!pendingItem.timedOut) {
if (isNew && this.options.verify) {
this.options.verify(client, (err) => {
this.options.verify(clientItem.client, (err) => {
if (err) {
client.release(err)
clientItem.client.release(err)
return pendingItem.callback(err, undefined, NOOP)
}

pendingItem.callback(undefined, client, client.release)
pendingItem.callback(undefined, clientItem.client, clientItem.client.release)
})
} else {
pendingItem.callback(undefined, client, client.release)
pendingItem.callback(undefined, clientItem.client, clientItem.client.release)
}
} else {
if (isNew && this.options.verify) {
this.options.verify(client, client.release)
this.options.verify(clientItem.client, clientItem.client.release)
} else {
client.release()
clientItem.client.release()
}
}
}

// release a client back to the poll, include an error
// to remove it from the pool
_release (client, idleListener, err) {
client.on('error', idleListener)
_release (clientItem, idleListener, err) {
clientItem.client.on('error', idleListener)

// TODO(bmc): expose a proper, public interface _queryable and _ending
if (err || this.ending || !client._queryable || client._ending) {
this._remove(client)
if (err || this.ending || !clientItem.client._queryable || clientItem.client._ending || clientItem.useCount >= this.options.maxUses) {
if (clientItem.useCount >= this.options.maxUses) {
this.log('removing expended client')
}
this._remove(clientItem)
this._pulseQueue()
return
}
Expand All @@ -296,11 +309,11 @@ class Pool extends EventEmitter {
if (this.options.idleTimeoutMillis) {
tid = setTimeout(() => {
this.log('remove idle client')
this._remove(client)
this._remove(clientItem)
}, this.options.idleTimeoutMillis)
}

this._idle.push(new IdleItem(client, idleListener, tid))
this._idle.push(new IdleItem(clientItem, idleListener, tid))
this._pulseQueue()
}

Expand Down
27 changes: 16 additions & 11 deletions packages/pg-pool/test/bring-your-own-promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,34 @@ const BluebirdPromise = require('bluebird')

const Pool = require('../')

const checkType = promise => {
const checkType = (promise, expectError) => {
expect(promise).to.be.a(BluebirdPromise)
return promise.catch(e => undefined)
return promise.catch(e => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer: It took me a while to figure out why this test kept failing with an error about the value being undefined, so I added the new argument so we can differentiate an unexpected error scenario and subsequently report it as a test failure.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice - thanks. As an aside, I think it's probably time I deprecate BYOP for pg@9.0 as all current supported versions of node include a native promise.

if (expectError) {
return undefined
}
expect().fail(e)
})
}

describe('Bring your own promise', function () {
it('uses supplied promise for operations', co.wrap(function * () {
const pool = new Pool({ Promise: BluebirdPromise })
const client1 = yield checkType(pool.connect())
const client1 = yield checkType(pool.connect(), false)
client1.release()
yield checkType(pool.query('SELECT NOW()'))
const client2 = yield checkType(pool.connect())
yield checkType(pool.query('SELECT NOW()'), false)
const client2 = yield checkType(pool.connect(), false)
// TODO - make sure pg supports BYOP as well
client2.release()
yield checkType(pool.end())
yield checkType(pool.end(), false)
}))

it('uses promises in errors', co.wrap(function * () {
const pool = new Pool({ Promise: BluebirdPromise, port: 48484 })
yield checkType(pool.connect())
yield checkType(pool.end())
yield checkType(pool.connect())
yield checkType(pool.query())
yield checkType(pool.end())
yield checkType(pool.connect(), true)
yield checkType(pool.end(), true)
yield checkType(pool.connect(), true)
yield checkType(pool.query(), true)
yield checkType(pool.end(), true)
}))
})
2 changes: 1 addition & 1 deletion packages/pg-pool/test/error-handling.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ describe('pool error handling', function () {
})

setTimeout(() => {
pool._clients[0].end()
pool._clients[0].client.end()
}, 1000)
})
})
59 changes: 59 additions & 0 deletions packages/pg-pool/test/max-uses.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
const expect = require('expect.js')
const co = require('co')
const _ = require('lodash')

const describe = require('mocha').describe
const it = require('mocha').it

const Pool = require('../')

describe('maxUses of 2', () => {
it('can create a single client and use it once', co.wrap(function * () {
const pool = new Pool({ maxUses: 2 })
expect(pool.waitingCount).to.equal(0)
const client = yield pool.connect()
const res = yield client.query('SELECT $1::text as name', ['hi'])
expect(res.rows[0].name).to.equal('hi')
client.release()
pool.end()
}))

it('getting a connection a second time returns the same connection and releasing it also closes it', co.wrap(function * () {
const pool = new Pool({ maxUses: 2 })
expect(pool.waitingCount).to.equal(0)
const client = yield pool.connect()
client.release()
const client2 = yield pool.connect()
expect(client).to.equal(client2)
expect(client2._ending).to.equal(false)
client2.release()
expect(client2._ending).to.equal(true)
return yield pool.end()
}))

it('getting a connection a third time returns a new connection', co.wrap(function * () {
const pool = new Pool({ maxUses: 2 })
expect(pool.waitingCount).to.equal(0)
const client = yield pool.connect()
client.release()
const client2 = yield pool.connect()
expect(client).to.equal(client2)
client2.release()
const client3 = yield pool.connect()
expect(client3).not.to.equal(client2)
client3.release()
return yield pool.end()
}))

it('logs when removing an expended client', co.wrap(function * () {
const messages = []
const log = function (msg) {
messages.push(msg)
}
const pool = new Pool({ maxUses: 1, log })
const client = yield pool.connect()
client.release()
expect(messages).to.contain('removing expended client')
return yield pool.end()
}))
})
Loading