Permalink
Browse files

[Completed #10218395] Add a redundancy measure to ensure no loss of d…

…ata.
  • Loading branch information...
1 parent 42bcbe2 commit bd8fac56fe4297745c07736fc158562652d13cb8 @tim-smart tim-smart committed Feb 20, 2011
Showing with 51 additions and 15 deletions.
  1. +46 −11 index.js
  2. +5 −4 test/worker.test.js
View
@@ -97,21 +97,26 @@ var Worker = function (options) {
// Call parent
events.EventEmitter.call(this);
- this.host = options.host;
- this.port = options.port;
- this.auth = options.auth;
- this.prefix = options.prefix || '';
- this.name = options.name;
- this.queues = {};
+ this.host = options.host;
+ this.port = options.port;
+ this.auth = options.auth;
+ this.prefix = options.prefix || '';
+ this.name = options.name;
+ this.queues = {};
+ this._current_id = null;
// TODO: Rename?
- this.continual = false;
+ this.continual = false;
// Client for use with child jobs.
this._child_client = redis.createClient(this.port, this.host, this.auth);
- this._child_client.on('error', function (error) {
- self.emit('error', error);
- });
+ this._onError = function (error) {
+ if (error) {
+ self.emit('error', error);
+ }
+ }
+
+ this._child_client.on('error', this._onError);
/**
* Callback for blpop responses.
@@ -129,8 +134,19 @@ var Worker = function (options) {
data = JSON.parse(Buffer.isBuffer(data[1]) ? data[1].toString() : data[1]);
var job = new Job(self, data);
+ if (!self.continual) {
+ self._current_id = job.id;
+ try {
+ self.client.hset(self.prefix + 'pending:' + self.name, job.id, JSON.stringify(job), self._onError);
+ } catch (error) {
+ self._onError(error);
+ }
+ }
+
self.emit('message', job);
- } catch (json_error) {}
+ } catch (json_error) {
+ self._onError(json_error);
+ }
if (!self.client.quitting && self.continual) {
// Listen for more jobs.
@@ -158,6 +174,10 @@ exports.Worker = Worker;
* Listen for the next job. Only has to be called by user if `continual` is false.
*/
Worker.prototype.next = function () {
+ if (this._current_id) {
+ this.client.hdel(this.prefix + 'pending:' + this.name, this._current_id, this._onError);
+ }
+
this.client.blpop(this.prefix + 'queue:' + this.name, 0, this._onPop);
};
@@ -235,3 +255,18 @@ Job.prototype.retry = function (callback) {
if (callback) callback(null, self.id);
});
};
+
+/**
+ * For JSON.stringify
+ *
+ * @return {Object}
+ */
+Job.prototype.toJSON = function () {
+ return {
+ id: this.id,
+ payload: this.payload,
+ error_count: this.error_count,
+ errors: this.errors,
+ modified: this.modified
+ };
+};
View
@@ -49,6 +49,8 @@ module.exports = {
},
"test Job#retry": function (done) {
w.on('message', function (job) {
+ w.next();
+
assert.ok(job);
assert.equal(job.id, 2);
@@ -60,6 +62,7 @@ module.exports = {
done();
});
+ w.continual = false;
w.start();
job.retry(function (error, id) {
@@ -70,9 +73,7 @@ module.exports = {
after: function () {
q.client.del('queue:worker.test');
q.client.del('id:worker.test');
- q.client.quit();
- w._child_client.quit();
- w.stop();
+ setTimeout(process.exit, 100);
}
-}
+};

0 comments on commit bd8fac5

Please sign in to comment.