Skip to content

Commit

Permalink
Merge remote-tracking branch 'spark/master' into datacopy
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Jan 21, 2018
2 parents b412b54 + 793841c commit b5ab25a
Show file tree
Hide file tree
Showing 704 changed files with 25,655 additions and 6,985 deletions.
3 changes: 2 additions & 1 deletion R/pkg/DESCRIPTION
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
Version: 2.3.0
Version: 2.4.0
Title: R Frontend for Apache Spark
Description: Provides an R Frontend for Apache Spark.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
Expand Down Expand Up @@ -59,3 +59,4 @@ Collate:
'window.R'
RoxygenNote: 5.0.1
VignetteBuilder: knitr
NeedsCompilation: no
9 changes: 8 additions & 1 deletion R/pkg/NAMESPACE
Expand Up @@ -76,7 +76,9 @@ exportMethods("glm",
export("setJobGroup",
"clearJobGroup",
"cancelJobGroup",
"setJobDescription")
"setJobDescription",
"setLocalProperty",
"getLocalProperty")

# Export Utility methods
export("setLogLevel")
Expand Down Expand Up @@ -133,6 +135,7 @@ exportMethods("arrange",
"isStreaming",
"join",
"limit",
"localCheckpoint",
"merge",
"mutate",
"na.omit",
Expand Down Expand Up @@ -176,6 +179,7 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
"withWatermark",
"write.df",
"write.jdbc",
"write.json",
Expand Down Expand Up @@ -225,11 +229,14 @@ exportMethods("%<=>%",
"crc32",
"create_array",
"create_map",
"current_date",
"current_timestamp",
"hash",
"cume_dist",
"date_add",
"date_format",
"date_sub",
"date_trunc",
"datediff",
"dayofmonth",
"dayofweek",
Expand Down
143 changes: 132 additions & 11 deletions R/pkg/R/DataFrame.R
Expand Up @@ -2297,6 +2297,7 @@ setClassUnion("characterOrColumn", c("character", "Column"))
#' @param ... additional sorting fields
#' @param decreasing a logical argument indicating sorting order for columns when
#' a character vector is specified for col
#' @param withinPartitions a logical argument indicating whether to sort only within each partition
#' @return A SparkDataFrame where all elements are sorted.
#' @family SparkDataFrame functions
#' @aliases arrange,SparkDataFrame,Column-method
Expand All @@ -2312,16 +2313,21 @@ setClassUnion("characterOrColumn", c("character", "Column"))
#' arrange(df, asc(df$col1), desc(abs(df$col2)))
#' arrange(df, "col1", decreasing = TRUE)
#' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE))
#' arrange(df, "col1", "col2", withinPartitions = TRUE)
#' }
#' @note arrange(SparkDataFrame, Column) since 1.4.0
setMethod("arrange",
signature(x = "SparkDataFrame", col = "Column"),
function(x, col, ...) {
function(x, col, ..., withinPartitions = FALSE) {
jcols <- lapply(list(col, ...), function(c) {
c@jc
})

sdf <- callJMethod(x@sdf, "sort", jcols)
if (withinPartitions) {
sdf <- callJMethod(x@sdf, "sortWithinPartitions", jcols)
} else {
sdf <- callJMethod(x@sdf, "sort", jcols)
}
dataFrame(sdf)
})

Expand All @@ -2332,7 +2338,7 @@ setMethod("arrange",
#' @note arrange(SparkDataFrame, character) since 1.4.0
setMethod("arrange",
signature(x = "SparkDataFrame", col = "character"),
function(x, col, ..., decreasing = FALSE) {
function(x, col, ..., decreasing = FALSE, withinPartitions = FALSE) {

# all sorting columns
by <- list(col, ...)
Expand All @@ -2356,7 +2362,7 @@ setMethod("arrange",
}
})

do.call("arrange", c(x, jcols))
do.call("arrange", c(x, jcols, withinPartitions = withinPartitions))
})

#' @rdname arrange
Expand Down Expand Up @@ -2847,7 +2853,7 @@ setMethod("intersect",
#' except
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT} in SQL.
#' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT DISTINCT} in SQL.
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
Expand Down Expand Up @@ -3048,10 +3054,10 @@ setMethod("describe",
#' \item stddev
#' \item min
#' \item max
#' \item arbitrary approximate percentiles specified as a percentage (eg, "75%")
#' \item arbitrary approximate percentiles specified as a percentage (eg, "75\%")
#' }
#' If no statistics are given, this function computes count, mean, stddev, min,
#' approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
#' approximate quartiles (percentiles at 25\%, 50\%, and 75\%), and max.
#' This function is meant for exploratory data analysis, as we make no guarantee about the
#' backward compatibility of the schema of the resulting Dataset. If you want to
#' programmatically compute summary statistics, use the \code{agg} function instead.
Expand Down Expand Up @@ -3655,7 +3661,8 @@ setMethod("getNumPartitions",
#' isStreaming
#'
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
#' as it arrives.
#' as it arrives. A dataset that reads data from a streaming source must be executed as a
#' \code{StreamingQuery} using \code{write.stream}.
#'
#' @param x A SparkDataFrame
#' @return TRUE if this SparkDataFrame is from a streaming source
Expand Down Expand Up @@ -3701,7 +3708,17 @@ setMethod("isStreaming",
#' @param df a streaming SparkDataFrame.
#' @param source a name for external data source.
#' @param outputMode one of 'append', 'complete', 'update'.
#' @param ... additional argument(s) passed to the method.
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
#' system. If specified, the output is laid out on the file system similar to Hive's
#' partitioning scheme.
#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
#' '1 minute'. This is a trigger that runs a query periodically based on the processing
#' time. If value is '0 seconds', the query will run as fast as possible, this is the
#' default. Only one trigger can be set.
#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
#' one batch of data in a streaming query then terminates the query. Only one trigger can be
#' set.
#' @param ... additional external data source specific named options.
#'
#' @family SparkDataFrame functions
#' @seealso \link{read.stream}
Expand All @@ -3719,7 +3736,8 @@ setMethod("isStreaming",
#' # console
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
#' # text stream
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
#' # memory stream
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
#' head(sql("SELECT * from outs"))
Expand All @@ -3731,7 +3749,8 @@ setMethod("isStreaming",
#' @note experimental
setMethod("write.stream",
signature(df = "SparkDataFrame"),
function(df, source = NULL, outputMode = NULL, ...) {
function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
trigger.processingTime = NULL, trigger.once = NULL, ...) {
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the data source specified ",
"in 'spark.sql.sources.default' configuration by default.")
Expand All @@ -3742,12 +3761,43 @@ setMethod("write.stream",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
cols <- NULL
if (!is.null(partitionBy)) {
if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
stop("All partitionBy column names should be characters.")
}
cols <- as.list(partitionBy)
}
jtrigger <- NULL
if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
if (!is.null(trigger.once)) {
stop("Multiple triggers not allowed.")
}
interval <- as.character(trigger.processingTime)
if (nchar(interval) == 0) {
stop("Value for trigger.processingTime must be a non-empty string.")
}
jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
"ProcessingTime",
interval)
} else if (!is.null(trigger.once) && !is.na(trigger.once)) {
if (!is.logical(trigger.once) || !trigger.once) {
stop("Value for trigger.once must be TRUE.")
}
jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
}
options <- varargsToStrEnv(...)
write <- handledCallJMethod(df@sdf, "writeStream")
write <- callJMethod(write, "format", source)
if (!is.null(outputMode)) {
write <- callJMethod(write, "outputMode", outputMode)
}
if (!is.null(cols)) {
write <- callJMethod(write, "partitionBy", cols)
}
if (!is.null(jtrigger)) {
write <- callJMethod(write, "trigger", jtrigger)
}
write <- callJMethod(write, "options", options)
ssq <- handledCallJMethod(write, "start")
streamingQuery(ssq)
Expand Down Expand Up @@ -3782,6 +3832,33 @@ setMethod("checkpoint",
dataFrame(df)
})

#' localCheckpoint
#'
#' Returns a locally checkpointed version of this SparkDataFrame. Checkpointing can be used to
#' truncate the logical plan, which is especially useful in iterative algorithms where the plan
#' may grow exponentially. Local checkpoints are stored in the executors using the caching
#' subsystem and therefore they are not reliable.
#'
#' @param x A SparkDataFrame
#' @param eager whether to locally checkpoint this SparkDataFrame immediately
#' @return a new locally checkpointed SparkDataFrame
#' @family SparkDataFrame functions
#' @aliases localCheckpoint,SparkDataFrame-method
#' @rdname localCheckpoint
#' @name localCheckpoint
#' @export
#' @examples
#'\dontrun{
#' df <- localCheckpoint(df)
#' }
#' @note localCheckpoint since 2.3.0
setMethod("localCheckpoint",
signature(x = "SparkDataFrame"),
function(x, eager = TRUE) {
df <- callJMethod(x@sdf, "localCheckpoint", as.logical(eager))
dataFrame(df)
})

#' cube
#'
#' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
Expand Down Expand Up @@ -3934,3 +4011,47 @@ setMethod("broadcast",
sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
dataFrame(sdf)
})

#' withWatermark
#'
#' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
#' time before which we assume no more late data is going to arrive.
#'
#' Spark will use this watermark for several purposes:
#' \itemize{
#' \item To know when a given time window aggregation can be finalized and thus can be emitted
#' when using output modes that do not allow updates.
#' \item To minimize the amount of state that we need to keep for on-going aggregations.
#' }
#' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
#' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
#' of coordinating this value across partitions, the actual watermark used is only guaranteed
#' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
#' process records that arrive more than \code{delayThreshold} late.
#'
#' @param x a streaming SparkDataFrame
#' @param eventTime a string specifying the name of the Column that contains the event time of the
#' row.
#' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
#' relative to the latest record that has been processed in the form of an
#' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
#' @return a SparkDataFrame.
#' @aliases withWatermark,SparkDataFrame,character,character-method
#' @family SparkDataFrame functions
#' @rdname withWatermark
#' @name withWatermark
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
#' df <- withWatermark(df, "time", "10 minutes")
#' }
#' @note withWatermark since 2.3.0
setMethod("withWatermark",
signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
function(x, eventTime, delayThreshold) {
sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
dataFrame(sdf)
})
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Expand Up @@ -727,7 +727,9 @@ read.jdbc <- function(url, tableName,
#' @param schema The data schema defined in structType or a DDL-formatted string, this is
#' required for file-based streaming data source
#' @param ... additional external data source specific named options, for instance \code{path} for
#' file-based streaming data source
#' file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
#' parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
#' uses the default value, session local timezone.
#' @return SparkDataFrame
#' @rdname read.stream
#' @name read.stream
Expand Down

0 comments on commit b5ab25a

Please sign in to comment.