diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 812e85238e9c6..b629b01ab2140 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -49,7 +49,9 @@ exportMethods( "sortBy", "sortByKey", "take", + "takeOrdered", "takeSample", + "top", "unionRDD", "unpersist", "value", diff --git a/pkg/R/RDD.R b/pkg/R/RDD.R index 3f6d96251365b..3ffe017e15182 100644 --- a/pkg/R/RDD.R +++ b/pkg/R/RDD.R @@ -1294,6 +1294,86 @@ setMethod("sortBy", values(sortByKey(keyBy(rdd, func), ascending, numPartitions)) }) +# Helper function to get first N elements from an RDD in the specified order. +# Param: +# rdd An RDD. +# num Number of elements to return. +# ascending A flag to indicate whether the sorting is ascending or descending. +# Return: +# A list of the first N elements from the RDD in the specified order. +# +takeOrderedElem <- function(rdd, num, ascending = TRUE) { + if (num <= 0L) { + return(list()) + } + + partitionFunc <- function(part) { + if (num < length(part)) { + # R limitation: order works only on primitive types! + ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending) + list(part[ord[1:num]]) + } else { + list(part) + } + } + + reduceFunc <- function(elems, part) { + newElems <- append(elems, part) + # R limitation: order works only on primitive types! + ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending) + newElems[ord[1:num]] + } + + newRdd <- mapPartitions(rdd, partitionFunc) + reduce(newRdd, reduceFunc) +} + +#' Returns the first N elements from an RDD in ascending order. +#' +#' @param rdd An RDD. +#' @param num Number of elements to return. +#' @return The first N elements from the RDD in ascending order. +#' @rdname takeOrdered +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7)) +#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6) +#'} +setGeneric("takeOrdered", function(rdd, num) { standardGeneric("takeOrdered") }) + +#' @rdname takeOrdered +#' @aliases takeOrdered,RDD,RDD-method +setMethod("takeOrdered", + signature(rdd = "RDD", num = "integer"), + function(rdd, num) { + takeOrderedElem(rdd, num) + }) + +#' Returns the top N elements from an RDD. +#' +#' @param rdd An RDD. +#' @param num Number of elements to return. +#' @return The top N elements from the RDD. +#' @rdname top +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7)) +#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4) +#'} +setGeneric("top", function(rdd, num) { standardGeneric("top") }) + +#' @rdname top +#' @aliases top,RDD,RDD-method +setMethod("top", + signature(rdd = "RDD", num = "integer"), + function(rdd, num) { + takeOrderedElem(rdd, num, FALSE) + }) + ############ Shuffle Functions ############ #' Partition an RDD by key diff --git a/pkg/inst/tests/test_rdd.R b/pkg/inst/tests/test_rdd.R index 2f48db61020fd..fa6c112ba7a51 100644 --- a/pkg/inst/tests/test_rdd.R +++ b/pkg/inst/tests/test_rdd.R @@ -278,6 +278,30 @@ test_that("sortBy() on RDDs", { expect_equal(actual, as.list(nums)) }) +test_that("takeOrdered() on RDDs", { + l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7) + rdd <- parallelize(sc, l) + actual <- takeOrdered(rdd, 6L) + expect_equal(actual, as.list(sort(unlist(l)))[1:6]) + + l <- list("e", "d", "c", "d", "a") + rdd <- parallelize(sc, l) + actual <- takeOrdered(rdd, 3L) + expect_equal(actual, as.list(sort(unlist(l)))[1:3]) +}) + +test_that("top() on RDDs", { + l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7) + rdd <- parallelize(sc, l) + actual <- top(rdd, 6L) + expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6]) + + l <- list("e", "d", "c", "d", "a") + rdd <- parallelize(sc, l) + actual <- top(rdd, 3L) + expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3]) +}) + test_that("keys() on RDDs", { keys <- keys(intRdd) actual <- collect(keys) diff --git a/pkg/man/takeOrdered.Rd b/pkg/man/takeOrdered.Rd new file mode 100644 index 0000000000000..9ae2137abed21 --- /dev/null +++ b/pkg/man/takeOrdered.Rd @@ -0,0 +1,31 @@ +% Generated by roxygen2 (4.0.2): do not edit by hand +\docType{methods} +\name{takeOrdered} +\alias{takeOrdered} +\alias{takeOrdered,RDD,RDD-method} +\alias{takeOrdered,RDD,integer-method} +\title{Returns the first N elements from an RDD in ascending order.} +\usage{ +takeOrdered(rdd, num) + +\S4method{takeOrdered}{RDD,integer}(rdd, num) +} +\arguments{ +\item{rdd}{An RDD.} + +\item{num}{Number of elements to return.} +} +\value{ +The first N elements from the RDD in ascending order. +} +\description{ +Returns the first N elements from an RDD in ascending order. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7)) +takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6) +} +} + diff --git a/pkg/man/top.Rd b/pkg/man/top.Rd new file mode 100644 index 0000000000000..627a43fd4ff71 --- /dev/null +++ b/pkg/man/top.Rd @@ -0,0 +1,31 @@ +% Generated by roxygen2 (4.0.2): do not edit by hand +\docType{methods} +\name{top} +\alias{top} +\alias{top,RDD,RDD-method} +\alias{top,RDD,integer-method} +\title{Returns the top N elements from an RDD.} +\usage{ +top(rdd, num) + +\S4method{top}{RDD,integer}(rdd, num) +} +\arguments{ +\item{rdd}{An RDD.} + +\item{num}{Number of elements to return.} +} +\value{ +The top N elements from the RDD. +} +\description{ +Returns the top N elements from an RDD. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7)) +top(rdd, 6L) # list(10, 9, 7, 6, 5, 4) +} +} +