diff --git a/package.json b/package.json index 26ff182..6457b22 100644 --- a/package.json +++ b/package.json @@ -48,11 +48,11 @@ "body-parser": "^1.15.2", "chalk": "^1.1.1", "docker-names": "^1.0.0", - "elasticsearch": "^11.0.1", + "elasticsearch": "^12.0.1", "isprod": "^1.1.0", "json3": "^3.3.2", "jsonpath": "^0.2.6", - "lodash": "4.16.0", + "lodash": "4.16.4", "lokijs": "^1.4.1", "node-uuid": "^1.4.7", "rabbit.js": "0.4.4", @@ -60,11 +60,11 @@ "winston": "^2.2.0" }, "devDependencies": { - "babel-cli": "^6.5.1", - "babel-core": "^6.5.2", - "babel-eslint": "^6.0.0", + "babel-cli": "^6.18.0", + "babel-core": "^6.18.0", + "babel-eslint": "^7.0.0", "babel-plugin-transform-function-bind": "^6.5.2", - "babel-preset-es2015": "^6.13.2", + "babel-preset-es2015": "^6.18.0", "chai": "^3.5.0", "coveralls": "^2.11.13", "dirty-chai": "^1.2.2", diff --git a/setup-test.sh b/setup-test.sh new file mode 100755 index 0000000..7fad824 --- /dev/null +++ b/setup-test.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +docker run -d -p 9200:9200 -p 9300:9300 --name es4test elasticsearch:2 +docker run -d -p 5671:5671 -p 5672:5672 -p 25672:25672 --name mq4test rabbitmq:3 diff --git a/src/module.js b/src/module.js index b1f4f7c..6a666aa 100644 --- a/src/module.js +++ b/src/module.js @@ -29,6 +29,7 @@ class Module extends EventEmitter { * a Peristence module is the module that only persists data, you can only have a single persistence module (same instances can be launched by service to process heavy load) * @param {String} [options.positivePath] the json path {@see https://github.com/dchester/jsonpath} that when matched messages will be sent to this module * @param {String} [options.negativePath] the json path {@see https://github.com/dchester/jsonpath} that when NOT matched messages will be sent to this module + * @param {boolean} [options.resend] when false the orchestrator will never send the message to the module more than once * */ @@ -41,7 +42,8 @@ class Module extends EventEmitter { registerQueue: 'o_register', amqpURL: 'amqp://localhost:5672', type: 'processor', - prefetch: 1 + prefetch: 1, + resend: true }; if (typeof options === 'string') { @@ -66,6 +68,7 @@ class Module extends EventEmitter { this.workerQueueName = null; this.prefetch = options.prefetch; this.type = options.type; + this.resend = options.resend; // --------------------------------------------- @@ -94,7 +97,8 @@ class Module extends EventEmitter { workerQueueName: this.workerQueueName, messagesQueue: this.messagesQueue, prefetch: this.prefetch, - type: this.type + type: this.type, + resend: this.resend }); } diff --git a/src/orchestrator.js b/src/orchestrator.js index 13c332b..6188556 100644 --- a/src/orchestrator.js +++ b/src/orchestrator.js @@ -387,7 +387,7 @@ class Orchestrator { return Promise.resolve().then(() => { module = Orchestrator.checkModule(module); - logger.info('Registering new module', module); + logger.info('Registering new module', module.toJSON()); if (this.isRegistered(module)) { logger.info(`Module ${module.name} already registered for uuid ${module.uuid}`); @@ -413,15 +413,20 @@ class Orchestrator { * Find modules that matches for the give message * ordered by their registration _order * @param {Object} message + * @param {Object} meta the message metadata * @return {Promise} */ - findMatchingModules(message) { + findMatchingModules(message, meta) { return new Promise((resolve) => { logger.debug('Finding modules that matches', message); const modules = this.modulesCollection.where((module) => { if (module.type === 'persistence') { return false; } + if (meta && meta.service === module.service && !module.resend) { + return false; + } + let matchesPositive = true; let matchesNegative = true; diff --git a/test/module.spec.js b/test/module.spec.js index d866345..83b466a 100644 --- a/test/module.spec.js +++ b/test/module.spec.js @@ -31,7 +31,7 @@ describe('Module', function () { // if a new property is added that should be serialized is not added to the toJSON method // it will not be serialized and the functionality might fail. // After verification this constant should be increased by the number of new properties - const expectedKeysSize = 19; + const expectedKeysSize = 20; chai.expect(Object.keys(m).length).to.be.equals(expectedKeysSize); diff --git a/test/orchestrator.spec.js b/test/orchestrator.spec.js index 2772510..7e38491 100644 --- a/test/orchestrator.spec.js +++ b/test/orchestrator.spec.js @@ -198,6 +198,43 @@ describe('Orchestrator', function () { }); }); + + it('Should resend to the same service when module.resend is omitted', function () { + // const o = new Orchestrator(); + + return o.listen().then(() => { + const originalModule = new Module({ service: serviceName }); + + originalModule.negativePath = '$.negativeKeyName'; + + return o.register(originalModule).then(() => { + return o.findMatchingModules({ key: 'value' }, { service: serviceName }); + }).then((modules) => { + chai.expect(modules).to.be.a('array'); + chai.expect(modules).not.to.be.empty(); + chai.expect(modules[0]).to.have.property('uuid').that.is.equals(originalModule.uuid); + }); + }); + }); + + + it('Should not resend to the same service when module.resend is false', function () { + // const o = new Orchestrator(); + + return o.listen().then(() => { + const originalModule = new Module({ service: serviceName, resend: false }); + + originalModule.negativePath = '$.negativeKeyName'; + + return o.register(originalModule).then(() => { + return o.findMatchingModules({ key: 'value' }, { service: serviceName }); + }).then((modules) => { + chai.expect(modules).to.be.a('array'); + chai.expect(modules).to.be.empty(); + }); + }); + }); + it('Should listen with default options', function () { // const o = new Orchestrator();