Permalink
Browse files

command queue, different fingerprints; bumping minor version again be…

…cause of all the core changes
  • Loading branch information...
1 parent d612840 commit afb00111ac4a142e31d02f31b0dac78651065c09 @cojohn cojohn committed Apr 26, 2012
Showing with 96 additions and 81 deletions.
  1. +1 −0 .gitignore
  2. +94 −80 lib/leeloo.js
  3. +1 −1 package.json
View
@@ -1,2 +1,3 @@
conf/env.json
.DS_Store
+node_modules/
View
@@ -20,6 +20,8 @@ var Leeloo = function() {
this._status = "idle";
this._processing;
this._heartbeats = 0;
+ this.command_queue = [];
+ this.command_idle = true;
this.MAXIMUM_HEARTBEATS_PROCESSING = 3;
events.EventEmitter.call(this);
@@ -50,7 +52,7 @@ Leeloo.prototype._broadcast = function(owner, payload) {
* @param {ObjectID} params.id The identifier of the job to cancel [optional]
* @param {Function} callback `callback(error)` [optional]
*/
-Leeloo.prototype.cancelJob = function(params, callback) {
+Leeloo.prototype.cancelJob = function(owner, params, callback) {
var self = this;
if (params instanceof Function) {
@@ -80,18 +82,20 @@ Leeloo.prototype.cancelJob = function(params, callback) {
self._idle();
}
- upd.o && self._broadcast(upd.o, {"command": "canceledJob", "params": upd});
console.log("[canceled] job " + upd._id.toString() + " `" + reason + "`");
+ self._broadcast(owner, {"command": "canceledJob", "params": upd});
}
else if (self._processing && self._processing._id && self._processing._id.equals(id)) {
// For some reason we can't find the record for the currently processing job
self._idle();
- };
+ }
+
+ return callback && callback();
});
});
}
else {
- callback && callback();
+ return callback && callback();
}
}
@@ -124,18 +128,26 @@ Leeloo.prototype.connect = function(port, host, callback) {
});
self._connection.on("data", function(chunk) {
- try {
data += chunk;
var payloads = data.split("\n\n");
data = "";
-
+
if (payloads.length) {
self._connection.pause();
+
payloads.forEach(function(payload) {
if (payload.slice(-1) === "}") {
- payload = JSON.parse(payload);
- payload.command && self.emit(payload.command, payload.params || {});
+ try {
+ payload = JSON.parse(payload);
+ payload.command && self.emit(payload.command, payload.params || {});
+ }
+ catch (ex) {
+ data = payload;
+ self._connection.resume();
+ ex.payload = payload;
+ self.emit("error", ex);
+ }
}
else if (payload.length) {
data = payload;
@@ -146,16 +158,18 @@ Leeloo.prototype.connect = function(port, host, callback) {
}
});
}
- }
- catch (ex) {
- self.emit("error", ex);
- }
});
self._connection.on("end", function() {
+ console.log("end");
self.emit("disconnected");
});
+ self._connection.on("close", function(had_error) {
+ console.log("closed", had_error);
+ });
+
+
self._connection.on("drain", function() {
});
@@ -172,9 +186,7 @@ Leeloo.prototype.connect = function(port, host, callback) {
* @param {Function} callback `callback(error)` [optional]
*/
Leeloo.prototype.disconnect = function(callback) {
- var self = this;
-
- self._connection && self._connection.end();
+ this._connection && this._connection.end();
callback && callback();
}
@@ -212,7 +224,7 @@ Leeloo.prototype.dispatch = function(command, params, callback) {
* @param {ObjectID} params.id The job to finish
* @param {Function} callback `callback(error)` [optional]
*/
-Leeloo.prototype.finishJob = function(params, callback) {
+Leeloo.prototype.finishJob = function(owner, params, callback) {
var self = this;
if (params.id) {
@@ -222,18 +234,20 @@ Leeloo.prototype.finishJob = function(params, callback) {
collection.findAndModify({"_id": new self._store.bson_serializer.ObjectID(params.id), "s": "p"}, [["id", "asc"]], {"$set": {"s": "f"}}, {"safe": true, "upsert": false}, function(error, upd) {
if (upd) {
console.log("[finished] job " + ((upd && upd._id && upd._id.toString()) || ""));
- upd.o && self._broadcast(upd.o, {"command": "finishedJob", "params": upd || {}});
+ upd.o && self._broadcast(owner, {"command": "finishedJob", "params": upd || {}});
if (upd && upd.r) {
// Reschdule this job for the fuuuuuture
- self.schedule({"when": Date.now() + upd.r, "reschedule": upd.r, "xid": upd.xid, "ext": upd.x || {}, "type": upd.t});
+ self.schedule(owner, {"when": Date.now() + upd.r, "reschedule": upd.r, "xid": upd.xid, "ext": upd.x || {}, "type": upd.t});
}
}
+
+ return callback && callback();
});
});
}
else {
- callback && callback();
+ return callback && callback();
}
}
@@ -279,26 +293,28 @@ Leeloo.prototype.help = function(details, callback) {
Leeloo.prototype._heartbeat = function(check) {
var self = this,
printHeartbeat = function() {
+ var num_conn = Object.keys(self._sockets).length;
switch (self._status) {
case "processing":
if (self._heartbeats >= self.MAXIMUM_HEARTBEATS_PROCESSING) {
- console.log("[heartbeat][" + Object.keys(self._sockets).length + "] processing " + ((self._processing && self._processing._id) || "") + " (" + self._heartbeats + ")");
+ console.log("[heartbeat][" + num_conn + "][" + ((self._processing && self._processing.o) || "anon") + "] processing " + ((self._processing && self._processing._id) || "") + " (" + self._heartbeats + ")");
if (self._processing && self._processing._id) {
- self.cancelJob({"reason": "timed out", "id": self._processing._d});
+ self.cancelJob(self._processing.o, {"reason": "timed out", "id": self._processing._d});
}
else {
self._idle();
}
}
else {
- console.log("[heartbeat][" + Object.keys(self._sockets).length + "] processing " + ((self._processing && self._processing._id) || "") + " (" + self._heartbeats + ")");
+ console.log("[heartbeat][" + num_conn + "][" + ((self._processing && self._processing.o) || "anon") + "] processing " + ((self._processing && self._processing._id) || "") + " (" + self._heartbeats + ")");
self._heartbeats += 1;
}
break;
default:
- console.log("[heartbeat][" + Object.keys(self._sockets).length + "] " + self._status);
- self.processJobs();
+ // Don't process jobs if there are no consumers connected
+ console.log("[heartbeat][" + num_conn + "] " + self._status);
+ num_conn && self.processJobs();
}
};
@@ -321,7 +337,11 @@ Leeloo.prototype._heartbeat = function(check) {
printHeartbeat();
}
- setTimeout(function() { self._heartbeat(); }, self._context && self._context.heartbeat || 60000);
+ if (self.command_idle && self.command_queue.length) {
+ self.emit("processCommand");
+ }
+
+ setTimeout(function() { self._heartbeat(); }, (self._context && self._context.heartbeat) || 60000);
}
@@ -334,6 +354,12 @@ Leeloo.prototype._idle = function() {
}
+Leeloo.prototype.idleCommand = function() {
+ this.command_idle = true;
+ this.emit("processCommand");
+}
+
+
/**
* Opens the MongoDB database.
*
@@ -360,24 +386,6 @@ Leeloo.prototype._openDatabase = function(callback) {
/**
- * Lists jobs.
- *
- * @param {Function} callback `callback(error)` [optional]
- */
-Leeloo.prototype.list = function(params, callback) {
- var self = this;
-
- self._store.collection(self._context.mongodb.collection, function(error, collection) {
- collection.find({"s": "s"}, {}, {}, function(error, cursor) {
- cursor.toArray(function(error, array) {
- callback && callback(error, {"jobs": array});
- });
- });
- });
-}
-
-
-/**
* Processes jobs in the queue.
*
* @param {Function} callback `callback(error)`
@@ -387,7 +395,7 @@ Leeloo.prototype.processJobs = function() {
self._store.collection(self._context.mongodb.collection, function(error, collection) {
// Grab the highest priority and oldest job that's been scheduled before now
- collection.findOne({"s": "s", "w": {"$lt": Date.now()}}, {"sort": {"p": -1, "w": 1}}, function(error, job) {
+ collection.findOne({"s": "s", "w": {"$lt": Date.now()}}, {}, {"sort": {"p": -1, "w": 1}}, function(error, job) {
if (error) {
self.emit("error", error);
}
@@ -414,7 +422,7 @@ Leeloo.prototype.processJobs = function() {
* @param {Object} params.ext Additional external parameters
* @param {Function} callback `callback(error)` [optional]
*/
-Leeloo.prototype.schedule = function(params, callback) {
+Leeloo.prototype.schedule = function(owner, params, callback) {
var self = this,
callback = (params instanceof Function)
? params
@@ -453,11 +461,13 @@ Leeloo.prototype.schedule = function(params, callback) {
collection.find({"xid": job.xid, "t": job.t, "s": "s"}, {"limit": 1}, function(error, cursor) {
if (error) {
console.log(error);
+ callback && callback();
}
else {
cursor.toArray(function(error, prev) {
if (error) {
console.log(error);
+ callback && callback();
}
else if (prev.length) {
if (job.w > prev[0].w) {
@@ -467,16 +477,19 @@ Leeloo.prototype.schedule = function(params, callback) {
{"$set": {"w": job.w}},
{"safe": true, "multi": false, "upsert": false, "new": true},
function(error, upd) {
- callback && callback(error, {"command": "scheduledJob", "params": upd});
+ self._broadcast(owner, {"command": "scheduledJob", "params": upd || {}});
+ callback && callback();
});
}
else {
- callback && callback(error, {"command": "scheduledJob", "params": (prev && prev[0]) || {}});
+ self._broadcast(owner, {"command": "scheduledJob", "params": prev[0]});
+ callback && callback();
}
}
else {
collection.insert(job, {"safe": true}, function(error, documents) {
- callback && callback(error, {"command": "scheduledJob", "params": (documents && documents[0]) || {}});
+ self._broadcast(owner, {"command": "scheduledJob", "params": (documents && documents[0]) || {}});
+ callback && callback();
});
}
});
@@ -507,7 +520,7 @@ Leeloo.prototype.scheduleJobs = function(owner, jobs, callback) {
else {
jobs[i].o = owner;
- self.schedule(jobs[i], function(error, job) {
+ self.schedule(owner, jobs[i], function(error, job) {
(error && scheduled.push({"error": error.message})) || scheduled.push(job);
scheduleJob(i + 1);
});
@@ -572,19 +585,12 @@ Leeloo.prototype.startServer = function(params, callback) {
console.log("[" + socket.remoteAddress + "]" + ((socket.identity && " [" + socket.identity + "] ") || " ") + payload.command + " " + JSON.stringify(payload.params));
- if (payload.command !== "scheduleJobs") {
- payload.command && self[payload.command] && self[payload.command](payload.params, function(error, message) {
- message = message || {};
-
- socket.write(JSON.stringify(message) + "\n\n");
- });
- }
- else {
- self.scheduleJobs(socket.identity, payload.params, function(error, message) {
- message = message || {};
-
- socket.write(JSON.stringify(message) + "\n\n");
- });
+ if (payload.command !== "identifyClient") {
+ self.command_queue.push({
+ "command": payload.command,
+ "owner": socket.identity,
+ "params": payload.params,
+ });
}
}
else if (payload.length) {
@@ -606,53 +612,61 @@ Leeloo.prototype.startServer = function(params, callback) {
// Client finished writing
socket.on("drain", function() {
});
+
+ socket.on("close", function(had_error) {
+ console.log("[" + socket.identity + "] disconnected; closed" + (had_error ? " with error" : ""));
+ });
// Client finished
socket.on("end", function() {
- console.log("[" + socket.remoteAddress + "] disconnected");
+ console.log("[" + socket.identity + "] disconnected");
socket.removeAllListeners();
if (socket.identity) {
delete self._sockets[socket.identity];
}
- console.log(self._sockets);
socket.end();
});
-
- socket.on("close", function() {
- });
}).listen(1997);
self._server.on("error", function(error) {
console.log(error.message);
console.log(error.stack);
callback && callback(error);
});
+
+
+ // Process commands in order as they roll in
+ self.on("processCommand", function() {
+ if (self.command_queue.length) {
+ self.command_idle = false;
+
+ var command = self.command_queue.shift();
+
+ console.log("command " + command.command + " issued (" + self.command_queue.length + ")");
+ self[command.command](command.owner, command.params, function() {
+ console.log("command " + command.command + " finished (" + self.command_queue.length + ")");
+ self.emit("processCommand");
+ });
+ }
+ else {
+ self.command_idle = true;
+ }
+ });
callback && callback();
});
}
-/**
- * Returns Leeloo's status information.
- *
- * @param {Function} callback `callback(error, status)`
- */
-Leeloo.prototype.status = function(params, callback) {
- callback(null, {"status": this._status, "uptime": Date.now() - this._started})
-}
-
/**
* Stops Leeloo. Temporarily.
*
* @param {Function} callback `callback(error)` [optional]
*/
Leeloo.prototype.stopServer = function(params, callback) {
- var self = this;
-
- self._store.close();
- self._server.close();
+ this._store.close();
+ this._server.close();
callback && callback(null, "bye!");
process.exit();
Oops, something went wrong.

0 comments on commit afb0011

Please sign in to comment.