Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Potential fix for issue 16

  • Loading branch information...
commit 7aebd59cff24d03a94640e598d08dff03ee85ab6 1 parent c29e15b
@chriso authored
Showing with 19 additions and 24 deletions.
  1. +19 −24 lib/node.io/process_master.js
View
43 lib/node.io/process_master.js
@@ -57,8 +57,9 @@ Processor.prototype.handleWorkerMessage = function (data) {
Processor.prototype.setupMasterEvents = function (job, workers) {
var self = this,
master = job.master,
- worker_count = workers.length;
-
+ worker_count = workers.length,
+ completeCheckInterval;
+
//Provide a method to check if workers are complete
var areWorkersComplete = function () {
if (job.pull_requests < worker_count) {
@@ -71,31 +72,31 @@ Processor.prototype.setupMasterEvents = function (job, workers) {
}
return true;
};
-
+
master.on('start', function () {
if (worker_count > 0) {
-
+
self.status('Running ' + worker_count + ' workers..', 'debug');
-
+
//Each worker is initially idle (complete) and requesting input
job.pull_requests = worker_count;
-
+
//Tell each worker to load the job - send `i` so that the worker
//can identify itself later
for (var i = 0; i < worker_count; i++) {
workers[i].send(['load', job.job_name, self.options, i]);
}
-
+
} else {
self.status('Running 1 worker..', 'debug');
}
-
+
//Pull the initial input
master.emit('pullInput');
});
-
+
master.on('pullInput', function (for_worker) {
-
+
//Determine how much input we need to pull
var pull = job.options.max * job.options.take;
if (worker_count > 0) {
@@ -105,37 +106,31 @@ Processor.prototype.setupMasterEvents = function (job, workers) {
pull = pull * worker_count;
}
}
-
+
//Handle input limits when the `input` op is set
if (job.options.input && (job.input_offset + pull) > job.options.input) {
pull = Math.max(job.options.input - job.input_offset, 0);
}
-
+
//Callback for when input is received from job.input()
var handle_input = function (input) {
if (typeof input !== 'undefined' && input !== null && input !== false) {
master.emit('input', input, for_worker);
} else {
-
+
//No input? We might be done..
-
- var completeCheckInterval;
-
+
var isComplete = function () {
//Check if any input was added dynamically
if (job.input.length > 0) {
job.is_complete = false;
- //self.spawnInstance(job_name);
- if (completeCheckInterval) {
- clearInterval(completeCheckInterval);
- }
return false;
}
-
+
//Wait for workers or instances that are still working
return worker_count > 0 ? areWorkersComplete() : job.instances <= 0;
};
-
+
//If we're not complete, check periodically
if (isComplete()) {
master.emit('complete');
@@ -149,9 +144,9 @@ Processor.prototype.setupMasterEvents = function (job, workers) {
}
}
};
-
+
if (pull > 0) {
-
+
//Incr the input offset
var offset = job.input_offset;
job.input_offset += pull;
Please sign in to comment.
Something went wrong with that request. Please try again.