Skip to content

Commit 935ba81

Browse files
committed
feat: expose lock & add timeout on push job so that queue cant be stuck
BREAKING CHANGE: earlier it was not needed to ensure that we operate on the taken lock, and it was possible for it to expire during long jobs and another process to take control of the job creating multiple parallel threads. To avoid that `onJobComplete` functions exposes `lock` as a property, which allows us to preserve back-compatibility. However, it must be noted that user must take care of lock extension during lengthy tasks and that they will fail with a timeout error if result is not resolved within default lock timeout multiplied by 2 or by expected custom timeout provided to `push` function.
1 parent ef94761 commit 935ba81

File tree

4 files changed

+312
-253
lines changed

4 files changed

+312
-253
lines changed

__tests__/integration.js

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ describe('integration tests', () => {
1818
client: this.redis,
1919
pubsub: this.pubsub,
2020
pubsubChannel: 'dlock',
21+
lock: {
22+
timeout: 2000,
23+
},
2124
});
2225
return null;
2326
})
@@ -93,6 +96,31 @@ describe('integration tests', () => {
9396
});
9497
});
9598

99+
it('#push: fails after timeout', () => {
100+
const job = sinon.spy();
101+
const onComplete = sinon.spy();
102+
const failedToQueue = sinon.spy();
103+
const unexpectedError = sinon.spy();
104+
105+
return Promise.map(this.queueManagers, (queueManager, idx) => {
106+
const id = String(idx % 3);
107+
return queueManager.dlock
108+
.push(id, (...args) => onComplete(...args)) /* to ensure functions are unique */
109+
.then(job)
110+
.catch(isLockAcquisitionError, failedToQueue)
111+
.catch(unexpectedError);
112+
})
113+
.delay(4500) /* must be called after timeout * 2 */
114+
.then(() => {
115+
assert.equal(job.callCount, 3);
116+
assert.equal(onComplete.callCount, 10);
117+
assert.equal(onComplete.withArgs(sinon.match({ message: 'queue-no-response' })).callCount, 10);
118+
assert.equal(failedToQueue.callCount, 7, 'unexpected error was raised');
119+
assert.equal(unexpectedError.called, false, 'fatal error was raised');
120+
return null;
121+
});
122+
});
123+
96124
it('#push: when job fails onComplete is called with an error', () => {
97125
const args = new Error('fail');
98126
const job = sinon.spy(next => next(args));
@@ -233,9 +261,6 @@ describe('integration tests', () => {
233261
.map(Array(50), (_, i) => {
234262
const semaphore = this.semaphores[i % this.semaphores.length];
235263
return Promise.using(semaphore.take(), async () => {
236-
process.stderr.write(this.counter);
237-
process.stderr.write('\n');
238-
239264
this.counter += 1;
240265
// if it's possible for other contestants
241266
// to run out of semaphore lock - this.counter will

package.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,21 @@
4747
"@makeomatic/deploy": "^5.0.2",
4848
"@makeomatic/last-release-npm": "^1.0.1",
4949
"babel-cli": "^6.26.0",
50-
"babel-eslint": "^8.1.2",
50+
"babel-eslint": "^8.2.1",
5151
"babel-plugin-istanbul": "^4.1.5",
5252
"babel-plugin-transform-strict-mode": "^6.24.1",
5353
"babel-register": "^6.26.0",
5454
"codecov": "^3.0.0",
5555
"cross-env": "^5.1.3",
56-
"eslint": "^4.14.0",
56+
"eslint": "^4.15.0",
5757
"eslint-config-makeomatic": "^2.0.1",
5858
"eslint-plugin-import": "^2.8.0",
5959
"eslint-plugin-promise": "^3.6.0",
6060
"ioredis": "^3.2.2",
61-
"jest-cli": "^22.0.4",
61+
"jest-cli": "^22.1.1",
6262
"mocha": "^4.1.0",
6363
"nyc": "^11.4.1",
64-
"sinon": "^4.1.3"
64+
"sinon": "^4.1.6"
6565
},
6666
"engine": {
6767
"node": ">= 8.9.0"

src/distributed-callback-queue.js

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const notLockAcquisitionError = e => e.name !== 'LockAcquisitionError';
2222
const isBoolean = filter(Boolean);
2323
const toFlattenedTruthyArray = compose(isBoolean, flatten);
2424
const couldNotAcquireLockError = new LockAcquisitionError('job is already running');
25+
const TimeoutError = new Promise.TimeoutError('queue-no-response');
2526

2627
/**
2728
* @class DistributedCallbackQueue
@@ -32,7 +33,7 @@ const couldNotAcquireLockError = new LockAcquisitionError('job is already runnin
3233
* @param {redisClient} pubsub: redis connection that will be used for notifications
3334
* @param {String} pubsubChannel - will be used to pass notifications
3435
* @param {Object} lock - configuration for redislock:
35-
* @param {Number} timeout - defaults to 1000
36+
* @param {Number} timeout - defaults to 10000
3637
* @param {Number} retries - defaults to 0
3738
* @param {Number} delay - defaults to 100
3839
* @param {Object|Boolean} log: sets up logger. If set to false supresses all warnings
@@ -55,7 +56,7 @@ class DistributedCallbackQueue {
5556

5657
const lockOptions = defaults(options.lock || {}, {
5758
timeout: 10000,
58-
retries: 1,
59+
retries: 2,
5960
delay: 100,
6061
});
6162

@@ -124,10 +125,11 @@ class DistributedCallbackQueue {
124125
* Adds callback to distributed queue
125126
* @param {String} suffix - queue identifier
126127
* @param {Function} next - callback to be called when request is finished
128+
* @param {number} [timeout=this.lockOptions.timeout * 2] - fail after <timeout>, set to 0 to disable
127129
* @returns {Promise} if promise is resolved then we must act, if it's rejected - then
128130
* somebody else is working on the same task right now
129131
*/
130-
push(suffix, next) {
132+
push(suffix, next, timeout = this.lockOptions.timeout * 2) {
131133
assert(suffix, 'must be a truthy string');
132134

133135
// first queue locally to make use of pending requests
@@ -140,6 +142,13 @@ class DistributedCallbackQueue {
140142
return Promise.reject(couldNotAcquireLockError);
141143
}
142144

145+
if (timeout) {
146+
/* we are first in the local queue */
147+
const onTimeout = setTimeout(callbackQueue._call, timeout, lockRedisKey, [TimeoutError], this.logger);
148+
/* if we have no response from dlock -> without timeout, clean local queue */
149+
callbackQueue.add(lockRedisKey, () => clearTimeout(onTimeout));
150+
}
151+
143152
// create lock
144153
const lock = this.getLock();
145154

@@ -254,33 +263,45 @@ class DistributedCallbackQueue {
254263
* all queued callbacks
255264
*/
256265
createWorker(lockRedisKey, lock) {
257-
const dlock = this;
258266
/**
259267
* This function must be called when job has been completed
260268
* @param {Error} err
261269
* @param {Array} ...args
262270
*/
263-
return (err, ...args) => {
271+
const broadcastJobStatus = async (err, ...args) => {
272+
/* clen ref */
273+
broadcastJobStatus.lock = null;
274+
264275
// must release lock now. Technically there could be an event
265276
// where lock had not been released, notification already emitted
266277
// and callback is stuck in the queue, to avoid that we can add retry
267278
// to lock acquisition. Desicion and constraints are up to you. Ideally
268279
// you would want to cache result of the function for some time - and then
269280
// this race is completed. Multi() command is not possible to use here
270-
return lock
271-
.release()
272-
.then(() => {
273-
// emit event
274-
// at this point we are sure that this job still belongs to us,
275-
// if it doesn't - we can't publish response, because this task may be acquired
276-
// by someone else
277-
return dlock.publish(lockRedisKey, err, ...args);
278-
})
279-
.catch((error) => {
280-
// because a job may take too much time, other listeners must implement timeout/retry strategy
281-
dlock.logger.warn('failed to release lock and publish results', error);
282-
});
281+
try {
282+
// ensure lock still belongs to us
283+
await lock.extend();
284+
} catch (error) {
285+
// because a job may take too much time, other listeners must implement timeout/retry strategy
286+
this.logger.warn('failed to release lock and publish results', error);
287+
return null;
288+
}
289+
290+
// emit event
291+
// at this point we are sure that this job still belongs to us,
292+
// if it doesn't - we can't publish response, because this task may be acquired
293+
// by someone else
294+
return this
295+
.publish(lockRedisKey, err, ...args)
296+
/* ensure we release the lock once publish is completed */
297+
/* during race conditions we rely on _retry_ setting to re-acquire lock */
298+
.finally(() => lock.release().reflect());
283299
};
300+
301+
// set associated lock -> lengthy jobs must extend this
302+
broadcastJobStatus.lock = lock;
303+
304+
return broadcastJobStatus;
284305
}
285306
}
286307

0 commit comments

Comments
 (0)