Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-25271-2
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Dec 4, 2018
2 parents 9629175 + 0889fba commit e04812d
Show file tree
Hide file tree
Showing 291 changed files with 6,836 additions and 5,345 deletions.
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ exportMethods("%<=>%",
"row_number",
"rpad",
"rtrim",
"schema_of_csv",
"schema_of_json",
"second",
"sha1",
"sha2",
Expand Down
22 changes: 16 additions & 6 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,6 @@ setMethod("repartition",
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#'
#' At least one partition-by expression must be specified.
#' When no explicit sort order is specified, "ascending nulls first" is assumed.
#'
Expand Down Expand Up @@ -828,7 +827,6 @@ setMethod("repartitionByRange",
#' toJSON
#'
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
#'
#' Each row is turned into a JSON document with columns as different fields.
#' The returned SparkDataFrame has a single character column with the name \code{value}
#'
Expand Down Expand Up @@ -2732,13 +2730,25 @@ setMethod("union",
dataFrame(unioned)
})

#' Return a new SparkDataFrame containing the union of rows
#' Return a new SparkDataFrame containing the union of rows.
#'
#' This is an alias for `union`.
#' This is an alias for \code{union}.
#'
#' @rdname union
#' @name unionAll
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the unionAll operation.
#' @family SparkDataFrame functions
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
#' @rdname unionAll
#' @name unionAll
#' @seealso \link{union}
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' unionAllDF <- unionAll(df1, df2)
#' }
#' @note unionAll since 1.4.0
setMethod("unionAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
Expand Down
77 changes: 70 additions & 7 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,18 @@ NULL
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' }
#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and \code{from_json},
#' this contains additional named properties to control how it is converted, accepts
#' the same options as the JSON/CSV data source. Additionally \code{to_json} supports
#' the "pretty" option which enables pretty JSON generation. In \code{arrays_zip},
#' this contains additional Columns of arrays to be merged.
#' @param ... additional argument(s).
#' \itemize{
#' \item \code{to_json}, \code{from_json} and \code{schema_of_json}: this contains
#' additional named properties to control how it is converted and accepts the
#' same options as the JSON data source.
#' \item \code{to_json}: it supports the "pretty" option which enables pretty
#' JSON generation.
#' \item \code{to_csv}, \code{from_csv} and \code{schema_of_csv}: this contains
#' additional named properties to control how it is converted and accepts the
#' same options as the CSV data source.
#' \item \code{arrays_zip}, this contains additional Columns of arrays to be merged.
#' }
#' @name column_collection_functions
#' @rdname column_collection_functions
#' @family collection functions
Expand Down Expand Up @@ -1771,12 +1778,16 @@ setMethod("to_date",
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a JSON object
#' df2 <- sql("SELECT map('name', 'Bob')) as people")
#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts an array of maps into a JSON array
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a pretty JSON object
#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people, pretty = TRUE))}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
function(x, ...) {
Expand Down Expand Up @@ -2285,6 +2296,32 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
column(jc)
})

#' @details
#' \code{schema_of_json}: Parses a JSON string and infers its schema in DDL format.
#'
#' @rdname column_collection_functions
#' @aliases schema_of_json schema_of_json,characterOrColumn-method
#' @examples
#'
#' \dontrun{
#' json <- "{\"name\":\"Bob\"}"
#' df <- sql("SELECT * FROM range(1)")
#' head(select(df, schema_of_json(json)))}
#' @note schema_of_json since 3.0.0
setMethod("schema_of_json", signature(x = "characterOrColumn"),
function(x, ...) {
if (class(x) == "character") {
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
} else {
col <- x@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"schema_of_json",
col, options)
column(jc)
})

#' @details
#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType}
#' with the specified \code{schema}.
Expand Down Expand Up @@ -2315,6 +2352,32 @@ setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
column(jc)
})

#' @details
#' \code{schema_of_csv}: Parses a CSV string and infers its schema in DDL format.
#'
#' @rdname column_collection_functions
#' @aliases schema_of_csv schema_of_csv,characterOrColumn-method
#' @examples
#'
#' \dontrun{
#' csv <- "Amsterdam,2018"
#' df <- sql("SELECT * FROM range(1)")
#' head(select(df, schema_of_csv(csv)))}
#' @note schema_of_csv since 3.0.0
setMethod("schema_of_csv", signature(x = "characterOrColumn"),
function(x, ...) {
if (class(x) == "character") {
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
} else {
col <- x@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"schema_of_csv",
col, options)
column(jc)
})

#' @details
#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a
Expand Down
10 changes: 9 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
#' @rdname union
setGeneric("union", function(x, y) { standardGeneric("union") })

#' @rdname union
#' @rdname unionAll
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })

