From 2079018ebaba4cc7378186092308a4d227f260c4 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Thu, 23 Jun 2016 23:23:55 -0700 Subject: [PATCH 1/4] Conf API in R --- R/pkg/NAMESPACE | 1 + R/pkg/R/SQLContext.R | 42 +++++++++++++++++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++-- .../org/apache/spark/sql/api/r/SQLUtils.scala | 4 ++ 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 2272d8bdd52c2..74840d0333be1 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -10,6 +10,7 @@ export("sparkR.session") export("sparkR.init") export("sparkR.stop") export("sparkR.session.stop") +export("conf") export("print.jobj") export("sparkRSQL.init", diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index ee3a41cacbee6..e9f95b35d3933 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -110,11 +110,45 @@ infer_type <- function(x) { } } -getDefaultSqlSource <- function() { +#' Get Runtime Config from the current active SparkSession +#' +#' Get Runtime Config from the current active SparkSession. +#' +#' @param key (optional) The key of the config to get, if omitted, all config is returned +#' @param defaultValue (optional) The default value of the config to return if they config is not +#' set, if omitted, the call fails if the config key is not set +#' @return a list of config values with keys as their names +#' @rdname conf +#' @name conf +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' allConfigs <- conf() +#' masterValue <- unlist(conf("spark.master")) +#' namedConfig <- conf("spark.executor.memory", "0g") +#' } +#' @note conf since 2.0.0 +conf <- function(key, defaultValue) { sparkSession <- getSparkSession() - conf <- callJMethod(sparkSession, "conf") - source <- callJMethod(conf, "get", "spark.sql.sources.default", "org.apache.spark.sql.parquet") - source + if (missing(key)) { + m <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", sparkSession) + as.list(m, all.names = TRUE, sorted = TRUE) + } else { + conf <- callJMethod(sparkSession, "conf") + value <- if (missing(defaultValue)) { + callJMethod(conf, "get", key) # throws if key not found + } else { + callJMethod(conf, "get", key, defaultValue) + } + l <- setNames(list(value), key) + l + } +} + +getDefaultSqlSource <- function() { + l <- conf("spark.sql.sources.default", "org.apache.spark.sql.parquet") + l[["spark.sql.sources.default"]] } #' Create a SparkDataFrame diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 9378c7afac8bd..7871a9d4b6830 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2365,7 +2365,7 @@ test_that("randomSplit", { expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 }))) }) -test_that("Change config on SparkSession", { +test_that("Setting and getting config on SparkSession", { # first, set it to a random but known value conf <- callJMethod(sparkSession, "conf") property <- paste0("spark.testing.", as.character(runif(1))) @@ -2378,15 +2378,14 @@ test_that("Change config on SparkSession", { names(l) <- property sparkR.session(sparkConfig = l) - conf <- callJMethod(sparkSession, "conf") - newValue <- callJMethod(conf, "get", property, "") + newValue <- unlist(conf(property, ""), use.names = FALSE) expect_equal(value2, newValue) value <- as.character(runif(1)) sparkR.session(spark.app.name = "sparkSession test", spark.testing.r.session.r = value) - conf <- callJMethod(sparkSession, "conf") - appNameValue <- callJMethod(conf, "get", "spark.app.name", "") - testValue <- callJMethod(conf, "get", "spark.testing.r.session.r", "") + allconf <- conf() + appNameValue <- allconf[["spark.app.name"]] + testValue <- allconf[["spark.testing.r.session.r"]] expect_equal(appNameValue, "sparkSession test") expect_equal(testValue, value) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 0a995d2e9d180..7d8ea03a27910 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -71,6 +71,10 @@ private[sql] object SQLUtils extends Logging { } } + def getSessionConf(spark: SparkSession): JMap[String, String] = { + spark.conf.getAll.asJava + } + def getJavaSparkContext(spark: SparkSession): JavaSparkContext = { new JavaSparkContext(spark.sparkContext) } From e9be83db843fbb2c5d1867d5cde5b20995a4b910 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Fri, 24 Jun 2016 11:42:54 -0700 Subject: [PATCH 2/4] name change --- R/pkg/NAMESPACE | 2 +- R/pkg/R/SQLContext.R | 15 ++++++++------- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 ++-- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 74840d0333be1..e0ffde922dacf 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -10,7 +10,7 @@ export("sparkR.session") export("sparkR.init") export("sparkR.stop") export("sparkR.session.stop") -export("conf") +export("sparkR.conf") export("print.jobj") export("sparkRSQL.init", diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index e9f95b35d3933..91c1a87358bbb 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -113,23 +113,24 @@ infer_type <- function(x) { #' Get Runtime Config from the current active SparkSession #' #' Get Runtime Config from the current active SparkSession. +#' To change SparkSession Runtime Config, please see `sparkR.session()`. #' #' @param key (optional) The key of the config to get, if omitted, all config is returned #' @param defaultValue (optional) The default value of the config to return if they config is not #' set, if omitted, the call fails if the config key is not set #' @return a list of config values with keys as their names -#' @rdname conf -#' @name conf +#' @rdname sparkR.conf +#' @name sparkR.conf #' @export #' @examples #'\dontrun{ #' sparkR.session() -#' allConfigs <- conf() -#' masterValue <- unlist(conf("spark.master")) -#' namedConfig <- conf("spark.executor.memory", "0g") +#' allConfigs <- sparkR.conf() +#' masterValue <- unlist(sparkR.conf("spark.master")) +#' namedConfig <- sparkR.conf("spark.executor.memory", "0g") #' } -#' @note conf since 2.0.0 -conf <- function(key, defaultValue) { +#' @note sparkR.conf since 2.0.0 +sparkR.conf <- function(key, defaultValue) { sparkSession <- getSparkSession() if (missing(key)) { m <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", sparkSession) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 7871a9d4b6830..efad1c4fa88a0 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2378,12 +2378,12 @@ test_that("Setting and getting config on SparkSession", { names(l) <- property sparkR.session(sparkConfig = l) - newValue <- unlist(conf(property, ""), use.names = FALSE) + newValue <- unlist(sparkR.conf(property, ""), use.names = FALSE) expect_equal(value2, newValue) value <- as.character(runif(1)) sparkR.session(spark.app.name = "sparkSession test", spark.testing.r.session.r = value) - allconf <- conf() + allconf <- sparkR.conf() appNameValue <- allconf[["spark.app.name"]] testValue <- allconf[["spark.testing.r.session.r"]] expect_equal(appNameValue, "sparkSession test") From bdd290c54ff094089a7d04e29c2ff65c43ac13e7 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Fri, 24 Jun 2016 13:19:24 -0700 Subject: [PATCH 3/4] missed one line --- R/pkg/R/SQLContext.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 91c1a87358bbb..9d6ed95369e4c 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -148,7 +148,7 @@ sparkR.conf <- function(key, defaultValue) { } getDefaultSqlSource <- function() { - l <- conf("spark.sql.sources.default", "org.apache.spark.sql.parquet") + l <- sparkR.conf("spark.sql.sources.default", "org.apache.spark.sql.parquet") l[["spark.sql.sources.default"]] } From 385645b9db9eb9468f07ac39144ef0a88af4830f Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sat, 25 Jun 2016 18:51:21 -0700 Subject: [PATCH 4/4] add R side error message for no such element exception --- R/pkg/R/SQLContext.R | 9 ++++++++- R/pkg/inst/tests/testthat/test_sparkSQL.R | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 9d6ed95369e4c..8df73db36e956 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -138,7 +138,14 @@ sparkR.conf <- function(key, defaultValue) { } else { conf <- callJMethod(sparkSession, "conf") value <- if (missing(defaultValue)) { - callJMethod(conf, "get", key) # throws if key not found + tryCatch(callJMethod(conf, "get", key), + error = function(e) { + if (any(grep("java.util.NoSuchElementException", as.character(e)))) { + stop(paste0("Config '", key, "' is not set")) + } else { + stop(paste0("Unknown error: ", as.character(e))) + } + }) } else { callJMethod(conf, "get", key, defaultValue) } diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index efad1c4fa88a0..74def5ce4245d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2388,6 +2388,7 @@ test_that("Setting and getting config on SparkSession", { testValue <- allconf[["spark.testing.r.session.r"]] expect_equal(appNameValue, "sparkSession test") expect_equal(testValue, value) + expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is not set") }) test_that("enableHiveSupport on SparkSession", {