Skip to content

Commit

Permalink
Merge branch 'master' into fork-sparksession
Browse files Browse the repository at this point in the history
  • Loading branch information
kunalkhamar committed Feb 16, 2017
2 parents f423f74 + dcc2d54 commit b1371d8
Show file tree
Hide file tree
Showing 732 changed files with 24,382 additions and 9,894 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependency-reduced-pom.xml
derby.log
dev/create-release/*final
dev/create-release/*txt
dev/pr-deps/
dist/
docs/_site
docs/api
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
24 changes: 23 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Imports from base R
# Do not include stats:: "rpois", "runif" - causes error at runtime
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
Expand Down Expand Up @@ -47,7 +64,9 @@ exportMethods("glm",
"spark.kstest",
"spark.logit",
"spark.randomForest",
"spark.gbt")
"spark.gbt",
"spark.bisectingKmeans",
"spark.svmLinear")

# Job group lifecycle management methods
export("setJobGroup",
Expand All @@ -63,6 +82,7 @@ exportMethods("arrange",
"as.data.frame",
"attach",
"cache",
"coalesce",
"collect",
"colnames",
"colnames<-",
Expand Down Expand Up @@ -94,6 +114,7 @@ exportMethods("arrange",
"freqItems",
"gapply",
"gapplyCollect",
"getNumPartitions",
"group_by",
"groupBy",
"head",
Expand Down Expand Up @@ -306,6 +327,7 @@ exportMethods("%in%",
"toDegrees",
"toRadians",
"to_date",
"to_timestamp",
"to_utc_timestamp",
"translate",
"trim",
Expand Down
91 changes: 82 additions & 9 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,8 @@ setMethod("names",
setMethod("names<-",
signature(x = "SparkDataFrame"),
function(x, value) {
if (!is.null(value)) {
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
dataFrame(sdf)
}
colnames(x) <- value
x
})

#' @rdname columns
Expand Down Expand Up @@ -417,7 +415,7 @@ setMethod("coltypes",
type <- PRIMITIVE_TYPES[[specialtype]]
}
}
type
type[[1]]
})

# Find which types don't have mapping to R
Expand Down Expand Up @@ -680,14 +678,53 @@ setMethod("storageLevel",
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
})

#' Coalesce
#'
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
#' the current partitions. If a larger number of partitions is requested, it will stay at the
#' current number of partitions.
#'
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
#' this may result in your computation taking place on fewer nodes than
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
#' call \code{repartition}. This will add a shuffle step, but means the
#' current upstream partitions will be executed in parallel (per whatever
#' the current partitioning is).
#'
#' @param numPartitions the number of partitions to use.
#'
#' @family SparkDataFrame functions
#' @rdname coalesce
#' @name coalesce
#' @aliases coalesce,SparkDataFrame-method
#' @seealso \link{repartition}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' newDF <- coalesce(df, 1L)
#'}
#' @note coalesce(SparkDataFrame) since 2.1.1
setMethod("coalesce",
signature(x = "SparkDataFrame"),
function(x, numPartitions) {
stopifnot(is.numeric(numPartitions))
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
dataFrame(sdf)
})

#' Repartition
#'
#' The following options for repartition are possible:
#' \itemize{
#' \item{1.} {Return a new SparkDataFrame partitioned by
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
#' the given columns into \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#' @param x a SparkDataFrame.
Expand All @@ -699,6 +736,7 @@ setMethod("storageLevel",
#' @rdname repartition
#' @name repartition
#' @aliases repartition,SparkDataFrame-method
#' @seealso \link{coalesce}
#' @export
#' @examples
#'\dontrun{
Expand Down Expand Up @@ -1138,6 +1176,7 @@ setMethod("collect",
if (!is.null(PRIMITIVE_TYPES[[colType]]) && colType != "binary") {
vec <- do.call(c, col)
stopifnot(class(vec) != "list")
class(vec) <- PRIMITIVE_TYPES[[colType]]
df[[colIndex]] <- vec
} else {
df[[colIndex]] <- col
Expand Down Expand Up @@ -1831,6 +1870,8 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' Return subsets of SparkDataFrame according to given conditions
#' @param x a SparkDataFrame.
#' @param i,subset (Optional) a logical expression to filter on rows.
#' For extract operator [[ and replacement operator [[<-, the indexing parameter for
#' a single Column.
#' @param j,select expression for the single Column or a list of columns to select from the SparkDataFrame.
#' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
#' Otherwise, a SparkDataFrame will always be returned.
Expand All @@ -1841,6 +1882,7 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' @export
#' @family SparkDataFrame functions
#' @aliases subset,SparkDataFrame-method
#' @seealso \link{withColumn}
#' @rdname subset
#' @name subset
#' @family subsetting functions
Expand All @@ -1858,6 +1900,10 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' subset(df, df$age %in% c(19, 30), 1:2)
#' subset(df, df$age %in% c(19), select = c(1,2))
#' subset(df, select = c(1,2))
#' # Columns can be selected and set
#' df[["age"]] <- 23
#' df[[1]] <- df$age
#' df[[2]] <- NULL # drop column
#' }
#' @note subset since 1.5.0
setMethod("subset", signature(x = "SparkDataFrame"),
Expand Down Expand Up @@ -1982,7 +2028,7 @@ setMethod("selectExpr",
#' @aliases withColumn,SparkDataFrame,character-method
#' @rdname withColumn
#' @name withColumn
#' @seealso \link{rename} \link{mutate}
#' @seealso \link{rename} \link{mutate} \link{subset}
#' @export
#' @examples
#'\dontrun{
Expand All @@ -1993,6 +2039,10 @@ setMethod("selectExpr",
#' # Replace an existing column
#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
#' newDF3 <- withColumn(newDF, "newCol", 42)
#' # Use extract operator to set an existing or new column
#' df[["age"]] <- 23
#' df[[2]] <- df$col1
#' df[[2]] <- NULL # drop column
#' }
#' @note withColumn since 1.4.0
setMethod("withColumn",
Expand Down Expand Up @@ -3428,3 +3478,26 @@ setMethod("randomSplit",
}
sapply(sdfs, dataFrame)
})

#' getNumPartitions
#'
#' Return the number of partitions
#'
#' @param x A SparkDataFrame
#' @family SparkDataFrame functions
#' @aliases getNumPartitions,SparkDataFrame-method
#' @rdname getNumPartitions
#' @name getNumPartitions
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createDataFrame(cars, numPartitions = 2)
#' getNumPartitions(df)
#' }
#' @note getNumPartitions since 2.1.1
setMethod("getNumPartitions",
signature(x = "SparkDataFrame"),
function(x) {
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
})
34 changes: 17 additions & 17 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ setMethod("checkpoint",
#' @rdname getNumPartitions
#' @aliases getNumPartitions,RDD-method
#' @noRd
setMethod("getNumPartitions",
setMethod("getNumPartitionsRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "getNumPartitions")
Expand All @@ -329,7 +329,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
.Deprecated("getNumPartitions")
getNumPartitions(x)
getNumPartitionsRDD(x)
})

#' Collect elements of an RDD
Expand Down Expand Up @@ -460,7 +460,7 @@ setMethod("countByValue",
signature(x = "RDD"),
function(x) {
ones <- lapply(x, function(item) { list(item, 1L) })
collectRDD(reduceByKey(ones, `+`, getNumPartitions(x)))
collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
})

#' Apply a function to all elements
Expand Down Expand Up @@ -780,7 +780,7 @@ setMethod("takeRDD",
resList <- list()
index <- -1
jrdd <- getJRDD(x)
numPartitions <- getNumPartitions(x)
numPartitions <- getNumPartitionsRDD(x)
serializedModeRDD <- getSerializedMode(x)

# TODO(shivaram): Collect more than one partition based on size
Expand Down Expand Up @@ -846,7 +846,7 @@ setMethod("firstRDD",
#' @noRd
setMethod("distinctRDD",
signature(x = "RDD"),
function(x, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
Expand Down Expand Up @@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
signature(x = "RDD"),
function(x, numPartitions) {
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
coalesce(x, numPartitions, TRUE)
coalesceRDD(x, numPartitions, TRUE)
} else {
stop("Please, specify the number of partitions")
}
Expand All @@ -1049,11 +1049,11 @@ setMethod("repartitionRDD",
#' @rdname coalesce
#' @aliases coalesce,RDD
#' @noRd
setMethod("coalesce",
setMethod("coalesceRDD",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(base::sample(numPartitions, 1) - 1)
Expand Down Expand Up @@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile",
#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
values(sortByKey(keyBy(x, func), ascending, numPartitions))
})

Expand Down Expand Up @@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
numPartitions <- getNumPartitions(newRdd)
numPartitions <- getNumPartitionsRDD(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)

while (TRUE) {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ setMethod("setName",
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
n <- getNumPartitions(x)
n <- getNumPartitionsRDD(x)

partitionFunc <- function(partIndex, part) {
mapply(
Expand Down Expand Up @@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId",
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
n <- getNumPartitions(x)
n <- getNumPartitionsRDD(x)
if (n > 1) {
nums <- collectRDD(lapplyPartition(x,
function(part) {
Expand Down Expand Up @@ -1566,8 +1566,8 @@ setMethod("unionRDD",
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
n1 <- getNumPartitions(x)
n2 <- getNumPartitions(other)
n1 <- getNumPartitionsRDD(x)
n2 <- getNumPartitionsRDD(other)
if (n1 != n2) {
stop("Can only zip RDDs which have the same number of partitions.")
}
Expand Down Expand Up @@ -1637,7 +1637,7 @@ setMethod("cartesian",
#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
Expand Down Expand Up @@ -1671,7 +1671,7 @@ setMethod("subtract",
#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })

Expand Down Expand Up @@ -1714,7 +1714,7 @@ setMethod("zipPartitions",
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
nPart <- sapply(rrdds, getNumPartitions)
nPart <- sapply(rrdds, getNumPartitionsRDD)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}
Expand Down

0 comments on commit b1371d8

Please sign in to comment.