Skip to content

Commit

Permalink
should be quite usable by now
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Jun 24, 2011
1 parent 0a59f26 commit adaa65b
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 17 deletions.
49 changes: 45 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

### Worker

var Gearman = require("./gearman");
var Gearman = require("./gearnode");

worker= new Gearman();
worker.addServer("localhost", 7003);
Expand All @@ -17,7 +17,7 @@

### Client

var Gearman = require("./gearman");
var Gearman = require("./gearnode");

client = new Gearman();
client.addServer("localhost", 7003);
Expand All @@ -33,7 +33,7 @@

### Require Gearman library

var Gearman = require("./gearman");
var Gearman = require("./gearnode");

### Create a new Gearman worker/client

Expand Down Expand Up @@ -87,7 +87,7 @@ Where

Possible option values

* **encoding** - indicates the encoding for the job response (default is Buffer). Can be "utf-8", "ascii", "base64" or "buffer"
* **encoding** - indicates the encoding for the job response (default is Buffer). Can be "utf-8", "ascii", "base64", "number" or "buffer"
* **background** - if set to true, detach the job from the client (complete and error events will not be sent to the client)
* **priority** - indicates the priority of the job. Possible values "low", "normal" (default) and "high"

Expand Down Expand Up @@ -119,3 +119,44 @@ Example

### Create a worker function

worker.addFunction(func_name[, encoding], worker_func)

Where

* **func_name** is the name of the function to be created
* **endocing** is the input encoding (default is buffer)
* **worker_func** is the actual worker function

#### Worker function

worker_func = function(payload, job)

Where

* **payload** is the data sent by the client and in the encoding specified with *addFunction*
* **job** is a Gearman Job object that can be used to send data back

#### Job object

Job object has the following methods

* **complete(response)** - send the result of the function back to the client
* **error(error)** - throw an exception (and end the job with *failed* status)
* **fail()** - end the function without response data when the function failed
* **warning(warning)** - send a warning message to the client
* **data(response)** - send a partial response data to the client
* **setStatus(numerator, denominator)** - send a progress event to the client

#### Example

var Gearman = require("./gearnode");

var worker = new Gearman();
worker.addServer();

worker.addFunction("sqr", "number", function(payload, job){
if(payload < 0){
job.warning("Used number is smaller than zero!");
}
job.complete(payload * payload);
});
7 changes: 4 additions & 3 deletions client.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var Gearman = require("./gearman");
var Gearman = require("./gearnode");

client = new Gearman();
client.addServer("localhost", 7003);
Expand All @@ -7,7 +7,7 @@ client.getExceptions(function(err, success){
console.log(success && "Registered for exceptions" || "No exceptions");
});

var job = client.submitJob("reverse", "Hello world!", {encoding:"base64"});
var job = client.submitJob("sqr", -25, {encoding:"number"});

job.on("created", function(handle){
console.log("Job created as '"+handle+"'");
Expand Down Expand Up @@ -37,4 +37,5 @@ job.on("data", function(message){

job.on("status", function(nu, de){
console.log("Status "+nu+" / "+de);
});
});

6 changes: 6 additions & 0 deletions gearman-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,9 @@ GearmanConnection.prototype.handler_WORK_COMPLETE = function(command, handle, re
case "base64":
response = response && response.toString(encoding) || "";
break;
case "number":
response = Number(response && response.toString("ascii") || "") || 0;
break;
case "buffer":
default:
// keep buffer
Expand Down Expand Up @@ -625,6 +628,9 @@ GearmanConnection.prototype.handler_WORK_DATA = function(command, handle, payloa
case "base64":
payload = payload && payload.toString(encoding) || "";
break;
case "number":
payload = Number(payload && payload.toString("ascii") || "") || 0;
break;
case "buffer":
default:
// keep buffer
Expand Down
40 changes: 36 additions & 4 deletions gearman.js → gearnode.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,25 @@ Gearman.prototype.addServer = function(server_name, server_port){
Gearman.prototype.runJob = function(server_name, handle, func_name, payload, uid){
uid = uid || null;
if(this.functions[func_name]){

var encoding = this.functions[func_name].encoding.toLowerCase() || "buffer";

switch(encoding){
case "utf-8":
case "ascii":
case "base64":
payload = payload && payload.toString(encoding) || "";
break;
case "number":
payload = Number(payload && payload.toString("ascii") || "") || 0;
break;
case "buffer":
default:
// keep buffer
}

var job = new Gearman.GearmanWorker(handle, server_name, this);
this.functions[func_name](payload, job);
this.functions[func_name].func(payload, job);
}else{
this.servers[server_name].connection.jobError(handle, "Function "+func_name+" not found");
}
Expand Down Expand Up @@ -212,17 +229,30 @@ Gearman.prototype.setWorkerId = function(server_name, id){
}
}

Gearman.prototype.addFunction = function(name, func){
Gearman.prototype.addFunction = function(name, encoding, func){
if(!name){
return false;
}

if(!func && typeof encoding=="function"){
func = encoding;
encoding = null;
}else if(typeof func != "function"){
return;
}

if(!(name in this.functions)){
this.functions[name] = func;
this.functions[name] = {
func: func,
encoding: encoding || "buffer"
}
this.function_names.push(name);
this.register(name);
}else{
this.functions[name] = func;
this.functions[name] = {
func: func,
encoding: encoding || "buffer"
}
this.function_names.push(name);
}

Expand Down Expand Up @@ -291,6 +321,8 @@ Gearman.GearmanWorker.prototype.error = function(error){
}

Gearman.GearmanWorker.prototype.setStatus = function(numerator, denominator){
numerator = parseInt(numerator, 10) || 0;
denominator = parseInt(denominator, 10) || 0;
this.gm.servers[this.server_name].connection.jobStatus(this.handle, numerator, denominator);
}

Expand Down
14 changes: 8 additions & 6 deletions worker.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var Gearman = require("./gearman");
var Gearman = require("./gearnode");

String.prototype.reverse = function(){
splitext = this.split("");
Expand All @@ -11,8 +11,8 @@ worker= new Gearman();
worker.addServer("localhost", 7003);
worker.setWorkerId("testkast");

worker.addFunction("reverse", function(payload, job){
var str = payload.toString("utf-8"),
worker.addFunction("reverse", "utf-8", function(payload, job){
var str = payload,
reversed = str.reverse();

setTimeout(function(){
Expand All @@ -35,7 +35,9 @@ worker.addFunction("reverse", function(payload, job){
},500);
});

worker.addFunction("reverse2", function(payload){
var str = payload.toString("utf-8");
return str.reverse();
worker.addFunction("sqr", "number", function(payload, job){
if(payload < 0){
job.warning("Used number is smaller than zero!");
}
job.complete(payload * payload);
});

0 comments on commit adaa65b

Please sign in to comment.