Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Branch: master
Fetching contributors…

Cannot retrieve contributors at this time

353 lines (227 sloc) 10.608 kB

advanced-pool

About

advanced-pool provides a simple and generic ways to manage resource pools in NodeJS applications.

Unlike the other pool libraries I found, advanced-pool allows easy extension of both, the pool object as well as the container managing the request queue.

Travis-CI status: Build Status

Installation

npm install advanced-pool

Changes

0.3.1 - 23.07.2014:
    - Removed some old (and not existing) reference from testing code.

0.3.0 - 23.07.2014:
    - Moved from Expresso to Mocha test framework.
    - Added acquireThunk() to work with co

0.2.0 - 07.06.2014:
    - Reworked error handling.
    - Improved documentation.
    - Acquire callbacks are always called asynchronously.

0.1.1 - 05.06.2014:
    - Added .gitignore removing some files fro NPM release.

0.1.0 - 05.06.2014:
    - Added travis-ci and tests.
    - Added TimedQueue.
    - Added queue size limit.

0.0.1 - 28.11.2012:
    - Initial release

Current state

This package was created to be used in one of my projects. I'll be fixing bugs and add improvements with time.

If you find any issues, please report them on GitHub: https://github.com/atheros/node-advanced-pool/issues

Selecting the appropriate queue class

Once pool resources runs out, following acquire requests are queued while waiting for resources to get released or created. There are currently two queue classes: SimpleQueue and TimedQueue. Choosing the correct queue for your needs is very important.

SimpleQueue

SimpleQueue offers only the basics. A FIFO queue with an optional size limit. This sort of queue is lightweight, both in memory and CPU usage, but lacks some of the features needed in many applications.

This queue should be used in data processing applications, when clients waiting for resources are not interactive. If resources stored within the pool can become unavailable (like database connections), clients requesting then will hang forever. In such cases you should use TimedQueue.

TimedQueue

TimedQueue implements a FIFO queue with timeout feature. This is very important for interactive application, where it is better to display an error message if resource is not available than to hang for ever. So this is the queue to use to pool resources like database connections and sockets in web applications.

Simple example

Example bellow demonstrates how to use the pool. It creates 10 workers trying to get access to limited resources. Once all workers received their access to requested resource, the pool is closed.

var advancedPool = require('advanced-pool');
var resources = 0;
var finished = 0;
var count = 10;
var pool = new advancedPool.Pool({
    min: 3,
    max: 4,
    create: function (callback) {
        // create the resource
        var resource = {name: "Resource #" + resources};
        console.log("created resource #" + resource.name);
        callback(null, resource);
        resources++;
    }
});
var i, fn;

// create some resource requests
for (i = 0; i < count; i++) {
    fn = function (err, resource) {
        if (err) {
            // something went wrong
            console.log(err);
        } else {
            // we got our resource!
            console.log("FN #" + this.id + " got resource: " + resource.name);
            setTimeout(function () {
                // release it after some time
                pool.release(resource);
                finished++;
            }, 10);
        }
    };
    // request resource
    // the bind part is there only to get an ID of the request and is optional
    pool.acquire(fn.bind({ id: i }));
}

// wait for all requests to complete
var interval = setInterval(function () {
    if (finished == count) {
        console.log('All workers finished, closing pool');
        pool.close();
        clearInterval(interval);
    }
}, 100);

Pool API

Exports

var advancedPool = require('advanced-pool');

advanced-pool comes with 3 constructors:

advancedPool.Pool(options)
advancedPool.SimpleQueue(queueSize)
advancedPool.TimedQueue(options)

Pool(options)

Pool constructor.

Pool accepts a number of options in the options argument:

  • name - Name of the pool, defaults to pool.
  • create - The object creator function, accepts a single argument, which is a callback function. The callback function takes two aguments, error and object. If error is set, it is assumed something when wrong, and object won't be added to the pool.
  • destroy - This function is called to destroy objects in the pool. It takes only one argument, the object. This function is optional.
  • queue - Queue object instance to use. If not specified, an instance of SimpleQueue object will be created.
  • min - Minimum number of objects in the pool (default 2).
  • max - Maximum number of objects in the pool (default 4). It must be greater or equal than min.
  • idleTimeout - time in milliseconds for an idle object above min to timeout (default 30000)
  • idleCheckInterval - interval in milliseconds for checking for timedout objects (default 1000)
  • log - logging accepts the following values:
    • true - console.log will be used for logging (default)
    • function - a function with 2 arguments, message and severity
    • false and any other value - no logging

