Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7033][SPARKR] Clean usage of split. Use partition instead where applicable. #5628

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 18 additions & 18 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
pipelinedFunc <- function(partIndex, part) {
func(partIndex, prev@func(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
Expand Down Expand Up @@ -306,7 +306,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
partitions <- callJMethod(jrdd, "splits")
partitions <- callJMethod(jrdd, "partitions")
callJMethod(partitions, "size")
})

Expand Down Expand Up @@ -452,8 +452,8 @@ setMethod("countByValue",
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
func <- function(split, iterator) {
lapply(iterator, FUN)
func <- function(partIndex, part) {
lapply(part, FUN)
}
lapplyPartitionsWithIndex(X, func)
})
Expand Down Expand Up @@ -538,8 +538,8 @@ setMethod("mapPartitions",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 5L)
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
#' split * Reduce("+", part) })
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
#' partIndex * Reduce("+", part) })
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
#'}
#' @rdname lapplyPartitionsWithIndex
Expand Down Expand Up @@ -813,7 +813,7 @@ setMethod("distinct",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
#' rdd <- parallelize(sc, 1:10)
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
#'}
Expand All @@ -825,14 +825,14 @@ setMethod("sampleRDD",
function(x, withReplacement, fraction, seed) {

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
samplingFunc <- function(partIndex, part) {
set.seed(seed)
res <- vector("list", length(part))
len <- 0

# Discards some random values to ensure each partition has a
# different random seed.
runif(split)
runif(partIndex)

for (elem in part) {
if (withReplacement) {
Expand Down Expand Up @@ -989,8 +989,8 @@ setMethod("coalesce",
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
func <- function(s, part) {
set.seed(s) # split as seed
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
Expand Down Expand Up @@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
#' Save this RDD as a text file, using string representations of elements.
#'
#' @param x The RDD to save
#' @param path The directory where the splits of the text file are saved
#' @param path The directory where the partitions of the text file are saved
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
Expand Down Expand Up @@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
function(x) {
n <- numPartitions(x)

partitionFunc <- function(split, part) {
partitionFunc <- function(partIndex, part) {
mapply(
function(item, index) {
list(item, (index - 1) * n + split)
list(item, (index - 1) * n + partIndex)
},
part,
seq_along(part),
Expand Down Expand Up @@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
startIndices <- Reduce("+", nums, accumulate = TRUE)
}

partitionFunc <- function(split, part) {
if (split == 0) {
partitionFunc <- function(partIndex, part) {
if (partIndex == 0) {
startIndex <- 0
} else {
startIndex <- startIndices[[split]]
startIndex <- startIndices[[partIndex]]
}

mapply(
Expand Down
20 changes: 10 additions & 10 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

# context.R: SparkContext driven functions

getMinSplits <- function(sc, minSplits) {
if (is.null(minSplits)) {
getMinPartitions <- function(sc, minPartitions) {
if (is.null(minPartitions)) {
defaultParallelism <- callJMethod(sc, "defaultParallelism")
minSplits <- min(defaultParallelism, 2)
minPartitions <- min(defaultParallelism, 2)
}
as.integer(minSplits)
as.integer(minPartitions)
}

#' Create an RDD from a text file.
Expand All @@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character}
#' @export
Expand All @@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
#' sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt")
#'}
textFile <- function(sc, path, minSplits = NULL) {
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
# jrdd is of type JavaRDD[String]
RDD(jrdd, "string")
}
Expand All @@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD containing serialized R objects.
#' @seealso saveAsObjectFile
Expand All @@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
#' sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile")
#'}
objectFile <- function(sc, path, minSplits = NULL) {
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
# Assume the RDD contains serialized R objects.
RDD(jrdd, "byte")
}
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ setMethod("partitionBy",
get(name, .broadcastNames) })
jrdd <- getJRDD(x)

# We create a PairwiseRRDD that extends RDD[(Array[Byte],
# Array[Byte])], where the key is the hashed split, the value is
# We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
# where the key is the target partition number, the value is
# the content (key-val pairs).
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
Expand Down Expand Up @@ -866,8 +866,8 @@ setMethod("sampleByKey",
}

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
set.seed(bitwXor(seed, split))
samplingFunc <- function(partIndex, part) {
set.seed(bitwXor(seed, partIndex))
res <- vector("list", length(part))
len <- 0

Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ appendPartitionLengths <- function(x, other) {
# A result RDD.
mergePartitions <- function(rdd, zip) {
serializerMode <- getSerializedMode(rdd)
partitionFunc <- function(split, part) {
partitionFunc <- function(partIndex, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
rdd2 <- rdd
for (i in 1:12)
rdd2 <- lapplyPartitionsWithIndex(
rdd2, function(split, part) {
part <- as.list(unlist(part) * split + i)
rdd2, function(partIndex, part) {
part <- as.list(unlist(part) * partIndex + i)
})
rdd2 <- lapply(rdd2, function(x) x + x)
actual <- collect(rdd2)
Expand All @@ -121,8 +121,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
# PipelinedRDD
rdd2 <- lapplyPartitionsWithIndex(
rdd2,
function(split, part) {
part <- as.list(unlist(part) * split)
function(partIndex, part) {
part <- as.list(unlist(part) * partIndex)
})

cache(rdd2)
Expand Down Expand Up @@ -174,13 +174,13 @@ test_that("lapply with dependency", {
})

test_that("lapplyPartitionsWithIndex on RDDs", {
func <- function(splitIndex, part) { list(splitIndex, Reduce("+", part)) }
func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
expect_equal(actual, list(list(0, 15), list(1, 40)))

pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
mkTup <- function(splitIndex, part) { list(splitIndex, part) }
mkTup <- function(partIndex, part) { list(partIndex, part) }
actual <- collect(lapplyPartitionsWithIndex(
partitionBy(pairsRDD, 2L, partitionByParity),
mkTup),
Expand Down