Skip to content

Commit

Permalink
Merge branch 'master' into pr4229
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Aug 1, 2015
2 parents 935615c + 8765665 commit abf5f18
Show file tree
Hide file tree
Showing 178 changed files with 7,559 additions and 1,781 deletions.
8 changes: 8 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ exportMethods("arrange",
"count",
"crosstab",
"describe",
"dim",
"distinct",
"dropna",
"dtypes",
Expand All @@ -45,11 +46,16 @@ exportMethods("arrange",
"isLocal",
"join",
"limit",
"merge",
"names",
"ncol",
"nrow",
"orderBy",
"mutate",
"names",
"persist",
"printSchema",
"rbind",
"registerTempTable",
"rename",
"repartition",
Expand All @@ -64,8 +70,10 @@ exportMethods("arrange",
"show",
"showDF",
"summarize",
"summary",
"take",
"unionAll",
"unique",
"unpersist",
"where",
"withColumn",
Expand Down
116 changes: 116 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ setMethod("names",
columns(x)
})

#' @rdname columns
setMethod("names<-",
signature(x = "DataFrame"),
function(x, value) {
if (!is.null(value)) {
sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value)))
dataFrame(sdf)
}
})

#' Register Temporary Table
#'
#' Registers a DataFrame as a Temporary Table in the SQLContext
Expand Down Expand Up @@ -473,6 +483,18 @@ setMethod("distinct",
dataFrame(sdf)
})

#' @title Distinct rows in a DataFrame
#
#' @description Returns a new DataFrame containing distinct rows in this DataFrame
#'
#' @rdname unique
#' @aliases unique
setMethod("unique",
signature(x = "DataFrame"),
function(x) {
distinct(x)
})

#' Sample
#'
#' Return a sampled subset of this DataFrame using a random seed.
Expand Down Expand Up @@ -534,6 +556,58 @@ setMethod("count",
callJMethod(x@sdf, "count")
})

#' @title Number of rows for a DataFrame
#' @description Returns number of rows in a DataFrames
#'
#' @name nrow
#'
#' @rdname nrow
#' @aliases count
setMethod("nrow",
signature(x = "DataFrame"),
function(x) {
count(x)
})

#' Returns the number of columns in a DataFrame
#'
#' @param x a SparkSQL DataFrame
#'
#' @rdname ncol
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' ncol(df)
#' }
setMethod("ncol",
signature(x = "DataFrame"),
function(x) {
length(columns(x))
})

#' Returns the dimentions (number of rows and columns) of a DataFrame
#' @param x a SparkSQL DataFrame
#'
#' @rdname dim
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' dim(df)
#' }
setMethod("dim",
signature(x = "DataFrame"),
function(x) {
c(count(x), ncol(x))
})

#' Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.
#'
#' @param x A SparkSQL DataFrame
Expand Down Expand Up @@ -1205,6 +1279,15 @@ setMethod("join",
dataFrame(sdf)
})

#' rdname merge
#' aliases join
setMethod("merge",
signature(x = "DataFrame", y = "DataFrame"),
function(x, y, joinExpr = NULL, joinType = NULL, ...) {
join(x, y, joinExpr, joinType)
})


#' UnionAll
#'
#' Return a new DataFrame containing the union of rows in this DataFrame
Expand All @@ -1231,6 +1314,22 @@ setMethod("unionAll",
dataFrame(unioned)
})

#' @title Union two or more DataFrames
#
#' @description Returns a new DataFrame containing rows of all parameters.
#
#' @rdname rbind
#' @aliases unionAll
setMethod("rbind",
signature(... = "DataFrame"),
function(x, ..., deparse.level = 1) {
if (nargs() == 3) {
unionAll(x, ...)
} else {
unionAll(x, Recall(..., deparse.level = 1))
}
})

#' Intersect
#'
#' Return a new DataFrame containing rows only in both this DataFrame
Expand Down Expand Up @@ -1322,9 +1421,11 @@ setMethod("write.df",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
Expand Down Expand Up @@ -1384,9 +1485,11 @@ setMethod("saveAsTable",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)
Expand Down Expand Up @@ -1430,6 +1533,19 @@ setMethod("describe",
dataFrame(sdf)
})

#' @title Summary
#'
#' @description Computes statistics for numeric columns of the DataFrame
#'
#' @rdname summary
#' @aliases describe
setMethod("summary",
signature(x = "DataFrame"),
function(x) {
describe(x)
})


#' dropna
#'
#' Returns a new DataFrame omitting rows with null values.
Expand Down
13 changes: 8 additions & 5 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)

isPipelinable <- function(rdd) {
e <- rdd@env
# nolint start
!(e$isCached || e$isCheckpointed)
# nolint end
}

