Skip to content

Commit 9affedc

Browse files
support unsubscribing to a process handler
1 parent 5811c76 commit 9affedc

File tree

5 files changed

+205
-29
lines changed

5 files changed

+205
-29
lines changed

example/queueClient.js

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,41 @@
11
"use strict";
2+
var Q = require('q');
3+
var Queue = require('../lib/queue');
24

3-
var Queue = require('../lib/queue');
4-
5-
process.on('uncaughtException', function (err)
6-
{
5+
process.on('uncaughtException', function (err) {
76
console.error('Uncaught Exception:' + err.stack);
87
});
98

109
var queue = new Queue("myChannel", {
1110
workers: 50,
1211
servers: ['nats://192.168.99.100:4222'],
13-
timeout: 1000
12+
timeout: 5000
1413
});
1514

1615
var req = {
1716
field: "myField"
1817
};
1918

2019

21-
for (var i = 0; i < 1; i++){
22-
queue.request(req)
23-
.then(function (res) { // jshint ignore:line
24-
console.log(`finished: ${JSON.stringify(res)}`);
25-
})
26-
.progress(function (info) { // jshint ignore:line
27-
console.log(`progress: ${info.toString()}`);
20+
var send = () => {
21+
return Q.delay(1000)
22+
.then(() => {
23+
queue.request(req)
24+
.then(function (res) { // jshint ignore:line
25+
console.log(`finished: ${JSON.stringify(res)}`);
26+
})
27+
.progress(function (info) { // jshint ignore:line
28+
console.log(`progress: ${info.toString()}`);
29+
})
30+
.catch(function (err) { // jshint ignore:line
31+
console.error(`error: ${JSON.stringify(err.toString())}`);
32+
});
2833
})
29-
.catch(function (err) { // jshint ignore:line
30-
console.error(`error: ${JSON.stringify(err.toString())}`);
34+
.then(() => {
35+
send();
3136
});
32-
}
37+
};
38+
39+
send();
40+
41+

example/queueServer.js

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@ var queue = new Queue("myChannel", {
99

1010

1111

12-
queue.process(function(request, callback){
13-
request.progress("progress");
12+
var unsubscribe = queue.process(function(request, callback){
13+
//request.progress("progress");
1414
setTimeout(() => {
15-
console.log(`got request: ${JSON.stringify(request)}`);
15+
//console.log(`got request: ${JSON.stringify(request)}`);
1616
callback(null, "ok");
1717
//callback(null, "ok");
1818
request.progress("progress");
19-
}, 4000);
20-
});
19+
}, 1000);
20+
});
21+
22+
setTimeout(() => {
23+
unsubscribe();
24+
}, 2000);

lib/queue.js

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,17 +164,23 @@ Queue.prototype.process = function (opts, handler) {
164164
}
165165

166166
var subscribed = false;
167+
var unsubscribed = false;
168+
var sid;
167169

168170
var subscribe = function () {
169171
if (subscribed) {
170172
console.log(`subscriber for job: ${self.name} is already waiting, returning`);
171173
return;
172174
}
173-
subscribed = true;
175+
if (unsubscribed) {
176+
console.log(`subscriber for job: ${self.name} was unsubscribed, returning`);
177+
return;
178+
}
174179

180+
subscribed = true;
175181
console.log(`subscribing for job: ${self.name}. total[${self.totalWorkers}], running[${self.runningWorkers}]`);
176182

177-
var sid = self.connection.subscribe(self.name, {queue: self.name}, function (toBeConvertedPayload, replyTo) {
183+
sid = self.connection.subscribe(self.name, {queue: self.name}, function (toBeConvertedPayload, replyTo) {
178184

179185
var payload;
180186
var d = domain.create();
@@ -183,7 +189,7 @@ Queue.prototype.process = function (opts, handler) {
183189
var finishProcess = function () {
184190
finished = true;
185191
self.runningWorkers--;
186-
console.log(`job: ${self.name} finished with error. total[${self.totalWorkers}], running[${self.runningWorkers}]`);
192+
console.log(`job: ${self.name} finished. total[${self.totalWorkers}], running[${self.runningWorkers}]`);
187193
subscribe();
188194
};
189195

@@ -269,6 +275,18 @@ Queue.prototype.process = function (opts, handler) {
269275
};
270276

271277
subscribe();
278+
279+
var unsubscribe = () => {
280+
if (unsubscribed) {
281+
return;
282+
}
283+
unsubscribed = true;
284+
if (subscribed) {
285+
self.connection.unsubscribe(sid);
286+
console.log(`unsubscribing job: ${self.name}. total[${self.totalWorkers}], running[${self.runningWorkers}]`);
287+
}
288+
};
289+
return unsubscribe;
272290
};
273291

274292
Queue.prototype.publish = function (data) {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "cf-queue",
3-
"version": "0.0.35",
3+
"version": "0.0.36",
44
"description": "Queue wrapper for nats.io",
55
"main": "index.js",
66
"scripts": {

test/queue.unit.spec.js

Lines changed: 150 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ describe('request/process tests', function () {
376376
done(err);
377377
});
378378
d.run(() => {
379-
expect(job.request).to.deep.equal({ field: 'value' });
379+
expect(job.request).to.deep.equal({field: 'value'});
380380
callback();
381381
expect(publishSpy).to.have.been.calledOnce; // jshint ignore:line
382382
expect(unsubscribeSpy).to.not.have.been.called; // jshint ignore:line
@@ -422,7 +422,7 @@ describe('request/process tests', function () {
422422
done(err);
423423
});
424424
d.run(() => {
425-
expect(job.request).to.deep.equal({ field: 'value' });
425+
expect(job.request).to.deep.equal({field: 'value'});
426426
callback();
427427
expect(publishSpy).to.have.been.calledOnce; // jshint ignore:line
428428
expect(unsubscribeSpy).to.not.have.been.called; // jshint ignore:line
@@ -470,7 +470,7 @@ describe('request/process tests', function () {
470470
done(err);
471471
});
472472
d.run(() => {
473-
expect(job.request).to.deep.equal({ field: 'value' });
473+
expect(job.request).to.deep.equal({field: 'value'});
474474
expect(unsubscribeSpy).to.have.been.calledOnce; // jshint ignore:line
475475
expect(subscribeSpy).to.have.been.calledOnce; // jshint ignore:line
476476
done();
@@ -514,7 +514,7 @@ describe('request/process tests', function () {
514514
done(err);
515515
});
516516
d.run(() => {
517-
expect(job.request).to.deep.equal({ field: 'value' });
517+
expect(job.request).to.deep.equal({field: 'value'});
518518
expect(unsubscribeSpy).to.have.been.calledOnce; // jshint ignore:line
519519
expect(subscribeSpy).to.have.been.calledOnce; // jshint ignore:line
520520
callback();
@@ -525,6 +525,151 @@ describe('request/process tests', function () {
525525
queue.process(handler);
526526
});
527527

528+
it('should call connection unsubscribe in case of unsubscribing', (done) => {
529+
var subscribed = false;
530+
var unsubscribeSpy = sinon.spy();
531+
var publishSpy = sinon.spy();
532+
var subscribeSpy = sinon.spy((name, options, callback) => {
533+
process.nextTick(() => {
534+
if (!subscribed) {
535+
subscribed = true;
536+
callback(Buffer(JSON.stringify({
537+
data: {
538+
field: "value"
539+
}
540+
}), 'utf8').toString('base64'));
541+
}
542+
});
543+
return 1;
544+
});
545+
var Queue = proxyquire('../lib/queue', {
546+
'nats': {
547+
connect: () => {
548+
return {
549+
unsubscribe: unsubscribeSpy,
550+
subscribe: subscribeSpy,
551+
publish: publishSpy
552+
};
553+
}
554+
}
555+
});
556+
var queue = new Queue("myChannel", {workers: 2});
557+
var handler = sinon.spy((job, callback) => {
558+
var d = domain.create();
559+
d.on('error', (err) => {
560+
done(err);
561+
});
562+
d.run(() => {
563+
expect(job.request).to.deep.equal({field: 'value'});
564+
callback();
565+
expect(publishSpy).to.have.been.calledOnce; // jshint ignore:line
566+
expect(unsubscribeSpy).to.not.have.been.called; // jshint ignore:line
567+
callback();
568+
unsubscribe();
569+
expect(unsubscribeSpy).to.have.been.calledOnce; // jshint ignore:line
570+
done();
571+
});
572+
});
573+
var unsubscribe = queue.process(handler); // jshint ignore:line
574+
});
575+
576+
it('should not succeed to subscribe again in case unsubscribe was called and after that a handler of a previous request finishes its execution and tries to re-subscribe', (done) => {
577+
var subscribed = false;
578+
var unsubscribeSpy = sinon.spy();
579+
var publishSpy = sinon.spy();
580+
var subscribeSpy = sinon.spy((name, options, callback) => {
581+
setTimeout(() => {
582+
if (!subscribed) {
583+
subscribed = true;
584+
callback(Buffer(JSON.stringify({
585+
data: {
586+
field: "value"
587+
}
588+
}), 'utf8').toString('base64'));
589+
}
590+
}, 500);
591+
return 1;
592+
});
593+
var Queue = proxyquire('../lib/queue', {
594+
'nats': {
595+
connect: () => {
596+
return {
597+
unsubscribe: unsubscribeSpy,
598+
subscribe: subscribeSpy,
599+
publish: publishSpy
600+
};
601+
}
602+
}
603+
});
604+
var queue = new Queue("myChannel", {workers: 2});
605+
var handler = sinon.spy((job, callback) => {
606+
var d = domain.create();
607+
d.on('error', (err) => {
608+
done(err);
609+
});
610+
d.run(() => {
611+
expect(job.request).to.deep.equal({field: 'value'});
612+
callback();
613+
expect(publishSpy).to.have.been.calledOnce; // jshint ignore:line
614+
expect(unsubscribeSpy).to.have.been.calledOnce; // jshint ignore:line
615+
expect(subscribeSpy).to.have.been.calledOnce; // jshint ignore:line
616+
callback();
617+
done();
618+
});
619+
});
620+
var unsubscribe = queue.process(handler);
621+
unsubscribe();
622+
});
623+
624+
it('should subscribe again in case worker reached max, completed and only after that unsubscribe was called', (done) => {
625+
var subscribed = false;
626+
var unsubscribeSpy = sinon.spy();
627+
var publishSpy = sinon.spy();
628+
var subscribeSpy = sinon.spy((name, options, callback) => {
629+
setTimeout(() => {
630+
if (!subscribed) {
631+
subscribed = true;
632+
callback(Buffer(JSON.stringify({
633+
data: {
634+
field: "value"
635+
}
636+
}), 'utf8').toString('base64'));
637+
}
638+
}, 500);
639+
return 1;
640+
});
641+
var Queue = proxyquire('../lib/queue', {
642+
'nats': {
643+
connect: () => {
644+
return {
645+
unsubscribe: unsubscribeSpy,
646+
subscribe: subscribeSpy,
647+
publish: publishSpy
648+
};
649+
}
650+
}
651+
});
652+
var queue = new Queue("myChannel", {workers: 1});
653+
var handler = sinon.spy((job, callback) => {
654+
var d = domain.create();
655+
d.on('error', (err) => {
656+
done(err);
657+
});
658+
d.run(() => {
659+
expect(job.request).to.deep.equal({field: 'value'});
660+
callback();
661+
expect(publishSpy).to.have.been.calledOnce; // jshint ignore:line
662+
expect(unsubscribeSpy).to.have.been.calledOnce; // jshint ignore:line
663+
expect(subscribeSpy).to.have.been.calledTwice; // jshint ignore:line
664+
unsubscribe();
665+
expect(unsubscribeSpy).to.have.been.calledTwice; // jshint ignore:line
666+
callback();
667+
done();
668+
});
669+
});
670+
var unsubscribe = queue.process(handler); // jshint ignore:line
671+
});
672+
528673
});
529674

530675
describe('negative tests', () => {
@@ -571,7 +716,7 @@ describe('request/process tests', function () {
571716
done(err);
572717
});
573718
d.run(() => {
574-
expect(job.request).to.deep.equal({ field: 'value' });
719+
expect(job.request).to.deep.equal({field: 'value'});
575720
expect(subscribeSpy).to.have.been.calledOnce; // jshint ignore:line
576721
callback(new Error("handling error"));
577722
});

0 commit comments

Comments
 (0)