diff --git a/R/execution.r b/R/execution.r index b7684971..46e0fedc 100644 --- a/R/execution.r +++ b/R/execution.r @@ -62,6 +62,7 @@ Executor <- setRefClass( Class = 'Executor', fields = list( send_response = 'function', + handle_stdin = 'function', abort_queued_messages = 'function', execution_count = 'integer', payload = 'list', @@ -120,6 +121,25 @@ quit = function(save = 'default', status = 0, runLast = TRUE) { payload <<- c(.self$payload, list(list(source = 'ask_exit', keepkernel = FALSE))) }, +# noninteractive +readline = function(prompt = '') { + log_debug('entering custom readline') + send_response('input_request', current_request, 'stdin', + list(prompt = prompt, password = FALSE)) + # wait for 'input_reply' response message + input <- handle_stdin() +}, + +# noninteractive 5.0 protocol: +readpass = function(prompt = '') { + log_debug('entering custom readpass') + send_response('input_request', current_request, 'stdin', + list(prompt = prompt, password = TRUE)) + # wait for 'input_reply' response message + log_debug('exiting custom readpass') + input = handle_stdin() +}, + handle_error = function(e) tryCatch({ log_debug('Error output: %s', toString(e)) calls <- head(sys.calls()[-seq_len(nframe + 1L)], -3) @@ -223,6 +243,12 @@ execute = function(request) { payload <<- list() err <<- list() + # shade base::readline + add_to_user_searchpath('readline', .self$readline) + + # shade 'readpass' if it appears in other packages + add_to_user_searchpath('readpass', .self$readpass) + # shade base::quit add_to_user_searchpath('quit', .self$quit) add_to_user_searchpath('q', .self$quit) diff --git a/R/kernel.r b/R/kernel.r index 9059b335..88d9bbe1 100644 --- a/R/kernel.r +++ b/R/kernel.r @@ -123,7 +123,7 @@ handle_shell = function() { is_complete_request = is_complete(msg), inspect_request = inspect(msg), shutdown_request = shutdown(msg), - print(c('Got unhandled msg_type:', msg$header$msg_type))) + log_debug(c('Got unhandled msg_type:', msg$header$msg_type))) send_response('status', msg, 'iopub', list( execution_state = 'idle')) @@ -166,6 +166,27 @@ abort_queued_messages = function() { }, +handle_stdin = function() { + "React to a stdin message coming in" + + # wait for 'input_reply' response message + while (TRUE) { + log_debug('stdin loop: beginning') + zmq.poll(c(sockets$stdin), # only stdin channel + c(.pbd_env$ZMQ.PO$POLLIN)) # type + + if (bitwAnd(zmq.poll.get.revents(1), .pbd_env$ZMQ.PO$POLLIN)) { + log_debug('stdin loop: found msg') + parts <- zmq.recv.multipart(sockets$stdin, unserialize = FALSE) + msg <- wire_to_msg(parts) + return(msg$content$value) + } else { + # else shouldn't be possible + log_error('stdin loop: zmq.poll returned but no message found?') + } + } +}, + is_complete = function(request) { "Checks whether the code in the rest is complete" @@ -336,7 +357,7 @@ handle_control = function() { log_debug('Control: shutdown...') shutdown(msg) } else { - print(paste('Unhandled control message, msg_type:', msg$header$msg_type)) + log_debug(paste('Unhandled control message, msg_type:', msg$header$msg_type)) } }, @@ -371,8 +392,11 @@ initialize = function(connection_file) { zmq.bind(sockets$stdin, url_with_port('stdin_port')) zmq.bind(sockets$shell, url_with_port('shell_port')) - executor <<- Executor$new(send_response = .self$send_response, + executor <<- Executor$new( + send_response = .self$send_response, + handle_stdin = .self$handle_stdin, abort_queued_messages = .self$abort_queued_messages) + comm_manager <<- CommManager$new(send_response = .self$send_response) runtime_env$comm_manager <- comm_manager }, @@ -386,7 +410,7 @@ run = function() { rep(.pbd_env$ZMQ.PO$POLLIN, 3)) log_debug('main loop: after poll') - # It's important that these messages are handler one by one in each + # It's important that these messages are handled one by one in each # look. The problem is that during the handler, a new zmq.poll could be # done (and is done in case of errors in a execution request) and this # invalidates the zmq.poll.get.revents call leading to "funny" results