Skip to content

Commit

Permalink
* package.json: Updated dependencies to latest (resolving some proce…
Browse files Browse the repository at this point in the history
…ss hold and other amqp issues)

 * package.json: Added some dev deps for diagnosing process holding issues
 * QueueWorker.js: Updated to better handle the graceful shutdown process and not leave danging processes behind
  • Loading branch information
kfitzgerald committed Aug 6, 2019
1 parent 07038fd commit 271601c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 17 deletions.
25 changes: 14 additions & 11 deletions QueueWorker.js
Expand Up @@ -250,19 +250,22 @@ class QueueWorker extends OkanjoWorker {
/**
* Starts the internal shutdown process (hook point)
*/
prepareForShutdown(canAsync) { /* eslint-disable-line no-unused-vars */
async prepareForShutdown(canAsync) { /* eslint-disable-line no-unused-vars */

this.log(` !! Shutting down the ${this.subscriptionName} queue`);
(async () => {
// If there's a message in the works, wait for it to end
try {
await this.service.broker.shutdown();
} catch (err) /* istanbul ignore next: faking this is unreliable */ {
await this.app.report('QueueWorker: Failed to shutdown broker', err, {
subscription: this.subscriptionName
});
}
})();
this.unbindProcessSignals();

// If there's a message in the works, wait for it to end
try {
await this.service.broker.shutdown();
} catch (err) /* istanbul ignore next: faking this is unreliable */ {
await this.app.report('QueueWorker: Failed to shutdown broker', err, {
subscription: this.subscriptionName
});
}

/* istanbul ignore next: die now */
this.shutdown();
}
}

Expand Down
14 changes: 8 additions & 6 deletions package.json
@@ -1,6 +1,6 @@
{
"name": "okanjo-app-queue",
"version": "2.1.0",
"version": "3.0.0",
"description": "Service for interfacing with RabbitMQ",
"main": "QueueService.js",
"scripts": {
Expand All @@ -24,21 +24,23 @@
"author": "Okanjo Partners Inc",
"license": "MIT",
"devDependencies": {
"amqplib": "^0.5.5",
"eslint": "^5.15.3",
"mocha": "^6.0.2",
"mocha": "^6.2.0",
"nyc": "^13.3.0",
"okanjo-app": "^2.0.0",
"okanjo-app": "^2.1.0",
"okanjo-app-broker": "^2.2.0",
"should": "^13.2.3",
"amqplib": "^0.5.3"
"why-is-node-running": "^2.1.0"
},
"peerDependencies": {
"okanjo-app": ">=2",
"okanjo-app-broker": ">=2.2",
"amqplib": "^0.5.3"
},
"dependencies": {
"async": "^2.6.2",
"okanjo-app-broker": "^1.0.0",
"rascal": "^4.2.3"
"rascal": "^4.6.0"
},
"nyc": {
"reporter": [
Expand Down
8 changes: 8 additions & 0 deletions test/BatchQueueWorker.test.js
@@ -1,5 +1,6 @@
"use strict";

const log = require('why-is-node-running'); // should be your first require
const should = require('should');

describe('BatchQueueWorker', () => {
Expand Down Expand Up @@ -34,6 +35,13 @@ describe('BatchQueueWorker', () => {
await app.connectToServices();
});

after((done) => {
app.services.queue.broker.shutdown(() => {
log(); // logs out active handles that are keeping node running
done();
});
});

it('should be bound to app', function () {
should(app.services.queue).be.an.Object();
app.services.queue.should.be.instanceof(QueueService);
Expand Down
8 changes: 8 additions & 0 deletions test/QueueService.test.js
@@ -1,5 +1,6 @@
"use strict";

const log = require('why-is-node-running'); // should be your first require
const should = require('should');
const TestUtil = require('./TestUtil');

Expand Down Expand Up @@ -27,6 +28,13 @@ describe('QueueService', () => {
await app.connectToServices();
});

after((done) => {
app.services.queue.broker.shutdown(() => {
log(); // logs out active handles that are keeping node running
done();
});
});

it('should be bound to app', function () {
app.services.queue.should.be.an.Object();
app.services.queue.should.be.instanceof(QueueService);
Expand Down
8 changes: 8 additions & 0 deletions test/QueueWorker.test.js
@@ -1,5 +1,6 @@
"use strict";

const log = require('why-is-node-running'); // should be your first require
const should = require('should');

// process.env.DEBUG='rascal:Subscription,rascal:SubscriberError,rascal:SubscriberSession';
Expand Down Expand Up @@ -35,6 +36,13 @@ describe('QueueWorker', () => {
await app.connectToServices();
});

after((done) => {
// app.services.queue.broker.shutdown(() => {
log(); // logs out active handles that are keeping node running
done();
// });
});

it('should be bound to app', function () {
should(app.services.queue).be.an.Object();
app.services.queue.should.be.instanceof(QueueService);
Expand Down

0 comments on commit 271601c

Please sign in to comment.