Permalink
Browse files

[feature] push back same xid/type for #5

[feature] better error handling for #3
[docs] version bump for `scheduleJobs` and modified `schedule`
  • Loading branch information...
1 parent e2fbd68 commit 5a2d465505f25511c4df1b71f0aeab274dbcb19b @cojohn cojohn committed Feb 29, 2012
Showing with 64 additions and 26 deletions.
  1. +62 −24 lib/leeloo.js
  2. +1 −1 package.json
  3. +1 −1 test/client.js
View
@@ -28,6 +28,21 @@ util.inherits(Leeloo, events.EventEmitter);
/**
+ * Broadcasts a payload to all listening clients.
+ *
+ * @param {Object} payload The message we're broadcasting
+ * @param {Function} callback `callback(error)` [optional]
+ * @api private
+ */
+Leeloo.prototype._broadcast = function(payload, callback) {
+ var self = this;
+
+ Object.keys(self._sockets).forEach(function(key) {
+ self._sockets[key].write(JSON.stringify(payload));
+ });
+}
+
+/**
* Cancels a job.
*
* @param {String} reason The reason the job is being canceled
@@ -52,9 +67,7 @@ Leeloo.prototype.cancelJob = function(reason, id, callback) {
self._processing = {};
self._processing.heartbeats = 0;
- Object.keys(self._sockets).forEach(function(key) {
- self._sockets[key].write(JSON.stringify({"command": "canceledJob", "params": upd}));
- });
+ self._broadcast({"command": "canceledJob", "params": upd});
});
});
}
@@ -160,10 +173,7 @@ Leeloo.prototype.finishJob = function(params, callback) {
self._status = "idle";
self._processing = {};
self._processing.heartbeats = {};
-
- Object.keys(self._sockets).forEach(function(key) {
- self._sockets[key].write(JSON.stringify({"command": "finished", "params": upd}));
- });
+ self._broadcast({"command": "finished", "params": upd});
}
});
});
@@ -239,10 +249,7 @@ Leeloo.prototype._heartbeat = function(check) {
self._status = "processing";
self._processing = job_in_progress;
self._processing.heartbeats = 0;
-
- Object.keys(self._sockets).forEach(function(key) {
- self._sockets[key].write(JSON.stringify({"command": "processJob", "params": job}));
- });
+ self._broadcast({"command": "processJob", "params": job_in_progress});
}
printHeartbeat();
@@ -322,10 +329,7 @@ Leeloo.prototype.processJobs = function() {
self._status = "processing";
self._processing = upd;
self._processing.heartbeats = 0;
-
- Object.keys(self._sockets).forEach(function(key) {
- self._sockets[key].write(JSON.stringify({"command": "processJob", "params": upd}));
- });
+ self._broadcast({"command": "processJob", "params": upd});
});
}
});
@@ -360,15 +364,51 @@ Leeloo.prototype.schedule = function(params, callback) {
// Reschedule on finish if > 0
job.r = params.reschedule || 0;
// External id
- job.x = params.ext || params.x || {};
+ job.xid = params.xid;
+ // Additional external parameters
+ job.x = params.ext || {};
// Current job status
job.s = "s";
- self._store.collection(self._context.mongodb.collection, function(error, collection) {
- collection.insert(job, {"safe": true}, function(error, documents) {
- callback && callback(error, {"job": documents[0] && documents[0]._id});
+ if (job.xid) {
+ self._store.collection(self._context.mongodb.collection, function(error, collection) {
+ collection.find({"xid": job.xid, "t": job.t, "s": "s"}, {"limit": 1}, function(error, cursor) {
+ if (error) {
+ console.log(error);
+ }
+ else {
+ cursor.toArray(function(error, prev) {
+ if (error) {
+ console.log(error);
+ }
+ else if (prev.length) {
+ if (job.w > prev[0].w) {
+ collection.findAndModify(
+ {"_id": prev[0]._id},
+ [["_id", "asc"]],
+ {"$set": {"w": job.w}},
+ {"safe": true, "multi": false, "upsert": false, "new": true},
+ function(error, upd) {
+ callback && callback(error, {"job": upd && upd._id});
+ });
+ }
+ else {
+ callback && callback(error, {"job": prev[0]._id});
+ }
+ }
+ else {
+ collection.insert(job, {"safe": true}, function(error, documents) {
+ callback && callback(error, {"job": documents[0] && documents[0]._id});
+ });
+ }
+ });
+ }
+ });
});
- });
+ }
+ else {
+ callback && callback(new Error("`xid` is a required parameter"));
+ }
}
@@ -384,13 +424,11 @@ Leeloo.prototype.scheduleJobs = function(jobs, callback) {
scheduled = [],
scheduleJob = function(i) {
if (i > j - 1) {
- Object.keys(self._sockets).forEach(function(key) {
- self._sockets[key].write(JSON.stringify({"command": "scheduledJobs", "params": scheduled}));
- });
+ self._broadcast({"command": "scheduledJobs", "params": scheduled});
}
else {
self.schedule(jobs[i], function(error, job) {
- scheduled.push(job);
+ (error && scheduled.push({"error": error.message})) || scheduled.push(job);
scheduleJob(i + 1);
});
}
View
@@ -5,7 +5,7 @@
],
"name": "leeloo",
"description": "Tip or Skip job queue server.",
- "version": "0.0.6",
+ "version": "0.1.0",
"repository": {
"type": "git",
"url": "git://github.com/tiporskip/leeloo.git"
View
@@ -2,7 +2,7 @@ var leeloo = require("../lib/leeloo");
leeloo.connect(1997, function(error) {
- leeloo.dispatch("scheduleJobs", [{"w": Date.now() + 10000},{"w": Date.now() + 20000},{"w": Date.now() + 30000}]);
+ leeloo.dispatch("scheduleJobs", [{"w": Date.now() + 10000},{"w": Date.now() + 20000, "xid": 2},{"w": Date.now() + 30000}]);
leeloo.on("scheduledJobs", function(jobs) {
console.log(jobs);

0 comments on commit 5a2d465

Please sign in to comment.