Skip to content

Commit

Permalink
added tests (nodeunit)
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Jun 25, 2011
1 parent bdb9eab commit 29479d9
Show file tree
Hide file tree
Showing 4 changed files with 547 additions and 74 deletions.
33 changes: 30 additions & 3 deletions README.md
Expand Up @@ -4,6 +4,17 @@

**NB!** this is usable beta but probably not yet ready for production, I'll yet have to do a lot of refactoring and optimization

## Installation

npm install gearnode

## Tests

Tests are run with *nodeunit*

npm install nodeunit -g
nodeunit test.js

## Usage

### Worker
Expand Down Expand Up @@ -77,6 +88,22 @@ Example
console.log(exception);
});

### Assign an ID for the Worker

Worker ID's identify unique workers for monitoring Gearman.

worker.setWorkerId(id)

Where

* **id** is a string that will act as the name for the worker

Example

worker = new Gearman();
worker.addServer(); // use default values
worker.setWorkerId("my_worker");

### Submit a job

client.submitJob(func, payload[, options])
Expand All @@ -93,7 +120,7 @@ Possible option values
* **background** - if set to true, detach the job from the client (complete and error events will not be sent to the client)
* **priority** - indicates the priority of the job. Possible values "low", "normal" (default) and "high"

Returns a Gearman Job object with the following events
Returns a Client Job object with the following events

* **created** - when the function is queued by the server (params: handle value)
* **complete** - when the function returns (params: response data in encoding specified by the options value)
Expand Down Expand Up @@ -138,9 +165,9 @@ Where
* **payload** is the data sent by the client and in the encoding specified with *addFunction*
* **job** is a Gearman Job object that can be used to send data back

#### Job object
#### Worker Job object

Job object has the following methods
Worker Job object has the following methods

* **complete(response)** - send the result of the function back to the client
* **error(error)** - throw an exception (and end the job with *failed* status)
Expand Down
19 changes: 7 additions & 12 deletions gearman-connection.js
Expand Up @@ -16,11 +16,12 @@ function GearmanConnection(server, port){

this.queued_jobs = {};

this.workerId = null;

this.connected = false;
this.processing = false;
this.failed = false;

this.retries = 0;
this.remainder = false;

this.debug = false;
Expand Down Expand Up @@ -140,12 +141,7 @@ GearmanConnection.prototype.processQueue = function(){

// if no connection yet, open one
if(!this.connected){
if(this.retries<5){
this.connect();
}else{
console.log("failed");
this.failed = true;
}
return this.connect();
return false;
}

Expand Down Expand Up @@ -265,15 +261,13 @@ GearmanConnection.prototype.connect = function(){
this.socket.on("connect", (function(){
this.connecting = false;
this.connected = true;
this.retries = 0;

if(this.debug){
console.log("connected!");
}
}

this.processQueue();

}).bind(this));


this.socket.on("end", this.close.bind(this));
this.socket.on("error", this.close.bind(this));
Expand All @@ -298,7 +292,7 @@ GearmanConnection.prototype.close = function(){
}
this.connected = false;
this.connecting = false;
this.retries++;
this.emit("disconnect");
}
}

Expand Down Expand Up @@ -420,6 +414,7 @@ GearmanConnection.prototype.getExceptions = function(callback){
}

