Skip to content

Commit

Permalink
Add saveArtifact support to interactive session. Closes #1902 (#1903)
Browse files Browse the repository at this point in the history
  • Loading branch information
brollb committed Sep 11, 2020
1 parent bd96716 commit 4b0d8b4
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/common/compute/interactive/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
}
}(this, function() {
const Constants = makeEnum('STDOUT', 'STDERR', 'RUN', 'ADD_ARTIFACT', 'KILL',
'ADD_FILE', 'REMOVE_FILE', 'ADD_USER_DATA', 'COMPLETE', 'ERROR', 'SET_ENV');
'ADD_FILE', 'REMOVE_FILE', 'ADD_USER_DATA', 'COMPLETE', 'ERROR', 'SET_ENV',
'SAVE_ARTIFACT');

function makeEnum() {
const names = Array.prototype.slice.call(arguments);
Expand Down
6 changes: 3 additions & 3 deletions src/common/compute/interactive/session-with-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ define([

async runTask(task) {
const queuedTask = this.queueTask(task);
await queuedTask.promise;
return await queuedTask.promise;
}

queueTask(task) {
Expand All @@ -46,9 +46,9 @@ define([

async runNextTask() {
const queuedTask = this.tasks[0];
await super.runTask(queuedTask.unwrap());
const result = await super.runTask(queuedTask.unwrap());
this.tasks.shift();
queuedTask.resolve();
queuedTask.resolve(result);
this.checkTaskQueue();
}

Expand Down
15 changes: 14 additions & 1 deletion src/common/compute/interactive/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ define([
this.ensureIdle('run task');

this.currentTask = task;
await task.run();
const result = await task.run();
this.currentTask = null;
this.checkReady();
return result;
}

async whenConnected() {
Expand Down Expand Up @@ -128,12 +129,24 @@ define([
}

async addArtifact(name, dataInfo, type, auth) {
auth = auth || {};
this.ensureIdle('add artifact');
const msg = new Message(Message.ADD_ARTIFACT, [name, dataInfo, type, auth]);
const task = new Task(this.channel, msg);
await this.runTask(task);
}

async saveArtifact(/*path, name, storageId, config*/) {
this.ensureIdle('save artifact');
const msg = new Message(Message.SAVE_ARTIFACT, [...arguments]);
const task = new Task(this.channel, msg);
const [exitCode, dataInfo] = await this.runTask(task);
if (exitCode) {
throw new CommandFailedError('saveArtifact', {exitCode});
}
return dataInfo;
}

async addFile(filepath, content) {
this.ensureIdle('add file');
const msg = new Message(Message.ADD_FILE, [filepath, content]);
Expand Down
8 changes: 6 additions & 2 deletions src/common/compute/interactive/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ define([
this.emitMessage(msg);
if (msg.type === Message.COMPLETE) {
this.channel.unlisten(handler);
deferred.resolve();
deferred.resolve(msg.data);
}
};
this.channel.listen(handler);
Expand All @@ -37,7 +37,11 @@ define([
}

emitMessage(msg) {
this.emit(msg.type, msg.data);
if (msg.type === Message.COMPLETE) {
this.emit(msg.type, ...msg.data);
} else {
this.emit(msg.type, msg.data);
}
}

static async getMessageData(wsMsg) {
Expand Down
33 changes: 25 additions & 8 deletions src/routers/InteractiveCompute/job-files/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class InteractiveClient {
if (msg.type === Message.RUN) {
const [cmd, ...opts] = InteractiveClient.parseCommand(msg.data);
this.subprocess = spawn(cmd, opts);
this.subprocess.on('exit', code => this.sendMessage(Message.COMPLETE, code));
this.subprocess.on('exit', code => this.sendMessage(Message.COMPLETE, [code]));
this.subprocess.stdout.on('data', data => this.sendMessage(Message.STDOUT, data));
this.subprocess.stderr.on('data', data => this.sendMessage(Message.STDERR, data));
} else if (msg.type === Message.KILL) {
Expand All @@ -46,15 +46,31 @@ class InteractiveClient {
Utils,
) => {
const {Storage} = Utils;

async function saveArtifact() {
const client = await Storage.getClient(dataInfo.backend, null, config);
const fetchArtifact = async () => {
const client = await Storage.getClient(dataInfo.backend, undefined, config);
const dataPath = path.join(...dirs.concat('data'));
const stream = await client.getFileStream(dataInfo);
await pipeline(stream, fs.createWriteStream(dataPath));
const filePath = path.join(...dirs.concat('__init__.py'));
await fsp.writeFile(filePath, initFile(name, type));
}
};

this.runTask(fetchArtifact);
});
} else if (msg.type === Message.SAVE_ARTIFACT) {
const [filepath, name, backend, config={}] = msg.data;
requirejs([
'./utils.build',
], (
Utils,
) => {
const {Storage} = Utils;
const saveArtifact = async () => {
const client = await Storage.getClient(backend, null, config);
const stream = await fs.createReadStream(filepath);
const dataInfo = await client.putFileStream(name, stream);
return dataInfo;
};

this.runTask(saveArtifact);
});
Expand All @@ -76,7 +92,7 @@ class InteractiveClient {
await fsp.unlink(filepath);
});
} else {
this.sendMessage(Message.COMPLETE, 2);
this.sendMessage(Message.COMPLETE, [2]);
}
}

Expand All @@ -99,13 +115,14 @@ class InteractiveClient {

async runTask(fn) {
let exitCode = 0;
let result;
try {
await fn();
result = await fn();
} catch (err) {
exitCode = 1;
console.log('Task failed with error:', err);
}
this.sendMessage(Message.COMPLETE, exitCode);
this.sendMessage(Message.COMPLETE, [exitCode, result]);
}

static parseCommand(cmd) {
Expand Down
7 changes: 7 additions & 0 deletions test/integration/InteractiveCompute.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ describe('InteractiveCompute', function() {
sleep(100).then(() => session.kill(task));
});

it('should save artifacts', async function() {
await session.exec('node -e \'fs.writeFileSync("test.txt", "hi")\'');
const dataInfo = await session.saveArtifact('test.txt', 'test', 'gme');
assert.equal(dataInfo.backend, 'gme');
assert(dataInfo.data);
});

function sleep(duration) {
return new Promise(resolve => setTimeout(resolve, duration));
}
Expand Down

0 comments on commit 4b0d8b4

Please sign in to comment.