Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into SPARK-23901
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed May 23, 2018
2 parents ca9caa0 + 00c13cf commit fca3af8
Show file tree
Hide file tree
Showing 256 changed files with 5,246 additions and 1,763 deletions.
1 change: 1 addition & 0 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
License: Apache License (== 2.0)
URL: http://www.apache.org/ http://spark.apache.org/
BugReports: http://spark.apache.org/contributing.html
SystemRequirements: Java (== 8)
Depends:
R (>= 3.0),
methods
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ exportMethods("%<=>%",
"sinh",
"size",
"skewness",
"slice",
"sort_array",
"soundex",
"spark_partition_id",
Expand Down
36 changes: 36 additions & 0 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,49 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
combinedArgs
}

checkJavaVersion <- function() {
javaBin <- "java"
javaHome <- Sys.getenv("JAVA_HOME")
javaReqs <- utils::packageDescription(utils::packageName(), fields = c("SystemRequirements"))
sparkJavaVersion <- as.numeric(tail(strsplit(javaReqs, "[(=)]")[[1]], n = 1L))
if (javaHome != "") {
javaBin <- file.path(javaHome, "bin", javaBin)
}

# If java is missing from PATH, we get an error in Unix and a warning in Windows
javaVersionOut <- tryCatch(
launchScript(javaBin, "-version", wait = TRUE, stdout = TRUE, stderr = TRUE),
error = function(e) {
stop("Java version check failed. Please make sure Java is installed",
" and set JAVA_HOME to point to the installation directory.", e)
},
warning = function(w) {
stop("Java version check failed. Please make sure Java is installed",
" and set JAVA_HOME to point to the installation directory.", w)
})
javaVersionFilter <- Filter(
function(x) {
grepl(" version", x)
}, javaVersionOut)

javaVersionStr <- strsplit(javaVersionFilter[[1]], "[\"]")[[1L]][2]
# javaVersionStr is of the form 1.8.0_92.
# Extract 8 from it to compare to sparkJavaVersion
javaVersionNum <- as.integer(strsplit(javaVersionStr, "[.]")[[1L]][2])
if (javaVersionNum != sparkJavaVersion) {
stop(paste("Java version", sparkJavaVersion, "is required for this package; found version:",
javaVersionStr))
}
}

launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
sparkSubmitBinName <- determineSparkSubmitBin()
if (sparkHome != "") {
sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
} else {
sparkSubmitBin <- sparkSubmitBinName
}

combinedArgs <- generateSparkSubmitArgs(args, sparkHome, jars, sparkSubmitOpts, packages)
cat("Launching java with spark-submit command", sparkSubmitBin, combinedArgs, "\n")
invisible(launchScript(sparkSubmitBin, combinedArgs))
Expand Down
58 changes: 40 additions & 18 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,20 @@ NULL
#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
#' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1)))
#' head(select(tmp, array_position(tmp$v1, 21), array_sort(tmp$v1)))
#' head(select(tmp, flatten(tmp$v1)))
#' head(select(tmp, flatten(tmp$v1), reverse(tmp$v1)))
#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
#' head(tmp2)
#' head(select(tmp, posexplode(tmp$v1)))
#' head(select(tmp, slice(tmp$v1, 2L, 2L)))
#' head(select(tmp, sort_array(tmp$v1)))
#' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))
#' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl))
#' head(select(tmp3, map_keys(tmp3$v3)))
#' head(select(tmp3, map_values(tmp3$v3)))
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))}
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))
#' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$hp))
#' head(select(tmp4, concat(tmp4$v4, tmp4$v5)))
#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))}
NULL

#' Window functions for Column operations
Expand Down Expand Up @@ -1259,9 +1263,9 @@ setMethod("quarter",
})

