Skip to content

Commit fba17e6

Browse files
committed
feat: provide new .fanout method as a helper around .push
BREAKING CHANGE: upgrades deps, uses async/await
1 parent 467f1d9 commit fba17e6

File tree

12 files changed

+3113
-2334
lines changed

12 files changed

+3113
-2334
lines changed

.eslintrc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
{
2-
"extends": "makeomatic",
3-
"parser": "babel-eslint"
2+
"extends": "makeomatic"
43
}

.mdeprc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
2-
"node": "10.15.1",
2+
"node": "10.16.3",
33
"nycCoverage": false,
4-
"test_framework": "jest --coverage --coverageDirectory <coverageDirectory> --forceExit",
4+
"test_framework": "jest --coverage --coverageDirectory <coverageDirectory>",
55
"tests": "__tests__/*.js",
66
"docker_compose": "__tests__/docker-compose.yml"
77
}

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,38 @@ function onJobCompleted(err, ...args) {
9494
}
9595
```
9696

97+
### Async/Await in-flight request caching
98+
99+
#### Fanout(jobId: String, [timeout: Number], job: Function)
100+
101+
Use `fanout(...)` method for the easiest way to handle job subscriptions where
102+
one actor must perform long-running job, but as soon as it's done - everyone who
103+
queued for the results of this job must be notified.
104+
105+
Sample of code is provided to make use of this feature:
106+
107+
```js
108+
const jobId = 'xxx';
109+
const job = async () => {
110+
await Promise.delay(10000);
111+
return 'done';
112+
}
113+
114+
let result;
115+
try {
116+
result = await callbackQueue.fanout(jobId, 2000, job);
117+
} catch (e) {
118+
// handle timeout error - because it will fail
119+
}
120+
121+
try {
122+
result = await callbackQueue.fanout(jobId, 20000, job);
123+
// result will be equal 'done'
124+
} catch (e) {
125+
// no error is expected, but make sure you handle unexpected cases
126+
}
127+
```
128+
97129
### Distributed Resource Locking
98130

99131
Allows to acquire lock across multiple processes with redis based lock

__tests__/docker-compose.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ version: '2'
22

33
services:
44
redis:
5-
image: redis:4.0.11-alpine
5+
image: redis:5-alpine
66
container_name: redis
77
hostname: redis
88
expose:
99
- 6379
1010

1111
tester:
12-
image: makeomatic/node:10.15.0-tester
12+
image: makeomatic/node:10.16.3-tester
1313
hostname: tester
1414
links:
1515
- redis
@@ -18,4 +18,5 @@ services:
1818
- ${PWD}:/src
1919
environment:
2020
NODE_ENV: "test"
21+
DEBUG: "${DEBUG}"
2122
command: tail -f /dev/null

__tests__/integration.js

Lines changed: 180 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
const Promise = require('bluebird');
22
const Redis = require('ioredis');
33
const assert = require('assert');
4-
const DLock = require('..');
54
const sinon = require('sinon');
5+
const DLock = require('..');
66

77
describe('integration tests', () => {
88
jest.setTimeout(10000);
@@ -42,17 +42,23 @@ describe('integration tests', () => {
4242

4343
it('#push: job is performed only once', () => {
4444
const args = [null, 'completed'];
45-
const job = sinon.spy(next => setTimeout(next, 500, ...args));
45+
const job = sinon.spy((next) => setTimeout(next, 500, ...args));
4646
const onComplete = sinon.spy();
4747
const failedToQueue = sinon.spy();
4848
const unexpectedError = sinon.spy();
4949

50-
return Promise.map(this.queueManagers, (queueManager) => {
51-
return queueManager.dlock
52-
.push('1', (...data) => onComplete(...data))
53-
.then(job)
54-
.catch(isLockAcquisitionError, failedToQueue)
55-
.catch(unexpectedError);
50+
return Promise.map(this.queueManagers, async (queueManager) => {
51+
try {
52+
await queueManager.dlock
53+
.push('1', (...data) => onComplete(...data))
54+
.then(job);
55+
} catch (e) {
56+
if (isLockAcquisitionError(e)) {
57+
failedToQueue(e);
58+
} else {
59+
unexpectedError(e);
60+
}
61+
}
5662
})
5763
.delay(600)
5864
.then(() => {
@@ -67,22 +73,29 @@ describe('integration tests', () => {
6773

6874
it('#push: multiple jobs are completed only once', () => {
6975
const args = [null, 'completed'];
70-
const job = sinon.spy(next => next(...args));
76+
const job = sinon.spy((next) => next(...args));
7177
const onComplete = sinon.spy();
7278
const failedToQueue = sinon.spy();
7379
const unexpectedError = sinon.spy();
7480

75-
return Promise.map(this.queueManagers, (queueManager, idx) => {
81+
return Promise.map(this.queueManagers, async (queueManager, idx) => {
7682
// 0 1 2
7783
// 0 1 2
7884
// 0 1 2
7985
// 0
8086
const id = String(idx % 3);
81-
return queueManager.dlock
82-
.push(id, (...data) => onComplete(id, ...data))
83-
.then(job)
84-
.catch(isLockAcquisitionError, failedToQueue)
85-
.catch(unexpectedError);
87+
88+
try {
89+
await queueManager.dlock
90+
.push(id, (...data) => onComplete(id, ...data))
91+
.then(job);
92+
} catch (e) {
93+
if (isLockAcquisitionError(e)) {
94+
failedToQueue(e);
95+
} else {
96+
unexpectedError(e);
97+
}
98+
}
8699
})
87100
.delay(100)
88101
.then(() => {
@@ -102,13 +115,19 @@ describe('integration tests', () => {
102115
const failedToQueue = sinon.spy();
103116
const unexpectedError = sinon.spy();
104117

105-
return Promise.map(this.queueManagers, (queueManager, idx) => {
118+
return Promise.map(this.queueManagers, async (queueManager, idx) => {
106119
const id = String(idx % 3);
107-
return queueManager.dlock
108-
.push(id, (...args) => onComplete(...args)) /* to ensure functions are unique */
109-
.then(job)
110-
.catch(isLockAcquisitionError, failedToQueue)
111-
.catch(unexpectedError);
120+
try {
121+
await queueManager.dlock
122+
.push(id, (...args) => onComplete(...args)) /* to ensure functions are unique */
123+
.then(job);
124+
} catch (e) {
125+
if (isLockAcquisitionError(e)) {
126+
failedToQueue(e);
127+
} else {
128+
unexpectedError(e);
129+
}
130+
}
112131
})
113132
.delay(4500) /* must be called after timeout * 2 */
114133
.then(() => {
@@ -123,17 +142,23 @@ describe('integration tests', () => {
123142

124143
it('#push: when job fails onComplete is called with an error', () => {
125144
const args = new Error('fail');
126-
const job = sinon.spy(next => next(args));
145+
const job = sinon.spy((next) => next(args));
127146
const onComplete = sinon.spy();
128147
const failedToQueue = sinon.spy();
129148
const unexpectedError = sinon.spy();
130149

131-
return Promise.map(this.queueManagers, (queueManager) => {
132-
return queueManager.dlock
133-
.push('error', (...data) => onComplete(...data))
134-
.then(job)
135-
.catch(isLockAcquisitionError, failedToQueue)
136-
.catch(unexpectedError);
150+
return Promise.map(this.queueManagers, async (queueManager) => {
151+
try {
152+
await queueManager.dlock
153+
.push('error', (...data) => onComplete(...data))
154+
.then(job);
155+
} catch (e) {
156+
if (isLockAcquisitionError(e)) {
157+
failedToQueue(e);
158+
} else {
159+
unexpectedError(e);
160+
}
161+
}
137162
})
138163
.delay(100)
139164
.then(() => {
@@ -152,6 +177,128 @@ describe('integration tests', () => {
152177
});
153178
});
154179

180+
it('#fanout: job is performed only once', () => {
181+
const args = ['completed'];
182+
const job = sinon.spy(async () => {
183+
await Promise.delay(500);
184+
return [...args];
185+
});
186+
const onComplete = sinon.spy();
187+
const unexpectedError = sinon.spy();
188+
189+
return Promise.map(this.queueManagers, async (queueManager) => {
190+
try {
191+
onComplete(await queueManager.dlock.fanout('1', job));
192+
} catch (e) {
193+
unexpectedError(e);
194+
}
195+
})
196+
.delay(600)
197+
.then(() => {
198+
assert(job.calledOnce, 'job was called more than once');
199+
assert(onComplete.alwaysCalledWithExactly(args), 'onComplete was called with incorrect args');
200+
assert.equal(onComplete.callCount, 10, 'onComplete was called wrong amount of times');
201+
assert.equal(unexpectedError.called, false, 'fatal error was raised');
202+
return null;
203+
});
204+
});
205+
206+
it('#fanout: multiple jobs are completed only once', () => {
207+
const args = ['completed'];
208+
const job = sinon.spy(() => args);
209+
const onComplete = sinon.spy();
210+
const unexpectedError = sinon.spy();
211+
212+
return Promise.map(this.queueManagers, async (queueManager, idx) => {
213+
// 0 1 2
214+
// 0 1 2
215+
// 0 1 2
216+
// 0
217+
const id = String(idx % 3);
218+
219+
try {
220+
onComplete(id, await queueManager.dlock.fanout(id, job));
221+
} catch (e) {
222+
unexpectedError(e);
223+
}
224+
})
225+
.delay(100)
226+
.then(() => {
227+
assert.equal(job.callCount, 3);
228+
assert.equal(onComplete.withArgs('0', args).callCount, 4);
229+
assert.equal(onComplete.withArgs('1', args).callCount, 3);
230+
assert.equal(onComplete.withArgs('2', args).callCount, 3);
231+
assert.equal(unexpectedError.called, false, 'fatal error was raised');
232+
return null;
233+
});
234+
});
235+
236+
it('#fanout: fails after timeout', async () => {
237+
const job = sinon.spy(async () => {
238+
await Promise.delay(3000);
239+
});
240+
const onComplete = sinon.spy();
241+
const timeoutError = sinon.spy();
242+
const unexpectedError = sinon.spy();
243+
244+
await Promise.map(this.queueManagers, async (queueManager, idx) => {
245+
const id = String(idx % 3);
246+
247+
try {
248+
const result = await queueManager.dlock.fanout(id, 1500, job);
249+
onComplete(result);
250+
} catch (e) {
251+
if (e.message === 'queue-no-response') {
252+
timeoutError(e);
253+
} else {
254+
unexpectedError(e);
255+
}
256+
}
257+
});
258+
259+
assert.equal(job.callCount, 3);
260+
assert.equal(onComplete.callCount, 0);
261+
assert.equal(timeoutError.callCount, 10);
262+
assert.equal(timeoutError.withArgs(sinon.match({ message: 'queue-no-response' })).callCount, 10);
263+
assert.equal(unexpectedError.called, false, 'fatal error was raised');
264+
});
265+
266+
it('#fanout: when job fails onComplete is called with an error', () => {
267+
const args = new Error('fail');
268+
const job = sinon.spy(async () => {
269+
throw args;
270+
});
271+
const onComplete = sinon.spy();
272+
const unexpectedError = sinon.spy();
273+
274+
return Promise.map(this.queueManagers, async (queueManager) => {
275+
try {
276+
const results = await queueManager.dlock.fanout('error', job);
277+
onComplete(null, results);
278+
} catch (e) {
279+
if (e.name === args.name && e.message === args.message) {
280+
onComplete(e);
281+
} else {
282+
unexpectedError(e);
283+
}
284+
}
285+
})
286+
.delay(100)
287+
.then(() => {
288+
assert(job.calledOnce, 'job was called more than once');
289+
assert.equal(onComplete.callCount, 10, 'onComplete was called wrong amount of times');
290+
onComplete.args.forEach((it) => {
291+
const [err] = it;
292+
const { name, message, stack } = err;
293+
assert.equal(args.name, name);
294+
assert.equal(args.message, message);
295+
assert.ok(stack);
296+
});
297+
assert.equal(unexpectedError.called, false, 'fatal error was raised');
298+
return null;
299+
});
300+
});
301+
155302
it('#once - performs task once and rejects others', () => {
156303
const job = sinon.spy();
157304
const failedToQueue = sinon.spy();
@@ -183,15 +330,15 @@ describe('integration tests', () => {
183330
const job = sinon.spy();
184331
const failedToQueue = sinon.spy();
185332
const unexpectedError = sinon.spy();
186-
const queueManager = this.queueManagers[0];
333+
const [queueManager] = this.queueManagers;
187334

188335
return queueManager
189336
.dlock
190337
.multi('1', '2')
191338
.tap(job)
192-
.tap(lock => lock.extend(10000))
339+
.tap((lock) => lock.extend(10000))
193340
.tap(job)
194-
.tap(lock => lock.release())
341+
.tap((lock) => lock.release())
195342
.tap(job)
196343
.catch(DLock.MultiLockError, failedToQueue)
197344
.catch(unexpectedError)
@@ -231,7 +378,7 @@ describe('integration tests', () => {
231378
.map(this.queueManagers, (queueManager) => {
232379
return queueManager.dlock
233380
.multi('1', '2', '3')
234-
.then(lock => (
381+
.then((lock) => (
235382
Promise
236383
.delay(1000)
237384
.then(() => lock.release())
@@ -251,7 +398,7 @@ describe('integration tests', () => {
251398
describe('#semaphore', () => {
252399
beforeEach(() => {
253400
this.counter = 0;
254-
this.semaphores = this.queueManagers.map(manager => (
401+
this.semaphores = this.queueManagers.map((manager) => (
255402
manager.dlock.semaphore('test-semaphore')
256403
));
257404
});

0 commit comments

Comments
 (0)