Skip to content

Commit

Permalink
Abort queued messages if an error occurs
Browse files Browse the repository at this point in the history
The notebook send all execution requests without waiting for the reply and
expects the kernel to abort requests if an error occurs (that's at least the
behavior of the python kernel).

After this commit, the R kernel aborts all shell messages (where execution
requests come in) when the kernel was either interrupted or an error occurred
during the execution of some code.

Fixes: #232
  • Loading branch information
jankatins committed Feb 22, 2016
1 parent fdee8b0 commit bb2a3d0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
9 changes: 8 additions & 1 deletion R/execution.r
Expand Up @@ -63,6 +63,7 @@ Executor <- setRefClass(
'Executor',
fields = list(
send_response = 'function',
abort_queued_messages = 'function',
execution_count = 'integer',
payload = 'list',
err = 'list',
Expand Down Expand Up @@ -258,7 +259,13 @@ execute = function(request) {
}

send_response('execute_reply', request, 'shell', reply_content)


if (interrupted || !is.null(err$ename)){
# errors or interrupts should interrupt all currently queued messages,
# not only the currently running one...
abort_queued_messages()
}

if (!silent) {
execution_count <<- execution_count + 1L
}
Expand Down
36 changes: 33 additions & 3 deletions R/kernel.r
Expand Up @@ -91,7 +91,7 @@ new_reply = function(msg_type, parent_msg) {

header <- list(
msg_id = UUIDgenerate(),
username = parent_msg$header$username,
username = parent_msg$header$username,
session = parent_msg$header$session,
msg_type = msg_type,
version = '5.0')
Expand Down Expand Up @@ -129,6 +129,35 @@ handle_shell = function() {
print(c('Got unhandled msg_type:', msg$header$msg_type)))
},

abort_shell_msg = function(){
"Send an abort message for an incoming shell request"
# See https://github.com/ipython/ipykernel/blob/1d97cb2a04149387a0d2dbea1b3d0af691d8df6c/ipykernel/kernelbase.py#L623

parts <- zmq.recv.multipart(sockets$shell, unserialize = FALSE)
msg <- wire_to_msg(parts)
reply_type <- paste(strsplit(msg$header$msg_type, "_")[1], "_reply")
reply_content <- list(status = 'aborted')
send_response(reply_type, msg, "shell", reply_content)
},

abort_queued_messages = function(){
"Abort all already queued shell messages after an error"

while (TRUE) {
ret = zmq.poll(
c(sockets$shell), # only shell channel
c(.pbd_env$ZMQ.PO$POLLIN), # type
0) # no timeout, only what's there
if (ret != 1){
# no more messages received
break
}
if(bitwAnd(zmq.poll.get.revents(1), .pbd_env$ZMQ.PO$POLLIN)) {
abort_shell_msg()
}
}
},

is_complete = function(request) {
"Checks whether the code in the rest is complete"

Expand Down Expand Up @@ -264,8 +293,9 @@ initialize = function(connection_file) {
zmq.bind(sockets$control, url_with_port('control_port'))
zmq.bind(sockets$stdin, url_with_port('stdin_port'))
zmq.bind(sockets$shell, url_with_port('shell_port'))

executor <<- Executor$new(send_response = .self$send_response)

executor <<- Executor$new(send_response = .self$send_response,
abort_queued_messages = .self$abort_queued_messages)
},

run = function() {
Expand Down

0 comments on commit bb2a3d0

Please sign in to comment.