From 179fbdc7f93a929e00aaeeb575eb404038385252 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Fri, 17 Jun 2016 21:48:05 -0700 Subject: [PATCH 1/3] lapply should not need spark context --- R/pkg/R/context.R | 16 +++++++++++----- R/pkg/inst/tests/testthat/test_context.R | 6 +++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 5c886030ff5c5..eb0841188603b 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -252,7 +252,6 @@ setCheckpointDir <- function(sc, dirName) { #' } #' #' @rdname spark.lapply -#' @param sc Spark Context to use #' @param list the list of elements #' @param func a function that takes one argument. #' @return a list of results (the exact type being determined by the function) @@ -262,7 +261,11 @@ setCheckpointDir <- function(sc, dirName) { #' sc <- sparkR.init() #' doubled <- spark.lapply(sc, 1:10, function(x){2 * x}) #'} -spark.lapply <- function(sc, list, func) { +spark.lapply <- function(list, func) { + if (!exists(".sparkRjsc", envir = .sparkREnv)) { + stop("SparkR has not been initialized. Please call sparkR.session()") + } + sc <- get(".sparkRjsc", envir = .sparkREnv) rdd <- parallelize(sc, list, length(list)) results <- map(rdd, func) local <- collect(results) @@ -274,14 +277,17 @@ spark.lapply <- function(sc, list, func) { #' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN" #' #' @rdname setLogLevel -#' @param sc Spark Context to use #' @param level New log level #' @export #' @examples #'\dontrun{ -#' setLogLevel(sc, "ERROR") +#' setLogLevel("ERROR") #'} -setLogLevel <- function(sc, level) { +setLogLevel <- function(level) { + if (!exists(".sparkRjsc", envir = .sparkREnv)) { + stop("SparkR has not been initialized. Please call sparkR.session()") + } + sc <- get(".sparkRjsc", envir = .sparkREnv) callJMethod(sc, "setLogLevel", level) } diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index f123187adf3ef..b149818ff46f6 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -107,8 +107,8 @@ test_that("job group functions can be called", { }) test_that("utility function can be called", { - sc <- sparkR.sparkContext() - setLogLevel(sc, "ERROR") + sparkR.sparkContext() + setLogLevel("ERROR") sparkR.session.stop() }) @@ -161,7 +161,7 @@ test_that("sparkJars sparkPackages as comma-separated strings", { test_that("spark.lapply should perform simple transforms", { sc <- sparkR.sparkContext() - doubled <- spark.lapply(sc, 1:10, function(x) { 2 * x }) + doubled <- spark.lapply(1:10, function(x) { 2 * x }) expect_equal(doubled, as.list(2 * 1:10)) sparkR.session.stop() }) From f5f401c42034396a45015cd558f62cbf83af1719 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Fri, 17 Jun 2016 22:16:47 -0700 Subject: [PATCH 2/3] update roxygen2 doc --- R/pkg/R/context.R | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index eb0841188603b..71963524ea7ad 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -258,8 +258,7 @@ setCheckpointDir <- function(sc, dirName) { #' @export #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' doubled <- spark.lapply(sc, 1:10, function(x){2 * x}) +#' doubled <- spark.lapply(1:10, function(x){2 * x}) #'} spark.lapply <- function(list, func) { if (!exists(".sparkRjsc", envir = .sparkREnv)) { From 978d1afb2b3f661d41abec79fe32187f6603a7f9 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Mon, 20 Jun 2016 11:34:23 -0700 Subject: [PATCH 3/3] doc update --- R/pkg/R/context.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 71963524ea7ad..968a9d2251b18 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -258,6 +258,7 @@ setCheckpointDir <- function(sc, dirName) { #' @export #' @examples #'\dontrun{ +#' sparkR.session() #' doubled <- spark.lapply(1:10, function(x){2 * x}) #'} spark.lapply <- function(list, func) {