Skip to content

Commit

Permalink
fxing errors on message routing to modules.
Browse files Browse the repository at this point in the history
creating e2e tests between module and orchestrator
fixing errors on message and module serialization when posting to queues
  • Loading branch information
Panthro committed Aug 16, 2016
1 parent a4175e7 commit b52435b
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 45 deletions.
39 changes: 32 additions & 7 deletions src/module.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Module extends EventEmitter {
};

options = _.defaults(options, defaults);
logger.debug('Initializing module with options:', JSON.stringify(options));
logger.debug('Initializing module with options:', options);

// Simple properties that should be added to JSON
// ---------------------------------------------
Expand All @@ -48,13 +48,16 @@ class Module extends EventEmitter {
this.order = options.order;
this.registerQueue = options.registerQueue;
this.amqpURL = options.amqpURL;
this.messagesQueue = null;

// ---------------------------------------------

// Complex properties (Objects, classes, etc)
// ---------------------------------------------
this.workerQueueName = null;
this.amqpContext = null;
this.workerSocket = null;
this.messageQueueSocket = null;
// ---------------------------------------------
}

Expand All @@ -72,7 +75,8 @@ class Module extends EventEmitter {
order: this.order,
registerQueue: this.registerQueue,
amqpURL: this.amqpURL,
workerQueueName: this.workerQueueName
workerQueueName: this.workerQueueName,
messagesQueue: this.messagesQueue
});
}