#' @rdname unionByName
Expand Down Expand Up @@ -1206,6 +1206,14 @@ setGeneric("rpad", function(x, len, pad) { standardGeneric("rpad") })
#' @name NULL
setGeneric("rtrim", function(x, trimString) { standardGeneric("rtrim") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("schema_of_csv", function(x, ...) { standardGeneric("schema_of_csv") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("schema_of_json", function(x, ...) { standardGeneric("schema_of_json") })

#' @rdname column_aggregate_functions
#' @name NULL
setGeneric("sd", function(x, na.rm = FALSE) { standardGeneric("sd") })
Expand Down
16 changes: 14 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1620,14 +1620,20 @@ test_that("column functions", {
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

# Test from_csv()
# Test from_csv(), schema_of_csv()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)

# Test to_json(), from_json()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
c <- collect(select(df, schema_of_csv(lit("Amsterdam,2018"))))
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")

# Test to_json(), from_json(), schema_of_json()
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
Expand All @@ -1654,6 +1660,12 @@ test_that("column functions", {
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 })))
}

df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_json('{"name":"Bob"}')))
expect_equal(c[[1]], "struct<name:string>")
c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}'))))
expect_equal(c[[1]], "struct<name:string>")

# Test to_json() supports arrays of primitive types and arrays
df <- sql("SELECT array(19, 42, 70) as age")
j <- collect(select(df, alias(to_json(df$age), "json")))
Expand Down
17 changes: 14 additions & 3 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ function create_dev_build_context {(
"$PYSPARK_CTX/kubernetes/dockerfiles"
mkdir "$PYSPARK_CTX/python"
cp -r "python/lib" "$PYSPARK_CTX/python/lib"
cp -r "python/pyspark" "$PYSPARK_CTX/python/pyspark"

local R_CTX="$CTX_DIR/sparkr"
mkdir -p "$R_CTX/kubernetes"
Expand Down Expand Up @@ -146,6 +147,12 @@ function build {
fi

local BUILD_ARGS=(${BUILD_PARAMS})

# If a custom SPARK_UID was set add it to build arguments
if [ -n "$SPARK_UID" ]; then
BUILD_ARGS+=(--build-arg spark_uid=$SPARK_UID)
fi

local BINDING_BUILD_ARGS=(
${BUILD_PARAMS}
--build-arg
Expand Down Expand Up @@ -207,8 +214,10 @@ Options:
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache
-b arg Build arg to build or push the image. For multiple build args, this option needs to
be used separately for each build arg.
-u uid UID to use in the USER directive to set the user the main Spark process runs as inside the
resulting container
-b arg Build arg to build or push the image. For multiple build args, this option needs to
be used separately for each build arg.
Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
Expand Down Expand Up @@ -243,7 +252,8 @@ PYDOCKERFILE=
RDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
while getopts f:p:R:mr:t:nb: option
SPARK_UID=
while getopts f:p:R:mr:t:nb:u: option
do
case "${option}"
in
Expand All @@ -263,6 +273,7 @@ do
fi
eval $(minikube docker-env)
;;
u) SPARK_UID=${OPTARG};;
esac
done

Expand Down
13 changes: 13 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,19 @@
<scope>provided</scope>
</dependency>

<!--
The following kafka dependency used to obtain delegation token.
In order to prevent spark-core from depending on kafka, these deps have been placed in the
"provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are
handled when the user explicitly use neither spark-streaming-kafka nor spark-sql-kafka modules.
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/resources/org/apache/spark/ui/static/stagepage.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ function createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTable) {
"searching": false,
"order": [[0, "asc"]],
"bSort": false,
"bAutoWidth": false
"bAutoWidth": false,
"oLanguage": {
"sEmptyTable": "No tasks have reported metrics yet"
}
};
taskSummaryMetricsDataTable = $(taskMetricsTable).DataTable(taskConf);
}
Expand Down Expand Up @@ -426,7 +429,10 @@ $(document).ready(function () {
}
],
"order": [[0, "asc"]],
"bAutoWidth": false
"bAutoWidth": false,
"oLanguage": {
"sEmptyTable": "No data to show yet"
}
}
var executorSummaryTableSelector =
$("#summary-executor-table").DataTable(executorSummaryConf);
Expand Down
29 changes: 25 additions & 4 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
with Logging {

private var encryptionServer: PythonServer[Unit] = null
private var decryptionServer: PythonServer[Unit] = null

/**
* Read data from disks, then copy it to `out`
Expand Down Expand Up @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
override def handleConnection(sock: Socket): Unit = {
val env = SparkEnv.get
val in = sock.getInputStream()
val dir = new File(Utils.getLocalDir(env.conf))
val file = File.createTempFile("broadcast", "", dir)
path = file.getAbsolutePath
val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path))
val abspath = new File(path).getAbsolutePath
val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath))
DechunkedInputStream.dechunkAndCopyToOutput(in, out)
}
}
Array(encryptionServer.port, encryptionServer.secret)
}

def setupDecryptionServer(): Array[Any] = {
decryptionServer = new PythonServer[Unit]("broadcast-decrypt-server-for-driver") {
override def handleConnection(sock: Socket): Unit = {
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()))
Utils.tryWithSafeFinally {
val in = SparkEnv.get.serializerManager.wrapForEncryption(new FileInputStream(path))
Utils.tryWithSafeFinally {
Utils.copyStream(in, out, false)
} {
in.close()
}
out.flush()
} {
JavaUtils.closeQuietly(out)
}
}
}
Array(decryptionServer.port, decryptionServer.secret)
}

def waitTillBroadcastDataSent(): Unit = decryptionServer.getResult()

def waitTillDataReceived(): Unit = encryptionServer.getResult()
}
// scalastyle:on no.finalize
Expand Down

0 comments on commit e04812d

Please sign in to comment.