#' @details
#' \code{reverse}: Reverses the string column and returns it as a new string column.
#' \code{reverse}: Returns a reversed string or an array with reverse order of elements.
#'
#' @rdname column_string_functions
#' @rdname column_collection_functions
#' @aliases reverse reverse,Column-method
#' @note reverse since 1.5.0
setMethod("reverse",
Expand Down Expand Up @@ -1912,6 +1916,7 @@ setMethod("atan2", signature(y = "Column"),

#' @details
#' \code{datediff}: Returns the number of days from \code{y} to \code{x}.
#' If \code{y} is later than \code{x} then the result is positive.
#'
#' @rdname column_datetime_diff_functions
#' @aliases datediff datediff,Column-method
Expand Down Expand Up @@ -1971,7 +1976,10 @@ setMethod("levenshtein", signature(y = "Column"),
})

#' @details
#' \code{months_between}: Returns number of months between dates \code{y} and \code{x}.
#' \code{months_between}: Returns number of months between dates \code{y} and \code{x}.
#' If \code{y} is later than \code{x}, then the result is positive. If \code{y} and \code{x}
#' are on the same day of month, or both are the last day of month, time of day will be ignored.
#' Otherwise, the difference is calculated based on 31 days per month, and rounded to 8 digits.
#'
#' @rdname column_datetime_diff_functions
#' @aliases months_between months_between,Column-method
Expand Down Expand Up @@ -2050,20 +2058,10 @@ setMethod("countDistinct",

#' @details
#' \code{concat}: Concatenates multiple input columns together into a single column.
#' If all inputs are binary, concat returns an output as binary. Otherwise, it returns as string.
#' The function works with strings, binary and compatible array columns.
#'
#' @rdname column_string_functions
#' @rdname column_collection_functions
#' @aliases concat concat,Column-method
#' @examples
#'
#' \dontrun{
#' # concatenate strings
#' tmp <- mutate(df, s1 = concat(df$Class, df$Sex),
#' s2 = concat(df$Class, df$Sex, df$Age),
#' s3 = concat(df$Class, df$Sex, df$Age, df$Class),
#' s4 = concat_ws("_", df$Class, df$Sex),
#' s5 = concat_ws("+", df$Class, df$Sex, df$Age, df$Survived))
#' head(tmp)}
#' @note concat since 1.5.0
setMethod("concat",
signature(x = "Column"),
Expand Down Expand Up @@ -2404,6 +2402,13 @@ setMethod("shiftRightUnsigned", signature(y = "Column", x = "numeric"),
#' @param sep separator to use.
#' @rdname column_string_functions
#' @aliases concat_ws concat_ws,character,Column-method
#' @examples
#'
#' \dontrun{
#' # concatenate strings
#' tmp <- mutate(df, s1 = concat_ws("_", df$Class, df$Sex),
#' s2 = concat_ws("+", df$Class, df$Sex, df$Age, df$Survived))
#' head(tmp)}
#' @note concat_ws since 1.5.0
setMethod("concat_ws", signature(sep = "character", x = "Column"),
function(sep, x, ...) {
Expand Down Expand Up @@ -3058,7 +3063,8 @@ setMethod("array_sort",
})

#' @details
#' \code{flatten}: Transforms an array of arrays into a single array.
#' \code{flatten}: Creates a single array from an array of arrays.
#' If a structure of nested arrays is deeper than two levels, only one level of nesting is removed.
#'
#' @rdname column_collection_functions
#' @aliases flatten flatten,Column-method
Expand Down Expand Up @@ -3138,6 +3144,22 @@ setMethod("size",
column(jc)
})

#' @details
#' \code{slice}: Returns an array containing all the elements in x from the index start
#' (or starting from the end if start is negative) with the specified length.
#'
#' @rdname column_collection_functions
#' @param start an index indicating the first element occuring in the result.
#' @param length a number of consecutive elements choosen to the result.
#' @aliases slice slice,Column-method
#' @note slice since 2.4.0
setMethod("slice",
signature(x = "Column"),
function(x, start, length) {
jc <- callJStatic("org.apache.spark.sql.functions", "slice", x@jc, start, length)
column(jc)
})

#' @details
#' \code{sort_array}: Sorts the input array in ascending or descending order according to
#' the natural ordering of the array elements. NA elements will be placed at the beginning of
Expand Down
10 changes: 7 additions & 3 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ setGeneric("summarize", function(x, ...) { standardGeneric("summarize") })
#' @rdname summary
setGeneric("summary", function(object, ...) { standardGeneric("summary") })

setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
setGeneric("toJSON", function(x, ...) { standardGeneric("toJSON") })

setGeneric("toRDD", function(x) { standardGeneric("toRDD") })

Expand Down Expand Up @@ -817,7 +817,7 @@ setGeneric("collect_set", function(x) { standardGeneric("collect_set") })
#' @rdname column
setGeneric("column", function(x) { standardGeneric("column") })

#' @rdname column_string_functions
#' @rdname column_collection_functions
#' @name NULL
setGeneric("concat", function(x, ...) { standardGeneric("concat") })

Expand Down Expand Up @@ -1134,7 +1134,7 @@ setGeneric("regexp_replace",
#' @name NULL
setGeneric("repeat_string", function(x, n) { standardGeneric("repeat_string") })

#' @rdname column_string_functions
#' @rdname column_collection_functions
#' @name NULL
setGeneric("reverse", function(x) { standardGeneric("reverse") })

Expand Down Expand Up @@ -1194,6 +1194,10 @@ setGeneric("size", function(x) { standardGeneric("size") })
#' @name NULL
setGeneric("skewness", function(x) { standardGeneric("skewness") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("slice", function(x, start, length) { standardGeneric("slice") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("sort_array", function(x, asc = TRUE) { standardGeneric("sort_array") })
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ sparkR.sparkContext <- function(
submitOps <- getClientModeSparkSubmitOpts(
Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
sparkEnvirMap)
checkJavaVersion()
launchBackend(
args = path,
sparkHome = sparkHome,
Expand All @@ -193,7 +194,7 @@ sparkR.sparkContext <- function(

# Don't use readString() so that we can provide a useful
# error message if the R and Java versions are mismatched.
authSecretLen = readInt(f)
authSecretLen <- readInt(f)
if (length(authSecretLen) == 0 || authSecretLen == 0) {
stop("Unexpected EOF in JVM connection data. Mismatched versions?")
}
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ varargsToJProperties <- function(...) {
props
}

launchScript <- function(script, combinedArgs, wait = FALSE) {
launchScript <- function(script, combinedArgs, wait = FALSE, stdout = "", stderr = "") {
if (.Platform$OS.type == "windows") {
scriptWithArgs <- paste(script, combinedArgs, sep = " ")
# on Windows, intern = F seems to mean output to the console. (documentation on this is missing)
Expand All @@ -756,7 +756,7 @@ launchScript <- function(script, combinedArgs, wait = FALSE) {
# stdout = F means discard output
# stdout = "" means to its console (default)
# Note that the console of this child process might not be the same as the running R process.
system2(script, combinedArgs, stdout = "", wait = wait)
system2(script, combinedArgs, stdout = stdout, wait = wait, stderr = stderr)
}
}

Expand Down
22 changes: 20 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ test_that("column functions", {
df5 <- createDataFrame(list(list(a = "010101")))
expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15")

# Test array_contains(), array_max(), array_min(), array_position() and element_at()
# Test array_contains(), array_max(), array_min(), array_position(), element_at() and reverse()
df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L))))
result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]]
expect_equal(result, c(TRUE, FALSE))
Expand All @@ -1496,6 +1496,13 @@ test_that("column functions", {
result <- collect(select(df, element_at(df[[1]], 1L)))[[1]]
expect_equal(result, c(1, 6))

result <- collect(select(df, reverse(df[[1]])))[[1]]
expect_equal(result, list(list(3L, 2L, 1L), list(4L, 5L, 6L)))

df2 <- createDataFrame(list(list("abc")))
result <- collect(select(df2, reverse(df2[[1]])))[[1]]
expect_equal(result, "cba")

# Test array_sort() and sort_array()
df <- createDataFrame(list(list(list(2L, 1L, 3L, NA)), list(list(NA, 6L, 5L, NA, 4L))))

Expand All @@ -1507,7 +1514,18 @@ test_that("column functions", {
result <- collect(select(df, sort_array(df[[1]])))[[1]]
expect_equal(result, list(list(NA, 1L, 2L, 3L), list(NA, NA, 4L, 5L, 6L)))

# Test flattern
# Test slice()
df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(4L, 5L))))
result <- collect(select(df, slice(df[[1]], 2L, 2L)))[[1]]
expect_equal(result, list(list(2L, 3L), list(5L)))

# Test concat()
df <- createDataFrame(list(list(list(1L, 2L, 3L), list(4L, 5L, 6L)),
list(list(7L, 8L, 9L), list(10L, 11L, 12L))))
result <- collect(select(df, concat(df[[1]], df[[2]])))[[1]]
expect_equal(result, list(list(1L, 2L, 3L, 4L, 5L, 6L), list(7L, 8L, 9L, 10L, 11L, 12L)))

# Test flatten()
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L, 4L))),
list(list(list(5L, 6L), list(7L, 8L)))))
result <- collect(select(df, flatten(df[[1]])))[[1]]
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,6 @@ private[spark] class TaskContextImpl(

private[spark] def fetchFailed: Option[FetchFailedException] = _fetchFailedException

// TODO: shall we publish it and define it in `TaskContext`?
private[spark] def getLocalProperties(): Properties = localProperties
}
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason {
* Task was killed intentionally and needs to be rescheduled.
*/
@DeveloperApi
case class TaskKilled(reason: String) extends TaskFailedReason {
case class TaskKilled(
reason: String,
accumUpdates: Seq[AccumulableInfo] = Seq.empty,
private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil)
extends TaskFailedReason {

override def toErrorString: String = s"TaskKilled ($reason)"
override def countTowardsTaskFailures: Boolean = false

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ private[spark] class SparkSubmit extends Logging {
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isMesosClient = clusterManager == MESOS && deployMode == CLIENT

if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
Expand Down Expand Up @@ -337,7 +338,7 @@ private[spark] class SparkSubmit extends Logging {
val targetDir = Utils.createTempDir()

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.launcher.SparkSubmitArgumentsParser
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils


/**
* Parses and encapsulates arguments from the spark-submit script.
* The env argument is used for testing.
Expand Down Expand Up @@ -76,6 +75,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var proxyUser: String = null
var principal: String = null
var keytab: String = null
private var dynamicAllocationEnabled: Boolean = false

// Standalone cluster mode only
var supervise: Boolean = false
Expand Down Expand Up @@ -198,6 +198,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
dynamicAllocationEnabled =
sparkProperties.get("spark.dynamicAllocation.enabled").exists("true".equalsIgnoreCase)

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
Expand Down Expand Up @@ -274,7 +276,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
error("Total executor cores must be a positive number")
}
if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
if (!dynamicAllocationEnabled &&
numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
error("Number of executors must be a positive number")
}
if (pyFiles != null && !isPython) {
Expand Down
Loading

0 comments on commit fca3af8

Please sign in to comment.