Expand Down Expand Up @@ -120,14 +124,16 @@ class Module extends EventEmitter {
* for new jobs to process
*/
listen() {
logger.debug('Listening for new messages on queue: ', this.workerQueueName);
this.workerSocket = this.amqpContext.socket('WORKER');
this.workerSocket.connect(this.workerQueueName);
const _this = this;

this.workerSocket.on('data', (message) => {
logger.debug('New message received', message);

logger.debug(`New message received ${message.toString()}`);
try {
message = JSON.parse(message.toString());
message = JSON.parse(message);
} catch (err) {
logger.debug('Could not convert message to JSON, sending raw value');
message = message.toString();
Expand All @@ -140,12 +146,31 @@ class Module extends EventEmitter {
* Send an acknowledge message back to the orchestrator, this ack the last received job
* and should be called for every message handled inside #handleMessage
* @see handleMessage
* @param {*} message
*/
sendAck() {
logger.debug('Sending ACK for last received message');
this.workerSocket.ack();
afterProcess(message) {
this._connectToMessageQueue().then(() => {
this.messageQueueSocket.write(JSON.stringify(message));
logger.debug('Sending ACK for last received message');
this.workerSocket.ack();
});
}

/**
* Connects to the message queue that the Orchestrator has given back in the field messageQueue
* @private
* @return {Promise}
*/
_connectToMessageQueue() {
if (this.messageQueueSocket) {
return Promise.resolve();
}
return new Promise((resolve) => {
this.messageQueueSocket = this.amqpContext.socket('PUSH');
this.messageQueueSocket.connect(this.messagesQueue, resolve);
});

}
}


Expand Down
59 changes: 29 additions & 30 deletions src/orchestrator.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class Orchestrator {
const _this = this;

workerSocket.on('data', (message) => {
_this._onMessage(message)
_this._onMessage(JSON.parse(message.toString()))
.then(() => {
workerSocket.ack();
});
Expand All @@ -200,7 +200,7 @@ class Orchestrator {
replySocket.on('data', (message) => {
logger.debug('Received new module registration');
return this.onNewModule(message).then((m) => {
logger.debug('Module registered, writing back response', m);
logger.debug('Module registered, writing back response', m.toJSON());
replySocket.write(m.toJSON());
logger.debug('Response written');
return m;
Expand Down Expand Up @@ -257,30 +257,26 @@ class Orchestrator {
* @return {Promise} that resolves if handling of the message is OK
*/
_onMessage(originalMessage) {
return this._storeMessage(originalMessage).then((storedMessage) => {
try {
storedMessage = JSON.parse(storedMessage.toString());
this.findMatchingModules(storedMessage).then((modules) => {
if (_.isEmpty(modules)) {
logger.info('Finished pipeline for message, storing and not redirecting to any module');
this._storeMessage(storedMessage);
} else {
const module = modules[0];

logger.info('Redirecting message to module', module.service, module.name);
logger.info('Sending message to queue', module.workerQueueName);
const pushSocket = this.amqpContext.socket('PUSH');

pushSocket.connect(module.workerQueueName);
pushSocket.write(storedMessage);
}
});
} catch (err) {
logger.err('Could not convert message to JSON', storedMessage.toString());
logger.err('Message will be discarded');
}
});

return this._storeMessage(originalMessage)
.then((storedMessage) => {
return this.findMatchingModules(storedMessage)
.then((modules) => {
if (_.isEmpty(modules)) {
logger.info('Finished pipeline for message, storing and not redirecting to any module');
this._storeMessage(storedMessage);
} else {
const module = modules[0];

logger.info('Redirecting message to module', module.service, module.name);
logger.debug('Sending message to queue', storedMessage, module.workerQueueName);
const pushSocket = this.amqpContext.socket('PUSH');

pushSocket.connect(module.workerQueueName, () => {
pushSocket.write(JSON.stringify(storedMessage));
});
}
});
});
}

/**
Expand Down Expand Up @@ -366,6 +362,7 @@ class Orchestrator {
});
}
module.order = ++_this._order;
module.messagesQueue = _this.messagesQueue;
module.workerQueueName = _this.generateModuleQueueName(module);
return this.modulesCollection.insert(module);
});
Expand All @@ -382,16 +379,18 @@ class Orchestrator {
return new Promise((resolve) => {
logger.debug('Finding modules that matches', message);
const modules = this.modulesCollection.where((module) => {
let matches = false;
let matchesPositive = true;
let matchesNegative = true;


if (module.positivePath) {
matches = Orchestrator.matchesPath(message, module.positivePath);
matchesPositive = Orchestrator.matchesPath(message, module.positivePath);
}

if (module.negativePath) {
matches = !Orchestrator.matchesPath(message, module.negativePath);
matchesNegative = !Orchestrator.matchesPath(message, module.negativePath);
}
return matches;
return matchesPositive && matchesNegative;
});

logger.debug('Found modules:', modules.length);
Expand Down
95 changes: 95 additions & 0 deletions test/module.e2e.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Common test Dependencies
*/

import * as chai from 'chai';
import dirtyChai from 'dirty-chai';
import Orchestrator from '../src/orchestrator';
import Module from '../src/module';
import uuid from 'node-uuid';
import logger from '../src/logging/logger';

// TEST SETUP
// =============================================================================
chai.use(dirtyChai);

describe('Module integration', function () {

const o = new Orchestrator({
registerQueue: `register-${uuid.v4()}`,
messagesQueue: `messages-${uuid.v4()}`,
messagesIndex: `index-${uuid.v4()}`,
messagesType: `type-${uuid.v4()}`
});

after(() => {
o.shutdown();
});

it('Should send back messages processed to the Orchestrator', function () {


const m1 = new Module({
service: 'm1',
name: 'm1',
registerQueue: o.registerQueue,
positivePath: '$.uuid',
negativePath: '$.m1Key'
});

const m2 = new Module({
service: 'm2',
name: 'm2',
registerQueue: o.registerQueue,
positivePath: '$.m1Key',
negativePath: '$.m2Key'
});

return o.listen()
.then(() => {
return m1.register();
})
.then(() => {
return m2.register();
})
.then(() => {
return new Promise((resolve) => {
const expectedMessage = {
uuid: uuid.v4()
};

let passedM1 = false;

// Setup listener to proccess received message
m1.on('data', (message) => {
chai.expect(passedM1).to.be.false('Already passed m1');
logger.info(`[${m1.name}] Received message: ${message.toString()}`);
message.m1Key = uuid.v4();
chai.expect(message.uuid).to.be.equals(expectedMessage.uuid, 'not same message');
passedM1 = true;
m1.afterProcess(message);
});

m2.on('data', (message) => {
logger.info(`[${m2.name}] Received message: ${message.toString()}`);
message.m2key = uuid.v4();
chai.expect(message.uuid).to.be.equals(expectedMessage.uuid, 'not same message');
chai.expect(message.m1Key).to.be.ok('m1Key is not set');
chai.expect(passedM1).to.be.true('passed m2 before passing m1');
m2.afterProcess(message);
resolve();
});

// Sent message to tue message queue
const push = o.amqpContext.socket('PUSH');

logger.debug('Sending validation message to ', o.messagesQueue);
push.connect(o.messagesQueue, () => {
push.write(JSON.stringify(expectedMessage));
});
});
});


});
});
14 changes: 6 additions & 8 deletions test/orchestrator.e2e.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import * as chai from 'chai';
import dirtyChai from 'dirty-chai';
import Orchestrator from '../src/orchestrator';
import uuid from 'node-uuid';

import logger from '../src/logging/logger';

// TEST SETUP
// =============================================================================
Expand All @@ -16,7 +16,6 @@ describe('Orchestrator Integration', function () {

it('Should store message on ES', function () {

this.timeout(10000); // eslint-disable-line no-invalid-this

const o = new Orchestrator({
registerQueue: `register-${uuid.v4()}`,
Expand All @@ -34,25 +33,24 @@ describe('Orchestrator Integration', function () {
const pub = o.amqpContext.socket('PUSH');
const message = { uuid: uuid.v4() };

pub.connect(o.messagesQueue);
setTimeout(() => {
pub.connect(o.messagesQueue, () => {
pub.write(JSON.stringify(message));
}, 1000);
});

return new Promise((resolve, reject) => {
return new Promise((resolve) => {
const checkInterval = setInterval(() => {
o.esClient.count({
index: o.messagesIndex,
ignoreUnavailable: true
}, (err, response) => {
if (err) {
reject(err);
logger.warn(`ElasticSearch is returning error ${err}`);
} else if (response.count > 0) {
clearInterval(checkInterval);
resolve(response.count);
}
});
}, 1000);
}, 100);
});
});
});
Expand Down
20 changes: 20 additions & 0 deletions test/orchestrator.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,26 @@ describe('Orchestrator', function () {
});
});

it('Should find a module by positivePath AND negativePath', function () {
// const o = new Orchestrator();

return o.listen().then(() => {
const originalModule = new Module({ service: serviceName });

originalModule.positivePath = '$.positiveKeyName';
originalModule.negativePath = '$.negativeKeyName';

return o.register(originalModule).then(() => {
return o.findMatchingModules({ positiveKeyName: 'value' });
}).then((modules) => {
chai.expect(modules).to.be.a('array');
chai.expect(modules[0])
.to.have.property('uuid')
.that.is.equals(originalModule.uuid);
});
});
});

it('Should NOT find a module by positivePath', function () {
// const o = new Orchestrator();

Expand Down

0 comments on commit b52435b

Please sign in to comment.