Skip to content

Commit

Permalink
Purge compute job after recording results. Closes #1978 (#1979)
Browse files Browse the repository at this point in the history
* Purge compute job after recording results. Closes #1978

* Replace return with await
  • Loading branch information
brollb committed Nov 11, 2020
1 parent cead1ec commit 387accb
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/common/compute/backends/local/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ define([
return JSON.parse(resultsTxt);
}

async purge (job) {
async purgeJob (job) {
const {hash} = job;
if (hash === this.currentJob) {
throw new Error('Cannot purge running job.');
Expand Down
20 changes: 14 additions & 6 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ define([
ExecuteJobMetadata.call(this);
this.pluginMetadata = pluginMetadata;
this._running = null;
this._computeJobs = {};

// Metadata updating
this.lastAppliedCmd = {};
Expand Down Expand Up @@ -190,20 +191,24 @@ define([

compute.on('end',
async (id/*, info*/) => {
const computeJob = this._computeJobs[id];
try {
const job = this.getNodeForJobId(id);
if (job === null) {
assert(
this.canceled,
`Cannot find node for job ID in running pipeline: ${id}`
);
await compute.purgeJob(computeJob);
return;
}
this.cleanJobHashInfo(id);
await this.onOperationEnd(null, job);
await compute.purgeJob(computeJob);
} catch (err) {
this.logger.error(`Error when processing operation end: ${err}`);
await this.save('Saving remaining edits before pipeline exits w/ error.');
await compute.purgeJob(computeJob);
throw err;
}
}
Expand Down Expand Up @@ -534,6 +539,7 @@ define([
this.startExecHeartBeat();
}

this._computeJobs[jobInfo.hash] = computeJob;
return await this.recordJobOrigin(jobInfo.hash, job);
};

Expand Down Expand Up @@ -642,9 +648,11 @@ define([

ExecuteJob.prototype.onOperationEnd = async function (err, job) {
if (err) {
return this.onOperationFail(job, err);
await this.onOperationFail(job, err);
return;
} else if (this.isLocalOperation(job)) {
return this.onOperationComplete(job);
await this.onOperationComplete(job);
return;
}

const op = await this.getOperation(job);
Expand All @@ -660,7 +668,7 @@ define([
this.logger.debug(`"${name}" has been CANCELED!`);
const stdout = await this.logManager.getLog(jobId);
this.core.setAttribute(job, 'stdout', stdout);
return this.onOperationCanceled(op);
await this.onOperationCanceled(op);
}

if (status === this.compute.SUCCESS || status === this.compute.FAILED) {
Expand All @@ -678,9 +686,9 @@ define([
// Parse the most precise error and present it in the toast...
const lastline = result.stdout.split('\n').filter(l => !!l).pop() || '';
if (lastline.includes('Error')) {
this.onOperationFail(op, lastline);
await this.onOperationFail(op, lastline);
} else {
this.onOperationFail(op, `Operation "${opName}" failed!`);
await this.onOperationFail(op, `Operation "${opName}" failed!`);
}
}
} else { // something bad happened...
Expand All @@ -689,7 +697,7 @@ define([

this.core.setAttribute(job, 'stdout', consoleErr);
this.logger.error(err);
return this.onOperationFail(op, err);
await this.onOperationFail(op, err);
}
};

Expand Down

0 comments on commit 387accb

Please sign in to comment.