Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Clean up a few bugs and worker test

  • Loading branch information...
commit ae8c89609f69550d319465300b4acd60643d057e 1 parent e69d477
@tim-smart tim-smart authored
Showing with 35 additions and 18 deletions.
  1. +34 −17 index.js
  2. +1 −1  test/worker.test.js
View
51 index.js
@@ -2,7 +2,8 @@
* Requires.
*/
var redis = require('./deps/node-redis'),
- events = require('events');
+ events = require('events'),
+ util = require('util');
/**
* Handle an error appropiatly.
@@ -48,6 +49,9 @@ var Queue = function (name, host, port) {
});
};
+// Inherits from EventEmitter.
+util.inherits(Queue, events.EventEmitter);
+
/**
* Creates a new Queue object.
*
@@ -99,10 +103,16 @@ Queue.prototype.push = function (payload, callback) {
* Inherits from EventEmitter.
*
* @constructor
+ * @param {String} name The queue name.
+ * @param {String} host The host name for the Redis client.
+ * @param {Number} port The port number for the Redis client.
*/
var Worker = function (name, host, port) {
var self = this;
+ // Call parent
+ events.EventEmitter.call(this);
+
this.host = host;
this.port = port;
this.prefix = 'queue:';
@@ -111,12 +121,6 @@ var Worker = function (name, host, port) {
// TODO: Rename?
this.continual = false;
- // Client for use with child jobs.
- this.child_client = redis.createClient(host, port);
- this.child_client.on('error', function (error) {
- self.emit('error', error);
- });
-
/*
* Callback for blpop responses.
*
@@ -142,18 +146,18 @@ var Worker = function (name, host, port) {
};
// Inherits from EventEmitter.
-Worker.prototype = Object.create(events.EventEmitter.prototype);
-Worker.prototype.constructor = Worker;
+util.inherits(Worker, events.EventEmitter);
/**
* Creates a new Worker object.
*
+ * @param {String} name The queue name.
* @param {String} host The host name for the Redis client.
* @param {Number} port The port number for the Redis client.
* @returns {Worker}
*/
-exports.createWorker = function (host, port) {
- return new Worker(host, port);
+exports.createWorker = function (name, host, port) {
+ return new Worker(name, host, port);
};
exports.Worker = Worker;
@@ -175,6 +179,19 @@ Worker.prototype.start = function () {
this.client.on('error', function (error) {
self.emit('error', error);
});
+
+ if (this.continual) {
+ if (!this._child_client) {
+ // Client for use with child jobs.
+ this._child_client = redis.createClient(this.host, this.port);
+ this._child_client.on('error', function (error) {
+ self.emit('error', error);
+ });
+ }
+ } else {
+ this._child_client = this.client;
+ }
+
this.next();
};
@@ -190,20 +207,20 @@ Worker.prototype.stop = function () {
* Job prototype used by the workers.
*
* @constructor
- * @param {Worker} parent Parent prototype
+ * @param {Worker} worker Parent prototype
* @param payload The data to set as the payload.
*/
-var Job = function (parent, data, queue) {
+var Job = function (worker, data) {
this.payload = data.payload;
this.id = data.id;
this.error_count = data.error_count;
this.errors = data.errors;
this.modified = data.modified;
- this.queue = queue;
- this.prefix = parent.prefix;
+ this.queue = worker.name;
+ this.prefix = worker.prefix;
// Associate with a redis client.
- this.client = parent.child_client;
+ this.parent = worker;
};
exports.Job = Job;
@@ -226,7 +243,7 @@ Job.prototype.reportError = function (error) {
Job.prototype.retry = function (callback) {
var self = this;
- this.client.rpush(this.prefix + this.queue, JSON.stringify({
+ this.parent._child_client.rpush(this.prefix + this.queue, JSON.stringify({
id: this.id,
payload: this.payload,
error_count: this.error_count,
View
2  test/worker.test.js
@@ -72,7 +72,7 @@ module.exports = {
q.client.del('id:worker.test');
q.client.quit();
- w.child_client.quit();
+ w._child_client.quit();
w.stop();
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.