From 16aa40086f8e2e58f7e3d7c3ec95a2e4d5967e5b Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 15 Nov 2016 02:01:38 -0800 Subject: [PATCH 1/8] SparkR running in yarn-cluster mode should not download Spark package. --- R/pkg/R/sparkR.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 6b4a2f2fdc85c..f204b95d17433 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -558,7 +558,7 @@ sparkCheckInstall <- function(sparkHome, master) { message(msg) NULL } else { - if (!nzchar(master) || isMasterLocal(master)) { + if (isMasterLocal(master)) { msg <- paste0("Spark not found in SPARK_HOME: ", sparkHome) message(msg) From 2729b9c447cf31065b3e9b129185918c666aa542 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 15 Nov 2016 02:31:45 -0800 Subject: [PATCH 2/8] handle yarn-client and yarn-cluster different --- R/pkg/R/sparkR.R | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index f204b95d17433..5af7bb3e94531 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -564,10 +564,12 @@ sparkCheckInstall <- function(sparkHome, master) { message(msg) packageLocalDir <- install.spark() packageLocalDir - } else { + } else if (master == "yarn-client") { msg <- paste0("Spark not found in SPARK_HOME: ", sparkHome, "\n", installInstruction("remote")) stop(msg) + } else { + NULL } } } else { From 4d4c6b6f4b5e4569770e319092019fe95261fdb6 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 16 Nov 2016 08:05:52 -0800 Subject: [PATCH 3/8] Compatibility for yarn-client and mesos-client. --- R/pkg/R/sparkR.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 5af7bb3e94531..bc55814042fc8 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -564,7 +564,7 @@ sparkCheckInstall <- function(sparkHome, master) { message(msg) packageLocalDir <- install.spark() packageLocalDir - } else if (master == "yarn-client") { + } else if (endsWith(master, "client")) { msg <- paste0("Spark not found in SPARK_HOME: ", sparkHome, "\n", installInstruction("remote")) stop(msg) From 24b41264c2229ca1917e0c6f78a85f24654864ec Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 17 Nov 2016 07:56:19 -0800 Subject: [PATCH 4/8] Handle deployMode --- R/pkg/R/sparkR.R | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index bc55814042fc8..5059427107325 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -373,8 +373,13 @@ sparkR.session <- function( overrideEnvs(sparkConfigMap, paramMap) } + deployMode <- "" + if (exists("spark.submit.deployMode", envir = sparkConfigMap)) { + deployMode <- sparkConfigMap[["spark.submit.deployMode"]] + } + if (!exists(".sparkRjsc", envir = .sparkREnv)) { - retHome <- sparkCheckInstall(sparkHome, master) + retHome <- sparkCheckInstall(sparkHome, master, deployMode) if (!is.null(retHome)) sparkHome <- retHome sparkExecutorEnvMap <- new.env() sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, sparkExecutorEnvMap, @@ -550,8 +555,10 @@ processSparkPackages <- function(packages) { # # @param sparkHome directory to find Spark package. # @param master the Spark master URL, used to check local or remote mode. +# @param deployMode whether to deploy your driver on the worker nodes (cluster) +# or locally as an external client (client). # @return NULL if no need to update sparkHome, and new sparkHome otherwise. -sparkCheckInstall <- function(sparkHome, master) { +sparkCheckInstall <- function(sparkHome, master, deployMode) { if (!isSparkRShell()) { if (!is.na(file.info(sparkHome)$isdir)) { msg <- paste0("Spark package found in SPARK_HOME: ", sparkHome) @@ -564,7 +571,7 @@ sparkCheckInstall <- function(sparkHome, master) { message(msg) packageLocalDir <- install.spark() packageLocalDir - } else if (endsWith(master, "client")) { + } else if (endsWith(master, "client") || deployMode == "client") { msg <- paste0("Spark not found in SPARK_HOME: ", sparkHome, "\n", installInstruction("remote")) stop(msg) From db60ad651d6520ec744a67ab020b1f7b72da905f Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sat, 19 Nov 2016 02:42:32 -0800 Subject: [PATCH 5/8] Add test for sparkCheckInstall. --- R/pkg/inst/tests/testthat/test_sparkR.R | 36 +++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 R/pkg/inst/tests/testthat/test_sparkR.R diff --git a/R/pkg/inst/tests/testthat/test_sparkR.R b/R/pkg/inst/tests/testthat/test_sparkR.R new file mode 100644 index 0000000000000..b7fc3881ccbf0 --- /dev/null +++ b/R/pkg/inst/tests/testthat/test_sparkR.R @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("functions in sparkR.R") + +test_that("sparkCheckInstall", { + # "local, yarn-client, mesos-client" mode, SPARK_HOME was set correctly, + # and the SparkR job was submitted by "spark-submit" + sparkHome <- paste0(tempdir(), "/", "sparkHome") + dir.create(sparkHome) + master <- "" + deployMode <- "" + expect_true(is.null(sparkCheckInstall(sparkHome, master, deployMode))) + unlink(sparkHome, recursive = TRUE) + + # "yarn-cluster, mesos-cluster" mode, SPARK_HOME was not set, + # and the SparkR job was submitted by "spark-submit" + sparkHome <- "" + master <- "" + deployMode <- "" + expect_true(is.null(sparkCheckInstall(sparkHome, master, deployMode))) +}) \ No newline at end of file From eba070795e71f2f9c08346213fef5425e677855f Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sat, 19 Nov 2016 02:51:59 -0800 Subject: [PATCH 6/8] Fix typo. --- R/pkg/inst/tests/testthat/test_sparkR.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkR.R b/R/pkg/inst/tests/testthat/test_sparkR.R index b7fc3881ccbf0..1eff279c3067d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkR.R +++ b/R/pkg/inst/tests/testthat/test_sparkR.R @@ -33,4 +33,4 @@ test_that("sparkCheckInstall", { master <- "" deployMode <- "" expect_true(is.null(sparkCheckInstall(sparkHome, master, deployMode))) -}) \ No newline at end of file +}) From 64c2c8f786ed8e6835054a8c0e4ece5d0faa72b8 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sat, 19 Nov 2016 04:20:11 -0800 Subject: [PATCH 7/8] Replace endsWith since it was masked before R version 3.3. --- R/pkg/R/sparkR.R | 5 ++--- R/pkg/R/utils.R | 4 ++++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 5059427107325..a7152b4313993 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -566,12 +566,11 @@ sparkCheckInstall <- function(sparkHome, master, deployMode) { NULL } else { if (isMasterLocal(master)) { - msg <- paste0("Spark not found in SPARK_HOME: ", - sparkHome) + msg <- paste0("Spark not found in SPARK_HOME: ", sparkHome) message(msg) packageLocalDir <- install.spark() packageLocalDir - } else if (endsWith(master, "client") || deployMode == "client") { + } else if (isClientMode(master) || deployMode == "client") { msg <- paste0("Spark not found in SPARK_HOME: ", sparkHome, "\n", installInstruction("remote")) stop(msg) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 20004549cc037..098c0e3e31e95 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -777,6 +777,10 @@ isMasterLocal <- function(master) { grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE) } +isClientMode <- function(master) { + grepl("([a-z]+)-client$", master, perl = TRUE) +} + isSparkRShell <- function() { grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE) } From 04e375eb9a1ebfa72831430e193c88f3f12699af Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 20 Nov 2016 23:18:09 -0800 Subject: [PATCH 8/8] Add more tests. --- R/pkg/inst/tests/testthat/test_sparkR.R | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/R/pkg/inst/tests/testthat/test_sparkR.R b/R/pkg/inst/tests/testthat/test_sparkR.R index 1eff279c3067d..f73fc6baeccef 100644 --- a/R/pkg/inst/tests/testthat/test_sparkR.R +++ b/R/pkg/inst/tests/testthat/test_sparkR.R @@ -33,4 +33,14 @@ test_that("sparkCheckInstall", { master <- "" deployMode <- "" expect_true(is.null(sparkCheckInstall(sparkHome, master, deployMode))) + + # "yarn-client, mesos-client" mode, SPARK_HOME was not set + sparkHome <- "" + master <- "yarn-client" + deployMode <- "" + expect_error(sparkCheckInstall(sparkHome, master, deployMode)) + sparkHome <- "" + master <- "" + deployMode <- "client" + expect_error(sparkCheckInstall(sparkHome, master, deployMode)) })