Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

API revision to Worker constructor

  • Loading branch information...
commit 2b6800ff248ac855dbf586b6aea78b26f103b13a 1 parent cf00cbd
Tim tim-smart authored

Showing 3 changed files with 48 additions and 71 deletions. Show diff stats Hide diff stats

  1. +8 8 README
  2. +29 54 index.js
  3. +11 9 test/worker.test.js
16 README
@@ -38,17 +38,17 @@ queue-worker.js:
38 38 // Require the node-redis-queue
39 39 var client = require('./node-redis-queue');
40 40
41   -// Create a worker.
42   -var worker = client.createWorker('localhost', 6379);
  41 +// Create a worker for each queue.
  42 +var worker = client.createWorker('user', 'localhost', 6379);
43 43
44   -// Setup events for each queue you want to listen to.
45   -worker.on('user', function (job) {
  44 +// Setup message event
  45 +worker.on('message', function (job) {
46 46 // Process message
47 47 process(job.payload);
48   -});
49 48
50   -// Listen for all messages on worker.
51   -worker.on('*', fn);
  49 + // Listen for next job
  50 + worker.next();
  51 +});
52 52
53 53 // Start listening.
54   -worker.listen('user');
  54 +worker.start();
83 index.js
@@ -84,13 +84,16 @@ Queue.prototype.push = function (payload, callback) {
84 84 * Inherits from EventEmitter.
85 85 * @constructor
86 86 */
87   -var Worker = function (host, port) {
  87 +var Worker = function (name, host, port) {
88 88 var self = this;
89 89
90   - this.host = host;
91   - this.port = port;
92   - this.prefix = 'queue:';
93   - this.queues = {};
  90 + this.host = host;
  91 + this.port = port;
  92 + this.prefix = 'queue:';
  93 + this.name = name;
  94 + this.queues = {};
  95 + // TODO: Rename?
  96 + this.continual = false;
94 97
95 98 // Client for use with child jobs.
96 99 this.child_client = redis.createClient(host, port);
@@ -106,12 +109,15 @@ var Worker = function (host, port) {
106 109 return self.emit('error', error);
107 110 }
108 111
109   - var key = removePrefix(self.prefix, data[0].toString()),
110   - data = JSON.parse(Buffer.isBuffer(data[1]) ? data[1].toString() : data),
111   - job = new Job(self, data, key);
  112 + var data = JSON.parse(Buffer.isBuffer(data[1]) ? data[1].toString() : data),
  113 + job = new Job(self, data);
  114 +
  115 + self.emit('message', job);
112 116
113   - self.emit(key, job);
114   - self.emit('*', job);
  117 + if (!self.client.quitting && self.continual) {
  118 + // Listen for more jobs.
  119 + self.client.blpop(self.prefix + self.name, 0, self._onPop);
  120 + }
115 121 };
116 122 };
117 123
@@ -131,57 +137,26 @@ exports.createWorker = function (host, port) {
131 137
132 138 exports.Worker = Worker;
133 139
134   -/*
135   - * Recycle the client.
136   - * @private
  140 +/**
  141 + * Listen for the next job. Only has to be called by user if `continual` is false.
137 142 */
138   -Worker.prototype._recycle = function () {
139   - if (this.client) {
140   - this.client.destroy();
141   - }
142   -
143   - var keys = Object.keys(this.queues);
144   -
145   - if (0 < keys.length) {
146   - var key_string = '';
147   - for (var i = 0, il = keys.length; i < il; i++) {
148   - key_string += this.prefix + keys[i] + ' ';
149   - }
150   - key_string = key_string.slice(0, -1);
151   -
152   - this.client = redis.createClient(this.host, this.port);
153   - this.client.blpop(key_string, 0, this._onPop);
154   - }
155   -
156   - // TODO: Recovery when disconnected. Is it needed? (node-redis might re-send the command for us)
157   -}
  143 +Worker.prototype.next = function () {
  144 + this.client.blpop(this.prefix + this.name, 0, this._onPop);
  145 +};
158 146
159 147 /**
160   - * Tell the worker what queue(s) to listen on.
161   - * We have to create a new redis client for each queue, as we use
162   - * blpop.
  148 + * Start the worker
163 149 */
164   -Worker.prototype.listen = function () {
165   - for (var i = 0, il = arguments.length; i < il; i++) {
166   - this.queues[arguments[i]] = null;
167   - }
168   -
169   - this._recycle();
  150 +Worker.prototype.start = function () {
  151 + this.client = redis.createClient(this.host, this.port);
  152 + this.next();
170 153 };
171 154
172   -/*
173   - * Un-listen to the specified queues
  155 +/**
  156 + * Stop the worker
174 157 */
175   -Worker.prototype.unlisten = function () {
176   - var name;
177   -
178   - for (var i = 0, il = arguments.length; i < il; i++) {
179   - name = arguments[i];
180   - delete this.queues[name];
181   - this.removeAllListeners(name);
182   - }
183   -
184   - this._recycle();
  158 +Worker.prototype.stop = function () {
  159 + this.client.destroy();
185 160 };
186 161
187 162
20 test/worker.test.js
@@ -2,9 +2,11 @@ var client = require('../'),
2 2 assert = require('assert');
3 3
4 4 var q = client.createQueue('worker.test'),
5   - w = client.createWorker(),
  5 + w = client.createWorker('worker.test'),
6 6 job;
7 7
  8 +w.continual = true;
  9 +
8 10 job = new client.Job(w, {
9 11 payload: 'testing',
10 12 id: 2,
@@ -19,7 +21,7 @@ q.push({
19 21
20 22 module.exports = {
21 23 "test Worker events": function (done) {
22   - w.on('worker.test', function (job) {
  24 + w.once('message', function (job) {
23 25 assert.ok(job);
24 26 assert.equal(typeof job.reportError, 'function');
25 27 assert.equal(typeof job.retry, 'function');
@@ -33,11 +35,11 @@ module.exports = {
33 35 done();
34 36 });
35 37
36   - w.listen('worker.test');
  38 + w.start();
37 39 },
38   - "test Worker#unlisten": function () {
39   - w.unlisten('worker.test');
40   - assert.equal(w.listeners('worker.test').length, 0);
  40 + "test Worker#stop": function () {
  41 + w.stop();
  42 + assert.ok(w.client.quitting);
41 43 },
42 44 "test Job#reportError": function () {
43 45 job.reportError(new Error('Bacon was not tasty enough.'));
@@ -46,7 +48,7 @@ module.exports = {
46 48 assert.equal(job.error_count, 1);
47 49 },
48 50 "test Job#retry": function (done) {
49   - w.on('worker.test', function (job) {
  51 + w.on('message', function (job) {
50 52 assert.ok(job);
51 53
52 54 assert.equal(job.id, 2);
@@ -58,7 +60,7 @@ module.exports = {
58 60 done();
59 61 });
60 62
61   - w.listen('worker.test');
  63 + w.start();
62 64
63 65 job.retry(function (error, id) {
64 66 assert.ok(!error);
@@ -71,6 +73,6 @@ module.exports = {
71 73 q.client.quit();
72 74
73 75 w.child_client.quit();
74   - w.client.destroy();
  76 + w.stop();
75 77 }
76 78 }

0 comments on commit 2b6800f

Please sign in to comment.
Something went wrong with that request. Please try again.