Skip to content

Commit

Permalink
Merge pull request #12 from apache/master
Browse files Browse the repository at this point in the history
merge lastest spark
  • Loading branch information
pzzs committed Apr 29, 2015
2 parents f12fa50 + f98773a commit f03fe7f
Show file tree
Hide file tree
Showing 380 changed files with 13,450 additions and 3,668 deletions.
22 changes: 13 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
## Contributing to Spark

Contributions via GitHub pull requests are gladly accepted from their original
author. Along with any pull requests, please state that the contribution is
your original work and that you license the work to the project under the
project's open source license. Whether or not you state this explicitly, by
submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.
*Before opening a pull request*, review the
[Contributing to Spark wiki](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark).
It lists steps that are required before creating a PR. In particular, consider:

- Is the change important and ready enough to ask the community to spend time reviewing?
- Have you searched for existing, related JIRAs and pull requests?
- Is this a new feature that can stand alone as a package on http://spark-packages.org ?
- Is the change being proposed clearly explained and motivated?

Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
for more information.
When you contribute code, you affirm that the contribution is your original work and that you
license the work to the project under the project's open source license. Whether or not you
state this explicitly, by submitting any copyrighted material via pull request, email, or
other means you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ exportMethods(
"unpersist",
"value",
"values",
"zipPartitions",
"zipRDD",
"zipWithIndex",
"zipWithUniqueId"
Expand Down
8 changes: 7 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,12 @@ setMethod("$", signature(x = "DataFrame"),

setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column")
stopifnot(class(value) == "Column" || is.null(value))
cols <- columns(x)
if (name %in% cols) {
if (is.null(value)) {
cols <- Filter(function(c) { c != name }, cols)
}
cols <- lapply(cols, function(c) {
if (c == name) {
alias(value, name)
Expand All @@ -802,6 +805,9 @@ setMethod("$<-", signature(x = "DataFrame"),
})
nx <- select(x, cols)
} else {
if (is.null(value)) {
return(x)
}
nx <- withColumn(x, name, value)
}
x@sdf <- nx@sdf
Expand Down
89 changes: 70 additions & 19 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
.Object
})

setMethod("show", "RDD",
function(.Object) {
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
})

setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
.Object@env <- new.env()
.Object@env$isCached <- FALSE
Expand All @@ -91,8 +96,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
pipelinedFunc <- function(partIndex, part) {
func(partIndex, prev@func(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
Expand Down Expand Up @@ -306,7 +311,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
partitions <- callJMethod(jrdd, "splits")
partitions <- callJMethod(jrdd, "partitions")
callJMethod(partitions, "size")
})

Expand Down Expand Up @@ -452,8 +457,8 @@ setMethod("countByValue",
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
func <- function(split, iterator) {
lapply(iterator, FUN)
func <- function(partIndex, part) {
lapply(part, FUN)
}
lapplyPartitionsWithIndex(X, func)
})
Expand Down Expand Up @@ -538,8 +543,8 @@ setMethod("mapPartitions",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 5L)
#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
#' split * Reduce("+", part) })
#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
#' partIndex * Reduce("+", part) })
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
#'}
#' @rdname lapplyPartitionsWithIndex
Expand Down Expand Up @@ -813,7 +818,7 @@ setMethod("distinct",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
#' rdd <- parallelize(sc, 1:10)
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
#'}
Expand All @@ -825,14 +830,14 @@ setMethod("sampleRDD",
function(x, withReplacement, fraction, seed) {

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
samplingFunc <- function(partIndex, part) {
set.seed(seed)
res <- vector("list", length(part))
len <- 0

# Discards some random values to ensure each partition has a
# different random seed.
runif(split)
runif(partIndex)

for (elem in part) {
if (withReplacement) {
Expand Down Expand Up @@ -967,7 +972,7 @@ setMethod("keyBy",
setMethod("repartition",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
coalesce(x, numToInt(numPartitions), TRUE)
coalesce(x, numPartitions, TRUE)
})

#' Return a new RDD that is reduced into numPartitions partitions.
Expand All @@ -989,8 +994,8 @@ setMethod("coalesce",
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
func <- function(s, part) {
set.seed(s) # split as seed
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
Expand Down Expand Up @@ -1035,7 +1040,7 @@ setMethod("saveAsObjectFile",
#' Save this RDD as a text file, using string representations of elements.
#'
#' @param x The RDD to save
#' @param path The directory where the splits of the text file are saved
#' @param path The directory where the partitions of the text file are saved
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
Expand Down Expand Up @@ -1335,10 +1340,10 @@ setMethod("zipWithUniqueId",
function(x) {
n <- numPartitions(x)

partitionFunc <- function(split, part) {
partitionFunc <- function(partIndex, part) {
mapply(
function(item, index) {
list(item, (index - 1) * n + split)
list(item, (index - 1) * n + partIndex)
},
part,
seq_along(part),
Expand Down Expand Up @@ -1382,11 +1387,11 @@ setMethod("zipWithIndex",
startIndices <- Reduce("+", nums, accumulate = TRUE)
}

partitionFunc <- function(split, part) {
if (split == 0) {
partitionFunc <- function(partIndex, part) {
if (partIndex == 0) {
startIndex <- 0
} else {
startIndex <- startIndices[[split]]
startIndex <- startIndices[[partIndex]]
}

mapply(
Expand Down Expand Up @@ -1590,3 +1595,49 @@ setMethod("intersection",

keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})

#' Zips an RDD's partitions with one (or more) RDD(s).
#' Same as zipPartitions in Spark.
#'
#' @param ... RDDs to be zipped.
#' @param func A function to transform zipped partitions.
#' @return A new RDD by applying a function to the zipped partitions.
#' Assumes that all the RDDs have the *same number of partitions*, but
#' does *not* require them to have the same number of elements in each partition.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
#' collect(zipPartitions(rdd1, rdd2, rdd3,
#' func = function(x, y, z) { list(list(x, y, z))} ))
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
#'}
#' @rdname zipRDD
#' @aliases zipPartitions,RDD
setMethod("zipPartitions",
"RDD",
function(..., func) {
rrdds <- list(...)
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
nPart <- sapply(rrdds, numPartitions)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}

rrdds <- lapply(rrdds, function(rdd) {
mapPartitionsWithIndex(rdd, function(partIndex, part) {
print(length(part))
list(list(partIndex, part))
})
})
union.rdd <- Reduce(unionRDD, rrdds)
zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
res <- mapPartitions(zipped.rdd, function(plist) {
do.call(func, plist[[1]])
})
res
})
20 changes: 10 additions & 10 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

# context.R: SparkContext driven functions

getMinSplits <- function(sc, minSplits) {
if (is.null(minSplits)) {
getMinPartitions <- function(sc, minPartitions) {
if (is.null(minPartitions)) {
defaultParallelism <- callJMethod(sc, "defaultParallelism")
minSplits <- min(defaultParallelism, 2)
minPartitions <- min(defaultParallelism, 2)
}
as.integer(minSplits)
as.integer(minPartitions)
}

#' Create an RDD from a text file.
Expand All @@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character}
#' @export
Expand All @@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
#' sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt")
#'}
textFile <- function(sc, path, minSplits = NULL) {
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
# jrdd is of type JavaRDD[String]
RDD(jrdd, "string")
}
Expand All @@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD containing serialized R objects.
#' @seealso saveAsObjectFile
Expand All @@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
#' sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile")
#'}
objectFile <- function(sc, path, minSplits = NULL) {
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")

jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
# Assume the RDD contains serialized R objects.
RDD(jrdd, "byte")
}
Expand Down
17 changes: 11 additions & 6 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") })

#' @rdname distinct
#' @export
setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })

#' @rdname filterRDD
#' @export
Expand Down Expand Up @@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") })
#' @rdname sortBy
#' @export
setGeneric("sortBy",
function(x, func, ascending = TRUE, numPartitions = 1L) {
function(x, func, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortBy")
})

Expand Down Expand Up @@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
#' @export
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })

#' @rdname zipRDD
#' @export
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
signature = "...")

#' @rdname zipWithIndex
#' @seealso zipWithUniqueId
#' @export
Expand Down Expand Up @@ -244,7 +249,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")

#' @rdname intersection
#' @export
setGeneric("intersection", function(x, other, numPartitions = 1L) {
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })

#' @rdname keys
Expand Down Expand Up @@ -346,21 +351,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri
#' @rdname sortByKey
#' @export
setGeneric("sortByKey",
function(x, ascending = TRUE, numPartitions = 1L) {
function(x, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortByKey")
})

#' @rdname subtract
#' @export
setGeneric("subtract",
function(x, other, numPartitions = 1L) {
function(x, other, numPartitions = 1) {
standardGeneric("subtract")
})

#' @rdname subtractByKey
#' @export
setGeneric("subtractByKey",
function(x, other, numPartitions = 1L) {
function(x, other, numPartitions = 1) {
standardGeneric("subtractByKey")
})

Expand Down
Loading

0 comments on commit f03fe7f

Please sign in to comment.