Skip to content

Commit

Permalink
Throw an error if no worker machines available on execute. Closes #1054
Browse files Browse the repository at this point in the history
… (#1055)

* WIP #1054 Added execution env check to plugins

* WIP #1054 added check for execution workers on execute plugin start

* WIP #1054 minor adjustment to log msg

* WIP #1054 updated webgme-fab dep

* WIP #1054 updated tests
  • Loading branch information
brollb committed Jan 18, 2018
1 parent e41b733 commit 8777238
Show file tree
Hide file tree
Showing 8 changed files with 4,574 additions and 1,694 deletions.
6,161 changes: 4,495 additions & 1,666 deletions npm-shrinkwrap.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"webgme-chflayout": "^2.0.0",
"webgme-easydag": "dfst/webgme-easydag",
"webgme-executor-worker": "^1.0.1",
"webgme-fab": "dfst/webgme-fab",
"webgme-fab": "github:dfst/webgme-fab",
"webgme-simple-nodes": "^2.1.0"
},
"devDependencies": {
Expand Down
48 changes: 48 additions & 0 deletions src/common/ExecutionEnv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/* globals define */
define([
'superagent',
'q'
], function(
superagent,
Q
) {
const WORKER_ENDPOINT = '/rest/executor/worker';
const JOBS_ENDPOINT = '/rest/executor';
const values = dict => Object.keys(dict).map(k => dict[k]);

const ExecutionEnv = {};

ExecutionEnv.url = function(urlPath) {
if (typeof window === 'undefined') {
let gmeConfig = require('../../config');
return `http://127.0.0.1:${gmeConfig.server.port}${urlPath}`;
}
return urlPath;
};

ExecutionEnv.get = function(urlPath) {
const deferred = Q.defer();
const url = this.url(urlPath);

superagent.get(url)
.end((err, res) => {
if (err) {
return deferred.reject(err);
}
deferred.resolve(JSON.parse(res.text));
});

return deferred.promise;
};

ExecutionEnv.getWorkers = function() {
return this.get(WORKER_ENDPOINT)
.then(workerDict => values(workerDict));
};

ExecutionEnv.getJobs = function() {
return this.get(JOBS_ENDPOINT);
};

return ExecutionEnv;
});
19 changes: 17 additions & 2 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ define([
'text!./metadata.json',
'executor/ExecutorClient',
'plugin/PluginBase',
'deepforge/ExecutionEnv',
'deepforge/plugin/LocalExecutor',
'deepforge/plugin/PtrCodeGen',
'deepforge/plugin/Operation',
Expand All @@ -24,6 +25,7 @@ define([
pluginMetadata,
ExecutorClient,
PluginBase,
ExecutionEnv,
LocalExecutor, // DeepForge operation primitives
PtrCodeGen,
OperationPlugin,
Expand Down Expand Up @@ -142,7 +144,8 @@ define([
this._callback = callback;
this.currentForkName = null;
this.forkNameBase = this.getAttribute(this.activeNode, 'name');
this.isResuming(this.activeNode)
this.checkExecutionEnv()
.then(() => this.isResuming(this.activeNode))
.then(resuming => {
this._resumed = resuming;
return this.prepare(resuming);
Expand All @@ -166,7 +169,19 @@ define([
return this.executeJob(this.activeNode);
}
})
.catch(err => this._callback(err));
.catch(err => this._callback(err, this.result));
};

ExecuteJob.prototype.checkExecutionEnv = function () {
// Throw an exception if no resources
this.logger.info(`Checking execution environment`);
return ExecutionEnv.getWorkers()
.then(workers => {
if (workers.length === 0) {
this.logger.info(`Cannot execute job(s): No connected workers`);
throw new Error('No connected workers');
}
});
};

ExecuteJob.prototype.isResuming = function (job) {
Expand Down
11 changes: 7 additions & 4 deletions src/plugins/ExecutePipeline/ExecutePipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,20 @@ define([
* @param {function(string, plugin.PluginResult)} callback - the result callback
*/
ExecutePipeline.prototype.main = function (callback) {
var startPromise,
var startPromise = this.checkExecutionEnv(),
runId;

this.initRun();
if (this.core.isTypeOf(this.activeNode, this.META.Pipeline)) {
// If starting with a pipeline, we will create an Execution first
startPromise = this.createExecution(this.activeNode)
startPromise = startPromise
.then(() => this.createExecution(this.activeNode))
.then(execNode => {
this.logger.debug(`Finished creating execution "${this.getAttribute(execNode, 'name')}"`);
this.activeNode = execNode;
});
} else if (this.core.isTypeOf(this.activeNode, this.META.Execution)) {
this.logger.debug('Restarting execution');
startPromise = Q();
} else {
return callback('Current node is not a Pipeline or Execution!', this.result);
}
Expand Down Expand Up @@ -149,7 +149,10 @@ define([
});

})
.fail(e => this.logger.error(e));
.fail(err => {
this.logger.error(err);
callback(err, this.result);
});
};

ExecutePipeline.prototype.isResuming = function () {
Expand Down
25 changes: 4 additions & 21 deletions src/visualizers/panels/WorkerHeader/WorkerDialog.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/* globals define, $ */
define([
'deepforge/ExecutionEnv',
'q',
'superagent',
'deepforge/viz/Utils',
'deepforge/api/JobOriginClient',
'text!./WorkerModal.html',
'text!./WorkerTemplate.html.ejs',
'text!./WorkerJobItem.html',
'css!./WorkerModal.css'
], function(
ExecutionEnv,
Q,
superagent,
utils,
JobOriginClient,
WorkerHtml,
Expand All @@ -19,9 +19,6 @@ define([
) {
'use strict';

var WORKER_ENDPOINT = '/rest/executor/worker',
JOBS_ENDPOINT = '/rest/executor';

var WorkerDialog = function(logger) {
this.workerDict = {};
this.workers = {};
Expand Down Expand Up @@ -53,25 +50,11 @@ define([
this.initialize();
};

WorkerDialog.prototype.get = function(url) {
var deferred = Q.defer();

superagent.get(url)
.end((err, res) => {
if (err) {
return deferred.reject(err);
}
deferred.resolve(JSON.parse(res.text));
});

return deferred.promise;
};

WorkerDialog.prototype.update = function() {
// Poll the workers
return Q.all([
this.get(WORKER_ENDPOINT).then(workers => this.updateWorkers(workers)),
this.get(JOBS_ENDPOINT).then(jobs => this.updateJobs(jobs))
ExecutionEnv.getWorkers().then(workers => this.updateWorkers(workers)),
ExecutionEnv.getJobs().then(jobs => this.updateJobs(jobs))
]).then(() => {
if (this.active) {
setTimeout(this.update.bind(this), 1000);
Expand Down
1 change: 1 addition & 0 deletions test/plugins/ExecuteJob/ExecuteJob.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ describe('ExecuteJob', function () {
return manager.initializePlugin(pluginName)
.then(plugin_ => {
plugin = plugin_;
plugin.checkExecutionEnv = () => Q();
return manager.configurePlugin(plugin, {}, context);
})
.then(() => node = plugin.activeNode)
Expand Down
1 change: 1 addition & 0 deletions test/plugins/ExecutePipeline/ExecutePipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ describe('ExecutePipeline', function () {
return manager.initializePlugin(pluginName)
.then(plugin_ => {
plugin = plugin_;
plugin.checkExecutionEnv = () => Q();
plugin.startExecHeartBeat = () => {};
return manager.configurePlugin(plugin, {}, context);
})
Expand Down

0 comments on commit 8777238

Please sign in to comment.