Skip to content

Commit

Permalink
Merge pull request #264 from JanSchulz/fix_232
Browse files Browse the repository at this point in the history
Abort queued messages if an error occurs
  • Loading branch information
takluyver committed Feb 23, 2016
2 parents fdee8b0 + 61f0ca4 commit 819439f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
9 changes: 8 additions & 1 deletion R/execution.r
Expand Up @@ -63,6 +63,7 @@ Executor <- setRefClass(
'Executor',
fields = list(
send_response = 'function',
abort_queued_messages = 'function',
execution_count = 'integer',
payload = 'list',
err = 'list',
Expand Down Expand Up @@ -258,7 +259,13 @@ execute = function(request) {
}

send_response('execute_reply', request, 'shell', reply_content)


if (interrupted || !is.null(err$ename)){
# errors or interrupts should interrupt all currently queued messages,
# not only the currently running one...
abort_queued_messages()
}

if (!silent) {
execution_count <<- execution_count + 1L
}
Expand Down
36 changes: 33 additions & 3 deletions R/kernel.r
Expand Up @@ -91,7 +91,7 @@ new_reply = function(msg_type, parent_msg) {

header <- list(
msg_id = UUIDgenerate(),
username = parent_msg$header$username,
username = parent_msg$header$username,
session = parent_msg$header$session,
msg_type = msg_type,
version = '5.0')
Expand Down Expand Up @@ -129,6 +129,35 @@ handle_shell = function() {
print(c('Got unhandled msg_type:', msg$header$msg_type)))
},

abort_shell_msg = function(){
"Send an abort message for an incoming shell request"
# See https://github.com/ipython/ipykernel/blob/1d97cb2a04149387a0d2dbea1b3d0af691d8df6c/ipykernel/kernelbase.py#L623

parts <- zmq.recv.multipart(sockets$shell, unserialize = FALSE)
msg <- wire_to_msg(parts)
reply_type <- paste(strsplit(msg$header$msg_type, "_")[1], "_reply")
reply_content <- list(status = 'aborted')
send_response(reply_type, msg, "shell", reply_content)
},

abort_queued_messages = function(){
"Abort all already queued shell messages after an error"

while (TRUE) {
ret = zmq.poll(
c(sockets$shell), # only shell channel
c(.pbd_env$ZMQ.PO$POLLIN), # type
0) # zero timeout, only what's already there

if(bitwAnd(zmq.poll.get.revents(1), .pbd_env$ZMQ.PO$POLLIN)) {
abort_shell_msg()
} else {
# no more messages...
break
}
}
},

is_complete = function(request) {
"Checks whether the code in the rest is complete"

Expand Down Expand Up @@ -264,8 +293,9 @@ initialize = function(connection_file) {
zmq.bind(sockets$control, url_with_port('control_port'))
zmq.bind(sockets$stdin, url_with_port('stdin_port'))
zmq.bind(sockets$shell, url_with_port('shell_port'))

executor <<- Executor$new(send_response = .self$send_response)

executor <<- Executor$new(send_response = .self$send_response,
abort_queued_messages = .self$abort_queued_messages)
},

run = function() {
Expand Down

0 comments on commit 819439f

Please sign in to comment.