Skip to content

Commit

Permalink
Refactored code gen out of ExecuteJob. Fixes #1014 (#1018)
Browse files Browse the repository at this point in the history
* WIP created GenerateJob plugin for generating exec code

* WIP moved templates to GenerateJob

* WIP #1014 GenerateJob tests passing

* WIP #1014 Added attribute file test

* WIP #1014 Added more GenerateJob tests

* WIP #1014 removed extra logs

* WIP #1014 Follow base for plugin, namespace

* WIP #1014 Added CodeGen component settings

* WIP #1014 Removed component settings

* WIP #1014 Fixed the loading of inputs

* WIP #1014 Fixed namespace detection

* WIP #1014 Fixed code gen of job refs

* WIP #1014 Fixed 2nd degree code gen

* WIP #1014 Removed old code generation file

* WIP #1014 Fixed code linting issues

* WIP #1014 Fixed blob fetch error handling

* WIP #1014 fixed error handling for bad blob fetch

* WIP #1014 Updated the commit hash for the ptr code gen

* WIP #1014 save after forwarding data from the successful job

* WIP #1014 updated error msg on blob retrieval fail

* WIP #1014 fixed code linting issues
  • Loading branch information
brollb committed Apr 27, 2017
1 parent ff541be commit 73f7ba3
Show file tree
Hide file tree
Showing 16 changed files with 563 additions and 263 deletions.
35 changes: 35 additions & 0 deletions src/common/plugin/Operation.js
@@ -0,0 +1,35 @@
/*globals define */
// This is a mixin containing helpers for working with operation nodes
define([],function() {

var OperationOps = function() {
};

OperationOps.prototype.getOutputs = function (node) {
return this.getOperationData(node, this.META.Outputs);
};

OperationOps.prototype.getInputs = function (node) {
return this.getOperationData(node, this.META.Inputs);
};

OperationOps.prototype.getOperationData = function (node, metaType) {
// Load the children and the output's children
return this.core.loadChildren(node)
.then(containers => {
var outputs = containers.find(c => this.core.isTypeOf(c, metaType));
return outputs ? this.core.loadChildren(outputs) : [];
})
.then(outputs => {
var bases = outputs.map(node => this.core.getMetaType(node));
// return [[arg1, Type1, node1], [arg2, Type2, node2]]
return outputs.map((node, i) => [
this.getAttribute(node, 'name'),
this.getAttribute(bases[i], 'name'),
node
]);
});
};

return OperationOps;
});
64 changes: 51 additions & 13 deletions src/common/plugin/PtrCodeGen.js
Expand Up @@ -6,27 +6,64 @@ define([
PluginUtils,
Q
) {

var CodeGen = {
Operation: {
pluginId: 'GenerateJob',
namespace: 'pipeline'
}
};

var PtrCodeGen = function() {
};

PtrCodeGen.prototype.getCodeGenPluginIdFor = function(node) {
var base = this.core.getBase(node),
name = this.core.getAttribute(node, 'name'),
namespace = this.core.getNamespace(node),
pluginId;

//this.logger.debug(`loaded pointer target of ${ptrId}: ${ptrNode}`);
pluginId = (this.core.getOwnRegistry(node, 'validPlugins') || '').split(' ').shift();
//this.logger.info(`generating code for ${this.core.getAttribute(ptrNode, 'name')} using ${pluginId}`);

if (this.core.isMetaNode(node) && CodeGen[name]) {
pluginId = CodeGen[name].pluginId || CodeGen[name];
namespace = CodeGen[name].namespace;
}

if (pluginId) {
return {
namespace: namespace,
pluginId: pluginId
};
} else if (base) {
return this.getCodeGenPluginIdFor(base);
} else {
return null;
}
};

PtrCodeGen.prototype.getPtrCodeHash = function(ptrId) {
return this.core.loadByPath(this.rootNode, ptrId)
.then(ptrNode => {
// Look up the plugin to use
var metanode = this.core.getMetaType(ptrNode),
pluginId;

this.logger.debug(`loaded pointer target of ${ptrId}: ${ptrNode}`);
pluginId = this.core.getRegistry(ptrNode, 'validPlugins').split(' ').shift();
this.logger.info(`generating code for ${this.core.getAttribute(ptrNode, 'name')} using ${pluginId}`);
var genInfo = this.getCodeGenPluginIdFor(ptrNode);

var context = {
namespace: this.core.getNamespace(metanode),
activeNode: this.core.getPath(ptrNode)
};
if (genInfo.pluginId) {
var context = {
namespace: genInfo.namespace,
activeNode: this.core.getPath(ptrNode)
};

// Load and run the plugin
return this.executePlugin(pluginId, context);
// Load and run the plugin
return this.executePlugin(genInfo.pluginId, context);
} else {
var metanode = this.core.getMetaType(ptrNode),
type = this.core.getAttribute(metanode, 'name');
this.logger.warn(`Could not find plugin for ${type}. Will try to proceed anyway`);
return null;
}
})
.then(hashes => hashes[0]); // Grab the first asset for now
};
Expand Down Expand Up @@ -56,12 +93,13 @@ define([
return PluginUtils.loadNodesAtCommitHash(
this.project,
this.core,
this.commitHash,
this.currentHash,
this.logger,
opts
).then(config => {
plugin.initialize(logger, this.blobClient, this.gmeConfig);
config.core = this.core;
config.project = this.project;
plugin.configure(config);
return plugin;
});
Expand Down
184 changes: 24 additions & 160 deletions src/plugins/ExecuteJob/ExecuteJob.js
Expand Up @@ -8,10 +8,10 @@ define([
'plugin/PluginBase',
'deepforge/plugin/LocalExecutor',
'deepforge/plugin/PtrCodeGen',
'deepforge/plugin/Operation',
'deepforge/api/JobLogsClient',
'deepforge/api/JobOriginClient',
'deepforge/api/ExecPulseClient',
'./ExecuteJob.Files',
'./ExecuteJob.Metadata',
'./ExecuteJob.SafeSave',
'deepforge/Constants',
Expand All @@ -26,11 +26,11 @@ define([
PluginBase,
LocalExecutor, // DeepForge operation primitives
PtrCodeGen,
OperationPlugin,
JobLogsClient,
JobOriginClient,
ExecPulseClient,

ExecuteJobFiles,
ExecuteJobMetadata,
ExecuteJobSafeSave,

Expand All @@ -44,8 +44,7 @@ define([

pluginMetadata = JSON.parse(pluginMetadata);

var OUTPUT_INTERVAL = 1500,
STDOUT_FILE = 'job_stdout.txt';
var STDOUT_FILE = 'job_stdout.txt';

/**
* Initializes a new instance of ExecuteJob.
Expand Down Expand Up @@ -453,9 +452,10 @@ define([
children.find(child => this.isMetaTypeOf(child, this.META.Operation)));
};

ExecuteJob.prototype.onBlobRetrievalFail = function (node, input, err) {
// Handle the blob retrieval failed error
ExecuteJob.prototype.onBlobRetrievalFail = function (node, input) {
var job = this.core.getParent(node),
e = `Failed to retrieve "${input}" (${err})`,
e = `Failed to retrieve "${input}" (BLOB_FETCH_FAILED)`,
consoleErr = `Failed to execute operation: ${e}`;

consoleErr += [
Expand All @@ -473,141 +473,31 @@ define([

ExecuteJob.prototype.executeJob = function (job) {
return this.getOperation(job).then(node => {
var jobId = this.core.getPath(job),
name = this.getAttribute(node, 'name'),
localTypeId = this.getLocalOperationType(node),
artifact,
artifactName,
files,
data = {},
inputs;
var name = this.getAttribute(node, 'name'),
localTypeId = this.getLocalOperationType(node);

// Execute any special operation types here - not on an executor
this.logger.debug(`Executing operation "${name}"`);
if (localTypeId !== null) {
return this.executeLocalOperation(localTypeId, node);
} else {
// Generate all execution files
return this.createOperationFiles(node).then(results => {
this.logger.info('Created operation files!');
files = results;
artifactName = `${name}_${jobId.replace(/\//g, '_')}-execution-files`;
artifact = this.blobClient.createArtifact(artifactName);

// Add the input assets
// - get the metadata (name)
// - add the given inputs
inputs = Object.keys(files.inputAssets);

return Q.all(
inputs.map(input => { // Get the metadata for each input
var hash = files.inputAssets[input];

// data asset for "input"
return this.blobClient.getMetadata(hash)
.fail(err => this.onBlobRetrievalFail(job, input, err));
})
);
})
.then(mds => {
// Record the large files
var inputData = {},
runsh = '# Bash script to download data files and run job\n' +
'if [ -z "$DEEPFORGE_URL" ]; then\n echo "Please set DEEPFORGE_URL and' +
' re-run:"\n echo "" \n echo " DEEPFORGE_URL=http://my.' +
'deepforge.server.com:8080 bash run.sh"\n echo ""\n exit 1\nfi\n';

mds.forEach((metadata, i) => {
// add the hashes for each input
var input = inputs[i],
hash = files.inputAssets[input],
dataPath = 'inputs/' + input + '/data',
url = this.blobClient.getRelativeDownloadURL(hash);

inputData[dataPath] = {
req: hash,
cache: metadata.content
};

// Add to the run.sh file
runsh += `wget $DEEPFORGE_URL${url} -O ${dataPath}\n`;
});

delete files.inputAssets;
files['input-data.json'] = JSON.stringify(inputData, null, 2);
runsh += 'th init.lua';
files['run.sh'] = runsh;

// Add pointer assets
Object.keys(files.ptrAssets)
.forEach(path => data[path] = files.ptrAssets[path]);

// Add the executor config
return this.getOutputs(node);
})
.then(outputArgs => {
var config,
outputs,
fileList,
ptrFiles = Object.keys(files.ptrAssets),
file;

delete files.ptrAssets;
fileList = Object.keys(files).concat(ptrFiles);

outputs = outputArgs.map(pair => pair[0])
.map(name => {
return {
name: name,
resultPatterns: [`outputs/${name}`]
};
});

outputs.push(
{
name: 'stdout',
resultPatterns: [STDOUT_FILE]
},
{
name: name + '-all-files',
resultPatterns: fileList
return this.getPtrCodeHash(this.core.getPath(node))
.fail(err => {
this.logger.error(`Could not generate files: ${err}`);
if (err.message.indexOf('BLOB_FETCH_FAILED') > -1) {
this.onBlobRetrievalFail(node, err.message.split(':')[1]);
}
);

config = {
cmd: 'node',
args: ['start.js'],
outputInterval: OUTPUT_INTERVAL,
resultArtifacts: outputs
};
files['executor_config.json'] = JSON.stringify(config, null, 4);

// Save the artifact
// Remove empty hashes
for (file in data) {
if (!data[file]) {
this.logger.warn(`Empty data hash has been found for file "${file}". Removing it...`);
delete data[file];
}
}
return artifact.addObjectHashes(data);
})
.then(() => {
this.logger.info(`Added ptr/input data hashes for "${artifactName}"`);
return artifact.addFiles(files);
})
.then(() => {
this.logger.info(`Added execution files for "${artifactName}"`);
return artifact.save();
})
.then(hash => {
this.logger.info(`Saved execution files "${artifactName}"`);
this.result.addArtifact(hash); // Probably only need this for debugging...
this.executeDistOperation(job, node, hash);
})
.fail(e => {
this.onOperationFail(node, `Distributed operation "${name}" failed ${e}`);
});
throw err;
})
.then(hash => {
this.logger.info(`Saved execution files`);
this.result.addArtifact(hash); // Probably only need this for debugging...
this.executeDistOperation(job, node, hash);
})
.fail(e => {
this.onOperationFail(node, `Distributed operation "${name}" failed ${e}`);
});
}
});
};
Expand Down Expand Up @@ -881,32 +771,6 @@ define([
.fail(e => this.onOperationFail(node, `Operation ${nodeId} failed: ${e}`));
};

ExecuteJob.prototype.getOutputs = function (node) {
return this.getOperationData(node, this.META.Outputs);
};

ExecuteJob.prototype.getInputs = function (node) {
return this.getOperationData(node, this.META.Inputs);
};

ExecuteJob.prototype.getOperationData = function (node, metaType) {
// Load the children and the output's children
return this.core.loadChildren(node)
.then(containers => {
var outputs = containers.find(c => this.core.isTypeOf(c, metaType));
return outputs ? this.core.loadChildren(outputs) : [];
})
.then(outputs => {
var bases = outputs.map(node => this.core.getMetaType(node));
// return [[arg1, Type1, node1], [arg2, Type2, node2]]
return outputs.map((node, i) => [
this.getAttribute(node, 'name'),
this.getAttribute(bases[i], 'name'),
node
]);
});
};

//////////////////////////// Special Operations ////////////////////////////
ExecuteJob.prototype.executeLocalOperation = function (type, node) {
// Retrieve the given LOCAL_OP type
Expand All @@ -920,7 +784,7 @@ define([

_.extend(
ExecuteJob.prototype,
ExecuteJobFiles.prototype,
OperationPlugin.prototype,
ExecuteJobMetadata.prototype,
ExecuteJobSafeSave.prototype,
PtrCodeGen.prototype,
Expand Down

0 comments on commit 73f7ba3

Please sign in to comment.