Permalink
Browse files

Initial Commit

  • Loading branch information...
Malte Ubl Malte Ubl
Malte Ubl authored and Malte Ubl committed Mar 12, 2010
0 parents commit 67af743b025c3848311d1ccabb32fe310547c975
Showing with 429 additions and 0 deletions.
  1. +20 −0 LICENSE
  2. +38 −0 README.md
  3. +18 −0 example/example.js
  4. +9 −0 example/worker.js
  5. +202 −0 lib/worker.js
  6. +24 −0 test/common.js
  7. +40 −0 test/fixtures/worker.js
  8. +78 −0 test/test-worker.js
20 LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2009-2010 Malte Ubl
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,38 @@
+# node-worker - 0.0.1
+
+node-worker is an implementation of the WebWorker API for node.js.
+http://www.whatwg.org/specs/web-workers/current-work/
+
+## Example
+
+ var Worker = require("../lib/worker").Worker;
+
+ var worker = new Worker("worker.js");
+
+ worker.postMessage({
+ hello: "world"
+ });
+
+ worker.onmessage = function (msg) {
+ sys.puts(msg.hello);
+ };
+
+ worker.addListener("message", function (msg) {
+ sys.puts(msg.hello);
+ worker.terminate();
+ });
+
+## Example Worker File
+
+ var worker = require("worker").worker;
+
+ worker.onmessage = function (msg) {
+ worker.postMessage({
+ hello: "mother"
+ });
+ };
+
+
+
+
+
@@ -0,0 +1,18 @@
+var sys = require("sys");
+
+var Worker = require("../lib/worker").Worker;
+
+var worker = new Worker("worker.js");
+
+worker.postMessage({
+ hello: "world"
+});
+
+worker.onmessage = function (msg) {
+ sys.puts(msg.hello);
+};
+
+worker.addListener("message", function (msg) {
+ sys.puts(msg.hello);
+ worker.terminate();
+});
@@ -0,0 +1,9 @@
+// the actual web worker
+
+var worker = require("worker").worker;
+
+worker.onmessage = function (msg) {
+ worker.postMessage({
+ hello: "mother"
+ });
+};
@@ -0,0 +1,202 @@
+// An implementation of the web worker API for node.js
+
+var sys = require('sys');
+
+var workerIndex = 0;
+var MESSAGE_SPLITTER = "\r\n";
+var WORKER_PARAS = ["-mode", "worker"];
+var HANDSHAKE = "HANDSHAKE";
+var workerImplementation = WorkerChild;
+var workerProcessImplementation = WorkerProcess;
+
+function debug(msg) {
+ //sys.error("WorkerDebug "+process.pid+" - "+msg)
+}
+
+exports.importScripts = function () {
+ for (var i = 0, len = arguments.length; i < len; ++i) {
+ require(arguments[i]);
+ }
+};
+
+var Worker = function (filename) {
+ var self = this;
+ process.EventEmitter.call(this);
+ this.addListener("message", function (message) {
+ if (self.onmessage) {
+ self.onmessage(message);
+ }
+ });
+ this.addListener("error", function (message) {
+ if (self.onerror) {
+ self.onerror(message);
+ } else {
+ if(self.listeners("error").length === 1) {
+ throw new Error(message)
+ }
+ }
+ });
+
+ this.impl = new workerImplementation(this, filename);
+ this.workerIndex = workerIndex++;
+};
+
+sys.inherits(Worker, process.EventEmitter);
+Worker.prototype.postMessage = function (payload) {
+ var message = JSON.stringify(payload);
+ this.impl.postMessage(message);
+};
+
+Worker.prototype.terminate = function () {
+ this.impl.terminate();
+};
+
+exports.Worker = Worker;
+
+function WorkerChild (eventDest, filename) {
+ var self = this;
+ this.eventDest = eventDest;
+ this.filename = filename;
+ this.child = process.createChildProcess("node", [this.filename].concat(WORKER_PARAS));
+ this.child.addListener("output", function (data) {
+ debug("From worker " + data);
+ self.handleData(data);
+ });
+
+ this.child.addListener("error", function (data) {
+ if(data !== null) {
+ debug(self.child.pid + ": "+ data);
+ }
+ });
+
+ this.child.addListener("exit", function (code) {
+ debug(self.child.pid + ": exit "+code);
+ });
+
+ this.buffer = "";
+ this.active = false;
+ this.queue = [];
+}
+
+WorkerChild.prototype = {
+
+ postMessage: function (message) {
+ if(this.active) {
+ debug("Sending data "+message);
+ this.child.write(message+MESSAGE_SPLITTER, "utf8");
+ } else {
+ this.queue.push(message);
+ }
+ },
+
+ postQueue: function () {
+ for(var i = 0, len = this.queue.length; i < len; ++i) {
+ this.postMessage(this.queue[i]);
+ }
+ this.queue = [];
+ },
+
+ handleData: function (data) {
+ var self = this;
+ this.buffer += (data || "");
+ debug("Received data "+this.buffer);
+
+ if(this.buffer !== "") {
+ var parts = this.buffer.split(MESSAGE_SPLITTER);
+ while (parts.length > 1) {
+ var message = parts.shift();
+ if (message !== "") {
+ if(message === HANDSHAKE) {
+ this.active = true;
+ this.postQueue();
+ setTimeout(function () {
+ self.child.write(MESSAGE_SPLITTER, "utf8")
+ }, 10)
+ } else {
+ this.handleMessage(message);
+ }
+
+ this.buffer = parts.join(MESSAGE_SPLITTER);
+ if(this.buffer !== "") {
+ this.handleData("");
+ return;
+ }
+ }
+ }
+ }
+ },
+
+ handleMessage: function (message) {
+ debug("Emit event "+message);
+ var obj = JSON.parse(message);
+ if (obj.__exception__) {
+ this.eventDest.emit("error", obj)
+ } else {
+ this.eventDest.emit("message", obj)
+ }
+
+ },
+
+ terminate: function () {
+ this.child.close();
+ }
+};
+
+var workerProcess;
+var i = 0;
+function WorkerProcess(eventDest) {
+ sys.print(HANDSHAKE+MESSAGE_SPLITTER);
+ var self = this;
+ this.eventDest = eventDest;
+ process.stdio.open();
+ process.stdio.addListener("data", function (data) {
+ debug("Process receiving data "+data);
+ self.handleData(data);
+ });
+ this.buffer = "";
+}
+
+WorkerProcess.prototype = {
+ postMessage: function (message) {
+ debug("Process posting message "+message);
+ sys.print(message+MESSAGE_SPLITTER);
+ },
+
+ handleData: WorkerChild.prototype.handleData,
+ handleMessage: WorkerChild.prototype.handleMessage
+};
+
+function WorkerNode () {
+ var self = this;
+ this.impl = new workerProcessImplementation(this);
+
+ process.EventEmitter.call(this);
+ this.addListener("message", function (message) {
+ if (self.onmessage) {
+ self.onmessage(message);
+ }
+ });
+
+ process.addListener("uncaughtException", function (exception) {
+ self.postMessage({
+ __exception__: exception
+ })
+ })
+}
+sys.inherits(WorkerNode, process.EventEmitter);
+
+WorkerNode.prototype.postMessage = function (payload) {
+ this.impl.postMessage(JSON.stringify(payload));
+};
+
+(function () {
+ if (len = process.ARGV.length < 4) return;
+ for (var i = 2, len = process.ARGV.length; i < len; ++i) {
+ var arg = process.ARGV[i];
+ if (arg != WORKER_PARAS[i-2]) {
+ return;
+ }
+ }
+ // if we are here, we are a worker
+ exports.worker = new WorkerNode();
+})();
@@ -0,0 +1,24 @@
+var path = require("path");
+
+exports.testDir = path.dirname(__filename);
+exports.fixturesDir = path.join(exports.testDir, "fixtures");
+exports.libDir = path.join(exports.testDir, "../../lib");
+
+require.paths.unshift(exports.libDir);
+
+var assert = require('assert');
+var sys = require("sys");
+
+process.mixin(exports, sys);
+exports.assert = require('assert');
+exports.path = path;
+
+var ok = assert.ok;
+assert.ok = function (bool, msg) {
+ if(bool) {
+ sys.print("OK ")
+ } else {
+ sys.print("NOT OK ")
+ }
+ sys.puts(msg);
+}
@@ -0,0 +1,40 @@
+/* TODO: Take this code duplication out when NODE_PATH in env works */
+var path = require("path");
+
+exports.testDir = path.dirname(__filename);
+exports.libDir = path.join(exports.testDir, "../../../lib");
+
+require.paths.unshift(exports.libDir);
+
+var sys = require("sys");
+var worker = require("worker").worker;
+
+worker.onmessage = function (msg) {
+ sys.error("OnMessage " +JSON.stringify(msg));
+
+ if(msg.fib) {
+ worker.postMessage(fib(msg.fib*1));
+ return;
+ }
+
+ if(msg.wait) {
+ setTimeout(function () {
+ worker.postMessage("Waited")
+ }, 1000)
+ return;
+ }
+
+ if(msg.error) {
+ throw("ErrorMarker");
+ }
+
+ msg.output = msg.input * 3;
+ setTimeout(function () {
+ worker.postMessage(msg);
+ }, 100 * msg.output)
+};
+
+
+function fib(n) {
+ return n < 2 ? n : fib(n-1)+fib(n-2);
+}
Oops, something went wrong.

0 comments on commit 67af743

Please sign in to comment.