Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into fix-local-page-…
Browse files Browse the repository at this point in the history
…size
  • Loading branch information
Andrew Or committed Oct 22, 2015
2 parents 0e140c2 + 555b208 commit d0fc050
Show file tree
Hide file tree
Showing 446 changed files with 14,406 additions and 6,090 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Expand Up @@ -265,7 +265,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
9 changes: 7 additions & 2 deletions R/pkg/NAMESPACE
Expand Up @@ -23,6 +23,7 @@ export("setJobGroup",
exportClasses("DataFrame")

exportMethods("arrange",
"attach",
"cache",
"collect",
"columns",
Expand All @@ -40,6 +41,7 @@ exportMethods("arrange",
"fillna",
"filter",
"first",
"freqItems",
"group_by",
"groupBy",
"head",
Expand All @@ -63,6 +65,7 @@ exportMethods("arrange",
"repartition",
"sample",
"sample_frac",
"sampleBy",
"saveAsParquetFile",
"saveAsTable",
"saveDF",
Expand Down Expand Up @@ -106,6 +109,7 @@ exportMethods("%in%",
"cbrt",
"ceil",
"ceiling",
"column",
"concat",
"concat_ws",
"contains",
Expand Down Expand Up @@ -226,7 +230,8 @@ exportMethods("agg")
export("sparkRSQL.init",
"sparkRHive.init")

export("cacheTable",
export("as.DataFrame",
"cacheTable",
"clearCache",
"createDataFrame",
"createExternalTable",
Expand All @@ -250,4 +255,4 @@ export("structField",
"structType.structField",
"print.structType")

export("as.data.frame")
export("as.data.frame")
57 changes: 45 additions & 12 deletions R/pkg/R/DataFrame.R
Expand Up @@ -1414,9 +1414,10 @@ setMethod("where",
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
#' @param joinType The type of join to perform. The following join types are available:
#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
#' @return A DataFrame containing the result of the join operation.
#' @rdname join
#' @name join
Expand All @@ -1441,11 +1442,15 @@ setMethod("join",
if (is.null(joinType)) {
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
} else {
if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) {
if (joinType %in% c("inner", "outer", "full", "fullouter",
"leftouter", "left_outer", "left",
"rightouter", "right_outer", "right", "leftsemi")) {
joinType <- gsub("_", "", joinType)
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
} else {
stop("joinType must be one of the following types: ",
"'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'")
"'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left',
'rightouter', 'right_outer', 'right', 'leftsemi'")
}
}
}
Expand Down Expand Up @@ -1826,17 +1831,15 @@ setMethod("fillna",
if (length(colNames) == 0 || !all(colNames != "")) {
stop("value should be an a named list with each name being a column name.")
}

# Convert to the named list to an environment to be passed to JVM
valueMap <- new.env()
for (col in colNames) {
# Check each item in the named list is of valid type
v <- value[[col]]
# Check each item in the named list is of valid type
lapply(value, function(v) {
if (!(class(v) %in% c("integer", "numeric", "character"))) {
stop("Each item in value should be an integer, numeric or charactor.")
}
valueMap[[col]] <- v
}
})

# Convert to the named list to an environment to be passed to JVM
valueMap <- convertNamedListToEnv(value)

# When value is a named list, caller is expected not to pass in cols
if (!is.null(cols)) {
Expand Down Expand Up @@ -1881,3 +1884,33 @@ setMethod("as.data.frame",
}
collect(x)
})

#' The specified DataFrame is attached to the R search path. This means that
#' the DataFrame is searched by R when evaluating a variable, so columns in
#' the DataFrame can be accessed by simply giving their names.
#'
#' @rdname attach
#' @title Attach DataFrame to R search path
#' @param what (DataFrame) The DataFrame to attach
#' @param pos (integer) Specify position in search() where to attach.
#' @param name (character) Name to use for the attached DataFrame. Names
#' starting with package: are reserved for library.
#' @param warn.conflicts (logical) If TRUE, warnings are printed about conflicts
#' from attaching the database, unless that DataFrame contains an object
#' @examples
#' \dontrun{
#' attach(irisDf)
#' summary(Sepal_Width)
#' }
#' @seealso \link{detach}
setMethod("attach",
signature(what = "DataFrame"),
function(what, pos = 2, name = deparse(substitute(what)), warn.conflicts = TRUE) {
cols <- columns(what)
stopifnot(length(cols) > 0)
newEnv <- new.env()
for (i in 1:length(cols)) {
assign(x = cols[i], value = what[, cols[i]], envir = newEnv)
}
attach(newEnv, pos = pos, name = name, warn.conflicts = warn.conflicts)
})
39 changes: 25 additions & 14 deletions R/pkg/R/SQLContext.R
Expand Up @@ -32,6 +32,7 @@ infer_type <- function(x) {
numeric = "double",
raw = "binary",
list = "array",
struct = "struct",
environment = "map",
Date = "date",
POSIXlt = "timestamp",
Expand All @@ -44,39 +45,42 @@ infer_type <- function(x) {
paste0("map<string,", infer_type(get(key, x)), ">")
} else if (type == "array") {
stopifnot(length(x) > 0)

paste0("array<", infer_type(x[[1]]), ">")
} else if (type == "struct") {
stopifnot(length(x) > 0)
names <- names(x)
if (is.null(names)) {
paste0("array<", infer_type(x[[1]]), ">")
} else {
# StructType
types <- lapply(x, infer_type)
fields <- lapply(1:length(x), function(i) {
structField(names[[i]], types[[i]], TRUE)
})
do.call(structType, fields)
}
stopifnot(!is.null(names))

type <- lapply(seq_along(x), function(i) {
paste0(names[[i]], ":", infer_type(x[[i]]), ",")
})
type <- Reduce(paste0, type)
type <- paste0("struct<", substr(type, 1, nchar(type) - 1), ">")
} else if (length(x) > 1) {
paste0("array<", infer_type(x[[1]]), ">")
} else {
type
}
}

#' Create a DataFrame from an RDD
#' Create a DataFrame
#'
#' Converts an RDD to a DataFrame by infer the types.
#' Converts R data.frame or list into DataFrame.
#'
#' @param sqlContext A SQLContext
#' @param data An RDD or list or data.frame
#' @param schema a list of column names or named list (StructType), optional
#' @return an DataFrame
#' @rdname createDataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
#' df <- createDataFrame(sqlContext, rdd)
#' df1 <- as.DataFrame(sqlContext, iris)
#' df2 <- as.DataFrame(sqlContext, list(3,4,5,6))
#' df3 <- createDataFrame(sqlContext, iris)
#' }

# TODO(davies): support sampling and infer type from NA
Expand Down Expand Up @@ -149,6 +153,13 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0
dataFrame(sdf)
}

#' @rdname createDataFrame
#' @aliases createDataFrame
#' @export
as.DataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
createDataFrame(sqlContext, data, schema, samplingRatio)
}

# toDF
#
# Converts an RDD to a DataFrame by infer the types.
Expand Down
12 changes: 5 additions & 7 deletions R/pkg/R/column.R
Expand Up @@ -36,13 +36,11 @@ setMethod("initialize", "Column", function(.Object, jc) {
.Object
})

column <- function(jc) {
new("Column", jc)
}

col <- function(x) {
column(callJStatic("org.apache.spark.sql.functions", "col", x))
}
setMethod("column",
signature(x = "jobj"),
function(x) {
new("Column", x)
})

#' @rdname show
#' @name show
Expand Down
10 changes: 10 additions & 0 deletions R/pkg/R/deserialize.R
Expand Up @@ -51,6 +51,7 @@ readTypedObject <- function(con, type) {
"a" = readArray(con),
"l" = readList(con),
"e" = readEnv(con),
"s" = readStruct(con),
"n" = NULL,
"j" = getJobj(readString(con)),
stop(paste("Unsupported type for deserialization", type)))
Expand Down Expand Up @@ -135,6 +136,15 @@ readEnv <- function(con) {
env
}

# Read a field of StructType from DataFrame
# into a named list in R whose class is "struct"
readStruct <- function(con) {
names <- readObject(con)
fields <- readObject(con)
names(fields) <- names
listToStruct(fields)
}

readRaw <- function(con) {
dataLen <- readInt(con)
readBin(con, raw(), as.integer(dataLen), endian = "big")
Expand Down
35 changes: 31 additions & 4 deletions R/pkg/R/functions.R
Expand Up @@ -18,16 +18,21 @@
#' @include generics.R column.R
NULL

#' Creates a \code{Column} of literal value.
#' lit
#'
#' The passed in object is returned directly if it is already a \linkS4class{Column}.
#' If the object is a Scala Symbol, it is converted into a \linkS4class{Column} also.
#' Otherwise, a new \linkS4class{Column} is created to represent the literal value.
#' A new \linkS4class{Column} is created to represent the literal value.
#' If the parameter is a \linkS4class{Column}, it is returned unchanged.
#'
#' @family normal_funcs
#' @rdname lit
#' @name lit
#' @export
#' @examples
#' \dontrun{
#' lit(df$name)
#' select(df, lit("x"))
#' select(df, lit("2015-01-01"))
#'}
setMethod("lit", signature("ANY"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down Expand Up @@ -233,6 +238,28 @@ setMethod("ceil",
column(jc)
})

#' Though scala functions has "col" function, we don't expose it in SparkR
#' because we don't want to conflict with the "col" function in the R base
#' package and we also have "column" function exported which is an alias of "col".
col <- function(x) {
column(callJStatic("org.apache.spark.sql.functions", "col", x))
}

#' column
#'
#' Returns a Column based on the given column name.
#'
#' @rdname col
#' @name column
#' @family normal_funcs
#' @export
#' @examples \dontrun{column(df)}
setMethod("column",
signature(x = "character"),
function(x) {
col(x)
})

#' cos
#'
#' Computes the cosine of the given value.
Expand Down
16 changes: 16 additions & 0 deletions R/pkg/R/generics.R
Expand Up @@ -63,6 +63,10 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
# @export
setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })

# @rdname statfunctions
# @export
setGeneric("freqItems", function(x, cols, support = 0.01) { standardGeneric("freqItems") })

# @rdname distinct
# @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
Expand Down Expand Up @@ -505,6 +509,10 @@ setGeneric("sample",
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })

#' @rdname statfunctions
#' @export
setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") })

#' @rdname saveAsParquetFile
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
Expand Down Expand Up @@ -682,6 +690,10 @@ setGeneric("cbrt", function(x) { standardGeneric("cbrt") })
#' @export
setGeneric("ceil", function(x) { standardGeneric("ceil") })

#' @rdname col
#' @export
setGeneric("column", function(x) { standardGeneric("column") })

#' @rdname concat
#' @export
setGeneric("concat", function(x, ...) { standardGeneric("concat") })
Expand Down Expand Up @@ -995,3 +1007,7 @@ setGeneric("rbind", signature = "...")
#' @rdname as.data.frame
#' @export
setGeneric("as.data.frame")

#' @rdname attach
#' @export
setGeneric("attach")
5 changes: 3 additions & 2 deletions R/pkg/R/mllib.R
Expand Up @@ -45,11 +45,12 @@ setClass("PipelineModel", representation(model = "jobj"))
#' summary(model)
#'}
setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"),
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0) {
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0,
solver = "auto") {
family <- match.arg(family)
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"fitRModelFormula", deparse(formula), data@sdf, family, lambda,
alpha)
alpha, solver)
return(new("PipelineModel", model = model))
})

Expand Down

0 comments on commit d0fc050

Please sign in to comment.