GearmanConnection.prototype.setWorkerId = function(id){
this.workerId = id;
this.sendCommand({
type: "SET_CLIENT_ID",
params: [id]
Expand Down
128 changes: 69 additions & 59 deletions gearnode.js
Expand Up @@ -10,6 +10,8 @@ function Gearman(){

this.function_names = [];
this.functions = {};

this.workerId = null;
}
utillib.inherits(Gearman, EventEmitter);

Expand All @@ -31,7 +33,6 @@ Gearman.prototype.addServer = function(server_name, server_port){

this.servers[server_name].connection.on("error", function(err){
console.log("Error with "+server_name);
//console.log(err.message);
console.log(err.stack);
});

Expand Down Expand Up @@ -82,33 +83,6 @@ Gearman.prototype.addServer = function(server_name, server_port){
this.update(server_name);
}

Gearman.prototype.runJob = function(server_name, handle, func_name, payload, uid){
uid = uid || null;
if(this.functions[func_name]){

var encoding = this.functions[func_name].encoding.toLowerCase() || "buffer";

switch(encoding){
case "utf-8":
case "ascii":
case "base64":
payload = payload && payload.toString(encoding) || "";
break;
case "number":
payload = Number(payload && payload.toString("ascii") || "") || 0;
break;
case "buffer":
default:
// keep buffer
}

var job = new Gearman.GearmanWorker(handle, server_name, this);
this.functions[func_name].func(payload, job);
}else{
this.servers[server_name].connection.jobError(handle, "Function "+func_name+" not found");
}
}

Gearman.prototype.removeServer = function(server_name){
var connection, pos;

Expand All @@ -130,6 +104,11 @@ Gearman.prototype.removeServer = function(server_name){
return true;
}

Gearman.prototype.end = function(){
for(var i=this.server_names.length-1; i>=0; i--){
this.removeServer(this.server_names[i]);
}
}

Gearman.prototype.update = function(server_name){
if(!server_name){
Expand All @@ -139,6 +118,10 @@ Gearman.prototype.update = function(server_name){
this.function_names.forEach((function(func_name){
this.register(func_name, server_name);
}).bind(this));

if(this.workerId){
this.setWorkerId(server_name, this.workerId);
}
}

Gearman.prototype.register = function(func_name, server_name){
Expand Down Expand Up @@ -172,38 +155,37 @@ Gearman.prototype.unregister = function(func_name, server_name){
}
}

// WORKER FUNCTIONS

Gearman.prototype.getExceptions = function(server_name, callback){
var pos;

if(!callback && typeof server_name =="function"){
callback = server_name;
server_name = null;
}

if(server_name){
if(this.servers[server_name]){

this.servers[server_name].connection.getExceptions((function(err, success){
if(callback){
return callback(err, success);
}
if(err){
console.log("Server "+server_name+" responded with error: "+(err.message || err));
}else{
console.log("Exceptions are followed from "+server_name);
}
}).bind(this));
Gearman.prototype.runJob = function(server_name, handle, func_name, payload, uid){
uid = uid || null;
if(this.functions[func_name]){

var encoding = this.functions[func_name].encoding.toLowerCase() || "buffer";

switch(encoding){
case "utf-8":
case "ascii":
case "base64":
payload = payload && payload.toString(encoding) || "";
break;
case "number":
payload = Number(payload && payload.toString("ascii") || "") || 0;
break;
case "buffer":
default:
// keep buffer
}

var job = new Gearman.GearmanWorker(handle, server_name, this);
this.functions[func_name].func(payload, job);
}else{
this.server_names.forEach((function(server_name){
if(server_name){
this.getExceptions(server_name, callback);
}
}).bind(this))
this.servers[server_name].connection.jobError(handle, "Function "+func_name+" not found");
}
}



Gearman.prototype.setWorkerId = function(server_name, id){
var pos;

Expand All @@ -217,10 +199,10 @@ Gearman.prototype.setWorkerId = function(server_name, id){

if(server_name){
if(this.servers[server_name]){

this.servers[server_name].connection.setWorkerId(id);
}
}else{
this.workerId = id;
this.server_names.forEach((function(server_name){
if(server_name){
this.setWorkerId(server_name, id);
Expand Down Expand Up @@ -253,7 +235,6 @@ Gearman.prototype.addFunction = function(name, encoding, func){
func: func,
encoding: encoding || "buffer"
}
this.function_names.push(name);
}

}
Expand All @@ -272,9 +253,36 @@ Gearman.prototype.removeFunction = function(name){
}
}

Gearman.prototype.end = function(){
for(var i=this.server_names.length-1; i>=0; i--){
this.removeServer(this.server_names[i]);
// CLIENT FUNCTIONS

Gearman.prototype.getExceptions = function(server_name, callback){
var pos;

if(!callback && typeof server_name =="function"){
callback = server_name;
server_name = null;
}

if(server_name){
if(this.servers[server_name]){

this.servers[server_name].connection.getExceptions((function(err, success){
if(callback){
return callback(err, success);
}
if(err){
console.log("Server "+server_name+" responded with error: "+(err.message || err));
}else{
console.log("Exceptions are followed from "+server_name);
}
}).bind(this));
}
}else{
this.server_names.forEach((function(server_name){
if(server_name){
this.getExceptions(server_name, callback);
}
}).bind(this))
}
}

Expand All @@ -288,6 +296,8 @@ Gearman.prototype.submitJob = function(func_name, payload, options){
return new Gearman.GearmanJob(func_name, payload, options, server);
}


// WORKER JOB
Gearman.GearmanJob = function(func_name, payload, options, server){
EventEmitter.call(this);

Expand Down

0 comments on commit 29479d9

Please sign in to comment.