Skip to content

Commit

Permalink
Merge pull request #17 from apache/master
Browse files Browse the repository at this point in the history
merge lastest spark
  • Loading branch information
pzzs committed Jun 10, 2015
2 parents 0ba5f42 + 778f3ca commit 7bc7d28
Show file tree
Hide file tree
Showing 1,019 changed files with 18,312 additions and 12,201 deletions.
3 changes: 3 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,8 @@ local-1425081759269/*
local-1426533911241/*
local-1426633911242/*
local-1430917381534/*
local-1430917381535_1
local-1430917381535_2
DESCRIPTION
NAMESPACE
test_support/*
46 changes: 46 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,52 @@ and

Vis.js may be distributed under either license.

========================================================================
For dagre-d3 (core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js):
========================================================================
Copyright (c) 2013 Chris Pettitt

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

========================================================================
For graphlib-dot (core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js):
========================================================================
Copyright (c) 2012-2013 Chris Pettitt

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

========================================================================
BSD-style licenses
========================================================================
Expand Down
8 changes: 4 additions & 4 deletions R/create-docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
# After running this script the html docs can be found in
# $SPARK_HOME/R/pkg/html

set -o pipefail
set -e

# Figure out where the script is
export FWDIR="$(cd "`dirname "$0"`"; pwd)"
pushd $FWDIR

# Generate Rd file
Rscript -e 'library(devtools); devtools::document(pkg="./pkg", roclets=c("rd"))'

# Install the package
# Install the package (this will also generate the Rd files)
./install-dev.sh

# Now create HTML files
Expand Down
11 changes: 10 additions & 1 deletion R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,20 @@
# NOTE(shivaram): Right now we use $SPARK_HOME/R/lib to be the installation directory
# to load the SparkR package on the worker nodes.

set -o pipefail
set -e

FWDIR="$(cd `dirname $0`; pwd)"
LIB_DIR="$FWDIR/lib"

mkdir -p $LIB_DIR

# Install R
pushd $FWDIR

# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'

# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

popd
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ exportMethods("arrange",
"count",
"describe",
"distinct",
"dropna",
"dtypes",
"except",
"explain",
"fillna",
"filter",
"first",
"group_by",
Expand Down
135 changes: 129 additions & 6 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1314,9 +1314,8 @@ setMethod("except",
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
signature(df = "DataFrame", path = 'character'),
function(df, path, source = NULL, mode = "append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
Expand All @@ -1338,9 +1337,8 @@ setMethod("write.df",
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
signature(df = "DataFrame", path = 'character'),
function(df, path, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})

Expand Down Expand Up @@ -1431,3 +1429,128 @@ setMethod("describe",
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
dataFrame(sdf)
})

#' dropna
#'
#' Returns a new DataFrame omitting rows with null values.
#'
#' @param x A SparkSQL DataFrame.
#' @param how "any" or "all".
#' if "any", drop a row if it contains any nulls.
#' if "all", drop a row only if all its values are null.
#' if minNonNulls is specified, how is ignored.
#' @param minNonNulls If specified, drop rows that have less than
#' minNonNulls non-null values.
#' This overwrites the how parameter.
#' @param cols Optional list of column names to consider.
#' @return A DataFrame
#'
#' @rdname nafunctions
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' dropna(df)
#' }
setMethod("dropna",
signature(x = "DataFrame"),
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
how <- match.arg(how)
if (is.null(cols)) {
cols <- columns(x)
}
if (is.null(minNonNulls)) {
minNonNulls <- if (how == "any") { length(cols) } else { 1 }
}

naFunctions <- callJMethod(x@sdf, "na")
sdf <- callJMethod(naFunctions, "drop",
as.integer(minNonNulls), listToSeq(as.list(cols)))
dataFrame(sdf)
})

#' @aliases dropna
#' @export
setMethod("na.omit",
signature(x = "DataFrame"),
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
dropna(x, how, minNonNulls, cols)
})

#' fillna
#'
#' Replace null values.
#'
#' @param x A SparkSQL DataFrame.
#' @param value Value to replace null values with.
#' Should be an integer, numeric, character or named list.
#' If the value is a named list, then cols is ignored and
#' value must be a mapping from column name (character) to
#' replacement value. The replacement value must be an
#' integer, numeric or character.
#' @param cols optional list of column names to consider.
#' Columns specified in cols that do not have matching data
#' type are ignored. For example, if value is a character, and
#' subset contains a non-character column, then the non-character
#' column is simply ignored.
#' @return A DataFrame
#'
#' @rdname nafunctions
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' fillna(df, 1)
#' fillna(df, list("age" = 20, "name" = "unknown"))
#' }
setMethod("fillna",
signature(x = "DataFrame"),
function(x, value, cols = NULL) {
if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
stop("value should be an integer, numeric, charactor or named list.")
}

if (class(value) == "list") {
# Check column names in the named list
colNames <- names(value)
if (length(colNames) == 0 || !all(colNames != "")) {
stop("value should be an a named list with each name being a column name.")
}

# Convert to the named list to an environment to be passed to JVM
valueMap <- new.env()
for (col in colNames) {
# Check each item in the named list is of valid type
v <- value[[col]]
if (!(class(v) %in% c("integer", "numeric", "character"))) {
stop("Each item in value should be an integer, numeric or charactor.")
}
valueMap[[col]] <- v
}

# When value is a named list, caller is expected not to pass in cols
if (!is.null(cols)) {
warning("When value is a named list, cols is ignored!")
cols <- NULL
}

value <- valueMap
} else if (is.integer(value)) {
# Cast an integer to a numeric
value <- as.numeric(value)
}

naFunctions <- callJMethod(x@sdf, "na")
sdf <- if (length(cols) == 0) {
callJMethod(naFunctions, "fill", value)
} else {
callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols)))
}
dataFrame(sdf)
})
19 changes: 15 additions & 4 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -452,20 +452,31 @@ dropTempTable <- function(sqlContext, tableName) {
#' df <- read.df(sqlContext, "path/to/file.json", source = "json")
#' }

read.df <- function(sqlContext, path = NULL, source = NULL, ...) {
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
}
sdf <- callJMethod(sqlContext, "load", source, options)
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
if (!is.null(schema)) {
stopifnot(class(schema) == "structType")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sqlContext, source,
schema$jobj, options)
} else {
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sqlContext, source, options)
}
dataFrame(sdf)
}

#' @aliases loadDF
#' @export

loadDF <- function(sqlContext, path = NULL, source = NULL, ...) {
read.df(sqlContext, path, source, ...)
loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
read.df(sqlContext, path, source, schema, ...)
}

#' Create an external table
Expand Down
22 changes: 20 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,20 @@ setGeneric("columns", function(x) {standardGeneric("columns") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })

#' @rdname nafunctions
#' @export
setGeneric("dropna",
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
standardGeneric("dropna")
})

#' @rdname nafunctions
#' @export
setGeneric("na.omit",
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
standardGeneric("na.omit")
})

#' @rdname schema
#' @export
setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
Expand All @@ -408,6 +422,10 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @export
setGeneric("except", function(x, y) { standardGeneric("except") })

#' @rdname nafunctions
#' @export
setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })

#' @rdname filter
#' @export
setGeneric("filter", function(x, condition) { standardGeneric("filter") })
Expand Down Expand Up @@ -482,11 +500,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })

#' @rdname write.df
#' @export
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })

#' @rdname schema
#' @export
Expand Down
18 changes: 17 additions & 1 deletion R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ writeObject <- function(con, object, writeType = TRUE) {
# passing in vectors as arrays and instead require arrays to be passed
# as lists.
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
# Checking types is needed here, since ‘is.na’ only handles atomic vectors,
# lists and pairlists
if (type %in% c("integer", "character", "logical", "double", "numeric")) {
if (is.na(object)) {
object <- NULL
type <- "NULL"
}
}
if (writeType) {
writeType(con, type)
}
Expand Down Expand Up @@ -160,6 +168,14 @@ writeList <- function(con, arr) {
}
}

# Used to pass arrays where the elements can be of different types
writeGenericList <- function(con, list) {
writeInt(con, length(list))
for (elem in list) {
writeObject(con, elem)
}
}

# Used to pass in hash maps required on Java side.
writeEnv <- function(con, env) {
len <- length(env)
Expand All @@ -168,7 +184,7 @@ writeEnv <- function(con, env) {
if (len > 0) {
writeList(con, as.list(ls(env)))
vals <- lapply(ls(env), function(x) { env[[x]] })
writeList(con, as.list(vals))
writeGenericList(con, as.list(vals))
}
}

Expand Down
Loading

0 comments on commit 7bc7d28

Please sign in to comment.