Skip to content

Commit

Permalink
Merge pull request apache#159 from sun-rui/SPARKR-150_2
Browse files Browse the repository at this point in the history
[SPARKR-150] phase 2: implement takeOrdered() and top().
  • Loading branch information
shivaram committed Feb 11, 2015
2 parents bd6705b + f4573c1 commit 7972858
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/NAMESPACE
Expand Up @@ -49,7 +49,9 @@ exportMethods(
"sortBy",
"sortByKey",
"take",
"takeOrdered",
"takeSample",
"top",
"unionRDD",
"unpersist",
"value",
Expand Down
80 changes: 80 additions & 0 deletions pkg/R/RDD.R
Expand Up @@ -1294,6 +1294,86 @@ setMethod("sortBy",
values(sortByKey(keyBy(rdd, func), ascending, numPartitions))
})

# Helper function to get first N elements from an RDD in the specified order.
# Param:
# rdd An RDD.
# num Number of elements to return.
# ascending A flag to indicate whether the sorting is ascending or descending.
# Return:
# A list of the first N elements from the RDD in the specified order.
#
takeOrderedElem <- function(rdd, num, ascending = TRUE) {
if (num <= 0L) {
return(list())
}

partitionFunc <- function(part) {
if (num < length(part)) {
# R limitation: order works only on primitive types!
ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
list(part[ord[1:num]])
} else {
list(part)
}
}

reduceFunc <- function(elems, part) {
newElems <- append(elems, part)
# R limitation: order works only on primitive types!
ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending)
newElems[ord[1:num]]
}

newRdd <- mapPartitions(rdd, partitionFunc)
reduce(newRdd, reduceFunc)
}

#' Returns the first N elements from an RDD in ascending order.
#'
#' @param rdd An RDD.
#' @param num Number of elements to return.
#' @return The first N elements from the RDD in ascending order.
#' @rdname takeOrdered
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
#'}
setGeneric("takeOrdered", function(rdd, num) { standardGeneric("takeOrdered") })

#' @rdname takeOrdered
#' @aliases takeOrdered,RDD,RDD-method
setMethod("takeOrdered",
signature(rdd = "RDD", num = "integer"),
function(rdd, num) {
takeOrderedElem(rdd, num)
})

#' Returns the top N elements from an RDD.
#'
#' @param rdd An RDD.
#' @param num Number of elements to return.
#' @return The top N elements from the RDD.
#' @rdname top
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
#'}
setGeneric("top", function(rdd, num) { standardGeneric("top") })

#' @rdname top
#' @aliases top,RDD,RDD-method
setMethod("top",
signature(rdd = "RDD", num = "integer"),
function(rdd, num) {
takeOrderedElem(rdd, num, FALSE)
})

############ Shuffle Functions ############

#' Partition an RDD by key
Expand Down
24 changes: 24 additions & 0 deletions pkg/inst/tests/test_rdd.R
Expand Up @@ -278,6 +278,30 @@ test_that("sortBy() on RDDs", {
expect_equal(actual, as.list(nums))
})

test_that("takeOrdered() on RDDs", {
l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
rdd <- parallelize(sc, l)
actual <- takeOrdered(rdd, 6L)
expect_equal(actual, as.list(sort(unlist(l)))[1:6])

l <- list("e", "d", "c", "d", "a")
rdd <- parallelize(sc, l)
actual <- takeOrdered(rdd, 3L)
expect_equal(actual, as.list(sort(unlist(l)))[1:3])
})

test_that("top() on RDDs", {
l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
rdd <- parallelize(sc, l)
actual <- top(rdd, 6L)
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])

l <- list("e", "d", "c", "d", "a")
rdd <- parallelize(sc, l)
actual <- top(rdd, 3L)
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
})

test_that("keys() on RDDs", {
keys <- keys(intRdd)
actual <- collect(keys)
Expand Down
31 changes: 31 additions & 0 deletions pkg/man/takeOrdered.Rd
@@ -0,0 +1,31 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{takeOrdered}
\alias{takeOrdered}
\alias{takeOrdered,RDD,RDD-method}
\alias{takeOrdered,RDD,integer-method}
\title{Returns the first N elements from an RDD in ascending order.}
\usage{
takeOrdered(rdd, num)

\S4method{takeOrdered}{RDD,integer}(rdd, num)
}
\arguments{
\item{rdd}{An RDD.}

\item{num}{Number of elements to return.}
}
\value{
The first N elements from the RDD in ascending order.
}
\description{
Returns the first N elements from an RDD in ascending order.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
}
}

31 changes: 31 additions & 0 deletions pkg/man/top.Rd
@@ -0,0 +1,31 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{top}
\alias{top}
\alias{top,RDD,RDD-method}
\alias{top,RDD,integer-method}
\title{Returns the top N elements from an RDD.}
\usage{
top(rdd, num)

\S4method{top}{RDD,integer}(rdd, num)
}
\arguments{
\item{rdd}{An RDD.}

\item{num}{Number of elements to return.}
}
\value{
The top N elements from the RDD.
}
\description{
Returns the top N elements from an RDD.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
}
}

0 comments on commit 7972858

Please sign in to comment.