Skip to content
Browse files

Initial commit (move stuff from

  • Loading branch information...
0 parents commit e3d8e0f90da8720eb6b1b29844138af09f4c188b @Kami committed Mar 30, 2011
20 LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2009-2010 Malte Ubl
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
7 Makefile
@@ -0,0 +1,7 @@
+test:
+ @find test/test-*.js | xargs -n 1 -t node
+
+test-pool:
+ @find test/test-worker-pool*.js | xargs -n 1 -t expresso
+
+.PHONY: test test-pool
38 README.md
@@ -0,0 +1,38 @@
+# node-worker - 0.0.1
+
+node-worker is an implementation of the WebWorker API for node.js.
+http://www.whatwg.org/specs/web-workers/current-work/
+
+## Example
+
+ var Worker = require("../lib/worker").Worker;
+
+ var worker = new Worker("worker.js");
+
+ worker.postMessage({
+ hello: "world"
+ });
+
+ worker.onmessage = function (msg) {
+ sys.puts(msg.hello);
+ };
+
+ worker.addListener("message", function (msg) {
+ sys.puts(msg.hello);
+ worker.terminate();
+ });
+
+## Example Worker File
+
+ var worker = require("worker").worker;
+
+ worker.onmessage = function (msg) {
+ worker.postMessage({
+ hello: "mother"
+ });
+ };
+
+
+
+
+
18 example/example.js
@@ -0,0 +1,18 @@
+var sys = require("sys");
+
+var Worker = require("../lib/worker").Worker;
+
+var worker = new Worker("worker.js");
+
+worker.postMessage({
+ hello: "world"
+});
+
+worker.onmessage = function (msg) {
+ sys.puts(msg.hello);
+};
+
+worker.addListener("message", function (msg) {
+ sys.puts(msg.hello);
+ worker.terminate();
+});
9 example/worker.js
@@ -0,0 +1,9 @@
+// the actual web worker
+
+var worker = require("worker").worker;
+
+worker.onmessage = function (msg) {
+ worker.postMessage({
+ hello: "mother"
+ });
+};
2 index.js
@@ -0,0 +1,2 @@
+exports.worker = require('./lib/worker');
+exports.worker_pool = require('./lib/worker_pool');
21 lib/common.js
@@ -0,0 +1,21 @@
+var constants = require('./constants')
+
+var WORKER_PARAS = constants.WORKER_PARAS;
+
+exports.get_worker = function(process_argv, worker_class) {
+ var argv_len = process_argv.length;
+
+ if (len = argv_len < 4) {
+ return null;
+ }
+
+ for (var i = 2, len = argv_len; i < len; ++i) {
+ var arg = process_argv[i];
+ if (arg != WORKER_PARAS[i-2]) {
+ return null;
+ }
+ }
+
+ // if we are here, we are a worker
+ return new worker_class();
+};
5 lib/constants.js
@@ -0,0 +1,5 @@
+exports.MESSAGE_SPLITTER = "\r\n";
+exports.WORKER_PARAS = ["-mode", "worker"];
+exports.HANDSHAKE = "HANDSHAKE";
+
+exports.DEBUG = false;
166 lib/tcpworker.js
@@ -0,0 +1,166 @@
+var tcp = require("net");
+var sys = require("sys");
+var worker = require("./worker");
+
+var MESSAGE_SPLITTER = "\r\n";
+var HANDSHAKE = "HANDSHAKE";
+
+function debug(msg) {
+ //sys.error("Workerdebug "+process.pid+" - "+msg)
+}
+
+var WorkerServer = {};
+
+exports.makeWorker = function (filename, port, hostname, callback) {
+
+ var dest = port+":"+hostname;
+
+ var server = WorkerServer[dest];
+
+ var paras = {
+ port: null, // not yet known
+ hostname: hostname
+ };
+
+ var w = new worker.Worker(filename, WorkerChild, paras);
+
+ if(!server) {
+ var server = new worker.Worker("server", WorkerChild, {
+ port: port,
+ hostname: hostname
+ });
+ WorkerServer[dest] = server;
+
+ server.onmessage = function (port) {
+ paras.port = port;
+ w.impl.connect(paras);
+ };
+ }
+ server.postMessage(filename);
+
+ w.terminateServer = function () {
+ WorkerServer[dest] = null;
+ server.terminate();
+ };
+
+ return w;
+}
+
+exports.startWorker = function (port, hostname) {
+ var w = worker.startWorker(WorkerProcess, {
+ port: port,
+ hostname: hostname
+ });
+ sys.puts("System up");
+ return w;
+}
+
+exports.WorkerChild = WorkerChild;
+exports.WorkerProcess = WorkerProcess;
+
+function WorkerChild (eventDest, filename, paras) {
+ var self = this;
+ this.eventDest = eventDest;
+ this.filename = filename;
+
+ if(paras.port) {
+ this.connect(paras)
+ }
+
+ this.buffer = "";
+ this.active = false;
+ this.queue = [];
+}
+
+sys.inherits(WorkerChild, worker._WorkerChild);
+
+WorkerChild.prototype.connect = function (paras) {
+ var self = this;
+ var socket = tcp.createConnection(paras.port, paras.hostname || "localhost");
+ this.socket = socket;
+
+ socket.setEncoding("utf8");
+ socket.addListener("connect", function () {
+ self.active = true;
+ });
+ socket.addListener("data", function (data) {
+ debug("From worker " + data);
+ self.handleData(data);
+ });
+ socket.addListener("timeout", function (data) {
+ debug("timeout "+paras.port);
+ });
+ socket.addListener("end", function () {
+ socket.close();
+ });
+
+ socket.addListener("close", function (hadError) {
+ debug("Closing socket "+paras.port);
+ self.stopKeepAlive();
+ if(hadError) {
+ debug("Socket closed due to error");
+ }
+ });
+
+ this.keepAliveInterval = setInterval(function () {
+ self.write(MESSAGE_SPLITTER);
+ }, 5 * 1000)
+}
+
+WorkerChild.prototype.write = function (msg) {
+ this.socket.write(msg, "utf8")
+};
+
+WorkerChild.prototype.stopKeepAlive = function () {
+ if(this.keepAliveInterval) {
+ clearInterval(this.keepAliveInterval);
+ this.keepAliveInterval = null;
+ }
+}
+
+WorkerChild.prototype.terminate = function () {
+ this.stopKeepAlive();
+ this.socket.close();
+}
+
+function WorkerProcess(eventDest, paras) {
+ var self = this;
+ this.eventDest = eventDest;
+
+ this.socket = null;
+
+ var server = tcp.createServer(function (socket) {
+ if(self.socket) {
+ throw("Only one socket may be open at any given time (I can only have one parent).")
+ }
+ self.socket = socket;
+ socket.setEncoding("utf8");
+ socket.addListener("connect", function () {
+ socket.write(HANDSHAKE+MESSAGE_SPLITTER);
+ });
+ socket.addListener("data", function (data) {
+ debug("Process receiving data "+data);
+ self.handleData(data);
+ });
+ socket.addListener("timeout", function (data) {
+ debug("timeout "+paras.port);
+ });
+ socket.addListener("end", function () {
+ self.socket = null;
+ socket.close();
+ });
+ });
+ server.listen(paras.port, paras.hostname || "localhost");
+ this.buffer = "";
+ debug("Listening on "+paras.port);
+}
+
+WorkerProcess.prototype = {
+ postMessage: function (message) {
+ debug("Process posting message "+message);
+ this.socket.write(message+MESSAGE_SPLITTER);
+ },
+
+ handleData: WorkerChild.prototype.handleData,
+ handleMessage: WorkerChild.prototype.handleMessage
+};
43 lib/tcpworkerserver.js
@@ -0,0 +1,43 @@
+
+var sys = require("sys");
+
+var port = parseInt(process.ARGV[2], 10);
+var filename = process.ARGV[3];
+
+var worker = require("./tcpworker").startWorker(port);
+
+var processes = [];
+
+worker.onmessage = function (filename) {
+ port = port + 1;
+ var curPort = port;
+ sys.error("Starting child "+curPort);
+ var child = process.createChildProcess("node", [filename, curPort]);
+
+ child.addListener("error", function (data) {
+ sys.error(data);
+ });
+
+ var init = false;
+ child.addListener("output", function (data) {
+ if(!init) {
+ init = true;
+ worker.postMessage(curPort);
+ }
+ sys.puts(data);
+ });
+
+ child.addListener("exit", function (data) {
+ child.dead = true;
+ });
+
+ processes.push(child);
+}
+
+process.addListener("exit", function () {
+ processes.forEach(function (child) {
+ if(child && !child.dead) {
+ child.kill();
+ }
+ })
+})
289 lib/worker.js
@@ -0,0 +1,289 @@
+// An implementation of the web worker API for node.js
+
+var sys = require('sys');
+var child_process = require('child_process');
+
+var constants = require('./constants');
+var common = require('./common');
+
+var workerIndex = 0;
+
+var MESSAGE_SPLITTER = constants.MESSAGE_SPLITTER;
+var WORKER_PARAS = constants.WORKER_PARAS;
+var HANDSHAKE = constants.HANDSHAKE;
+var DEBUG = constants.DEBUG;
+
+function debug(msg) {
+ if (DEBUG) {
+ sys.error("debug "+process.pid+" - "+msg)
+ }
+}
+
+exports.importScripts = function () {
+ for (var i = 0, len = arguments.length; i < len; ++i) {
+ require(arguments[i]);
+ }
+};
+
+exports.getWorker = function(file_name, options) {
+ return new Worker(file_name, null, options);
+};
+
+var Worker = function (filename, impl, options) {
+ var self = this;
+ process.EventEmitter.call(this);
+ this.addListener("message", function (message) {
+ if (self.onmessage) {
+ self.onmessage(message);
+ }
+ });
+
+ this.addListener("error", function (message) {
+ if (self.onerror) {
+ self.onerror(message);
+ } else {
+ if(self.listeners("error").length === 1) {
+ throw new Error(message)
+ }
+ }
+ });
+
+ if(!impl) impl = WorkerChild;
+ this.impl = new impl(this, filename, options);
+ this.workerIndex = workerIndex++;
+};
+
+sys.inherits(Worker, process.EventEmitter);
+Worker.prototype.postMessage = function (payload) {
+ var message = JSON.stringify(payload);
+ this.impl.postMessage(message);
+};
+
+Worker.prototype.terminate = function () {
+ this.impl.terminate();
+};
+
+Worker.prototype.isTerminated = function () {
+ return this.impl.isTerminated();
+};
+
+exports.Worker = Worker;
+
+function WorkerChild (eventDest, filename, options) {
+ var timeout_id;
+ var options = options || {};
+ var self = this;
+ this.eventDest = eventDest;
+ this.filename = filename;
+ this.child = child_process.spawn("node", [this.filename].concat(WORKER_PARAS));
+ this.child.stdout.addListener("data", function (data) {
+ debug("From worker " + data);
+ self.handleData(data);
+ });
+
+ this.child.stderr.addListener("data", function (data) {
+ if(data !== null) {
+ if((data+"").match("SyntaxError")) { // highly depends on node's error reporting behavior
+ self.eventDest.emit("error", new SyntaxError(data));
+ }
+ }
+ });
+
+ this.child.addListener("exit", function (code) {
+ if (!this.terminated) {
+ this.terminated = true;
+ }
+
+ if (self.timeout_id) {
+ clearTimeout(self.timeout_id);
+ }
+
+ self.eventDest.emit("exit", code);
+ debug(self.child.pid + ": exit "+code);
+ });
+
+ if (options.timeout) {
+ // Set a timeout handler which kills the process if it has not finished
+ // executing after options.timeout number of milliseconds.
+ timeout_id = setTimeout(function() {
+ self.timeoutHandler();
+ }, options.timeout);
+ }
+ else {
+ timeout_id = null;
+ }
+
+ this.buffer = "";
+ this.active = false;
+ this.terminated = false;
+ this.queue = [];
+
+ this.options = options;
+ this.timeout_id = timeout_id;
+}
+
+WorkerChild.prototype = {
+
+ timeoutHandler: function() {
+ if (!this.terminated) {
+ this.child.kill('SIGTERM');
+
+ err = new Error('Child killed, because it did not finish after '
+ + this.options.timeout + ' milliseconds');
+ this.eventDest.emit("error", err);
+ }
+ },
+
+ postMessage: function (message) {
+ if (!this.active && this.terminated) {
+ throw new Error('Cannot post message to a terminated child');
+ }
+
+ if(this.active) {
+ debug("Sending data "+message);
+ this.write(message+MESSAGE_SPLITTER);
+ } else {
+ this.queue.push(message);
+ }
+ },
+
+ postQueue: function () {
+ for(var i = 0, len = this.queue.length; i < len; ++i) {
+ this.postMessage(this.queue[i]);
+ }
+ this.queue = [];
+ },
+
+ handleData: function (data, handle_message_function) {
+ var self = this;
+ var handle_function;
+ this.buffer += (data || "");
+ debug("Received data "+this.buffer);
+
+ if (handle_message_function) {
+ handle_function = handle_message_function;
+ }
+ else {
+ handle_function = this.handleMessage;
+ }
+
+ if(this.buffer !== "") {
+ var parts = this.buffer.split(MESSAGE_SPLITTER);
+ while (parts.length > 1) {
+ var message = parts.shift();
+ if (message !== "") {
+ if(message === HANDSHAKE) {
+ this.active = true;
+ this.postQueue();
+ self.write(MESSAGE_SPLITTER);
+ } else {
+ handle_function.call(this, message);
+ }
+
+ this.buffer = parts.join(MESSAGE_SPLITTER);
+ if(this.buffer !== "") {
+ this.handleData("");
+ return;
+ }
+ }
+ }
+ }
+ },
+
+ write: function (msg) {
+ if (this.terminated) {
+ return;
+ }
+
+ this.child.stdin.write(msg, "utf8")
+ },
+
+ handleMessage: function (message) {
+ debug("Emit event "+message);
+
+ try {
+ var obj = JSON.parse(message);
+ }
+ catch (err) {
+ this.eventDest.emit("error", err);
+ return;
+ }
+
+ if (obj.__exception__) {
+ this.eventDest.emit("error", obj.__exception__)
+ } else {
+ this.eventDest.emit("message", obj)
+ }
+ },
+
+ terminate: function () {
+ this.active = false;
+ this.terminated = true;
+ this.child.stdin.end();
+ },
+
+ isTerminated: function() {
+ return this.terminated;
+ },
+};
+
+var workerProcess;
+var i = 0;
+function WorkerProcess(eventDest) {
+ sys.print(HANDSHAKE+MESSAGE_SPLITTER);
+ var self = this;
+ this.eventDest = eventDest;
+ var stdin = process.openStdin();
+ stdin.addListener("data", function (data) {
+ debug("Process receiving data "+data);
+ self.handleData(data);
+ });
+ this.buffer = "";
+}
+
+WorkerProcess.prototype = {
+ postMessage: function (message) {
+ //debug("Process posting message "+message);
+ sys.print(message+MESSAGE_SPLITTER);
+ },
+
+ handleData: WorkerChild.prototype.handleData,
+ handleMessage: WorkerChild.prototype.handleMessage
+};
+
+function WorkerNode (impl, options) {
+ var self = this;
+ if(!impl) impl = WorkerProcess;
+ this.impl = new impl(this, options);
+
+ process.EventEmitter.call(this);
+ this.addListener("message", function (message) {
+ if (self.onmessage) {
+ self.onmessage(message);
+ }
+ });
+
+ process.addListener("uncaughtException", function (exception) {
+ debug("Exception in Worker "+exception)
+ self.postMessage({
+ __exception__: exception
+ });
+ })
+}
+sys.inherits(WorkerNode, process.EventEmitter);
+
+WorkerNode.prototype.postMessage = function (payload) {
+ this.impl.postMessage(JSON.stringify(payload));
+};
+
+// only for inheritance
+exports._Worker = Worker;
+exports._WorkerChild = WorkerChild;
+exports._WorkerProcess = WorkerProcess;
+exports._WorkerNode = WorkerNode;
+
+if (module.parent && module.parent.filename.indexOf('worker_pool') === -1) {
+ // This is only exported if this module is not included from the worker pool
+ // module and if it is run by a child.
+ exports.worker = common.get_worker(process.ARGV.slice(), WorkerNode);
+}
540 lib/worker_pool.js
@@ -0,0 +1,540 @@
+// Worker pool
+
+CHECK_PENDING_JOBS_INTERVAL = 200; // How often to check for pending jobs
+KILL_ACTIVE_WORKER_TIMEOUT = (30 * 60 * 1000); // 30 minutes by default
+
+var sys = require('util');
+var events = require('events');
+
+var constants = require('./constants');
+var common = require('./common');
+var worker_lib = require('./worker');
+
+var Worker = worker_lib._Worker;
+MESSAGE_SPLITTER = constants.MESSAGE_SPLITTER;
+
+/*
+ * Worker job class.
+ *
+ * @param {Object} msg Message object.
+ * @param {Options} options Worker options.
+ * @param {Function} callback Callback which is called with an optional error
+ * as the first argument and the worker object as the
+ * second one.
+ */
+function WorkerJob(msg, options, callback) {
+ this.msg = msg || {};
+ this.options = options || {};
+ this.callback = callback;
+}
+
+/*
+ * Pool worker class.
+ */
+function WorkerPoolChild(eventDest, filename, options) {
+ worker_lib._WorkerChild.call(this, eventDest, filename, options);
+
+ this._finished = false;
+}
+
+sys.inherits(WorkerPoolChild, worker_lib._WorkerChild);
+
+/*
+ * Create a new pool of workers which run script file_name.
+ *
+ * @param {Number} pool_size Pool size.
+ * @param {String} file_name Name of the script which workers will run.
+ */
+function WorkerPool(pool_size, file_name) {
+ events.EventEmitter.call(this);
+
+ this._pool_size = pool_size;
+ this._file_name = file_name;
+
+ this._workers = []; // All the available workers
+ this._active_workers = []; // Active workers
+ this._idle_workers = []; // Idle workers
+ this._pending_removal_workers = []; // Workers which are pending to be removed
+
+ this._pending_jobs = [];
+ this._is_terminated = false;
+ this._pending_jobs_timeout_id = null;
+
+ this._initialize();
+}
+
+sys.inherits(WorkerPool, events.EventEmitter);
+
+/*
+ * Initialize the pool (start the requested number of worker processes).
+ */
+WorkerPool.prototype._initialize = function() {
+ var self = this;
+
+ this._add_workers(this._pool_size);
+
+ this._pending_jobs_timeout_id = setInterval(function() {
+ self._run_pending_jobs();
+ }, CHECK_PENDING_JOBS_INTERVAL);
+
+ process.nextTick(function() {
+ self.emit('ready');
+ });
+};
+
+/*
+ * Return pool size.
+ *
+ * @return {Number} pool size.
+ */
+WorkerPool.prototype.get_size = function() {
+ return this._pool_size;
+};
+
+/*
+ * Ensures that the pool always contains exactly _pool_size number of workers.
+ *
+ * @param {Boolean} forcefully If false and there are no idle workers to be removed,
+ * wait before one becomes inactive, otherwise kill
+ * a random active worker.
+ */
+WorkerPool.prototype._ensure_pool_size = function(forcefully) {
+ var forcefully_ = forcefully || false;
+ var diff = (this._pool_size - this._workers.length);
+
+ if (diff === 0) {
+ return;
+ }
+ else if (diff > 0) {
+ this._add_workers(diff);
+ }
+ else if (diff < 0) {
+ this._remove_workers(Math.abs(diff), forcefully_);
+ }
+};
+
+/*
+ * Add workers to the pool.
+ *
+ * @param {Number} count Number of workers to add.
+ */
+WorkerPool.prototype._add_workers = function(count) {
+ var i, worker;
+
+ for (i = 0; i < count; i++) {
+ worker = new Worker(this._file_name, WorkerPoolChild, {});
+
+ this._workers.push(worker);
+ this._idle_workers.push(worker);
+ }
+};
+
+/*
+ * Remove workers from the pool.
+ *
+ * @param {Number} count How many workers to remove.
+ * @param {Boolean} forcefully If true and there are not idle workers, kill random
+ * active worker, otherwise wait for it to become idle.
+ */
+WorkerPool.prototype._remove_workers = function(count, forcefully) {
+ var self = this;
+ var forcefully_ = forcefully || false;
+ var i, worker, idle_workers_count, active_workers, count, removed = 0;
+
+ idle_workers_count = this._idle_workers.length;
+
+ for (i = 0; i < idle_workers_count && removed < count; i++) {
+ worker = this._idle_workers[idle_workers_count - i - 1];
+
+ this._kill_worker(worker);
+ removed++;
+ }
+
+ if (removed === count) {
+ // We are done
+ return;
+ }
+
+ active_workers_count = this._active_workers.length;
+ for (i = 0; i < active_workers_count && removed < count; i++) {
+ worker = this._active_workers[active_workers_count - i - 1];
+
+ if (forcefully_) {
+ this._kill_worker(worker);
+ removed++;
+ }
+ else {
+ // Worker is removed from the list of the active workers, but will actually
+ // be killed after is has finished processing or it has been killed because
+ // of a timeout.
+ this._remove_worker(worker);
+ this._pending_removal_workers.push(worker);
+ worker.on('result', function() {
+ self._kill_worker(this);
+ });
+
+ removed++;
+ }
+ }
+};
+
+/*
+ * Return a worker object if any idle worker are available, false otherwise.
+ *
+ * @return {WorkerPoolChild/Boolean} Worker object if any idle workers are
+ * available, false otherwise.
+ */
+WorkerPool.prototype._get_idle_worker = function() {
+ var worker;
+ var idle_len = this._idle_workers.length;
+
+ if (idle_len > 0) {
+ worker = this._idle_workers.shift();
+ this._active_workers.push(worker);
+
+ return worker;
+ }
+
+ return false;
+};
+
+/*
+ * Send the message to the idle worker and run it in the pool.
+ * Worker is returned back to the pool, when it emits an result event or if a
+ * terminated function is called.
+ *
+ * @oaram {Object} msg Message (payload).
+ * @param {Object} options Worker options.
+ * @param {Function} callback Callback which is called with a possible error as
+ * the first argument and Worker object as the second
+ * one.
+ */
+WorkerPool.prototype.run_in_pool = function(msg, options, callback) {
+ var job;
+
+ if (this._terminated) {
+ // Pool has been terminated - not accepting new tasks
+ callback(new Error('Worker pool has been terminated, not accepting new jobs'));
+ return;
+ }
+
+ job = new WorkerJob(msg, options, callback);
+ this._pending_jobs.push(job);
+};
+
+/*
+ * This function is called periodically and runs pending jobs if there are any
+ * idle workers available.
+ */
+WorkerPool.prototype._run_pending_jobs = function() {
+ var worker, job;
+
+ if (this._pending_jobs.length === 0 || this.is_terminated()) {
+ // No pending jobs or pool has been terminated
+ return;
+ }
+
+ worker = this._get_idle_worker();
+
+ if (!worker) {
+ // No idle workers
+ return;
+ }
+
+ job = this._pending_jobs.shift();
+ this._run_in_worker(worker, job);
+};
+
+/*
+ * Run job in the provided worker.
+ *
+ * @param {WorkerPoolChild} worker Worker in which the job will run.
+ * @param {WorkerJob} job Job to run.
+ */
+WorkerPool.prototype._run_in_worker = function(worker, job) {
+ var self = this;
+ var callback, msg, timeout, timeout_id;
+
+ callback = job.callback;
+ timeout = job.options.timeout || KILL_ACTIVE_WORKER_TIMEOUT;
+ msg = job.msg;
+
+ // Pass worker handle to the caller
+ callback(null, worker);
+
+ timeout_id = this._add_kill_timeout(worker, timeout);
+ worker._timeout_id = timeout_id;
+
+ // Free a worker after it has finished processing
+ worker.on('result', function() {
+ self._free_worker(worker);
+ });
+
+ // Send worker a message
+ worker.postMessage(msg);
+};
+
+/*
+ * Add a timeout after which the worker is killed if it has not finished processing.
+ *
+ * @param {Object} worker Worker object.
+ * @param {Number} timeout Timeout (in milliseconds).
+ */
+WorkerPool.prototype._add_kill_timeout = function(worker, timeout) {
+ var timeout_id;
+ var self = this;
+
+ // Automatically kill the worker if it has not finished processing after
+ // timeout number of milliseconds.
+ // This prevents bugs in a worker script to exhaust all the active workers.
+ timeout_id = setTimeout(function() {
+ worker.emit('timeout');
+ self._kill_worker(worker);
+ }, timeout);
+
+ return timeout_id;
+};
+
+/*
+ * Add worker back to the available workers.
+ * This is called after the worker has finished processing.
+ *
+ * @param {Object} worker Worker object.
+ */
+WorkerPool.prototype._free_worker = function(worker) {
+ var index = this._active_workers.indexOf(worker);
+
+ this._active_workers.splice(index, 1);
+ this._idle_workers.push(worker);
+
+ if (worker._timeout_id) {
+ // Clear the timeout
+ clearTimeout(worker._timeout_id);
+ worker._timeout_id = null;
+ }
+
+ // Remove all the listeners
+ worker.removeAllListeners('result');
+ worker.removeAllListeners('error');
+ worker.removeAllListeners('timeout');
+
+ this.emit('worker_freed', worker);
+};
+
+/*
+ * Remove a worker from the pool.
+ *
+ * @param {Worker} Worker object.
+ */
+WorkerPool.prototype._remove_worker = function(worker) {
+ var index_workers = this._workers.indexOf(worker);
+ var index_active = this._active_workers.indexOf(worker);
+ var index_idle = this._idle_workers.indexOf(worker);
+
+ if (index_workers === -1) {
+ // This worker has probably already been removed from the pool.
+ return;
+ }
+
+ this._workers.splice(index_workers, 1);
+
+ if (index_active !== -1) {
+ this._active_workers.splice(index_active, 1);
+ }
+
+ if (index_idle !== -1) {
+ this._idle_workers.splice(index_idle, 1);
+ }
+
+ // Remove all the listeners
+ worker.removeAllListeners('result');
+ worker.removeAllListeners('error');
+ worker.removeAllListeners('timeout');
+
+ this.emit('worker_removed', worker);
+}
+
+/*
+ * Forcefully kill a worker process.
+ *
+ * @param {WorkerPoolChild} worker Child worker object.
+ */
+WorkerPool.prototype._kill_worker = function(worker) {
+ var self = this;
+
+ var in_active = this._active_workers.indexOf(worker);
+ var in_idle = this._idle_workers.indexOf(worker);
+ var in_pending_removal = this._pending_removal_workers.indexOf(worker);
+
+ // Make sure that the worker is actually still running
+ if (in_active === -1 && in_pending_removal === -1 && in_idle === -1) {
+ return;
+ }
+
+ if (in_pending_removal !== -1) {
+ this._pending_removal_workers.splice(in_pending_removal, 1);
+ }
+
+ if (worker._timeout_id) {
+ // Clear the timeout
+ clearTimeout(worker._timeout_id);
+ worker._timeout_id = null;
+ }
+
+ // Because the worker has been forcefully terminated, _remove_worker function
+ // needs to be called manually.
+ worker.terminate(true);
+
+ this._remove_worker(worker);
+ process.nextTick(function() {
+ self._ensure_pool_size();
+ });
+};
+
+/*
+ * Resize the pool.
+ *
+ * @param {Number} new_size New pool size.
+ */
+WorkerPool.prototype.resize_pool = function(new_size) {
+ if (this._terminated) {
+ return;
+ }
+
+ this._pool_size = new_size;
+ this._ensure_pool_size();
+};
+
+/*
+ * Return worker pool status.
+ *
+ * @return {Boolean} true if the pool has been terminated (stopped), false
+ * otherwise.
+ */
+WorkerPool.prototype.is_terminated = function() {
+ return this._is_terminated;
+};
+
+/*
+ * Kill all the pool workers.
+ *
+ * @param {Boolean} forcefully If forcefully equals false, wait until all the
+ * workers have finished processing before terminating.
+ *
+ * Note: If there are any pending jobs left in the queue, they won't be processed,
+ * even if forcefully equals false.
+*/
+WorkerPool.prototype.terminate = function(forcefully) {
+ var self = this;
+ var pool_size = this._pool_size;
+
+ var forcefully_ = forcefully || false;
+ this._terminated = true;
+ this._pool_size = 0;
+ this._pending_jobs = [];
+
+ clearInterval(this._pending_jobs_timeout_id);
+
+ process.nextTick(function() {
+ self._remove_workers(pool_size, forcefully_);
+ });
+};
+
+function WorkerPoolProcess(eventDest) {
+ worker_lib._WorkerProcess.call(this, eventDest);
+}
+
+sys.inherits(WorkerPoolProcess, worker_lib._WorkerProcess);
+
+/*
+ * Post the result.
+ *
+ * @param {String} result JSON encoded result.
+ */
+WorkerPoolProcess.prototype.postResult = function(result) {
+ sys.print(result+MESSAGE_SPLITTER);
+};
+
+function WorkerPoolNode(options) {
+ var self = this;
+ worker_lib._WorkerNode.call(this, WorkerPoolProcess, {});
+
+ // Add a custom result and uncaughtException handler
+ this.removeAllListeners('message');
+ process.removeAllListeners('uncaughtException');
+
+ this.addListener('result', function(result) {
+ if (self.onresult) {
+ self.onresult(result);
+ }
+ });
+
+ process.addListener('uncaughtException', function (exception) {
+ self.postResult({
+ __exception__: exception
+ });
+ })
+}
+
+sys.inherits(WorkerPoolNode, worker_lib._WorkerNode);
+
+WorkerPoolNode.prototype.postResult = function(payload) {
+ this.impl.postResult(JSON.stringify(payload));
+};
+
+// postMessage function is only available with the normal workers. Pool workers
+// use 'postResult' function.
+//
+// Note: postResult must only be called once after the worker has finished all
+// the processing.
+WorkerPoolNode.prototype.postMessage = undefined;
+WorkerPoolNode.prototype.handleMessage = undefined;
+
+/*
+ * Emit the result event and propagate the result back to the caller.
+ *
+ * @param {String} result JSON encoded result.
+ */
+WorkerPoolChild.prototype.handleResult = function(result) {
+ var obj;
+
+ try {
+ obj = JSON.parse(result);
+ }
+ catch (err) {
+ this.eventDest.emit('error', err);
+
+ return;
+ }
+
+ if (obj.__exception__) {
+ this.eventDest.emit('error', obj.__exception__);
+ }
+ else {
+ this.eventDest.emit('result', obj);
+ }
+};
+
+/*
+ * Handle the worker data.
+ *
+ * @param {Buffer} data Data.
+ */
+WorkerPoolChild.prototype.handleData = function(data) {
+ worker_lib._WorkerChild.prototype.handleData.call(this, data,
+ WorkerPoolChild.prototype.handleResult);
+};
+
+/*
+ * Terminate (kill) a worker process.
+ */
+WorkerPoolChild.prototype.terminate = function() {
+ worker_lib._WorkerChild.prototype.terminate.call(this);
+
+ this.child.kill('SIGTERM');
+};
+
+exports.WorkerPool = WorkerPool;
+
+// This value will only be defined, if this script is run by a worker
+exports.worker = common.get_worker(process.ARGV.slice(), WorkerPoolNode);
10 package.json
@@ -0,0 +1,10 @@
+{
+ "name" : "worker",
+ "version" : "0.2.1",
+ "directories" : {
+ "lib" : "./lib",
+ "example" : "./example",
+ "test" : "./test"
+ },
+ "main" : "./lib/worker"
+}
22 test/common.js
@@ -0,0 +1,22 @@
+var path = require("path");
+
+exports.testDir = path.dirname(__filename);
+exports.fixturesDir = path.join(exports.testDir, "fixtures");
+exports.libDir = path.join(exports.testDir, "../../lib");
+
+require.paths.unshift(exports.libDir);
+
+var assert = require('assert');
+var sys = require("sys");
+
+exports.path = path;
+
+var ok = assert.ok;
+assert.ok = function (bool, msg) {
+ if(bool) {
+ sys.print("OK ")
+ } else {
+ sys.print("NOT OK ")
+ }
+ sys.puts(msg);
+}
117 test/deprecated-tcp-worker.js
@@ -0,0 +1,117 @@
+var common = require("./common");
+var sys = require("./sys");
+var assert = require('assert');
+
+var tcpWorker = require("../lib/tcpworker");
+
+process.ENV["NODE_PATH"] = common.libDir;
+
+function makeWorker (filename, cb) {
+ filename = __dirname+"/fixtures/"+filename;
+ return tcpWorker.makeWorker(filename, 7000, "localhost", cb)
+}
+
+// basic test
+var worker = makeWorker("tcp-worker.js")
+
+worker.onmessage = function (msg) {
+ if (msg.input) {
+ assert.ok(msg.output == msg.input * 3, "We can multiply asyncly");
+ if(msg.input * 3 == 12) {
+ worker.terminate();
+ worker.terminateServer();
+ }
+ }
+};
+
+worker.postMessage({
+ input: 1
+});
+worker.postMessage({
+ input: 2
+});
+worker.postMessage({
+ input: 3
+});
+worker.postMessage({
+ input: 4
+});
+
+// error handling
+setTimeout(function () {
+ setTimeout(function () {
+
+ var w2 = makeWorker("tcp-worker.js");
+ w2.postMessage({
+ error: true
+ });
+ w2.addListener("error", function () {
+ assert.ok(true, "Received expected error via event");
+ w2.terminate();
+ });
+ w2.addListener("message", function () {
+ assert.ok(false, "Wanted an error, but got a message");
+ w2.terminate();
+ });
+
+ var w3 = makeWorker("tcp-worker.js");
+ w3.postMessage({
+ error: true
+ });
+ w3.onerror = function () {
+ assert.ok(true, "Received expected error with onerror");
+ w3.terminate();
+ };
+ w3.addListener("message", function () {
+ assert.ok(false, "Wanted an error, but got a message");
+ w3.terminate();
+ });
+
+ }, 10);
+}, 10);/*
+
+// syntax error handling
+var syntaxError = makeWorker("syntax-error-worker.js");
+var hadSyntaxError = false;
+syntaxError.onerror = function (err) {
+ if(err instanceof SyntaxError) {
+ hadSyntaxError = true;
+ }
+}
+process.addListener("exit", function () {
+ assert.ok(hadSyntaxError, "detected syntax error");
+})
+
+// long running processes
+var fibWorker = makeWorker("fib-worker.js");
+sys.puts("Giving hard work to worker")
+fibWorker.postMessage({
+ fib: 41
+});
+var longRunningReturned = false;
+var interval = setInterval(function () {
+ sys.puts("# working in the background")
+}, 500);
+fibWorker.addListener("message", function (fib) {
+ sys.error("Fib "+fib);
+ longRunningReturned = true;
+ assert.ok(fib === 165580141, "Worker can do long running stuff.")
+ fibWorker.terminate();
+ clearInterval(interval);
+});
+for(var i = 0; i < 100; ++i) {
+ // counting;
+}
+assert.ok(!longRunningReturned, "Can do work while background spins");
+process.addListener("exit", function () {
+ assert.ok(longRunningReturned, "Did long running calculation")
+})
+
+var waitWorker = makeWorker("worker.js");
+waitWorker.postMessage({
+ wait: true
+});
+waitWorker.addListener("message", function () {
+ assert.ok(true, "Worker response can be async.")
+ waitWorker.terminate();
+});*/
22 test/fixtures/fib-worker.js
@@ -0,0 +1,22 @@
+/* TODO: Take this code duplication out when NODE_PATH in env works */
+var path = require("path");
+
+exports.testDir = path.dirname(__filename);
+exports.libDir = path.join(exports.testDir, "../../lib");
+
+require.paths.unshift(exports.libDir);
+
+var worker = require("worker").worker;
+
+worker.addListener("message", function (msg) {
+ if(msg.fib >= 0) {
+ worker.postMessage(fib(msg.fib*1));
+ return;
+ }
+ throw(msg)
+});
+
+
+function fib(n) {
+ return n < 2 ? n : fib(n-1)+fib(n-2);
+}
18 test/fixtures/pool-worker.js
@@ -0,0 +1,18 @@
+var worker = require('../../index').worker_pool.worker;
+
+worker.addListener('message', function(msg) {
+ var result = 0;
+ var return_after = msg.return_after;
+ var throw_error = msg.throw_error;
+ var number = msg.number;
+
+ if (throw_error) {
+ throw new Error('Worker thrown an error');
+ }
+
+ result = number + 10;
+
+ setTimeout(function() {
+ worker.postResult({'result': result});
+ }, return_after);
+});
1 test/fixtures/syntax-error-worker.js
@@ -0,0 +1 @@
+{
30 test/fixtures/tcp-worker.js
@@ -0,0 +1,30 @@
+/* TODO: Take this code duplication out when NODE_PATH in env works */
+var path = require("path");
+
+exports.testDir = path.dirname(__filename);
+exports.libDir = path.join(exports.testDir, "../../lib");
+
+require.paths.unshift(exports.libDir);
+
+var port = parseInt(process.ARGV[2]);
+
+var sys = require("sys");
+var worker = require("tcpworker").startWorker(port);
+
+worker.onmessage = function (msg) {
+ if(msg.wait) {
+ setTimeout(function () {
+ worker.postMessage("Waited")
+ }, 1000)
+ return;
+ }
+
+ if(msg.error) {
+ throw("ErrorMarker");
+ }
+
+ msg.output = msg.input * 3;
+ setTimeout(function () {
+ worker.postMessage(msg);
+ }, 100 * msg.output)
+};
28 test/fixtures/worker.js
@@ -0,0 +1,28 @@
+/* TODO: Take this code duplication out when NODE_PATH in env works */
+var path = require("path");
+
+exports.testDir = path.dirname(__filename);
+exports.libDir = path.join(exports.testDir, "../../lib");
+
+require.paths.unshift(exports.libDir);
+
+var sys = require("sys");
+var worker = require("worker").worker;
+
+worker.onmessage = function (msg) {
+ if(msg.wait) {
+ setTimeout(function () {
+ worker.postMessage("Waited")
+ }, 1000)
+ return;
+ }
+
+ if(msg.error) {
+ throw("ErrorMarker");
+ }
+
+ msg.output = msg.input * 3;
+ setTimeout(function () {
+ worker.postMessage(msg);
+ }, 100 * msg.output)
+};
46 test/test-parallelity.js
@@ -0,0 +1,46 @@
+var common = require("./common");
+var sys = require("sys");
+var assert = require('assert');
+
+var Worker = require("../lib/worker").Worker;
+
+process.ENV["NODE_PATH"] = common.libDir;
+
+function makeWorker (filename) {
+ return new Worker(__dirname+"/fixtures/"+filename);
+}
+
+var results = [0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181,6765,10946,17711,28657,46368,75025,121393,196418,317811,514229,832040,1346269,2178309,3524578,5702887,9227465,14930352,24157817,39088169,63245986,102334155,165580141];
+var all = [];
+
+var count = 0;
+var place = 0;
+function startAndExpect(count) {
+ sys.puts("Starting worker "+count)
+ var worker = makeWorker("fib-worker.js");
+ setTimeout(function () { // let everybody spawn
+ worker.postMessage({
+ fib: count
+ });
+ worker.addListener("message", function (result) {
+ assert.ok(result === results[count], count + "Result is correct: "+result);
+ all[count] = result;
+ sys.puts(count +"th worker came in "+place);
+ place++;
+ worker.terminate();
+ })
+ worker.onerror = function (err) {
+ sys.error(JSON.stringify(err));
+ }
+ }, 1000);
+}
+
+/*
+// gather results
+process.addListener("exit", function () {
+ sys.puts(JSON.stringify(all));
+})*/
+
+for(var i = 40; i >= 0; i--) { // start with the hardest
+ startAndExpect(i);
+}
431 test/test-worker-pool.js
@@ -0,0 +1,431 @@
+var assert = require('assert')
+
+var WorkerPool = require('../index').worker_pool.WorkerPool;
+
+exports['test basic functionallity'] = function(beforeExit) {
+ var n = 0;
+ var worker_pool = new WorkerPool(5, __dirname + '/fixtures/pool-worker.js');
+ var results = [ 11, 12, 13, 14, 15, 16, 17, 18, 19, 20 ];
+ var timeout = 1000;
+
+ setTimeout(function() {
+ worker_pool.terminate();
+ }, 4000);
+
+ worker_pool.on('ready', function() {
+ n++;
+
+ for (var i = 1; i <= 10; i++) {
+ (function(i) {
+ var worker_msg = { 'number': i, 'return_after': 0 };
+
+ worker_pool.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ if (!err) {
+ n++;
+
+ worker.addListener('result', function(result) {
+ n++;
+ assert.ok(result.result == results[i - 1], 'result equal');
+ });
+
+ worker.addListener('error', function(err) {
+ assert.fail('worker ' + i + ' emitted error: ' + err.message);
+ });
+
+ worker.addListener('timeout', function() {
+ assert.fail('emitted timeout');
+ });
+ }
+ else {
+ assert.fail('run_in_pool returned error: ' + err.message)
+ }
+ });
+ }(i));
+ }
+ });
+
+ beforeExit(function() {
+ assert.equal(n, 1 + 10 + 10);
+ });
+};
+
+exports['test many jobs'] = function(beforeExit) {
+ var n = 0;
+ var worker_pool1 = new WorkerPool(1, __dirname + '/fixtures/pool-worker.js');
+ var worker_pool2 = new WorkerPool(20, __dirname + '/fixtures/pool-worker.js');
+
+ var worker_pool1_start, worker_pool1_total;
+ var worker_pool2_start, worker_pool2_total;
+ var timeout = 10000;
+
+ worker_pool1.on('ready', function() {
+ n++;
+ worker_pool1_start = new Date().getTime();
+
+ for (var i = 1; i <= 40; i++) {
+ (function(i) {
+ var worker_msg = { 'number': i, 'return_after': 200 };
+
+ worker_pool1.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ if (!err) {
+ n++;
+
+ worker.addListener('result', function(result) {
+ n++;
+ assert.ok(result.result == (10 + i), 'result equal');
+
+ if (i == 40) {
+ worker_pool1.terminate();
+ worker_pool1_total = (new Date().getTime() - worker_pool1_start);
+ }
+ });
+
+ worker.addListener('error', function(err) {
+ assert.fail('worker ' + i + ' emitted error: ' + err.message);
+ });
+
+ worker.addListener('timeout', function() {
+ assert.fail('emitted timeout');
+ });
+ }
+ else {
+ assert.fail('run_in_pool returned error: ' + err.message)
+ }
+ });
+ }(i));
+ }
+ });
+
+ worker_pool2.on('ready', function() {
+ n++;
+ worker_pool2_start = new Date().getTime();
+
+ for (var i = 1; i <= 40; i++) {
+ (function(i) {
+ var worker_msg = { 'number': i, 'return_after': 200 };
+
+ worker_pool2.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ if (!err) {
+ n++;
+
+ worker.addListener('result', function(result) {
+ n++;
+ assert.ok(result.result == (10 + i), 'result equal');
+
+ if (i == 40) {
+ worker_pool2.terminate();
+ worker_pool2_total = (new Date().getTime() - worker_pool2_start);
+ }
+ });
+
+ worker.addListener('error', function(err) {
+ assert.fail('worker ' + i + ' emitted error: ' + err.message);
+ });
+
+ worker.addListener('timeout', function() {
+ assert.fail('emitted timeout');
+ });
+ }
+ else {
+ assert.fail('run_in_pool returned error: ' + err.message)
+ }
+ });
+ }(i));
+ }
+ });
+
+ beforeExit(function() {
+ assert.equal(n, 2 + 80 + 80);
+ assert.ok(worker_pool2_total < worker_pool1_total);
+ });
+};
+
+exports['test pool resize'] = function(beforeExit) {
+ var n = 0;
+ var worker_pool = new WorkerPool(5, __dirname + '/fixtures/pool-worker.js');
+
+ worker_pool.on('ready', function() {
+ n++;
+
+ assert.equal(5, worker_pool.get_size());
+
+ setTimeout(function() {
+ n++;
+ worker_pool.resize_pool(20);
+
+ assert.equal(20, worker_pool.get_size());
+ }, 1000);
+
+ setTimeout(function() {
+ n++;
+ worker_pool.resize_pool(3);
+
+ assert.equal(3, worker_pool.get_size());
+ }, 4000);
+
+ setTimeout(function() {
+ n++;
+ worker_pool.terminate();
+ }, 6000);
+ });
+
+ beforeExit(function() {
+ assert.equal(n, 4);
+ });
+};
+
+exports['test pool terminate'] = function(beforeExit) {
+ var n = 0;
+ var worker_pool = new WorkerPool(5, __dirname + '/fixtures/pool-worker.js');
+
+ worker_pool.on('ready', function() {
+ n++;
+
+ assert.equal(5, worker_pool.get_size());
+ worker_pool.terminate();
+ assert.equal(0, worker_pool.get_size());
+
+ setTimeout(function() {
+ var timeout = 1000;
+ var worker_msg = { 'number': 1, 'return_after': 0 };
+ worker_pool.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ if (!err) {
+ worker.addListener('result', function(result) {
+ assert.fail('emitted result: ' + result);
+ });
+
+ worker.addListener('error', function(err) {
+ assert.fail('emitted error' + err.message);
+ });
+
+ worker.addListener('timeout', function() {
+ assert.fail('emitted timeout');
+ });
+ }
+ else {
+ n++;
+
+ assert.ok('run_in_pool returned error: ' + err.message);
+ assert.match(err.message, /worker pool has been terminated/i);
+ }
+ });
+ }, 2000);
+
+ setTimeout(function() {
+ n++;
+ worker_pool.resize_pool(20);
+ assert.equal(0, worker_pool.get_size());
+ }, 3000);
+ });
+
+ beforeExit(function() {
+ assert.equal(n, 3);
+ });
+};
+
+exports['test worker timeout'] = function(beforeExit) {
+ var n = 0;
+ var worker_pool = new WorkerPool(5, __dirname + '/fixtures/pool-worker.js');
+
+ worker_pool.on('ready', function() {
+ n++;
+
+ var timeout = 2000;
+ var worker_msg = { 'number': 1, 'return_after': 10000 };
+ worker_pool.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ if (!err) {
+ n++;
+
+ worker.addListener('result', function(result) {
+ assert.fail('emitted result: ' + result);
+ });
+
+ worker.addListener('error', function(err) {
+ assert.fail('emitted error' + err.message);
+ });
+
+ worker.addListener('timeout', function() {
+ n++;
+
+ assert.ok('emitted timeout');
+
+ setTimeout(function() {
+ // Make sure that the pool size is ensured
+ n++;
+ assert.equal(5, worker_pool.get_size());
+
+ worker_pool.terminate();
+ }, 1000);
+ });
+ }
+ else {
+ assert.fail('run_in_pool returned error: ' + err.message)
+ }
+ });
+ });
+
+ beforeExit(function() {
+ assert.equal(n, 4);
+ });
+};
+
+exports['test worker error'] = function(beforeExit) {
+ var n = 0;
+ var worker_pool = new WorkerPool(5, __dirname + '/fixtures/pool-worker.js');
+
+ worker_pool.on('ready', function() {
+ n++;
+
+ var timeout = 10000;
+ var worker_msg = { 'number': 1, 'return_after': 0, 'throw_error': true };
+ worker_pool.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ if (!err) {
+ n++;
+
+ worker.addListener('result', function(result) {
+ assert.fail('emitted result: ' + result);
+ });
+
+ worker.addListener('error', function(err) {
+ n++;
+ assert.ok('emitted error' + err.message);
+ assert.match(err.message, /worker thrown an error/i);
+
+ worker_pool.terminate();
+ });
+
+ worker.addListener('timeout', function() {
+ assert.fail('emitted timeout');
+ });
+ }
+ else {
+ assert.fail('run_in_pool returned error: ' + err.message)
+ }
+ });
+ });
+
+ beforeExit(function() {
+ assert.equal(n, 3);
+ });
+};
+
+exports['test timeout and respawn'] = function(beforeExit) {
+ var n = 0;
+ var worker_pool = new WorkerPool(5, __dirname + '/fixtures/pool-worker.js');
+
+ worker_pool.on('ready', function() {
+ n++;
+
+ var timeout = 200;
+
+ for (var i = 1; i <= 5; i++) {
+ (function(i) {
+ var worker_msg = { 'number': i, 'return_after': 5000, 'throw_error': false };
+ worker_pool.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ if (!err) {
+ n++;
+
+ worker.addListener('result', function(result) {
+ assert.fail('emitted result: ' + result);
+ });
+
+ worker.addListener('error', function(err) {
+ assert.fail('emitted error' + err.message);
+ });
+
+ worker.addListener('timeout', function() {
+ n++;
+
+ assert.ok('emitted timeout');
+ });
+ }
+ else {
+ assert.fail('run_in_pool returned error: ' + err.message)
+ }
+ });
+ }(i));
+ }
+
+ setTimeout(function() {
+ var timeout = 5000;
+ for (var i = 1; i <= 5; i++) {
+ (function(i) {
+ var worker_msg = { 'number': i, 'return_after': 0, 'throw_error': false };
+ worker_pool.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ if (!err) {
+ n++;
+
+ worker.addListener('result', function(result) {
+ n++;
+ assert.ok(result.result == (10 + i), 'result equal');
+
+ if (i == 5) {
+ worker_pool.terminate();
+ }
+ });
+
+ worker.addListener('error', function(err) {
+ assert.fail('emitted error' + err.message);
+ });
+
+ worker.addListener('timeout', function() {
+ assert.fail('emitted timeout');
+ });
+ }
+ else {
+ assert.fail('run_in_pool returned error: ' + err.message)
+ }
+ });
+ }(i));
+ }
+ }, 8000);
+ });
+
+ beforeExit(function() {
+ assert.equal(n, 1 + 10 + 10);
+ });
+};
+
+exports['test job queuing'] = function(beforeExit) {
+ var n = 0;
+ var got_worker_handle_time, ready_time;
+ var worker_pool = new WorkerPool(5, __dirname + '/fixtures/pool-worker.js');
+
+ var timeout = 5000;
+ var worker_msg = { 'number': 1, 'return_after': 200, 'throw_error': false };
+ worker_pool.run_in_pool(worker_msg, { 'timeout': timeout }, function(err, worker) {
+ got_worker_handle_time = new Date().getTime();
+
+ if (!err) {
+ n++;
+
+ worker.addListener('result', function(result) {
+ n++;
+ assert.ok(result.result == 10 + 1, 'result equal');
+
+ worker_pool.terminate();
+ });
+
+ worker.addListener('error', function(err) {
+ assert.fail('emitted error' + err.message);
+ });
+
+ worker.addListener('timeout', function() {
+ assert.fail('emitted timeout');
+ });
+ }
+ else {
+ assert.fail('run_in_pool returned error: ' + err.message)
+ }
+ });
+
+ worker_pool.on('ready', function() {
+ ready_time = new Date().getTime();
+ n++;
+ });
+
+ beforeExit(function() {
+ assert.equal(n, 3);
+ assert.ok(ready_time < got_worker_handle_time);
+ });
+};
115 test/test-worker.js
@@ -0,0 +1,115 @@
+var common = require("./common");
+var sys = require("sys");
+var assert = require('assert');
+
+var Worker = require("../lib/worker").Worker;
+
+process.ENV["NODE_PATH"] = common.libDir;
+
+function makeWorker (filename) {
+ return new Worker(__dirname+"/fixtures/"+filename);
+}
+
+// basic test
+var worker = makeWorker("worker.js");
+
+worker.onmessage = function (msg) {
+ if (msg.input) {
+ assert.ok(msg.output == msg.input * 3, "We can multiply asyncly");
+ if(msg.input * 3 == 12) {
+ worker.terminate();
+ }
+ }
+};
+
+worker.postMessage({
+ input: 1
+});
+worker.postMessage({
+ input: 2
+});
+worker.postMessage({
+ input: 3
+});
+worker.postMessage({
+ input: 4
+});
+
+// error handling
+setTimeout(function () {
+ setTimeout(function () {
+
+ var w2 = makeWorker("worker.js");
+ w2.postMessage({
+ error: true
+ });
+ w2.addListener("error", function () {
+ assert.ok(true, "Received expected error via event");
+ w2.terminate();
+ });
+ w2.addListener("message", function () {
+ assert.ok(false, "Wanted an error, but got a message");
+ w2.terminate();
+ });
+
+ var w3 = makeWorker("worker.js");
+ w3.postMessage({
+ error: true
+ });
+ w3.onerror = function () {
+ assert.ok(true, "Received expected error with onerror");
+ w3.terminate();
+ };
+ w3.addListener("message", function () {
+ assert.ok(false, "Wanted an error, but got a message");
+ w3.terminate();
+ });
+
+ }, 10);
+}, 10);
+
+// syntax error handling
+var syntaxError = makeWorker("syntax-error-worker.js");
+var hadSyntaxError = false;
+syntaxError.onerror = function (err) {
+ if(err instanceof SyntaxError) {
+ hadSyntaxError = true;
+ }
+}
+process.addListener("exit", function () {
+ assert.ok(hadSyntaxError, "detected syntax error");
+})
+
+// long running processes
+var fibWorker = makeWorker("fib-worker.js");
+sys.puts("Giving hard work to worker")
+fibWorker.postMessage({
+ fib: 41
+});
+var longRunningReturned = false;
+var interval = setInterval(function () {
+ sys.puts("# working in the background")
+}, 500);
+fibWorker.addListener("message", function (fib) {
+ sys.error("Fib "+fib);
+ longRunningReturned = true;
+ assert.ok(fib === 165580141, "Worker can do long running stuff.")
+ fibWorker.terminate();
+ clearInterval(interval);
+});
+for(var i = 0; i < 100; ++i) {
+ // counting;
+}
+assert.ok(!longRunningReturned, "Can do work while background spins");
+process.addListener("exit", function () {
+ assert.ok(longRunningReturned, "Did long running calculation")
+})
+
+var waitWorker = makeWorker("worker.js");
+waitWorker.postMessage({
+ wait: true
+});
+waitWorker.addListener("message", function () {
+ assert.ok(true, "Worker response can be async.")
+ waitWorker.terminate();
+});

0 comments on commit e3d8e0f

Please sign in to comment.
Something went wrong with that request. Please try again.