Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into str_index
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jul 31, 2015
2 parents 515519b + 3fc0cb9 commit f2d29a1
Show file tree
Hide file tree
Showing 592 changed files with 26,124 additions and 9,451 deletions.
10 changes: 9 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ export("print.jobj")

# MLlib integration
exportMethods("glm",
"predict")
"predict",
"summary")

# Job group lifecycle management methods
export("setJobGroup",
Expand All @@ -26,7 +27,9 @@ exportMethods("arrange",
"collect",
"columns",
"count",
"crosstab",
"describe",
"dim",
"distinct",
"dropna",
"dtypes",
Expand All @@ -43,11 +46,15 @@ exportMethods("arrange",
"isLocal",
"join",
"limit",
"names",
"ncol",
"nrow",
"orderBy",
"mutate",
"names",
"persist",
"printSchema",
"rbind",
"registerTempTable",
"rename",
"repartition",
Expand All @@ -64,6 +71,7 @@ exportMethods("arrange",
"summarize",
"take",
"unionAll",
"unique",
"unpersist",
"where",
"withColumn",
Expand Down
122 changes: 122 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ setMethod("names",
columns(x)
})

#' @rdname columns
setMethod("names<-",
signature(x = "DataFrame"),
function(x, value) {
if (!is.null(value)) {
sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value)))
dataFrame(sdf)
}
})

#' Register Temporary Table
#'
#' Registers a DataFrame as a Temporary Table in the SQLContext
Expand Down Expand Up @@ -473,6 +483,18 @@ setMethod("distinct",
dataFrame(sdf)
})

#' @title Distinct rows in a DataFrame
#
#' @description Returns a new DataFrame containing distinct rows in this DataFrame
#'
#' @rdname unique
#' @aliases unique
setMethod("unique",
signature(x = "DataFrame"),
function(x) {
distinct(x)
})

#' Sample
#'
#' Return a sampled subset of this DataFrame using a random seed.
Expand Down Expand Up @@ -534,6 +556,58 @@ setMethod("count",
callJMethod(x@sdf, "count")
})

#' @title Number of rows for a DataFrame
#' @description Returns number of rows in a DataFrames
#'
#' @name nrow
#'
#' @rdname nrow
#' @aliases count
setMethod("nrow",
signature(x = "DataFrame"),
function(x) {
count(x)
})

#' Returns the number of columns in a DataFrame
#'
#' @param x a SparkSQL DataFrame
#'
#' @rdname ncol
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' ncol(df)
#' }
setMethod("ncol",
signature(x = "DataFrame"),
function(x) {
length(columns(x))
})

#' Returns the dimentions (number of rows and columns) of a DataFrame
#' @param x a SparkSQL DataFrame
#'
#' @rdname dim
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' dim(df)
#' }
setMethod("dim",
signature(x = "DataFrame"),
function(x) {
c(count(x), ncol(x))
})

#' Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.
#'
#' @param x A SparkSQL DataFrame
Expand Down Expand Up @@ -1231,6 +1305,22 @@ setMethod("unionAll",
dataFrame(unioned)
})

#' @title Union two or more DataFrames
#
#' @description Returns a new DataFrame containing rows of all parameters.
#
#' @rdname rbind
#' @aliases unionAll
setMethod("rbind",
signature(... = "DataFrame"),
function(x, ..., deparse.level = 1) {
if (nargs() == 3) {
unionAll(x, ...)
} else {
unionAll(x, Recall(..., deparse.level = 1))
}
})

#' Intersect
#'
#' Return a new DataFrame containing rows only in both this DataFrame
Expand Down Expand Up @@ -1322,9 +1412,11 @@ setMethod("write.df",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
Expand Down Expand Up @@ -1384,9 +1476,11 @@ setMethod("saveAsTable",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)
Expand Down Expand Up @@ -1554,3 +1648,31 @@ setMethod("fillna",
}
dataFrame(sdf)
})

#' crosstab
#'
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
#' non-zero pair frequencies will be returned.
#'
#' @param col1 name of the first column. Distinct items will make the first item of each row.
#' @param col2 name of the second column. Distinct items will make the column names of the output.
#' @return a local R data.frame representing the contingency table. The first column of each row
#' will be the distinct values of `col1` and the column names will be the distinct values
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
#' occurrences will have zero as their counts.
#'
#' @rdname statfunctions
#' @export
#' @examples
#' \dontrun{
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
#' ct = crosstab(df, "title", "gender")
#' }
setMethod("crosstab",
signature(x = "DataFrame", col1 = "character", col2 = "character"),
function(x, col1, col2) {
statFunctions <- callJMethod(x@sdf, "stat")
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
collect(dataFrame(sct))
})
13 changes: 8 additions & 5 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)

