Skip to content

Commit

Permalink
[SPARK-23770][R] Exposes repartitionByRange in SparkR
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR proposes to expose `repartitionByRange`.

```R
> df <- createDataFrame(iris)
...
> getNumPartitions(repartitionByRange(df, 3, col = df$Species))
[1] 3
```

## How was this patch tested?

Manually tested and the unit tests were added. The diff with `repartition` can be checked as below:

```R
> df <- createDataFrame(mtcars)
> take(repartition(df, 10, df$wt), 3)
   mpg cyl  disp  hp drat    wt  qsec vs am gear carb
1 14.3   8 360.0 245 3.21 3.570 15.84  0  0    3    4
2 10.4   8 460.0 215 3.00 5.424 17.82  0  0    3    4
3 32.4   4  78.7  66 4.08 2.200 19.47  1  1    4    1
> take(repartitionByRange(df, 10, df$wt), 3)
   mpg cyl disp hp drat    wt  qsec vs am gear carb
1 30.4   4 75.7 52 4.93 1.615 18.52  1  1    4    2
2 33.9   4 71.1 65 4.22 1.835 19.90  1  1    4    1
3 27.3   4 79.0 66 4.08 1.935 18.90  1  1    4    1
```

Author: hyukjinkwon <gurwls223@apache.org>

Closes #20902 from HyukjinKwon/r-repartitionByRange.
  • Loading branch information
HyukjinKwon committed Mar 29, 2018
1 parent 641aec6 commit 505480c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 2 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Expand Up @@ -151,6 +151,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
"repartitionByRange",
"rollup",
"sample",
"sample_frac",
Expand Down
65 changes: 63 additions & 2 deletions R/pkg/R/DataFrame.R
Expand Up @@ -687,7 +687,7 @@ setMethod("storageLevel",
#' @rdname coalesce
#' @name coalesce
#' @aliases coalesce,SparkDataFrame-method
#' @seealso \link{repartition}
#' @seealso \link{repartition}, \link{repartitionByRange}
#' @examples
#'\dontrun{
#' sparkR.session()
Expand Down Expand Up @@ -723,7 +723,7 @@ setMethod("coalesce",
#' @rdname repartition
#' @name repartition
#' @aliases repartition,SparkDataFrame-method
#' @seealso \link{coalesce}
#' @seealso \link{coalesce}, \link{repartitionByRange}
#' @examples
#'\dontrun{
#' sparkR.session()
Expand Down Expand Up @@ -759,6 +759,67 @@ setMethod("repartition",
dataFrame(sdf)
})


#' Repartition by range
#'
#' The following options for repartition by range are possible:
#' \itemize{
#' \item{1.} {Return a new SparkDataFrame range partitioned by
#' the given columns into \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#'
#' @param x a SparkDataFrame.
#' @param numPartitions the number of partitions to use.
#' @param col the column by which the range partitioning will be performed.
#' @param ... additional column(s) to be used in the range partitioning.
#'
#' @family SparkDataFrame functions
#' @rdname repartitionByRange
#' @name repartitionByRange
#' @aliases repartitionByRange,SparkDataFrame-method
#' @seealso \link{repartition}, \link{coalesce}
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' newDF <- repartitionByRange(df, col = df$col1, df$col2)
#' newDF <- repartitionByRange(df, 3L, col = df$col1, df$col2)
#'}
#' @note repartitionByRange since 2.4.0
setMethod("repartitionByRange",
signature(x = "SparkDataFrame"),
function(x, numPartitions = NULL, col = NULL, ...) {
if (!is.null(numPartitions) && !is.null(col)) {
# number of partitions and columns both are specified
if (is.numeric(numPartitions) && class(col) == "Column") {
cols <- list(col, ...)
jcol <- lapply(cols, function(c) { c@jc })
sdf <- callJMethod(x@sdf, "repartitionByRange", numToInt(numPartitions), jcol)
} else {
stop(paste("numPartitions and col must be numeric and Column; however, got",
class(numPartitions), "and", class(col)))
}
} else if (!is.null(col)) {
# only columns are specified
if (class(col) == "Column") {
cols <- list(col, ...)
jcol <- lapply(cols, function(c) { c@jc })
sdf <- callJMethod(x@sdf, "repartitionByRange", jcol)
} else {
stop(paste("col must be Column; however, got", class(col)))
}
} else if (!is.null(numPartitions)) {
# only numPartitions is specified
stop("At least one partition-by column must be specified.")
} else {
stop("Please, specify a column(s) or the number of partitions with a column(s)")
}
dataFrame(sdf)
})

#' toJSON
#'
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/generics.R
Expand Up @@ -531,6 +531,9 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
#' @rdname repartition
setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })

#' @rdname repartitionByRange
setGeneric("repartitionByRange", function(x, ...) { standardGeneric("repartitionByRange") })

#' @rdname sample
setGeneric("sample",
function(x, withReplacement = FALSE, fraction, seed) {
Expand Down
45 changes: 45 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Expand Up @@ -3104,6 +3104,51 @@ test_that("repartition by columns on DataFrame", {
})
})

test_that("repartitionByRange on a DataFrame", {
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
# partitions to reduce the number of the tasks to speed up the test. This is particularly
# slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
conf <- callJMethod(sparkSession, "conf")
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
tryCatch({
df <- createDataFrame(mtcars)
expect_error(repartitionByRange(df, "haha", df$mpg),
"numPartitions and col must be numeric and Column.*")
expect_error(repartitionByRange(df),
".*specify a column.*or the number of partitions with a column.*")
expect_error(repartitionByRange(df, col = "haha"),
"col must be Column; however, got.*")
expect_error(repartitionByRange(df, 3),
"At least one partition-by column must be specified.")

# The order of rows should be different with a normal repartition.
actual <- repartitionByRange(df, 3, df$mpg)
expect_equal(getNumPartitions(actual), 3)
expect_false(identical(collect(actual), collect(repartition(df, 3, df$mpg))))

actual <- repartitionByRange(df, col = df$mpg)
expect_false(identical(collect(actual), collect(repartition(df, col = df$mpg))))

# They should have same data.
actual <- collect(repartitionByRange(df, 3, df$mpg))
actual <- actual[order(actual$mpg), ]
expected <- collect(repartition(df, 3, df$mpg))
expected <- expected[order(expected$mpg), ]
expect_true(all(actual == expected))

actual <- collect(repartitionByRange(df, col = df$mpg))
actual <- actual[order(actual$mpg), ]
expected <- collect(repartition(df, col = df$mpg))
expected <- expected[order(expected$mpg), ]
expect_true(all(actual == expected))
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
})
})

test_that("coalesce, repartition, numPartitions", {
df <- as.DataFrame(cars, numPartitions = 5)
expect_equal(getNumPartitions(df), 5)
Expand Down

0 comments on commit 505480c

Please sign in to comment.