Skip to content

Commit

Permalink
Adding additional wait for normal output end of the pipe shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Lyubimov committed Nov 22, 2012
1 parent 96d78c8 commit a80fa13
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ For starters, you want to be running Linux-- we're not quite ready for OS X.

To use R Crunch, you need to have the following R packages installed locally:
* rJava
* doxygen
* roxygen2
* bitops
* RProtoBuf
* RProtoBuf (optional)

You will also need the protocol buffer compiler, `protoc`, version 2.4.1 installed on your path.

Expand Down
1 change: 1 addition & 0 deletions crunchR/src/main/Rpkg/R/crunchR.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#' @name crunchR
#' @exportPattern "^crunchR\\."
#' @import rJava
#' @import bitops
#'
#' @include zzzClasses.R
#' @include Pipeline.R
Expand Down
17 changes: 14 additions & 3 deletions crunchR/src/main/java/org/crunchr/io/TwoWayRPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,27 @@ public boolean checkOutputQueue(boolean wait) throws IOException {

int secondsWaited = 0;
int waitTime = 20;
boolean threadExited = false;
while (null == (emitBuff = wait ? outQueue.poll(waitTime, TimeUnit.SECONDS) : outQueue.poll())) {
if (!wait)
return true;
secondsWaited += waitTime;
if (secondsWaited >= outputQueueMaxWait)
throw new IOException("R side is not responding in the maximum amount of time allowed.");
if (!rThread.isAlive()) {

throw lastErr != null ? new IOException("R side exited prematurely", lastErr)
: new IOException("R side exited prematurely (no errors were captured)");
/*
* if we see thread having exited it is still possible
* that a normal output closure message is pending. so
* we make sure we check again for the graceful message
* before stating it was unexpected R-side breakage.
*/
if (threadExited) {
throw lastErr != null ? new IOException("R side exited prematurely", lastErr)
: new IOException("R side exited prematurely (no errors were captured)");
} else {
threadExited = true;
/* and try to wait again for the normal shutdown. */
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion examples/src/main/R/Example1.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ wordCountExample()
#doFn$callProcess(c("A","B","C"))
#
#line <- "this is a line"
#class(strsplit(tolower(line),"[^[:alnum:]]")[[1]])
#class(strsplit(tolower(line),"[^[:alnum:]]")[[1]])

0 comments on commit a80fa13

Please sign in to comment.