Skip to content
Browse files

changed directory structure

  • Loading branch information...
1 parent c4a367b commit 649852bfe7dd6e89094a2be760a54e6b9a89de92 andris9 committed Jun 26, 2011
View
1 .gitignore
@@ -0,0 +1 @@
+node_modules
View
12 README.md
@@ -12,8 +12,7 @@
Tests are run with *nodeunit*
- npm install nodeunit -g
- nodeunit tests.js
+ ./run_tests.sh
Tests expect a Gearman daemon running on port 7003
@@ -192,3 +191,12 @@ Worker Job object has the following methods
}
job.complete(payload * payload);
});
+
+### Detect connection errors
+
+When the connection is lost a "disconnect" event is emitted to the client/worker
+
+ worker.addServer("gearman.lan");
+ worker.on("disconnect", function(server){
+ console.log("Connection lost from "+server_name);
+ });
View
2 client.js → examples/client.js
@@ -1,4 +1,4 @@
-var Gearman = require("./gearnode");
+var Gearman = require("../lib/gearnode");
client = new Gearman();
client.addServer("localhost", 7003);
View
2 worker.js → examples/worker.js
@@ -1,4 +1,4 @@
-var Gearman = require("./gearnode");
+var Gearman = require("../lib/gearnode");
String.prototype.reverse = function(){
splitext = this.split("");
View
16 gearman-connection.js → lib/gearman-connection.js
@@ -292,6 +292,16 @@ GearmanConnection.prototype.close = function(){
}
this.connected = false;
this.connecting = false;
+
+ // kill all pending jobs
+ var handles = Object.keys(this.queued_jobs), original;
+
+ for(var i=0, len = handles.length; i<len; i++){
+ original = this.queued_jobs[handles[i]] || {};
+ delete this.queued_jobs[handles[i]];
+ this.emit("fail", handles[i], original.options);
+ }
+
this.emit("disconnect");
}
};
@@ -457,8 +467,9 @@ GearmanConnection.prototype.receive = function(chunk){
// TODO: here should be some kind of mechanism to recover sync
for(var i=0; i<4; i++){
if(magicRES[i] != buf[i]){
- console.log("WARNING: out of sync!");
- break;
+ console.log("ERROR: out of sync!");
+ this.close();
+ return;
}
}
@@ -608,6 +619,7 @@ GearmanConnection.prototype.handler_WORK_COMPLETE = function(command, handle, re
};
GearmanConnection.prototype.handler_WORK_EXCEPTION = function(command, handle, error){
+ console.log(arguments)
var original = this.queued_jobs[handle] || {};
this.emit("exception", handle, error, original.options);
};
View
4 gearnode.js → lib/gearnode.js
@@ -81,9 +81,7 @@ Gearman.prototype.addServer = function(server_name, server_port){
}).bind(this));
this.servers[server_name].connection.on("disconnect", (function(){
- if(options && options.job){
- options.job.emit("disconnect", server_name);
- }
+ this.emit("disconnect", server_name);
}).bind(this));
this.update(server_name);
View
0 tools.js → lib/tools.js
File renamed without changes.
View
6 package.json
@@ -14,13 +14,17 @@
"type" : "git",
"url" : "http://github.com/andris9/nodemailer.git"
},
- "main" : "./gearnode",
+ "main" : "./lib/gearnode",
"licenses" : [
{
"type": "MIT",
"url": "http://github.com/andris9/gearnode/blob/master/LICENSE"
}
],
+ "dependencies": {
+ "optimist":"*",
+ "nodeunit":"*"
+ },
"engine": [ "node >=0.3.0" ],
"keywords": ["gearman", "worker", "message queue"]
}
View
8 run_tests.sh
@@ -0,0 +1,8 @@
+#!/usr/local/bin/node
+
+require.paths.push(__dirname);
+require.paths.push(__dirname + '/lib');
+var testrunner = require('nodeunit').testrunner;
+
+process.chdir(__dirname);
+testrunner.run(['test']);
View
121 tests.js → test/tests.js
@@ -1,5 +1,5 @@
-var Gearnode = require("./gearnode"),
- GearmanConnection = require("./gearman-connection"),
+var Gearnode = require("../lib/gearnode"),
+ GearmanConnection = require("../lib/gearman-connection"),
testCase = require('nodeunit').testCase;
@@ -189,38 +189,44 @@ module.exports["worker behavior"] = testCase({
this.client = new Gearnode();
this.client.addServer("localhost",7003);
- this.worker.addFunction("upper", function(payload, job){
+ this.worker.addFunction("testjob_upper", function(payload, job){
job.complete(payload.toString("utf-8").toUpperCase());
});
- this.worker.addFunction("upper_utf8","utf-8", function(payload, job){
+ this.worker.addFunction("testjob_upper_utf8","utf-8", function(payload, job){
job.complete(payload.toUpperCase());
});
- this.worker.addFunction("upper_base64","base64", function(payload, job){
+ this.worker.addFunction("testjob_upper_base64","base64", function(payload, job){
job.complete(new Buffer(payload, "base64").toString("utf-8").toUpperCase());
});
- this.worker.addFunction("getexception",function(payload, job){
+ this.worker.addFunction("testjob_getexception",function(payload, job){
job.error(new Error("Error happened"));
});
- this.worker.addFunction("partial",function(payload, job){
+ this.worker.addFunction("testjob_partial",function(payload, job){
for(var i=0; i<4; i++){
job.data("data" + i);
}
job.complete("ready");
});
- this.worker.addFunction("getwarning",function(payload, job){
+ this.worker.addFunction("testjob_getwarning",function(payload, job){
job.warning("foo");
job.complete("bar");
});
- this.worker.addFunction("getfail",function(payload, job){
+ this.worker.addFunction("testjob_getfail",function(payload, job){
job.fail();
});
+ this.worker.addFunction("testjob_disconnect",function(payload, job){
+ setTimeout(function(){
+ job.complete("bar");
+ },1000);
+ });
+
callback();
},
@@ -233,7 +239,7 @@ module.exports["worker behavior"] = testCase({
test.expect(1);
- var job = this.client.submitJob("upper","test");
+ var job = this.client.submitJob("testjob_upper","test");
job.on("complete", function(data){
test.equal(data.toString("utf-8"), "TEST", "Function success");
@@ -254,7 +260,7 @@ module.exports["worker behavior"] = testCase({
test.expect(1);
- var job = this.client.submitJob("upper_utf8","test");
+ var job = this.client.submitJob("testjob_upper_utf8","test");
job.on("complete", function(data){
test.equal(data.toString("utf-8"), "TEST", "Function success");
@@ -275,7 +281,7 @@ module.exports["worker behavior"] = testCase({
test.expect(1);
- var job = this.client.submitJob("upper_base64","test");
+ var job = this.client.submitJob("testjob_upper_base64","test");
job.on("complete", function(data){
test.equal(data.toString("utf-8"), "TEST", "Function success");
@@ -296,7 +302,7 @@ module.exports["worker behavior"] = testCase({
test.expect(1);
- var job = this.client.submitJob("upper","test", {encoding:"utf-8"});
+ var job = this.client.submitJob("testjob_upper","test", {encoding:"utf-8"});
job.on("complete", function(data){
test.equal(data, "TEST", "Function success");
@@ -317,7 +323,7 @@ module.exports["worker behavior"] = testCase({
test.expect(1);
- var job = this.client.submitJob("upper","test", {encoding:"base64"});
+ var job = this.client.submitJob("testjob_upper","test", {encoding:"base64"});
job.on("complete", function(data){
test.equal(data, new Buffer("TEST","utf-8").toString("base64"), "Function success");
@@ -335,35 +341,18 @@ module.exports["worker behavior"] = testCase({
},
"get exceptions": function(test){
- test.expect(2);
+ test.expect(1);
this.client.getExceptions((function(err, success){
test.ok(success,"Listening for exceptions");
-
- var job = this.client.submitJob("getexception","test");
-
- job.on("complete", function(data){
- test.ok("false","No exceptions");
- test.done();
- });
-
- job.on("fail", function(){
- test.ok("false","No exceptions");
- test.done();
- });
-
- job.on("error", function(){
- test.ok(true, "Function failed with error");
- test.done();
- });
-
+ test.done();
}).bind(this));
},
"fail": function (test) {
test.expect(1);
- var job = this.client.submitJob("getfail","test", {encoding:"utf-8"});
+ var job = this.client.submitJob("testjob_getfail","test", {encoding:"utf-8"});
job.on("complete", function(data){
test.ok(false, "Should not complete");
@@ -384,7 +373,7 @@ module.exports["worker behavior"] = testCase({
"partial data": function(test){
test.expect(5);
- var job = this.client.submitJob("partial", "test", {encoding:"utf-8"}),
+ var job = this.client.submitJob("testjob_partial", "test", {encoding:"utf-8"}),
i = 0;
job.on("complete", function(data){
@@ -411,7 +400,7 @@ module.exports["worker behavior"] = testCase({
test.expect(2);
- var job = this.client.submitJob("getwarning","test", {encoding:"utf-8"});
+ var job = this.client.submitJob("testjob_getwarning","test", {encoding:"utf-8"});
job.on("complete", function(data){
test.equal(data, "bar", "Completed");
@@ -431,8 +420,62 @@ module.exports["worker behavior"] = testCase({
test.ok(false, "Function failed with error");
test.done();
});
+ },
+
+ "disconnect server": function (test) {
+
+ test.expect(1);
+
+ var job = this.client.submitJob("testjob_disconnect","test");
+
+ job.on("complete", function(data){
+ test.ok(false, "Should not complete")
+ test.done();
+ });
+
+ job.on("fail", function(){
+ test.ok(true, "Function failed");
+ test.done();
+ });
+
+ job.on("error", function(){
+ test.ok(false, "Function failed with error");
+ test.done();
+ });
+
+ setTimeout((function(){
+ this.client.servers[this.client.server_names[this.client.server_names.length-1]].connection.close();
+ }).bind(this), 100);
+ },
+
+ "disconnect event": function (test) {
+
+ test.expect(2);
+
+ var job = this.client.submitJob("testjob_disconnect","test");
+
+ this.client.on("disconnect", function(server_name){
+ test.equal(server_name, "localhost", "Server disconnected");
+ test.done();
+ });
+
+ job.on("complete", function(data){
+ test.ok(false, "Should not complete")
+ test.done();
+ });
+
+ job.on("fail", function(){
+ test.ok(true, "Function failed");
+ //test.done();
+ });
+
+ job.on("error", function(){
+ test.ok(false, "Function failed with error");
+ test.done();
+ });
+
+ setTimeout((function(){
+ this.client.servers[this.client.server_names[this.client.server_names.length-1]].connection.close();
+ }).bind(this), 100);
}
});
-
-
-

0 comments on commit 649852b

Please sign in to comment.
Something went wrong with that request. Please try again.