Permalink
Browse files

Added keepalive for socket, added timeout for jobs, created some time…

…out related tests, added travis support
  • Loading branch information...
1 parent e1d8b0a commit f7ca7fba679d2acbd6fc54bd214355753f419f47 Andris Reinman committed Jan 30, 2012
Showing with 211 additions and 17 deletions.
  1. +1 −0 .gitignore
  2. +15 −0 .travis.yml
  3. +45 −2 README.md
  4. +9 −0 examples/client.js
  5. +38 −11 lib/gearman.js
  6. +7 −1 package.json
  7. +2 −0 run_tests.js
  8. +94 −3 test/test.js
View
@@ -0,0 +1 @@
+node_modules
View
@@ -0,0 +1,15 @@
+language: node_js
+node_js:
+ - 0.4
+ - 0.6
+ - 0.7
+
+before_install:
+ - sudo apt-get install gearman
+
+notifications:
+ email:
+ recipients:
+ - andris@node.ee
+ on_success: change
+ on_failure: change
View
@@ -2,6 +2,10 @@
**node-gearman** is an extremely simple Gearman client/worker module for Node.JS. You can register workers and you can submit jobs, that's all about it.
+[![Build Status](https://secure.travis-ci.org/andris9/node-gearman.png)](http://travis-ci.org/andris9/node-gearman)
+
+**NB!** Breaking API - `'connected'` events etc are now called '`connect`'.
+
## Installation
Install through *npm*
@@ -30,15 +34,15 @@ This doesn't actually create the connection yet. Connection is created when need
The following events can be listened for a `Gearman` object:
- * **connected** - when the connection has been successfully established to the server
+ * **connect** - when the connection has been successfully established to the server
* **idle** - when a there's no jobs available for workers
* **close** - connection closed
* **error** - an error occured. Connection is automatically closed.
Example:
var gearman = new Gearman(hostname, port);
- gearman.on("connected", function(){
+ gearman.on("connect", function(){
console.log("Connected to the server!");
});
gearman.connect();
@@ -91,6 +95,37 @@ Example:
worker.end(reversed);
});
+## Job timeout
+
+You can set an optional timeout value (in milliseconds) for a job to abort it automatically when the timeout occurs.
+
+Timeout automatically aborts further processing of the job.
+
+ job.setTimeout(timeout[, timeoutCallback]);
+
+If `timeoutCallback` is not set, a `'timeout'` event is emitted on timeout.
+
+ job.setTimeout(10*1000); // timeout in 10 secs
+ job.on("timeout", function(){
+ console.log("Timeout exceeded for the worker. Job aborted.");
+ });
+
+## Close connection
+
+You can close the Geamrna connection with `close()`
+
+ var gearman = new Gearman();
+ ...
+ gearman.close();
+
+The connection is closed when a `'close'` event for the Gearman object is emitted
+
+ gearman.on("close", function(){
+ console.log("Connection closed");
+ });
+
+ gearman.close();
+
## Streaming
Worker and job objects also act as Stream objects (workers are writable and jobs readable streams), so you can stream data with `pipe` from a worker to a client (but not the other way round). This is useful for zipping/unzipping etc.
@@ -113,7 +148,15 @@ Worker and job objects also act as Stream objects (workers are writable and jobs
// save incoming stream to file
job.pipe(output);
+## Run tests
+
+Run the tests with
+
+ npm test
+
+or alternatively
+ node run_tests.js
## License
**MIT**
View
@@ -3,8 +3,16 @@ var Gearman = require("../lib/gearman"),
var job = gearman.submitJob("reverse", "test string");
+job.setTimeout(2000);
+
+job.on("timeout", function(){
+ console.log("Timeout!");
+ gearman.close();
+})
+
job.on("error", function(err){
console.log("ERROR: ", err.message || err);
+ gearman.close();
});
job.on("data", function(reversed){
@@ -13,4 +21,5 @@ job.on("data", function(reversed){
job.on("end", function(){
console.log("Ready!");
+ gearman.close();
});
View
@@ -28,10 +28,6 @@ Gearman.prototype.init = function(){
this.currentWorkers = {};
this.workers = {};
-
- this.noopTimer = setInterval((function(){
- this.sendCommand("NOOP");
- }).bind(this), 15000);
}
Gearman.packetTypes = {
@@ -149,18 +145,17 @@ Gearman.prototype.connect = function(){
if(this.debug){
console.log("connected!");
}
- this.emit("connected");
+ this.emit("connect");
this.processCommandQueue();
}).bind(this));
+ this.socket.setKeepAlive(true);
+
this.socket.on("end", this.close.bind(this));
this.socket.on("close", this.close.bind(this));
- this.socket.on("timeout", this.close.bind(this));
-
this.socket.on("error", this.errorHandler.bind(this));
-
this.socket.on("data", this.receive.bind(this));
};
@@ -174,8 +169,6 @@ Gearman.prototype.close = function(){
Gearman.prototype.closeConnection = function(){
var i;
- clearInterval(this.noopTimer);
-
if(this.connected){
if(this.socket){
try{
@@ -443,6 +436,7 @@ Gearman.prototype.receive_WORK_FAIL = function(handle){
Gearman.prototype.receive_WORK_DATA = function(handle, payload){
if(this.currentJobs[handle] && !this.currentJobs[handle].aborted){
this.currentJobs[handle].emit("data", payload);
+ this.currentJobs[handle].updateTimeout();
}
};
@@ -451,6 +445,8 @@ Gearman.prototype.receive_WORK_COMPLETE = function(handle, payload){
if((job = this.currentJobs[handle])){
delete this.currentJobs[handle];
if(!job.aborted){
+ clearTimeout(job.timeoutTimer);
+
if(payload){
job.emit("data", payload);
}
@@ -530,11 +526,42 @@ Gearman.prototype.Job = function(gearman, name, payload){
this.name = name;
this.payload = payload;
+ this.timeoutTimer = null;
+
gearman.sendCommand("SUBMIT_JOB", name, false, payload, this.receiveHandle.bind(this));
};
utillib.inherits(Gearman.prototype.Job, Stream);
+Gearman.prototype.Job.prototype.setTimeout = function(timeout, timeoutCallback){
+ this.timeoutValue = timeout;
+ this.timeoutCallback = timeoutCallback;
+ this.updateTimeout();
+}
+
+Gearman.prototype.Job.prototype.updateTimeout = function(){
+ if(this.timeoutValue){
+ clearTimeout(this.timeoutTimer);
+ this.timeoutTimer = setTimeout(this.onTimeout.bind(this), this.timeoutValue);
+ }
+}
+
+Gearman.prototype.Job.prototype.onTimeout = function(){
+ if(this.handle){
+ delete this.gearman.currentJobs[this.handle];
+ }
+ if(!this.aborted){
+ this.abort();
+ var error = new Error("Timeout exceeded for the job");
+ if(typeof this.timeoutCallback == "function"){
+ this.timeoutCallback(error);
+ }else{
+ this.emit("timeout", error);
+ }
+ }
+}
+
Gearman.prototype.Job.prototype.abort = function(){
+ clearTimeout(this.timeoutTimer);
this.aborted = true;
}
@@ -543,6 +570,6 @@ Gearman.prototype.Job.prototype.receiveHandle = function(handle){
this.handle = handle;
this.gearman.currentJobs[handle] = this;
}else{
- callback(new Error("Invalid response from server"));
+ this.emit("error", new Error("Invalid response from server"));
}
};
View
@@ -1,7 +1,7 @@
{
"name": "node-gearman",
"description": "Simple Gearman client/worker module for Node.JS",
- "version": "0.1.5",
+ "version": "0.2.0",
"author" : "Andris Reinman",
"maintainers":[
{
@@ -14,6 +14,12 @@
"type" : "git",
"url" : "http://github.com/andris9/gearman.git"
},
+ "scripts":{
+ "test": "node ./run_tests.js"
+ },
+ "devDependencies": {
+ "nodeunit": "*"
+ },
"main" : "./lib/gearman",
"licenses" : [
{
View
@@ -0,0 +1,2 @@
+var reporter = require('nodeunit').reporters["default"];
+reporter.run(['test']);
View
@@ -3,7 +3,7 @@ var Gearman = require("../lib/gearman"),
// CREATE INSTANCE
-var gearman = new Gearman("pangalink.net");
+var gearman = new Gearman("localhost");
exports["Test Connection"] = {
@@ -19,7 +19,7 @@ exports["Test Connection"] = {
gearman.connect();
gearman.on("error", function(e){
- test.ifError(e);
+ test.ok(false, "Should not occur");
test.done();
});
@@ -46,7 +46,7 @@ exports["Worker and Client"] = {
setUp: function(callback){
- this.gearman = new Gearman("pangalink.net");
+ this.gearman = new Gearman("localhost");
this.gearman.on("connect", function(){
callback();
});
@@ -128,3 +128,94 @@ exports["Worker and Client"] = {
}
};
+
+exports["Job timeout"] = {
+ setUp: function(callback){
+
+ this.gearman = new Gearman("localhost");
+ this.gearman.on("connect", function(){
+ callback();
+ });
+ this.gearman.on("error", function(e){
+ console.log(e.message);
+ });
+ this.gearman.connect();
+
+ this.gearman.registerWorker("test", function(payload, worker){
+ setTimeout(function(){
+ worker.end("OK");
+ }, 300);
+ });
+ },
+
+ tearDown: function(callback){
+ this.gearman.on("close", function(){
+ callback();
+ });
+ this.gearman.close();
+ },
+
+ "Timeout event": function(test){
+ test.expect(1);
+
+ var job = this.gearman.submitJob("test", "test");
+ job.setTimeout(100);
+
+ job.on("timeout", function(){
+ test.ok(1,"TImeout occured");
+ test.done();
+ });
+
+ job.on("error", function(err){
+ test.ok(false,"Job failed");
+ test.done();
+ });
+
+ job.on("end", function(err){
+ test.ok(false, "Job should not complete");
+ test.done();
+ });
+ },
+
+ "Timeout callback": function(test){
+ test.expect(1);
+
+ var job = this.gearman.submitJob("test", "test");
+ job.setTimeout(100, function(){
+ test.ok(true,"TImeout occured");
+ test.done();
+ });
+
+ job.on("error", function(err){
+ test.ok(false,"Job failed");
+ test.done();
+ });
+
+ job.on("end", function(err){
+ test.ok(false, "Job should not complete");
+ test.done();
+ });
+ },
+
+ "Timeout set but does not occur": function(test){
+ test.expect(1);
+
+ var job = this.gearman.submitJob("test", "test");
+ job.setTimeout(400, function(){
+ test.ok(false,"Timeout occured");
+ test.done();
+ });
+
+ job.on("error", function(err){
+ test.ok(false,"Job failed");
+ test.done();
+ });
+
+ job.on("end", function(err){
+ test.ok(true, "Job completed before timeout");
+ test.done();
+ });
+ }
+}
+
+

0 comments on commit f7ca7fb

Please sign in to comment.