Skip to content

Commit

Permalink
Merge pull request #1 from KurtPattyn/addStartMethod
Browse files Browse the repository at this point in the history
Add start method to worker
  • Loading branch information
KurtPattyn committed Sep 25, 2015
2 parents 4a315a1 + cbfa1a8 commit 6e8760a
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 11 deletions.
2 changes: 2 additions & 0 deletions benchmarks/krewtest.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,5 @@ worker.on("ready", function() {
worker.on("error", function(err) {
console.error("Worker encountered error:", err);
});

worker.start();
2 changes: 2 additions & 0 deletions examples/krewtest.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ worker.on("ready", function() {
worker.on("error", function(err) {
console.error("Worker encountered error:", err);
});

worker.start();
49 changes: 40 additions & 9 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var Transport = require("kimbu").Transport;
var Client = require("kimbu").Client;
var util = require("util");
var assert = require("assert");
var Promise = require("bluebird");

/**
* Emitted when the worker is ready for sending events. The event takes no arguments.
Expand Down Expand Up @@ -92,24 +93,21 @@ function Worker(workerName, transport, messages) {

var self = this;

var deferred = Promise.pending();

this._client = new Client(workerName, transport, function(err) {
/* istanbul ignore if */
if (err) {
self.emit("error", err);
deferred.reject(err);
} else {
self._messages = messages;
self._registerMessages(messages);
self._client.start(function(err) {
/* istanbul ignore if */
if (err) {
self.emit("error", err);
} else {
self.emit("ready");
}
});
deferred.resolve();
}
});

this._connectionPromise = deferred.promise;

//TODO: catch messagebus disconnected event
}
util.inherits(Worker, EventEmitter);
Expand Down Expand Up @@ -180,6 +178,39 @@ Worker.prototype.publish = function(event, parameters, options, callback) {
this._client.publish(event, parameters, options, callback);
};

/**
* Starts the worker. Once started the worker will receive messages and will be able to
* send messages itself.
* Emits `ready` when started successfully or `error` when an error occurred.
*
* @param {Function} [callback] - Optional callback called when stop() has finished.
*
* @emits ready
* @emits error
* @public
*/
Worker.prototype.start = function(callback) {
assert(util.isNullOrUndefined(callback) | util.isFunction(callback));
var self = this;

this._connectionPromise.then(function() {
self._client.start(function(err) {
/* istanbul ignore if */
if (err) {
self.emit("error", err);
} else {
self.emit("ready");
}
if (callback) {
setImmediate(callback.bind(self, err));
}
});
}, /* istanbul ignore next */
function(err) {
self.emit("error");
});
};

/**
* Stops the worker. Once stopped the worker will not receive messages nor will be able to
* send messages itself.
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"node >=0.11.0"
],
"dependencies": {
"bluebird": "^2.10.1",
"kimbu": "0.x"
},
"devDependencies": {
Expand Down
62 changes: 60 additions & 2 deletions test/worker.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,49 @@ describe("Worker", function() {
rmq.disconnect();
done();
});
});

describe(".start", function() {
it("should not throw an error when no callback is provided", function(done) {
var rmq = new (Transport.providers.RabbitMQTransport)();
var w1 = new Worker("someworker", rmq, {
"msg": function(parameters, next) { next(); }
});

w1.on("ready", function() {
w1.stop();
done();
});
assert.doesNotThrow(function() {
w1.start();
}, "Should not throw any exception");
});

it("should throw an error when an invalid callback is provided", function(done) {
var rmq = new (Transport.providers.RabbitMQTransport)();
var w1 = new Worker("someworker", rmq, {
"msg": function(parameters, next) { next(); }
});

assert.throws(function() {
w1.start("invalid callback");
}, /AssertionError/,
"Should throw an AssertionError");

done();
});

it("should call the callback when started or on error", function(done) {
var rmq = new (Transport.providers.RabbitMQTransport)();
var w1 = new Worker("someworker", rmq, {
"msg": function(parameters, next) { next(); }
});

w1.start(function(err) {
w1.stop();
done();
});
});

it("should emit ready or error", function (done) {
var rmq = new (Transport.providers.RabbitMQTransport)();
Expand All @@ -79,6 +122,7 @@ describe("Worker", function() {
rmq.disconnect();
done();
});
w.start();
});

it("should deliver the registered message names to the given methods", function (done) {
Expand All @@ -99,6 +143,7 @@ describe("Worker", function() {
assert(util.isNullOrUndefined(err));
});
});
w.start();
});
});

Expand All @@ -123,6 +168,8 @@ describe("Worker", function() {
w1.on("error", function(err) {
console.error("Error occurred", err);
});

w1.start();
});

it("should publish the event to all registered methods", function(done) {
Expand Down Expand Up @@ -150,6 +197,9 @@ describe("Worker", function() {
assert(util.isNullOrUndefined(err));
});
});

w2.start();
w1.start();
});
});

Expand All @@ -175,6 +225,8 @@ describe("Worker", function() {
w1.on("error", function(err) {
console.error("Error occurred", err);
});

w1.start();
});

it("should publish the command to all of the different workers", function(done) {
Expand Down Expand Up @@ -206,6 +258,9 @@ describe("Worker", function() {
}, 0);
});
});
w2.start(function(err) {
w1.start();
});
});

it("should publish the command in a round robin way to a cluster of the same workers", function(done) {
Expand Down Expand Up @@ -242,15 +297,18 @@ describe("Worker", function() {
assert.equal(reply, "another msg body!");

setTimeout(function () {
assert.equal(numMessagesReceivedByWorker1, 1);
assert.equal(numMessagesReceivedByWorker2, 1);
assert.equal(numMessagesReceivedByWorker1, 1, "Worker1 should receive exactly 1 message");
assert.equal(numMessagesReceivedByWorker2, 1, "Worker2 should receive exactly 1 message");
w1.stop();
w2.stop();
done();
}, 0);
});
});
});
w2.start(function(err) {
w1.start();
});
});
});
});

0 comments on commit 6e8760a

Please sign in to comment.