if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
Expand All @@ -97,7 +99,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
pipelinedFunc <- function(partIndex, part) {
func(partIndex, prev@func(partIndex, part))
f <- prev@func
func(partIndex, f(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
Expand Down Expand Up @@ -841,7 +844,7 @@ setMethod("sampleRDD",
if (withReplacement) {
count <- rpois(1, fraction)
if (count > 0) {
res[(len + 1):(len + count)] <- rep(list(elem), count)
res[ (len + 1) : (len + count) ] <- rep(list(elem), count)
len <- len + count
}
} else {
Expand Down Expand Up @@ -1261,12 +1264,12 @@ setMethod("pipeRDD",
signature(x = "RDD", command = "character"),
function(x, command, env = list()) {
func <- function(part) {
trim.trailing.func <- function(x) {
trim_trailing_func <- function(x) {
sub("[\r\n]*$", "", toString(x))
}
input <- unlist(lapply(part, trim.trailing.func))
input <- unlist(lapply(part, trim_trailing_func))
res <- system2(command, stdout = TRUE, input = input, env = env)
lapply(res, trim.trailing.func)
lapply(res, trim_trailing_func)
}
lapplyPartition(x, func)
})
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ functions <- c("min", "max", "sum", "avg", "mean", "count", "abs", "sqrt",
"acos", "asin", "atan", "cbrt", "ceiling", "cos", "cosh", "exp",
"expm1", "floor", "log", "log10", "log1p", "rint", "sign",
"sin", "sinh", "tan", "tanh", "toDegrees", "toRadians")
binary_mathfunctions<- c("atan2", "hypot")
binary_mathfunctions <- c("atan2", "hypot")

createOperator <- function(op) {
setMethod(op,
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ parallelize <- function(sc, coll, numSlices = 1) {
numSlices <- length(coll)

sliceLen <- ceiling(length(coll) / numSlices)
slices <- split(coll, rep(1:(numSlices + 1), each = sliceLen)[1:length(coll)])
slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)])

# Serialize each slice: obtain a list of raws, or a list of lists (slices) of
# 2-tuples of raws
Expand Down
12 changes: 12 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
#' @export
setGeneric("limit", function(x, num) {standardGeneric("limit") })

#' rdname merge
#' @export
setGeneric("merge")

#' @rdname withColumn
#' @export
setGeneric("mutate", function(x, ...) {standardGeneric("mutate") })
Expand Down Expand Up @@ -531,6 +535,10 @@ setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
#' @export
setGeneric("summarize", function(x,...) { standardGeneric("summarize") })

##' rdname summary
##' @export
setGeneric("summary", function(x, ...) { standardGeneric("summary") })

# @rdname tojson
# @export
setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
Expand Down Expand Up @@ -669,3 +677,7 @@ setGeneric("upper", function(x) { standardGeneric("upper") })
#' @rdname glm
#' @export
setGeneric("glm")

#' @rdname rbind
#' @export
setGeneric("rbind", signature = "...")
8 changes: 4 additions & 4 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ setMethod("predict", signature(object = "PipelineModel"),
#' model <- glm(y ~ x, trainingData)
#' summary(model)
#'}
setMethod("summary", signature(object = "PipelineModel"),
function(object) {
setMethod("summary", signature(x = "PipelineModel"),
function(x, ...) {
features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelFeatures", object@model)
"getModelFeatures", x@model)
weights <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelWeights", object@model)
"getModelWeights", x@model)
coefficients <- as.matrix(unlist(weights))
colnames(coefficients) <- c("Estimate")
rownames(coefficients) <- unlist(features)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ setMethod("sampleByKey",
if (withReplacement) {
count <- rpois(1, frac)
if (count > 0) {
res[(len + 1):(len + count)] <- rep(list(elem), count)
res[ (len + 1) : (len + count) ] <- rep(list(elem), count)
len <- len + count
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
}

results <- if (arrSize > 0) {
lapply(0:(arrSize - 1),
lapply(0 : (arrSize - 1),
function(index) {
obj <- callJMethod(jList, "get", as.integer(index))

Expand Down Expand Up @@ -572,7 +572,7 @@ mergePartitions <- function(rdd, zip) {
keys <- list()
}
if (lengthOfValues > 1) {
values <- part[(lengthOfKeys + 1) : (len - 1)]
values <- part[ (lengthOfKeys + 1) : (len - 1) ]
} else {
values <- list()
}
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ test_that("union on two RDDs", {
expect_equal(actual, c(as.list(nums), mockFile))
expect_equal(getSerializedMode(union.rdd), "byte")

rdd<- map(text.rdd, function(x) {x})
rdd <- map(text.rdd, function(x) {x})
union.rdd <- unionRDD(rdd, text.rdd)
actual <- collect(union.rdd)
expect_equal(actual, as.list(c(mockFile, mockFile)))
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ test_that("flatMapValues() on pairwise RDDs", {
expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4)))

# Generate x to x+1 for every value
actual <- collect(flatMapValues(intRdd, function(x) { x:(x + 1) }))
actual <- collect(flatMapValues(intRdd, function(x) { x: (x + 1) }))
expect_equal(actual,
list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
Expand Down Expand Up @@ -293,7 +293,7 @@ test_that("sumRDD() on RDDs", {
})

test_that("keyBy on RDDs", {
func <- function(x) { x*x }
func <- function(x) { x * x }
keys <- keyBy(rdd, func)
actual <- collect(keys)
expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
Expand All @@ -311,7 +311,7 @@ test_that("repartition/coalesce on RDDs", {
r2 <- repartition(rdd, 6)
expect_equal(numPartitions(r2), 6L)
count <- length(collectPartition(r2, 0L))
expect_true(count >=0 && count <= 4)
expect_true(count >= 0 && count <= 4)

# coalesce
r3 <- coalesce(rdd, 1)
Expand Down
Loading

0 comments on commit abf5f18

Please sign in to comment.