Skip to content

Commit

Permalink
* Updated dependencies
Browse files Browse the repository at this point in the history
 * Wrapped service callback functions in a Promise, so they can be awaited
 * Updated docs
  • Loading branch information
kfitzgerald committed Mar 17, 2019
1 parent accd207 commit f69b7a6
Show file tree
Hide file tree
Showing 14 changed files with 437 additions and 2,076 deletions.
22 changes: 20 additions & 2 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
{
"env": {
"node": true,
"es6": true
"node": true
},
"parserOptions": {
"ecmaVersion": 8,
"sourceType": "module"
},
"plugins": [
],
"extends": "eslint:recommended",
"globals": {
"require": true,
"module": true,
"describe": true,
"it": true,
"before": true,
"after": true,
"Promise": true
},
"overrides": [
{
"files": ["docs/**"],
"rules": {
"no-console": "off"
}
}
]
}
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ coverage
node_modules
.idea
*.tgz
.nyc_output
.nyc_output
package-lock.json
4 changes: 3 additions & 1 deletion .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ test
coverage
.nyc_output
.idea
docs
docs
.eslint*
.travis.yml
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ sudo: true
language: node_js
node_js:
- lts/*
- '6'
- '10'
- '8'
services:
- docker
before_install:
Expand Down
83 changes: 45 additions & 38 deletions Governor.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,51 +44,58 @@ class Governor {
* @param callback
*/
runTask(closure, callback) {
return new Promise((resolve, reject) => {

// Try to obtain a lock on each concurrency "worker"
// The first to come through will abort the subsequent locks
// Try to obtain a lock on each concurrency "worker"
// The first to come through will abort the subsequent locks

let gotLock = false;
let calledBack = false;
let workersCompleted = 0;
let gotLock = false;
let calledBack = false;
let workersCompleted = 0;

const run = (workerNumber) => {
let canReply = false;
this.service.lock(
this.locker,
`${this.prefix}:worker:${workerNumber}`,
this.ttl,
(unlock, lock) => {
if (!gotLock) {
// We're the first to get the lock on this task!
gotLock = true;
canReply = true;
const run = (workerNumber) => {
let canReply = false;
this.service.lock(
this.locker,
`${this.prefix}:worker:${workerNumber}`,
this.ttl,
(unlock, lock) => {
if (!gotLock) {
// We're the first to get the lock on this task!
gotLock = true;
canReply = true;

// Pass on the unlock callback/ lock for extensions
closure(unlock, lock, workerNumber);
} else {
// Already locked, abort
unlock();
// Pass on the unlock callback/ lock for extensions
closure(unlock, lock, workerNumber);
} else {
// Already locked, abort
unlock();
}
},
(err) => {
// callback if this worker ran the task or was the last worker to fire (process of elimination)
workersCompleted++;
if (!calledBack && (canReply || workersCompleted >= this.maximumConcurrency)) {
// we're the first to callback, so do that
calledBack = true;
if (callback) return callback(err);
if (err) {
return reject(err);
} else {
return resolve();
}
}
// If already called back, then obviously don't do it again
}
},
(err) => {
// callback if this worker ran the task or was the last worker to fire (process of elimination)
workersCompleted++;
if (!calledBack && (canReply || workersCompleted >= this.maximumConcurrency)) {
// we're the first to callback, so do that
calledBack = true;
callback(err);
}
// If already called back, then obviously don't do it again
}
);
};
);
};


for (let i = 0; i < this.maximumConcurrency; i++) {
this._updateNextWorker();
run(this._nextWorker);
}
for (let i = 0; i < this.maximumConcurrency; i++) {
this._updateNextWorker();
run(this._nextWorker);
}
});
}

}
Expand Down
29 changes: 18 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,17 +330,18 @@ Creates a new redis service instance.
Creates a new redlock instance. Useful for creating different locking algorithms for different purposes, where a one-size-fits-all approach does not work.
* `config` – Redlock configuration object. See [node-redlock](https://github.com/mike-marcacci/node-redlock#configuration) for additional options.

### `service.getSet(key, notCachedClosure, callback)`
### `service.getSet(key, notCachedClosure, [callback])`
* Gets a value, or sets the value if not already set. Useful for cache lookups.
* `key` - String key name to fetch
* `notCachedClosure((err, obj, ttl) => {...})` – Function to fire when the value is not cached in redis. The function is expected to callback with the value to cache.
* `err` – Set this to truthy if there was an error getting the value to store
* `obj` – The value to store in redis under the `key` cache key. If you try to store an `undefined` value, no redis key will be created.
* `ttl` – How long to store the key in redis, in milliseconds
* `callback(err, obj, wasCached)`Function to fire after getting/setting the cached value
* `callback(err, obj, wasCached)`Optional function to fire after getting/setting the cached value
* `err` – Error, if applicable, when getting or setting the cached value
* `obj` – The decoded cached object.
* `wasCached` – Boolean whether the value was already cached (`true`) or not (`false`)
* Returns a `Promise`.

Cached values are JSON serialized, so you can safely send it nearly anything. If you try to store `undefined`, it will not skip setting a key in redis.

Expand All @@ -367,7 +368,7 @@ service.getSet(
);
```

### `service.lock(redlock, key, ttl, whenLocked, callback)`
### `service.lock(redlock, key, ttl, whenLocked, [callback])`
Wrapper for locking using an arbitrary redlock instance.
* `redlock` – Redlock instance to use for the lock
* `key` – The key to lock
Expand All @@ -376,11 +377,12 @@ Wrapper for locking using an arbitrary redlock instance.
* `done(err) => { ... }` – Callback to fire when you are done with the lock.
* `err` – Set an error if you experienced an error during your lock
* `lock` – The [Redlock lock instance](https://github.com/mike-marcacci/node-redlock#locking-and-extending-1). Useful if you need to extend the lock because something is taking longer than expected.
* `callback(err)`Function to fire when done locking or if the lock could not be obtained.
* `callback(err)`Optional function to fire when done locking or if the lock could not be obtained.
* `err` – Error, if applicable, from redlock or the whenLocked function.
* `err.lockFailed` – If set, the error resulted from failure to acquire the lock. Useful if you simply want to retry than explode your app.
* Returns a `Promise`.

### `service.lockResource(resourceType, resourceId, whenLocked, callback)`
### `service.lockResource(resourceType, resourceId, whenLocked, [callback])`
Utility function to get a simple resource lock on something. Great for synchronizing resource changes across competing applications or instances.
* `resourceType` – The string type of thing you are locking. For example: `account`
* `resourceId` – The id of the instance of thing you are locking, For example, `12345`
Expand All @@ -389,9 +391,10 @@ Utility function to get a simple resource lock on something. Great for synchroni
* `done(err) => { ... }` – Callback to fire when you are done with the lock.
* `err` – Set an error if you experienced an error during your lock
* `lock` – The [Redlock lock instance](https://github.com/mike-marcacci/node-redlock#locking-and-extending-1). Useful if you need to extend the lock because something is taking longer than expected.
* `callback(err)`Function to fire when done locking or if the lock could not be obtained.
* `callback(err)`Optional function to fire when done locking or if the lock could not be obtained.
* `err` – Error, if applicable, from redlock or the whenLocked function.
* `err.lockFailed` – If set, the error resulted from failure to acquire the lock. Useful if you simply want to retry than explode your app.
* Returns a `Promise`

### `service.getSubscriber(channels, [options])`
Gets a new Subscriber class instance. The redis connection of the service is cloned.
Expand Down Expand Up @@ -437,12 +440,14 @@ Creates a new instance of the governor.
* `options.redlock` – Redlock configuration object. See [node-redlock](https://github.com/mike-marcacci/node-redlock#configuration) for additional options.
* `options.ttl` – How long in milliseconds the task has to run before the underlying lock expires. Defaults to `55000` (55s).

### `govenror.runTask(closure, callback)`
### `govenror.runTask(closure, [callback])`
Runs a task on the governor, when able to do so.
* `closure(unlock, lock, workerNumber)` – Task function to run
* `unlock(err)` – Callback to fire when done running the task. Set `err` if you need to bubble an error out.
* `lock` – The [Redlock lock instance](https://github.com/mike-marcacci/node-redlock#locking-and-extending-1). Useful if you need to extend the lock because something is taking longer than expected.
* `workerNumber` – Which worker slot is running this task. For example, if your concurrency is `2`, then workerNumber could be either `0` or `1`, depending on which slot is available.
* `workerNumber` – Which worker slot is running this task. For example, if your concurrency is `2`, then workerNumber could be either `0` or `1`, depending on which slot is available.
* `callback(err)` – Optional function to fire when task has ended
* Returns a `Promise`.

## Events

Expand Down Expand Up @@ -477,17 +482,19 @@ Note: instead of creating your own instance of this class, you can use `redisSer
### `subscriber.unsubscribe([channels], [callback])`
Unsubscribes from the given channels or all channels if empty.
* `channels` – Array of channels to unsubscribe from. If not present, all subscriber channels will be selected.
* `callback(err)` – Function to fire when unsubscribed
* `callback(err)` – Optional, function to fire when unsubscribed
* Returns a `Promise`.

For example:
* `subscriber.unsubscribe()` – Unsub from everything, no callback
* `subscriber.unsubscribe(channels)` – Unsub from the given channels, no callback
* `subscriber.unsubscribe(callback)` – Unsub from everything, and callback
* `subscriber.unsubscribe(channels, callback)` – Unsub from the given channels and callback

### `subscriber.quit(callback)`
### `subscriber.quit([callback])`
Unsubscribes from all channels and closes the underlying redis connection. Use this when all done with the instance.
* `callback(err)` – Function to fire when shutdown
* `callback(err)` – Optional, function to fire when shutdown
* Returns a `Promise`.

## Events

Expand Down

0 comments on commit f69b7a6

Please sign in to comment.