More information can be found in the source code.

Pool derives from EventEmitter and produce the following events:

  • create-error (errorMessage) - Called when object creation failed.
  • object-added (object) - Called when a new object is added to the pool.
  • object-error (object) - Object is considered bad, it will be removed (object-removed event will be called too).
  • object-removed (object, error) - Object was removed, if it was due to an error, error will be set to true, false if it was due to a timeout.
  • object-expired (object) - Object expired by idleTimeout when pool contained more objects than minimum - will be followed by object-removed event.
  • pool-closed - Called when the close() method is called.

Arguments:

  • options Pool options object

Pool.acquire(client, queueOptions)

Acquire an object from the pool.

Arguments:

  • client is a function with two arguments, error and object. If error argument is null, the object argument will hold an object from the pool.
  • queueOptions should hold additional arguments for the queue, however SimpleQueue doesn't require any arguments (optional).

An object received in client callback should either be released with the release() method or marked as bad object with removeBadObject() method.

Client can receive the following errors: (Error.name property)

  • TimeoutError when the request times out.
  • OverflowError when the request queue gets filled.
  • CloseError when the pool gets closed.

Example:

pool.acquire(function (err, obj) {
    if (err) {
        console.log('Could not acquire object! ' + err);
    } else {
        // ...do something with the object
        pool.release(obj);
    }
});

Pool.acquireThunk(queueOptions)

Acquire an object from the pool using thunk.

Arguments

  • queueOptions should hold additional arguments for the queue, however SimpleQueue doesn't require any arguments (optional).

The behavior of this method is similar to Pool.acquire() with different asynchronous model. This method works with co (https://github.com/visionmedia/co).

This method only works with ES6 generators. That means nodejs version 0.11.0 or newer is needed, called with --harmony or --harmony-generators switch.

Example

co(function *() {
    db = yield pool.acquireThunk();
    yield db.insert({name: "Object"});
})();

Pool.release(object)

Release an object previously acquired with acquire() method.

Arguments:

  • object the object to release

Pool.removeBadObject(object)

Mark an object as bad and remove it from the pool.

This will only work for an object that is currently busy (acquired with the acquire() method).

Arguments:

  • object the object to remove

Example:

pool.acquire(function (err, obj) {
    if (err) {
        console.log('Could not acquire object! ' + err);
    } else {
        // ...do something with the object
        // ...something went wrong
        pool.removeBadObject(obj);
    }
});

Pool.adjustLimits(min, max)

Changes the limits of the pool.

If there is a need of creating additional objects right away, they will be created during this call.

If the limits lowered, resource objects will only be freed by the idleTimeout handler.

Arguments:

  • min - minimum number of objects in the pool
  • max - maximum number of objects in the pool

Pool.close()

Close the pool.

This will remove all free objects, prevent the creation of new objects, send an error to all following and pending acquire() calls.

The pool won't remove objects currently busy and will wait until they get released by release() or removeBadObject() calls.

Queue: SimpleQueue(queueSize)

SimpleQueue implements the simplest queue (First-In, First-Out queue) to be used with advanced-pool. The only methods needed are push(), pop() and size().

Arguments:

  • queueSize - the maximum size of the queue or 0 for unlimited

Queue: TimedQueue(queueSize)

TimedQueue implements a FIFO queue (First-In, First-Out queue) with optional timeout.

When using TimedQueue, the second argument of Pool.acquire() can be the timeout of the client, 0 for no timeout or null/undefined to use the default timeout.

Arguments:

  • options - the maximum size of the queue or 0 for unlimited

TimedQueue options:

  • defaultTimeout - Default time a client can be queued in milliseconds or 0 for no limit.
  • queueSize - Maximum queue size. 0 means there is no size limit.
  • checkInterval - How often should the timeouts be checked. By default it is 1/10 of default timeout or 1000ms if timeout is 0.

Queue API

If none of the queue classes fits your need, you can implement your own. Bellow is a description of the required interface.

queue.push()

Push a client to the queue.

Arguments:

  • client - a callback function with two arguments, error and object.
  • queueParams - Queue parameters for this client. This is the second argument of Pool.acquire() call.

Returns: nothing

queue.pop()

Returns the next client.

Returns: client callback function

queue.size()

Returns: the number of clients in the queue

queue.close()

This allows the queue to cleanup some internal state. This method is called from Pool.close().

Jump to Line
Something went wrong with that request. Please try again.