Skip to content

Commit

Permalink
updated .kill method in CodeEngine to broadcast exit message
Browse files Browse the repository at this point in the history
and kill its Code objects concurrently. There is a concurrency
problem with the .kill call and each of CodeEngine's
reportStatus interval, which uses the pubsub.

The issue happens when the following happens:
- reportStatus interval timer is cleared (by the .kill call)
- pubsub is closed (by the .kill call)
- the pending callback for reportStatus (that was already in
  the event queue) fires, but since pubsub is closed it throws.

While we can safely ignore this error (in this case),
we should still throw because there would be other cases
where we want to be notified about the failure of a publish call.

Putting an extra check in the publish call (to see if pubsub is open)
would be too expensive, as the check would happen for every
invocation of .publish(). It's probably better to just throw
and handle the exception.

If we were to eliminate this concurrency issue, we need to
redesign the Pubsub API - the current implementation does not take
into account multiple objects adding event handlers (subscriptions)
to the Pubsub object; it assumes there is always a single object
accessing the Pubsub. This is the reason CodeEngine and Code
objects also have to do bookkeeping and track their own subscription
handlers (and unsubscribe accordingly when it is going down)
  • Loading branch information
jungkumseok committed Oct 16, 2018
1 parent ac81683 commit ee5c4a1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
22 changes: 11 additions & 11 deletions lib/core/CodeEngine.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,22 +230,22 @@ CodeEngine.prototype.kill = function(){
var self = this;
this.status = 'dead';
clearInterval(this.reportTimer);
return Promise.all(Object.values(this.codes).map(function(code){
return code.kill(self.pubsub !== code.pubsub);
})).then(function(){
return self.reportStatus('kill')
}).then(function(){

return Promise.all(([self.reportStatus('kill')])
.concat(Object.values(self.codes)
.map(function(code){
return code.kill(self.pubsub !== code.pubsub);
})
)
)
.then(function(){
console.log(chalk.green('[Engine:'+self.id+'] Killed gracefully'));
return self.pubsub.kill()
}).catch(function(err){
console.log(err);
console.log(chalk.green('[Engine:'+self.id+'] Killed forcibly'));
return self.pubsub.kill()
})
// return this.reportStatus()
// .then(function(){
// console.log(chalk.green('[Engine:'+self.id+'] Killed gracefully'));
// return self.pubsub.kill()
// })
});
}

CodeEngine.validateConfig = function(file_path){
Expand Down
2 changes: 1 addition & 1 deletion lib/core/Dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function Engine(dispatcher, id, meta){
if (self.console.length > 200) self.console.shift();
});
})
console.log('Engine '+id+' connected');
// console.log('Engine '+id+' connected');
}
Engine.prototype.getStat = function(){
return this.stats[this.stats.length-1];
Expand Down
14 changes: 7 additions & 7 deletions lib/core/Scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function Scheduler(config){
});

self.on('engine-registry-update', function(engine, message){
console.log('Engine Registry Updated', engine.id);
// console.log('Engine Registry Updated', engine.id);
// console.log(message);
self.logEvent('engine-registry-update', {
engine: engine.id,
Expand All @@ -62,7 +62,7 @@ function Scheduler(config){
});
});
self.on('program-monitor-update', function(program, message){
console.log('Program Monitor Updated', program.code_name+':'+program.id);
// console.log('Program Monitor Updated', program.code_name+':'+program.id);
self.logEvent('program-monitor-update', {
code_name: program.code_name,
instance_id: program.id,
Expand Down Expand Up @@ -181,9 +181,9 @@ Scheduler.prototype._assess = function(){
Scheduler.prototype._compute = function(state){
var self = this;
return new Promise(function(resolve, reject){
console.log('Current Mapping', state.mapping);
// console.log('Current Mapping', state.mapping);
var new_mapping = Scheduler.Algorithms['first_fit'](state.engines, state.tasks, state.mapping);
console.log('New Mapping', new_mapping);
// console.log('New Mapping', new_mapping);

var actions = Scheduler.computeActions(state.mapping, new_mapping);

Expand Down Expand Up @@ -264,14 +264,14 @@ Scheduler.Behaviours = {
return self.invoke()
.then(function(result){
(DEBUG && console.log(chalk.green('[Scheduler] Run Application Successfully Finished')));
console.log(result);
// console.log(result);
self.state.apps[trx_token] = {
name: kwargs.name,
status: 'Running',
startedAt: Date.now(),
procs: result.actions.map(function(action){ return action.result })
}
console.log(self.state);
// console.log(self.state);
self.reportStatus();

return {
Expand Down Expand Up @@ -519,7 +519,7 @@ Scheduler.computeActions = function(cur_mapping, next_mapping){
})
});

console.log(actions);
// console.log(actions);

return actions;
}
Expand Down

0 comments on commit ee5c4a1

Please sign in to comment.