Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into improve-global-li…
Browse files Browse the repository at this point in the history
…mit-parallelism
  • Loading branch information
viirya committed Oct 31, 2017
2 parents f2a7aac + 44c4003 commit 7598337
Show file tree
Hide file tree
Showing 769 changed files with 29,377 additions and 6,140 deletions.
1 change: 1 addition & 0 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

set -o pipefail
set -e
set -x

FWDIR="$(cd "`dirname "${BASH_SOURCE[0]}"`"; pwd)"
LIB_DIR="$FWDIR/lib"
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/.lintr
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
linters: with_defaults(line_length_linter(100), multiple_dots_linter = NULL, camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE))
linters: with_defaults(line_length_linter(100), multiple_dots_linter = NULL, object_name_linter = NULL, camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE))
exclusions: list("inst/profile/general.R" = 1, "inst/profile/shell.R")
73 changes: 46 additions & 27 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -986,10 +986,10 @@ setMethod("unique",
#' @param x A SparkDataFrame
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @param seed Randomness seed value
#' @param seed Randomness seed value. Default is a random seed.
#'
#' @family SparkDataFrame functions
#' @aliases sample,SparkDataFrame,logical,numeric-method
#' @aliases sample,SparkDataFrame-method
#' @rdname sample
#' @name sample
#' @export
Expand All @@ -998,33 +998,47 @@ setMethod("unique",
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' collect(sample(df, fraction = 0.5))
#' collect(sample(df, FALSE, 0.5))
#' collect(sample(df, TRUE, 0.5))
#' collect(sample(df, TRUE, 0.5, seed = 3))
#'}
#' @note sample since 1.4.0
setMethod("sample",
signature(x = "SparkDataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction, seed) {
if (fraction < 0.0) stop(cat("Negative fraction value:", fraction))
signature(x = "SparkDataFrame"),
function(x, withReplacement = FALSE, fraction, seed) {
if (!is.numeric(fraction)) {
stop(paste("fraction must be numeric; however, got", class(fraction)))
}
if (!is.logical(withReplacement)) {
stop(paste("withReplacement must be logical; however, got", class(withReplacement)))
}

if (!missing(seed)) {
if (is.null(seed)) {
stop("seed must not be NULL or NA; however, got NULL")
}
if (is.na(seed)) {
stop("seed must not be NULL or NA; however, got NA")
}

# TODO : Figure out how to send integer as java.lang.Long to JVM so
# we can send seed as an argument through callJMethod
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction, as.integer(seed))
sdf <- handledCallJMethod(x@sdf, "sample", as.logical(withReplacement),
as.numeric(fraction), as.integer(seed))
} else {
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction)
sdf <- handledCallJMethod(x@sdf, "sample",
as.logical(withReplacement), as.numeric(fraction))
}
dataFrame(sdf)
})

#' @rdname sample
#' @aliases sample_frac,SparkDataFrame,logical,numeric-method
#' @aliases sample_frac,SparkDataFrame-method
#' @name sample_frac
#' @note sample_frac since 1.4.0
setMethod("sample_frac",
signature(x = "SparkDataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction, seed) {
signature(x = "SparkDataFrame"),
function(x, withReplacement = FALSE, fraction, seed) {
sample(x, withReplacement, fraction, seed)
})

Expand Down Expand Up @@ -1177,6 +1191,9 @@ setMethod("collect",
vec <- do.call(c, col)
stopifnot(class(vec) != "list")
class(vec) <- PRIMITIVE_TYPES[[colType]]
if (is.character(vec) && stringsAsFactors) {
vec <- as.factor(vec)
}
df[[colIndex]] <- vec
} else {
df[[colIndex]] <- col
Expand Down Expand Up @@ -1909,13 +1926,15 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' @param i,subset (Optional) a logical expression to filter on rows.
#' For extract operator [[ and replacement operator [[<-, the indexing parameter for
#' a single Column.
#' @param j,select expression for the single Column or a list of columns to select from the SparkDataFrame.
#' @param j,select expression for the single Column or a list of columns to select from the
#' SparkDataFrame.
#' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
#' Otherwise, a SparkDataFrame will always be returned.
#' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
#' If \code{NULL}, the specified Column is dropped.
#' @param ... currently not used.
#' @return A new SparkDataFrame containing only the rows that meet the condition with selected columns.
#' @return A new SparkDataFrame containing only the rows that meet the condition with selected
#' columns.
#' @export
#' @family SparkDataFrame functions
#' @aliases subset,SparkDataFrame-method
Expand Down Expand Up @@ -2594,12 +2613,12 @@ setMethod("merge",
} else {
# if by or both by.x and by.y have length 0, use Cartesian Product
joinRes <- crossJoin(x, y)
return (joinRes)
return(joinRes)
}

# sets alias for making colnames unique in dataframes 'x' and 'y'
colsX <- generateAliasesForIntersectedCols(x, by, suffixes[1])
colsY <- generateAliasesForIntersectedCols(y, by, suffixes[2])
colsX <- genAliasesForIntersectedCols(x, by, suffixes[1])
colsY <- genAliasesForIntersectedCols(y, by, suffixes[2])

# selects columns with their aliases from dataframes
# in case same column names are present in both data frames
Expand Down Expand Up @@ -2647,17 +2666,16 @@ setMethod("merge",
#' @param intersectedColNames a list of intersected column names of the SparkDataFrame
#' @param suffix a suffix for the column name
#' @return list of columns
#'
#' @note generateAliasesForIntersectedCols since 1.6.0
generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
#' @noRd
genAliasesForIntersectedCols <- function(x, intersectedColNames, suffix) {
allColNames <- names(x)
# sets alias for making colnames unique in dataframe 'x'
cols <- lapply(allColNames, function(colName) {
col <- getColumn(x, colName)
if (colName %in% intersectedColNames) {
newJoin <- paste(colName, suffix, sep = "")
if (newJoin %in% allColNames){
stop ("The following column name: ", newJoin, " occurs more than once in the 'DataFrame'.",
stop("The following column name: ", newJoin, " occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.")
}
col <- alias(col, newJoin)
Expand Down Expand Up @@ -3044,7 +3062,8 @@ setMethod("describe",
#' summary(select(df, "age", "height"))
#' }
#' @note summary(SparkDataFrame) since 1.5.0
#' @note The statistics provided by \code{summary} were change in 2.3.0 use \link{describe} for previous defaults.
#' @note The statistics provided by \code{summary} were change in 2.3.0 use \link{describe} for
#' previous defaults.
#' @seealso \link{describe}
setMethod("summary",
signature(object = "SparkDataFrame"),
Expand Down Expand Up @@ -3751,8 +3770,8 @@ setMethod("checkpoint",
#'
#' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
#'
#' If grouping expression is missing \code{cube} creates a single global aggregate and is equivalent to
#' direct application of \link{agg}.
#' If grouping expression is missing \code{cube} creates a single global aggregate and is
#' equivalent to direct application of \link{agg}.
#'
#' @param x a SparkDataFrame.
#' @param ... character name(s) or Column(s) to group on.
Expand Down Expand Up @@ -3786,8 +3805,8 @@ setMethod("cube",
#'
#' Create a multi-dimensional rollup for the SparkDataFrame using the specified columns.
#'
#' If grouping expression is missing \code{rollup} creates a single global aggregate and is equivalent to
#' direct application of \link{agg}.
#' If grouping expression is missing \code{rollup} creates a single global aggregate and is
#' equivalent to direct application of \link{agg}.
#'
#' @param x a SparkDataFrame.
#' @param ... character name(s) or Column(s) to group on.
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ PipelinedRDD <- function(prev, func) {
# Return the serialization mode for an RDD.
setGeneric("getSerializedMode", function(rdd, ...) { standardGeneric("getSerializedMode") })
# For normal RDDs we can directly read the serializedMode
setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode )
setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode)
# For pipelined RDDs if jrdd_val is set then serializedMode should exist
# if not we return the defaultSerialization mode of "byte" as we don't know the serialization
# mode at this point in time.
Expand All @@ -145,7 +145,7 @@ setMethod("getSerializedMode", signature(rdd = "PipelinedRDD"),
})

# The jrdd accessor function.
setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd )
setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd)
setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
function(rdd, serializedMode = "byte") {
if (!is.null(rdd@env$jrdd_val)) {
Expand Down Expand Up @@ -893,7 +893,7 @@ setMethod("sampleRDD",
if (withReplacement) {
count <- stats::rpois(1, fraction)
if (count > 0) {
res[ (len + 1) : (len + count) ] <- rep(list(elem), count)
res[(len + 1) : (len + count)] <- rep(list(elem), count)
len <- len + count
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/WindowSpec.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ setMethod("show", "WindowSpec",
setMethod("partitionBy",
signature(x = "WindowSpec"),
function(x, col, ...) {
stopifnot (class(col) %in% c("character", "Column"))
stopifnot(class(col) %in% c("character", "Column"))

if (class(col) == "character") {
windowSpec(callJMethod(x@sws, "partitionBy", col, list(...)))
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,10 @@ setMethod("between", signature(x = "Column"),
#' @param x a Column.
#' @param dataType a character object describing the target data type.
#' See
# nolint start
#' \href{https://spark.apache.org/docs/latest/sparkr.html#data-type-mapping-between-r-and-spark}{
#' Spark Data Types} for available data types.
# nolint end
#' @rdname cast
#' @name cast
#' @family colum_func
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ spark.addFile <- function(path, recursive = FALSE) {
#' spark.getSparkFilesRootDirectory()
#'}
#' @note spark.getSparkFilesRootDirectory since 2.1.0
spark.getSparkFilesRootDirectory <- function() {
spark.getSparkFilesRootDirectory <- function() { # nolint
if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
# Running on driver.
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ readObject <- function(con) {
}

readTypedObject <- function(con, type) {
switch (type,
switch(type,
"i" = readInt(con),
"c" = readString(con),
"b" = readBoolean(con),
Expand Down

0 comments on commit 7598337

Please sign in to comment.