Skip to content

Commit

Permalink
merge master and address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
WeichenXu123 committed May 11, 2018
2 parents 54f73af + 75cf369 commit 46f4436
Show file tree
Hide file tree
Showing 180 changed files with 2,921 additions and 1,082 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.6 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.7 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ exportMethods("%<=>%",
"array_max",
"array_min",
"array_position",
"array_sort",
"asc",
"ascii",
"asin",
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# Creates a SparkR client connection object
# if one doesn't already exist
connectBackend <- function(hostname, port, timeout) {
connectBackend <- function(hostname, port, timeout, authSecret) {
if (exists(".sparkRcon", envir = .sparkREnv)) {
if (isOpen(.sparkREnv[[".sparkRCon"]])) {
cat("SparkRBackend client connection already exists\n")
Expand All @@ -29,7 +29,7 @@ connectBackend <- function(hostname, port, timeout) {

con <- socketConnection(host = hostname, port = port, server = FALSE,
blocking = TRUE, open = "wb", timeout = timeout)

doServerAuth(con, authSecret)
assign(".sparkRCon", con, envir = .sparkREnv)
con
}
Expand Down
10 changes: 7 additions & 3 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,18 @@ readTypedObject <- function(con, type) {
stop(paste("Unsupported type for deserialization", type)))
}

readString <- function(con) {
stringLen <- readInt(con)
raw <- readBin(con, raw(), stringLen, endian = "big")
readStringData <- function(con, len) {
raw <- readBin(con, raw(), len, endian = "big")
string <- rawToChar(raw)
Encoding(string) <- "UTF-8"
string
}

readString <- function(con) {
stringLen <- readInt(con)
readStringData(con, stringLen)
}

readInt <- function(con) {
readBin(con, integer(), n = 1, endian = "big")
}
Expand Down
32 changes: 29 additions & 3 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ NULL
#' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
#' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1)))
#' head(select(tmp, array_position(tmp$v1, 21)))
#' head(select(tmp, array_position(tmp$v1, 21), array_sort(tmp$v1)))
#' head(select(tmp, flatten(tmp$v1)))
#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
#' head(tmp2)
Expand Down Expand Up @@ -805,6 +805,8 @@ setMethod("factorial",
#'
#' The function by default returns the first values it sees. It will return the first non-missing
#' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
#' Note: the function is non-deterministic because its results depends on order of rows which
#' may be non-deterministic after a shuffle.
#'
#' @param na.rm a logical value indicating whether NA values should be stripped
#' before the computation proceeds.
Expand Down Expand Up @@ -948,6 +950,8 @@ setMethod("kurtosis",
#'
#' The function by default returns the last values it sees. It will return the last non-missing
#' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
#' Note: the function is non-deterministic because its results depends on order of rows which
#' may be non-deterministic after a shuffle.
#'
#' @param x column to compute on.
#' @param na.rm a logical value indicating whether NA values should be stripped
Expand Down Expand Up @@ -1201,6 +1205,7 @@ setMethod("minute",
#' 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
#' This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL.
#' The method should be used with no argument.
#' Note: the function is non-deterministic because its result depends on partition IDs.
#'
#' @rdname column_nonaggregate_functions
#' @aliases monotonically_increasing_id monotonically_increasing_id,missing-method
Expand Down Expand Up @@ -2584,6 +2589,7 @@ setMethod("lpad", signature(x = "Column", len = "numeric", pad = "character"),
#' @details
#' \code{rand}: Generates a random column with independent and identically distributed (i.i.d.)
#' samples from U[0.0, 1.0].
#' Note: the function is non-deterministic in general case.
#'
#' @rdname column_nonaggregate_functions
#' @param seed a random seed. Can be missing.
Expand Down Expand Up @@ -2612,6 +2618,7 @@ setMethod("rand", signature(seed = "numeric"),
#' @details
#' \code{randn}: Generates a column with independent and identically distributed (i.i.d.) samples
#' from the standard normal distribution.
#' Note: the function is non-deterministic in general case.
#'
#' @rdname column_nonaggregate_functions
#' @aliases randn randn,missing-method
Expand Down Expand Up @@ -3036,6 +3043,20 @@ setMethod("array_position",
column(jc)
})

#' @details
#' \code{array_sort}: Sorts the input array in ascending order. The elements of the input array
#' must be orderable. NA elements will be placed at the end of the returned array.
#'
#' @rdname column_collection_functions
#' @aliases array_sort array_sort,Column-method
#' @note array_sort since 2.4.0
setMethod("array_sort",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "array_sort", x@jc)
column(jc)
})

#' @details
#' \code{flatten}: Transforms an array of arrays into a single array.
#'
Expand Down Expand Up @@ -3118,8 +3139,9 @@ setMethod("size",
})

#' @details
#' \code{sort_array}: Sorts the input array in ascending or descending order according
#' to the natural ordering of the array elements.
#' \code{sort_array}: Sorts the input array in ascending or descending order according to
#' the natural ordering of the array elements. NA elements will be placed at the beginning of
#' the returned array in ascending order or at the end of the returned array in descending order.
#'
#' @rdname column_collection_functions
#' @param asc a logical flag indicating the sorting order.
Expand Down Expand Up @@ -3188,6 +3210,8 @@ setMethod("create_map",

#' @details
#' \code{collect_list}: Creates a list of objects with duplicates.
#' Note: the function is non-deterministic because the order of collected results depends
#' on order of rows which may be non-deterministic after a shuffle.
#'
#' @rdname column_aggregate_functions
#' @aliases collect_list collect_list,Column-method
Expand All @@ -3207,6 +3231,8 @@ setMethod("collect_list",

#' @details
#' \code{collect_set}: Creates a list of objects with duplicate elements eliminated.
#' Note: the function is non-deterministic because the order of collected results depends
#' on order of rows which may be non-deterministic after a shuffle.
#'
#' @rdname column_aggregate_functions
#' @aliases collect_set collect_set,Column-method
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,10 @@ setGeneric("array_min", function(x) { standardGeneric("array_min") })
#' @name NULL
setGeneric("array_position", function(x, value) { standardGeneric("array_position") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_sort", function(x) { standardGeneric("array_sort") })

#' @rdname column_string_functions
#' @name NULL
setGeneric("ascii", function(x) { standardGeneric("ascii") })
Expand Down
39 changes: 34 additions & 5 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ sparkR.sparkContext <- function(
" please use the --packages commandline instead", sep = ","))
}
backendPort <- existingPort
authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET")
if (nchar(authSecret) == 0) {
stop("Auth secret not provided in environment.")
}
} else {
path <- tempfile(pattern = "backend_port")
submitOps <- getClientModeSparkSubmitOpts(
Expand Down Expand Up @@ -186,16 +190,27 @@ sparkR.sparkContext <- function(
monitorPort <- readInt(f)
rLibPath <- readString(f)
connectionTimeout <- readInt(f)

# Don't use readString() so that we can provide a useful
# error message if the R and Java versions are mismatched.
authSecretLen = readInt(f)
if (length(authSecretLen) == 0 || authSecretLen == 0) {
stop("Unexpected EOF in JVM connection data. Mismatched versions?")
}
authSecret <- readStringData(f, authSecretLen)
close(f)
file.remove(path)
if (length(backendPort) == 0 || backendPort == 0 ||
length(monitorPort) == 0 || monitorPort == 0 ||
length(rLibPath) != 1) {
length(rLibPath) != 1 || length(authSecret) == 0) {
stop("JVM failed to launch")
}
assign(".monitorConn",
socketConnection(port = monitorPort, timeout = connectionTimeout),
envir = .sparkREnv)

monitorConn <- socketConnection(port = monitorPort, blocking = TRUE,
timeout = connectionTimeout, open = "wb")
doServerAuth(monitorConn, authSecret)

assign(".monitorConn", monitorConn, envir = .sparkREnv)
assign(".backendLaunched", 1, envir = .sparkREnv)
if (rLibPath != "") {
assign(".libPath", rLibPath, envir = .sparkREnv)
Expand All @@ -205,7 +220,7 @@ sparkR.sparkContext <- function(

.sparkREnv$backendPort <- backendPort
tryCatch({
connectBackend("localhost", backendPort, timeout = connectionTimeout)
connectBackend("localhost", backendPort, timeout = connectionTimeout, authSecret = authSecret)
},
error = function(err) {
stop("Failed to connect JVM\n")
Expand Down Expand Up @@ -687,3 +702,17 @@ sparkCheckInstall <- function(sparkHome, master, deployMode) {
NULL
}
}

# Utility function for sending auth data over a socket and checking the server's reply.
doServerAuth <- function(con, authSecret) {
if (nchar(authSecret) == 0) {
stop("Auth secret not provided.")
}
writeString(con, authSecret)
flush(con)
reply <- readString(con)
if (reply != "ok") {
close(con)
stop("Unexpected reply from server.")
}
}
4 changes: 3 additions & 1 deletion R/pkg/inst/worker/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ suppressPackageStartupMessages(library(SparkR))

port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
inputCon <- socketConnection(
port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)
port = port, open = "wb", blocking = TRUE, timeout = connectionTimeout)

SparkR:::doServerAuth(inputCon, Sys.getenv("SPARKR_WORKER_SECRET"))

# Waits indefinitely for a socket connecion by default.
selectTimeout <- NULL
Expand Down
5 changes: 4 additions & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,12 @@ suppressPackageStartupMessages(library(SparkR))

port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
inputCon <- socketConnection(
port = port, blocking = TRUE, open = "rb", timeout = connectionTimeout)
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
SparkR:::doServerAuth(inputCon, Sys.getenv("SPARKR_WORKER_SECRET"))

outputCon <- socketConnection(
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
SparkR:::doServerAuth(outputCon, Sys.getenv("SPARKR_WORKER_SECRET"))

# read the index of the current partition inside the RDD
partition <- SparkR:::readInt(inputCon)
Expand Down
17 changes: 11 additions & 6 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1479,8 +1479,7 @@ test_that("column functions", {
df5 <- createDataFrame(list(list(a = "010101")))
expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15")

# Test array_contains(), array_max(), array_min(), array_position(), element_at()
# and sort_array()
# Test array_contains(), array_max(), array_min(), array_position() and element_at()
df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L))))
result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]]
expect_equal(result, c(TRUE, FALSE))
Expand All @@ -1497,10 +1496,16 @@ test_that("column functions", {
result <- collect(select(df, element_at(df[[1]], 1L)))[[1]]
expect_equal(result, c(1, 6))

# Test array_sort() and sort_array()
df <- createDataFrame(list(list(list(2L, 1L, 3L, NA)), list(list(NA, 6L, 5L, NA, 4L))))

result <- collect(select(df, array_sort(df[[1]])))[[1]]
expect_equal(result, list(list(1L, 2L, 3L, NA), list(4L, 5L, 6L, NA, NA)))

result <- collect(select(df, sort_array(df[[1]], FALSE)))[[1]]
expect_equal(result, list(list(3L, 2L, 1L), list(6L, 5L, 4L)))
expect_equal(result, list(list(3L, 2L, 1L, NA), list(6L, 5L, 4L, NA, NA)))
result <- collect(select(df, sort_array(df[[1]])))[[1]]
expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L)))
expect_equal(result, list(list(NA, 1L, 2L, 3L), list(NA, NA, 4L, 5L, 6L)))

# Test flattern
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L, 4L))),
Expand Down Expand Up @@ -2210,8 +2215,8 @@ test_that("join(), crossJoin() and merge() on a DataFrame", {
expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
# cartesian join
expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for",
" INNER join between logical plans).*"))
paste0(".*(org.apache.spark.sql.AnalysisException: Detected implicit cartesian",
" product for INNER join between logical plans).*"))

joined <- crossJoin(df, df2)
expect_equal(names(joined), c("age", "name", "name", "test"))
Expand Down
6 changes: 3 additions & 3 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ source "${SPARK_HOME}"/bin/load-spark-env.sh
export _SPARK_CMD_USAGE="Usage: ./bin/pyspark [options]"

# In Spark 2.0, IPYTHON and IPYTHON_OPTS are removed and pyspark fails to launch if either option
# is set in the user's environment. Instead, users should set PYSPARK_DRIVER_PYTHON=ipython
# is set in the user's environment. Instead, users should set PYSPARK_DRIVER_PYTHON=ipython
# to use IPython and set PYSPARK_DRIVER_PYTHON_OPTS to pass options when starting the Python driver
# (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook'). This supports full customization of the IPython
# and executor Python executables.

# Fail noisily if removed options are set
if [[ -n "$IPYTHON" || -n "$IPYTHON_OPTS" ]]; then
echo "Error in pyspark startup:"
echo "Error in pyspark startup:"
echo "IPYTHON and IPYTHON_OPTS are removed in Spark 2.0+. Remove these from the environment and set PYSPARK_DRIVER_PYTHON and PYSPARK_DRIVER_PYTHON_OPTS instead."
exit 1
fi
Expand All @@ -57,7 +57,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.7-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.6-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.7-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.6</version>
<version>0.10.7</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
12 changes: 1 addition & 11 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@

package org.apache.spark

import java.lang.{Byte => JByte}
import java.net.{Authenticator, PasswordAuthentication}
import java.nio.charset.StandardCharsets.UTF_8
import java.security.{KeyStore, SecureRandom}
import java.security.cert.X509Certificate
import javax.net.ssl._

import com.google.common.hash.HashCodes
import com.google.common.io.Files
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

Expand Down Expand Up @@ -365,13 +360,8 @@ private[spark] class SecurityManager(
return
}

val rnd = new SecureRandom()
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
val secretBytes = new Array[Byte](length)
rnd.nextBytes(secretBytes)

secretKey = Utils.createSecret(sparkConf)
val creds = new Credentials()
secretKey = HashCodes.fromBytes(secretBytes).toString()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
UserGroupInformation.getCurrentUser().addCredentials(creds)
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
*/
private[spark] def validateSettings() {
if (contains("spark.local.dir")) {
val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
val msg = "Note that spark.local.dir will be overridden by the value set by " +
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS" +
" in YARN)."
logWarning(msg)
}

Expand Down
Loading

0 comments on commit 46f4436

Please sign in to comment.