Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
gweidner committed May 9, 2015
2 parents 9bea1eb + 1c78f68 commit cafd104
Show file tree
Hide file tree
Showing 274 changed files with 15,390 additions and 3,595 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -65,6 +65,7 @@ scalastyle.txt
scalastyle-output.xml
R-unit-tests.log
R/unit-tests.out
python/lib/pyspark.zip

# For Hive
metastore_db/
Expand Down
8 changes: 7 additions & 1 deletion .rat-excludes
Expand Up @@ -36,7 +36,6 @@ graphlib-dot.min.js
sorttable.js
vis.min.js
vis.min.css
vis.map
.*avsc
.*txt
.*json
Expand Down Expand Up @@ -74,5 +73,12 @@ logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
known_translations
json_expectation
local-1422981759269/*
local-1422981780767/*
local-1425081759269/*
local-1426533911241/*
local-1426633911242/*
local-1430917381534/*
DESCRIPTION
NAMESPACE
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Expand Up @@ -15,11 +15,11 @@ Suggests:
Description: R frontend for Spark
License: Apache License (== 2.0)
Collate:
'schema.R'
'generics.R'
'jobj.R'
'RDD.R'
'pairRDD.R'
'schema.R'
'column.R'
'group.R'
'DataFrame.R'
Expand Down
4 changes: 0 additions & 4 deletions R/pkg/NAMESPACE
Expand Up @@ -26,7 +26,6 @@ exportMethods("cache",
"intersect",
"isLocal",
"join",
"length",
"limit",
"orderBy",
"names",
Expand Down Expand Up @@ -101,9 +100,6 @@ export("cacheTable",
"tables",
"uncacheTable")

export("sparkRSQL.init",
"sparkRHive.init")

export("structField",
"structField.jobj",
"structField.character",
Expand Down
95 changes: 50 additions & 45 deletions R/pkg/R/DataFrame.R
Expand Up @@ -45,6 +45,9 @@ setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) {

#' @rdname DataFrame
#' @export
#'
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the dataFrame is cached
dataFrame <- function(sdf, isCached = FALSE) {
new("DataFrame", sdf, isCached)
}
Expand Down Expand Up @@ -244,7 +247,7 @@ setMethod("columns",
})

#' @rdname columns
#' @export
#' @aliases names,DataFrame,function-method
setMethod("names",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -399,23 +402,23 @@ setMethod("repartition",
dataFrame(sdf)
})

#' toJSON
#'
#' Convert the rows of a DataFrame into JSON objects and return an RDD where
#' each element contains a JSON string.
#'
#' @param x A SparkSQL DataFrame
#' @return A StringRRDD of JSON objects
#' @rdname tojson
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' newRDD <- toJSON(df)
#'}
# toJSON
#
# Convert the rows of a DataFrame into JSON objects and return an RDD where
# each element contains a JSON string.
#
#@param x A SparkSQL DataFrame
# @return A StringRRDD of JSON objects
# @rdname tojson
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# sqlCtx <- sparkRSQL.init(sc)
# path <- "path/to/file.json"
# df <- jsonFile(sqlCtx, path)
# newRDD <- toJSON(df)
#}
setMethod("toJSON",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -578,8 +581,8 @@ setMethod("limit",
dataFrame(res)
})

# Take the first NUM rows of a DataFrame and return a the results as a data.frame

#' Take the first NUM rows of a DataFrame and return a the results as a data.frame
#'
#' @rdname take
#' @export
#' @examples
Expand Down Expand Up @@ -644,22 +647,22 @@ setMethod("first",
take(x, 1)
})

#' toRDD()
#'
#' Converts a Spark DataFrame to an RDD while preserving column names.
#'
#' @param x A Spark DataFrame
#'
#' @rdname DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' rdd <- toRDD(df)
#' }
# toRDD()
#
# Converts a Spark DataFrame to an RDD while preserving column names.
#
# @param x A Spark DataFrame
#
# @rdname DataFrame
# @export
# @examples
#\dontrun{
# sc <- sparkR.init()
# sqlCtx <- sparkRSQL.init(sc)
# path <- "path/to/file.json"
# df <- jsonFile(sqlCtx, path)
# rdd <- toRDD(df)
# }
setMethod("toRDD",
signature(x = "DataFrame"),
function(x) {
Expand Down Expand Up @@ -706,6 +709,7 @@ setMethod("groupBy",
#'
#' Compute aggregates by specifying a list of columns
#'
#' @param x a DataFrame
#' @rdname DataFrame
#' @export
setMethod("agg",
Expand All @@ -721,53 +725,53 @@ setMethod("agg",
# the requested map function. #
###################################################################################

#' @rdname lapply
# @rdname lapply
setMethod("lapply",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
lapply(rdd, FUN)
})

#' @rdname lapply
# @rdname lapply
setMethod("map",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapply(X, FUN)
})

#' @rdname flatMap
# @rdname flatMap
setMethod("flatMap",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
flatMap(rdd, FUN)
})

#' @rdname lapplyPartition
# @rdname lapplyPartition
setMethod("lapplyPartition",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
lapplyPartition(rdd, FUN)
})

#' @rdname lapplyPartition
# @rdname lapplyPartition
setMethod("mapPartitions",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapplyPartition(X, FUN)
})

#' @rdname foreach
# @rdname foreach
setMethod("foreach",
signature(x = "DataFrame", func = "function"),
function(x, func) {
rdd <- toRDD(x)
foreach(rdd, func)
})

#' @rdname foreach
# @rdname foreach
setMethod("foreachPartition",
signature(x = "DataFrame", func = "function"),
function(x, func) {
Expand All @@ -788,6 +792,7 @@ setMethod("$", signature(x = "DataFrame"),
getColumn(x, name)
})

#' @rdname select
setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column" || is.null(value))
Expand Down Expand Up @@ -1009,7 +1014,7 @@ setMethod("sortDF",
})

#' @rdname sortDF
#' @export
#' @aliases orderBy,DataFrame,function-method
setMethod("orderBy",
signature(x = "DataFrame", col = "characterOrColumn"),
function(x, col) {
Expand Down Expand Up @@ -1046,7 +1051,7 @@ setMethod("filter",
})

#' @rdname filter
#' @export
#' @aliases where,DataFrame,function-method
setMethod("where",
signature(x = "DataFrame", condition = "characterOrColumn"),
function(x, condition) {
Expand Down

0 comments on commit cafd104

Please sign in to comment.