Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit 8cc1a7ca123ac1da72516b5d64e8ed6e0924804f 0 parents
@tj tj authored
4 .gitignore
@@ -0,0 +1,4 @@
+.DS_Store
+node_modules
+*.sock
+testing
4 .npmignore
@@ -0,0 +1,4 @@
+support
+test
+examples
+*.sock
5 History.md
@@ -0,0 +1,5 @@
+
+0.0.1 / 2010-01-03
+==================
+
+ * Initial release
5 Makefile
@@ -0,0 +1,5 @@
+
+test:
+ @echo "populate me"
+
+.PHONY: test
29 Readme.md
@@ -0,0 +1,29 @@
+
+# q
+
+ Job Queue
+
+## License
+
+(The MIT License)
+
+Copyright (c) 2011 TJ Holowaychuk <tj@learnboost.com>
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+'Software'), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2  index.js
@@ -0,0 +1,2 @@
+
+module.exports = require('./lib/q');
242 lib/job.js
@@ -0,0 +1,242 @@
+
+/**
+ * Module dependencies.
+ */
+
+var redis = require('redis')
+ , pool = require('./pool')
+ , noop = function(){};
+
+/**
+ * Expose `Job`.
+ */
+
+exports = module.exports = Job;
+
+/**
+ * Default job priority map.
+ */
+
+var priorities = exports.priorities = {
+ low: 10
+ , normal: 0
+ , medium: -5
+ , high: -10
+ , critical: -15
+};
+
+/**
+ * Get job with `id` and callback `fn(err, job)`.
+ *
+ * @param {Number} id
+ * @param {Function} fn
+ * @api public
+ */
+
+exports.get = function(id, fn){
+ var client = pool.alloc()
+ , job = new Job;
+
+ job.id = id;
+ client.hgetall('q:job:' + job.id, function(err, hash){
+ if (err) return fn(err);
+ job.type = hash.type;
+ job.priority(Number(hash.priority));
+ try {
+ job.data = JSON.parse(hash.data);
+ fn(err, job);
+ } catch (err) {
+ fn(err);
+ }
+ });
+};
+
+/**
+ * Initialize a new `Job` with the given `type` and `data`.
+ *
+ * @param {String} type
+ * @param {Object} data
+ * @api public
+ */
+
+function Job(type, data) {
+ this.type = type;
+ this.data = data || {};
+ this.client = pool.alloc();
+ this.priority('normal');
+}
+
+/**
+ * Set job `key` to `val`.
+ *
+ * @param {String} key
+ * @param {String} val
+ * @param {String} fn
+ * @return {Job} for chaining
+ * @api public
+ */
+
+Job.prototype.set = function(key, val, fn){
+ this.client.hset('q:job:' + this.id, key, val, fn || noop);
+ return this;
+};
+
+/**
+ * Set the job progress by telling the job
+ * how `complete` it is relative to `total`.
+ *
+ * @param {Number} complete
+ * @param {Number} total
+ * @return {Job} for chaining
+ * @api public
+ */
+
+Job.prototype.progress = function(complete, total){
+ this.set('progress', complete / total * 100 | 0);
+ return this;
+};
+
+/**
+ * Set the priority `level`, which is one
+ * of "low", "normal", "medium", and "high", or
+ * a number in the range of -10..10.
+ *
+ * @param {String|Number} level
+ * @return {Job} for chainging
+ * @api public
+ */
+
+Job.prototype.priority = function(level){
+ this._priority = null == priorities[level]
+ ? level
+ : priorities[level];
+ return this;
+};
+
+/**
+ * Fetch attemps, invoking callback `fn(remaining, attempts, max)`.
+ *
+ * @param {Function} fn
+ * @return {Job} for chaining
+ * @api public
+ */
+
+Job.prototype.attempts = function(fn){
+ // TODO: settings ...
+ var max = 5;
+ this.client.incr('q:job:' + this.id + ':attempts', function(err, attempts){
+ fn(err, max - attempts, attempts, max);
+ });
+ return this;
+};
+
+/**
+ * Remove `status`.
+ *
+ * @param {String} status
+ * @return {Job} for chaining
+ * @api public
+ */
+
+Job.prototype.removeStatus = function(status){
+ this.client.zrem('q:jobs:' + status, this.id);
+ this.client.zrem('q:jobs:' + this.type + ':' + status, this.id);
+ return this;
+};
+
+/**
+ * Set state to `status`.
+ *
+ * @param {String} status
+ * @return {Job} for chaining
+ * @api public
+ */
+
+Job.prototype.status = function(status){
+ this.state = status;
+ this.removeStatus('complete');
+ this.removeStatus('failures');
+ this.removeStatus('inactive');
+ this.removeStatus('active');
+ this.set('state', status);
+ this.client.zadd('q:jobs:' + status, this._priority, this.id);
+ this.client.zadd('q:jobs:' + this.type + ':' + status, this._priority, this.id);
+ return this;
+};
+
+/**
+ * Set status to "complete", and progress to 100%.
+ */
+
+Job.prototype.complete = function(){
+ return this.set('progress', 100).status('complete');
+};
+
+/**
+ * Set status to "failed".
+ */
+
+Job.prototype.failed = function(){
+ return this.status('failures');
+};
+
+/**
+ * Set status to "inactive".
+ */
+
+Job.prototype.inactive = function(){
+ return this.status('inactive');
+};
+
+/**
+ * Set status to "active".
+ */
+
+Job.prototype.active = function(){
+ return this.status('active');
+};
+
+/**
+ * Save the job, optionally invoking the callback `fn(err)`.
+ *
+ * - converts the job data to JSON
+ * - increments and assigns a job id
+ * - adds the job's type to the set
+ * - marks the job as "inactive"
+ *
+ * @param {Function} fn
+ * @api private
+ */
+
+Job.prototype.save = function(fn){
+ var client = this.client
+ , fn = fn || noop
+ , self = this
+ , json;
+
+ // serialize json data
+ try {
+ json = JSON.stringify(this.data);
+ } catch (err) {
+ return fn(err);
+ }
+
+ // incr id
+ client.incr('q:ids', function(err, id){
+ if (err) return fn(err);
+ self.id = id;
+
+ // type
+ client.sadd('q:job:types', self.type);
+ self.set('type', self.type);
+
+ // priority
+ self.set('priority', self._priority);
+
+ // push
+ self.inactive();
+
+ // data
+ self.set('data', json, fn);
+ });
+};
35 lib/pool.js
@@ -0,0 +1,35 @@
+
+/**
+ * Module dependencies.
+ */
+
+var redis = require('redis');
+
+/**
+ * Max connections in the pool.
+ */
+
+exports.maxConnections = 5;
+
+/**
+ * Connection pool.
+ */
+
+exports.pool = [];
+
+/**
+ * Allocate a redis connection.
+ *
+ * @return {RedisClient}
+ * @api private
+ */
+
+exports.alloc = function(){
+ var client;
+ if (exports.pool.length == exports.maxConnections) {
+ client = exports.pool[Math.random() * exports.maxConnections | 0];
+ } else {
+ exports.pool.push(client = redis.createClient());
+ }
+ return client;
+};
164 lib/q.js
@@ -0,0 +1,164 @@
+
+/*!
+ * q
+ * Copyright(c) 2011 TJ Holowaychuk <tj@learnboost.com>
+ * MIT Licensed
+ */
+
+/**
+ * Module dependencies.
+ */
+
+var EventEmitter = require('events').EventEmitter
+ , Worker = require('./worker')
+ , Job = require('./job')
+ , redis = require('redis');
+
+/**
+ * Expose `Queue`.
+ */
+
+exports = module.exports = Queue;
+
+/**
+ * Library version.
+ */
+
+exports.version = '0.0.1';
+
+/**
+ * Expose `Job`.
+ */
+
+exports.Job = Job;
+
+/**
+ * Initialize a new job `Queue`.
+ *
+ * @api public
+ */
+
+function Queue() {
+ this.client = redis.createClient();
+}
+
+/**
+ * Inherit from `EventEmitter.prototype`.
+ */
+
+Queue.prototype.__proto__ = EventEmitter.prototype;
+
+/**
+ * Create a `Job` with the given `type` and `data`.
+ *
+ * @param {String} type
+ * @param {Object} data
+ * @return {Job}
+ * @api public
+ */
+
+Queue.prototype.create =
+Queue.prototype.createJob = function(type, data){
+ return new Job(type, data);
+};
+
+/**
+ * Get setting `name` and invoke `fn(err, res)`.
+ *
+ * @param {String} name
+ * @param {Function} fn
+ * @return {Queue} for chaining
+ * @api public
+ */
+
+Queue.prototype.setting = function(name, fn){
+ this.client.hget('q:settings', name, fn);
+ return this;
+};
+
+/**
+ * Process jobs with the given `type`, invoking `fn(job)`.
+ *
+ * @param {String} type
+ * @param {Number|Function} n
+ * @param {Function} fn
+ * @api public
+ */
+
+Queue.prototype.process = function(type, n, fn){
+ var self = this;
+
+ if ('function' == typeof n) fn = n, n = 1;
+
+ while (n--) {
+ (function(worker){
+ worker.on('error', function(err){
+ self.emit('error', err);
+ });
+
+ worker.on('job complete', function(job){
+ self.client.incrby('q:stats:work-time', job.duration);
+ });
+ })(new Worker(this, type).start(fn));
+ }
+};
+
+/**
+ * Get the job types present and callback `fn(err, types)`.
+ *
+ * @param {Function} fn
+ * @return {Queue} for chaining
+ * @api public
+ */
+
+Queue.prototype.types = function(fn){
+ this.client.smembers('q:job:types', fn);
+ return this;
+};
+
+/**
+ * Return job ids for the given `status`, and
+ * callback `fn(err, ids)`.
+ *
+ * @param {String} status
+ * @param {Function} fn
+ * @return {Queue} for chaining
+ * @api public
+ */
+
+Queue.prototype.status = function(status, fn){
+ this.client.zrange('q:jobs:' + status, 0, -1, fn);
+ return this;
+};
+
+/**
+ * Completed jobs.
+ */
+
+Queue.prototype.complete = function(fn){
+ return this.status('complete', fn);
+};
+
+/**
+ * Failed jobs.
+ */
+
+Queue.prototype.failures = function(fn){
+ return this.status('failures', fn);
+};
+
+/**
+ * Inactive jobs (queued).
+ */
+
+Queue.prototype.inactive = function(fn){
+ return this.status('inactive', fn);
+};
+
+/**
+ * Active jobs (mid-process).
+ */
+
+Queue.prototype.active = function(fn){
+ return this.status('active', fn);
+};
164 lib/worker.js
@@ -0,0 +1,164 @@
+
+/**
+ * Module dependencies.
+ */
+
+var EventEmitter = require('events').EventEmitter
+ , redis = require('redis')
+ , Job = require('./job');
+
+/**
+ * Expose `Worker`.
+ */
+
+module.exports = Worker;
+
+/**
+ * Initialize a new `Worker` with the given Queue
+ * targetting jobs of `type`.
+ *
+ * @param {Queue} queue
+ * @param {String} type
+ * @api private
+ */
+
+function Worker(queue, type) {
+ this.queue = queue;
+ this.type = type;
+ this.client = redis.createClient();
+ this.interval = 1000;
+}
+
+/**
+ * Inherit from `EventEmitter.prototype`.
+ */
+
+Worker.prototype.__proto__ = EventEmitter.prototype;
+
+/**
+ * Start processing jobs with the given `fn`,
+ * checking for jobs every second (by default).
+ *
+ * @param {Function} fn
+ * @return {Worker} for chaining
+ * @api private
+ */
+
+Worker.prototype.start = function(fn){
+ var self = this;
+ self.getJob(function(err, job){
+ if (err) self.error(err);
+ if (!job || err) return setTimeout(function(){ self.start(fn); }, self.interval);
+ self.process(job, fn);
+ });
+ return this;
+};
+
+/**
+ * Error handler, emitting "error" on the queue.
+ *
+ * @param {Error} err
+ * @return {Worker} for chaining
+ * @api private
+ */
+
+Worker.prototype.error = function(err){
+ this.queue.emit('error', err);
+ return this;
+};
+
+/**
+ * Process a failed `job`. Set's the job's state
+ * to "failed" unless more attempts remain, in which
+ * case the job is marked as "inactive" and remains
+ * in the queue.
+ *
+ * @param {Function} fn
+ * @return {Worker} for chaining
+ * @api private
+ */
+
+Worker.prototype.failed = function(job, err, fn){
+ var self = this;
+ job.failed();
+ self.error(err);
+ job.attempts(function(err, remaining, attempts, max){
+ if (err) return self.error(err);
+ remaining
+ ? job.inactive()
+ : job.failed();
+ self.start(fn);
+ });
+};
+
+/**
+ * Process `job`, marking it as active,
+ * invoking the given callback `fn(job)`,
+ * if the job fails `Worker#failed()` is invoked,
+ * otherwise the job is marked as "complete".
+ *
+ * @param {Job} job
+ * @param {Function} fn
+ * @return {Worker} for chaining
+ * @api public
+ */
+
+Worker.prototype.process = function(job, fn){
+ var self = this
+ , start = new Date;
+ job.active();
+ fn(job, function(err){
+ if (err) return self.failed(job, err, fn);
+ job.complete();
+ job.set('duration', job.duration = new Date - start);
+ self.emit('job complete', job);
+ self.start(fn);
+ });
+ return this;
+};
+
+/**
+ * Atomic ZPOP implementation.
+ *
+ * @param {String} key
+ * @param {Function} fn
+ * @api private
+ */
+
+Worker.prototype.zpop = function(key, fn){
+ var client = this.client;
+ client.watch(key);
+ client.zrange(key, 0, 0, function(err, ids){
+ if (err) return fn(err);
+ var id = ids.shift();
+
+ if (!id) {
+ client.unwatch();
+ return fn();
+ }
+
+ client
+ .multi()
+ .zrem(key, id)
+ .exec(function(err, res){
+ if (err) return fn(err);
+ if (!res) return fn();
+ fn(null, id);
+ });
+ });
+};
+
+/**
+ * Attempt to fetch the next job.
+ *
+ * @param {Function} fn
+ * @api private
+ */
+
+Worker.prototype.getJob = function(fn){
+ this.zpop('q:jobs:' + this.type + ':inactive', function(err, id){
+ if (err) return fn(err);
+ if (!id) return fn();
+ Job.get(id, fn);
+ });
+};
14 package.json
@@ -0,0 +1,14 @@
+{
+ "name": "q"
+ , "version": "0.0.1"
+ , "description": "Job Queue"
+ , "keywords": ["redis", "job", "queue", "worker"]
+ , "author": "TJ Holowaychuk <tj@learnboost.com>"
+ , "dependencies": {
+ "redis": "0.6.0"
+ , "log": "1.2.0"
+ , "express": "2.3.12"
+ }
+ , "main": "index"
+ , "engines": { "node": "0.4.x" }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.