Skip to content

Commit

Permalink
Add support for KILL messages in interactive compute. Closes #1898 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
brollb committed Sep 9, 2020
1 parent 0895528 commit bd96716
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/common/compute/interactive/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
module.exports = (root.utils = factory());
}
}(this, function() {
const Constants = makeEnum('STDOUT', 'STDERR', 'RUN', 'ADD_ARTIFACT',
const Constants = makeEnum('STDOUT', 'STDERR', 'RUN', 'ADD_ARTIFACT', 'KILL',
'ADD_FILE', 'REMOVE_FILE', 'ADD_USER_DATA', 'COMPLETE', 'ERROR', 'SET_ENV');

function makeEnum() {
Expand Down
11 changes: 11 additions & 0 deletions src/common/compute/interactive/session-with-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ define([
this.checkTaskQueue();
}

async kill(task) {
const index = this.tasks
.findIndex(queuedTask => queuedTask.unwrap() === task);

if (index > 0) {
this.tasks.splice(index, 1);
} else {
super.kill(task);
}
}

static async new(id, config) {
return await Session.new(id, config, SessionWithQueue);
}
Expand Down
59 changes: 52 additions & 7 deletions src/common/compute/interactive/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ define([
const msg = Message.decode(data);
if (msg.type === Message.COMPLETE) {
const err = msg.data;
this.ws.onmessage = null;
this.channel = new MessageChannel(this.ws);
if (err) {
this.connected.reject(err);
} else {
Expand Down Expand Up @@ -85,7 +85,7 @@ define([
this.ensureIdle('spawn a task');

const msg = new Message(Message.RUN, cmd);
const task = new Task(this.ws, msg);
const task = new Task(this.channel, msg);
this.runTask(task);
return task;
}
Expand All @@ -111,7 +111,7 @@ define([
async exec(cmd) {
this.ensureIdle('exec a task');
const msg = new Message(Message.RUN, cmd);
const task = new Task(this.ws, msg);
const task = new Task(this.channel, msg);
const result = {
stdout: '',
stderr: '',
Expand All @@ -130,31 +130,44 @@ define([
async addArtifact(name, dataInfo, type, auth) {
this.ensureIdle('add artifact');
const msg = new Message(Message.ADD_ARTIFACT, [name, dataInfo, type, auth]);
const task = new Task(this.ws, msg);
const task = new Task(this.channel, msg);
await this.runTask(task);
}

async addFile(filepath, content) {
this.ensureIdle('add file');
const msg = new Message(Message.ADD_FILE, [filepath, content]);
const task = new Task(this.ws, msg);
const task = new Task(this.channel, msg);
await this.runTask(task);
}

async removeFile(filepath) {
this.ensureIdle('remove file');
const msg = new Message(Message.REMOVE_FILE, [filepath]);
const task = new Task(this.ws, msg);
const task = new Task(this.channel, msg);
await this.runTask(task);
}

async setEnvVar(name, value) {
this.ensureIdle('set env var');
const msg = new Message(Message.SET_ENV, [name, value]);
const task = new Task(this.ws, msg);
const task = new Task(this.channel, msg);
await this.runTask(task);
}

async kill(task) {
assert(
task.msg.type === Message.RUN,
'Cannot kill task. Must be a RUN task.'
);
if (task === this.currentTask) {
const msg = new Message(Message.KILL, task.msg.data);
const killTask = new Task(this.channel, msg);
await killTask.run();
this.checkReady();
}
}

close() {
this.ws.close();
}
Expand All @@ -166,7 +179,39 @@ define([
}
}

function assert(cond, msg) {
if (!cond) {
throw new Error(msg);
}
}

Object.assign(InteractiveSession, Message.Constants);

class MessageChannel {
constructor(ws) {
this.ws = ws;
this.listeners = [];

this.ws.onmessage = message => {
this.listeners.forEach(fn => fn(message));
};
}

send(data) {
this.ws.send(data);
}

listen(fn) {
this.listeners.push(fn);
}

unlisten(fn) {
const index = this.listeners.indexOf(fn);
if (index !== -1) {
this.listeners.splice(index, 1);
}
}
}

return InteractiveSession;
});
11 changes: 6 additions & 5 deletions src/common/compute/interactive/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,27 @@ define([

const isNodeJs = typeof window === 'undefined';
class Task extends EventEmitter {
constructor(ws, msg) {
constructor(channel, msg) {
super();
this.ws = ws;
this.channel = channel;
this.msg = msg;
}

async run() {
const deferred = utils.defer();

this.ws.send(this.msg.encode());
this.ws.onmessage = async wsMsg => {
this.channel.send(this.msg.encode());
const handler = async wsMsg => {
const data = await Task.getMessageData(wsMsg);

const msg = Message.decode(data);
this.emitMessage(msg);
if (msg.type === Message.COMPLETE) {
this.ws.onmessage = null;
this.channel.unlisten(handler);
deferred.resolve();
}
};
this.channel.listen(handler);

return deferred.promise;
}
Expand Down
12 changes: 8 additions & 4 deletions src/routers/InteractiveCompute/job-files/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ class InteractiveClient {
async onMessage(msg) {
if (msg.type === Message.RUN) {
const [cmd, ...opts] = InteractiveClient.parseCommand(msg.data);
const subprocess = spawn(cmd, opts);
subprocess.on('close', code => this.sendMessage(Message.COMPLETE, code));
subprocess.stdout.on('data', data => this.sendMessage(Message.STDOUT, data));
subprocess.stderr.on('data', data => this.sendMessage(Message.STDERR, data));
this.subprocess = spawn(cmd, opts);
this.subprocess.on('exit', code => this.sendMessage(Message.COMPLETE, code));
this.subprocess.stdout.on('data', data => this.sendMessage(Message.STDOUT, data));
this.subprocess.stderr.on('data', data => this.sendMessage(Message.STDERR, data));
} else if (msg.type === Message.KILL) {
if (this.subprocess) { // TODO: Add more checking here...
this.subprocess.kill();
}
} else if (msg.type === Message.ADD_ARTIFACT) {
const [name, dataInfo, type, config={}] = msg.data;
const dirs = ['artifacts', name];
Expand Down
12 changes: 11 additions & 1 deletion test/integration/InteractiveCompute.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ describe('InteractiveCompute', function() {
const testFixture = require('../globals');
const gmeConfig = testFixture.getGmeConfig();
const server = new testFixture.WebGME.standaloneServer(gmeConfig);
const Message = testFixture.requirejs('deepforge/compute/interactive/message');
server.start = promisify(server.start);
server.stop = promisify(server.stop);
let session;
Expand All @@ -29,7 +30,6 @@ describe('InteractiveCompute', function() {
});

it('should be able to spawn commands', function(done) {
const Message = testFixture.requirejs('deepforge/compute/interactive/message');
const task = session.spawn('ls');
task.on(Message.COMPLETE, exitCode => {
assert.equal(exitCode, 0);
Expand All @@ -54,4 +54,14 @@ describe('InteractiveCompute', function() {
assert(err.jobResult.stderr.includes('No such file'));
}
});

it('should cancel tasks', function(done) {
const task = session.spawn('sleep 20');
task.on(Message.COMPLETE, () => done());
sleep(100).then(() => session.kill(task));
});

function sleep(duration) {
return new Promise(resolve => setTimeout(resolve, duration));
}
});

0 comments on commit bd96716

Please sign in to comment.