isPipelinable <- function(rdd) {
e <- rdd@env
# nolint start
!(e$isCached || e$isCheckpointed)
# nolint end
}

if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
Expand All @@ -97,7 +99,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
pipelinedFunc <- function(partIndex, part) {
func(partIndex, prev@func(partIndex, part))
f <- prev@func
func(partIndex, f(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
Expand Down Expand Up @@ -841,7 +844,7 @@ setMethod("sampleRDD",
if (withReplacement) {
count <- rpois(1, fraction)
if (count > 0) {
res[(len + 1):(len + count)] <- rep(list(elem), count)
res[ (len + 1) : (len + count) ] <- rep(list(elem), count)
len <- len + count
}
} else {
Expand Down Expand Up @@ -1261,12 +1264,12 @@ setMethod("pipeRDD",
signature(x = "RDD", command = "character"),
function(x, command, env = list()) {
func <- function(part) {
trim.trailing.func <- function(x) {
trim_trailing_func <- function(x) {
sub("[\r\n]*$", "", toString(x))
}
input <- unlist(lapply(part, trim.trailing.func))
input <- unlist(lapply(part, trim_trailing_func))
res <- system2(command, stdout = TRUE, input = input, env = env)
lapply(res, trim.trailing.func)
lapply(res, trim_trailing_func)
}
lapplyPartition(x, func)
})
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/backend.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ invokeJava <- function(isStatic, objId, methodName, ...) {

# TODO: check the status code to output error information
returnStatus <- readInt(conn)
stopifnot(returnStatus == 0)
if (returnStatus != 0) {
stop(readString(conn))
}
readObject(conn)
}
2 changes: 1 addition & 1 deletion R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
jars <- paste("--jars", jars)
}

if (packages != "") {
if (!identical(packages, "")) {
packages <- paste("--packages", packages)
}

Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ functions <- c("min", "max", "sum", "avg", "mean", "count", "abs", "sqrt",
"acos", "asin", "atan", "cbrt", "ceiling", "cos", "cosh", "exp",
"expm1", "floor", "log", "log10", "log1p", "rint", "sign",
"sin", "sinh", "tan", "tanh", "toDegrees", "toRadians")
binary_mathfunctions<- c("atan2", "hypot")
binary_mathfunctions <- c("atan2", "hypot")

createOperator <- function(op) {
setMethod(op,
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ parallelize <- function(sc, coll, numSlices = 1) {
numSlices <- length(coll)

sliceLen <- ceiling(length(coll) / numSlices)
slices <- split(coll, rep(1:(numSlices + 1), each = sliceLen)[1:length(coll)])
slices <- split(coll, rep(1: (numSlices + 1), each = sliceLen)[1:length(coll)])

# Serialize each slice: obtain a list of raws, or a list of lists (slices) of
# 2-tuples of raws
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ readList <- function(con) {

readRaw <- function(con) {
dataLen <- readInt(con)
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
readBin(con, raw(), as.integer(dataLen), endian = "big")
}

readRawLen <- function(con, dataLen) {
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
readBin(con, raw(), as.integer(dataLen), endian = "big")
}

readDeserialize <- function(con) {
Expand Down
22 changes: 15 additions & 7 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ setGeneric("count", function(x) { standardGeneric("count") })
# @export
setGeneric("countByValue", function(x) { standardGeneric("countByValue") })

# @rdname statfunctions
# @export
setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })

# @rdname distinct
# @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
Expand Down Expand Up @@ -250,8 +254,10 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")

# @rdname intersection
# @export
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })
setGeneric("intersection",
function(x, other, numPartitions = 1) {
standardGeneric("intersection")
})

# @rdname keys
# @export
Expand Down Expand Up @@ -485,9 +491,7 @@ setGeneric("sample",
#' @rdname sample
#' @export
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) {
standardGeneric("sample_frac")
})
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })

#' @rdname saveAsParquetFile
#' @export
Expand Down Expand Up @@ -549,8 +553,8 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn

#' @rdname withColumnRenamed
#' @export
setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
standardGeneric("withColumnRenamed") })
setGeneric("withColumnRenamed",
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })


###################### Column Methods ##########################
Expand Down Expand Up @@ -665,3 +669,7 @@ setGeneric("upper", function(x) { standardGeneric("upper") })
#' @rdname glm
#' @export
setGeneric("glm")

#' @rdname rbind
#' @export
setGeneric("rbind", signature = "...")
Loading

0 comments on commit f2d29a1

Please sign in to comment.