Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into SPARK-6152
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 11, 2015
2 parents 002d1ea + 1dde39d commit 01f5cca
Show file tree
Hide file tree
Showing 332 changed files with 13,186 additions and 5,913 deletions.
1 change: 1 addition & 0 deletions R/pkg/DESCRIPTION
Expand Up @@ -34,4 +34,5 @@ Collate:
'serialize.R'
'sparkR.R'
'stats.R'
'types.R'
'utils.R'
6 changes: 3 additions & 3 deletions R/pkg/NAMESPACE
Expand Up @@ -23,9 +23,11 @@ export("setJobGroup",
exportClasses("DataFrame")

exportMethods("arrange",
"as.data.frame",
"attach",
"cache",
"collect",
"coltypes",
"columns",
"count",
"cov",
Expand Down Expand Up @@ -262,6 +264,4 @@ export("structField",
"structType",
"structType.jobj",
"structType.structField",
"print.structType")

export("as.data.frame")
"print.structType")
55 changes: 52 additions & 3 deletions R/pkg/R/DataFrame.R
Expand Up @@ -1944,9 +1944,9 @@ setMethod("describe",
#' @rdname summary
#' @name summary
setMethod("summary",
signature(x = "DataFrame"),
function(x) {
describe(x)
signature(object = "DataFrame"),
function(object, ...) {
describe(object)
})


Expand Down Expand Up @@ -2152,3 +2152,52 @@ setMethod("with",
newEnv <- assignNewEnv(data)
eval(substitute(expr), envir = newEnv, enclos = newEnv)
})

#' Returns the column types of a DataFrame.
#'
#' @name coltypes
#' @title Get column types of a DataFrame
#' @family dataframe_funcs
#' @param x (DataFrame)
#' @return value (character) A character vector with the column types of the given DataFrame
#' @rdname coltypes
#' @examples \dontrun{
#' irisDF <- createDataFrame(sqlContext, iris)
#' coltypes(irisDF)
#' }
setMethod("coltypes",
signature(x = "DataFrame"),
function(x) {
# Get the data types of the DataFrame by invoking dtypes() function
types <- sapply(dtypes(x), function(x) {x[[2]]})

# Map Spark data types into R's data types using DATA_TYPES environment
rTypes <- sapply(types, USE.NAMES=F, FUN=function(x) {

# Check for primitive types
type <- PRIMITIVE_TYPES[[x]]

if (is.null(type)) {
# Check for complex types
for (t in names(COMPLEX_TYPES)) {
if (substring(x, 1, nchar(t)) == t) {
type <- COMPLEX_TYPES[[t]]
break
}
}

if (is.null(type)) {
stop(paste("Unsupported data type: ", x))
}
}
type
})

# Find which types don't have mapping to R
naIndices <- which(is.na(rTypes))

# Assign the original scala data types to the unmatched ones
rTypes[naIndices] <- types[naIndices]

rTypes
})
2 changes: 1 addition & 1 deletion R/pkg/R/functions.R
Expand Up @@ -1339,7 +1339,7 @@ setMethod("pmod", signature(y = "Column"),
#' @export
setMethod("approxCountDistinct",
signature(x = "Column"),
function(x, rsd = 0.95) {
function(x, rsd = 0.05) {
jc <- callJStatic("org.apache.spark.sql.functions", "approxCountDistinct", x@jc, rsd)
column(jc)
})
Expand Down
6 changes: 5 additions & 1 deletion R/pkg/R/generics.R
Expand Up @@ -561,7 +561,7 @@ setGeneric("summarize", function(x,...) { standardGeneric("summarize") })

#' @rdname summary
#' @export
setGeneric("summary", function(x, ...) { standardGeneric("summary") })
setGeneric("summary", function(object, ...) { standardGeneric("summary") })

# @rdname tojson
# @export
Expand Down Expand Up @@ -1047,3 +1047,7 @@ setGeneric("attach")
#' @rdname with
#' @export
setGeneric("with")

#' @rdname coltypes
#' @export
setGeneric("coltypes", function(x) { standardGeneric("coltypes") })
33 changes: 24 additions & 9 deletions R/pkg/R/mllib.R
Expand Up @@ -48,8 +48,9 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0,
standardize = TRUE, solver = "auto") {
family <- match.arg(family)
formula <- paste(deparse(formula), collapse="")
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"fitRModelFormula", deparse(formula), data@sdf, family, lambda,
"fitRModelFormula", formula, data@sdf, family, lambda,
alpha, standardize, solver)
return(new("PipelineModel", model = model))
})
Expand Down Expand Up @@ -88,14 +89,28 @@ setMethod("predict", signature(object = "PipelineModel"),
#' model <- glm(y ~ x, trainingData)
#' summary(model)
#'}
setMethod("summary", signature(x = "PipelineModel"),
function(x, ...) {
setMethod("summary", signature(object = "PipelineModel"),
function(object, ...) {
modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelName", object@model)
features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelFeatures", x@model)
"getModelFeatures", object@model)
coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelCoefficients", x@model)
coefficients <- as.matrix(unlist(coefficients))
colnames(coefficients) <- c("Estimate")
rownames(coefficients) <- unlist(features)
return(list(coefficients = coefficients))
"getModelCoefficients", object@model)
if (modelName == "LinearRegressionModel") {
devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelDevianceResiduals", object@model)
devianceResiduals <- matrix(devianceResiduals, nrow = 1)
colnames(devianceResiduals) <- c("Min", "Max")
rownames(devianceResiduals) <- rep("", times = 1)
coefficients <- matrix(coefficients, ncol = 4)
colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)")
rownames(coefficients) <- unlist(features)
return(list(devianceResiduals = devianceResiduals, coefficients = coefficients))
} else {
coefficients <- as.matrix(unlist(coefficients))
colnames(coefficients) <- c("Estimate")
rownames(coefficients) <- unlist(features)
return(list(coefficients = coefficients))
}
})
15 changes: 1 addition & 14 deletions R/pkg/R/schema.R
Expand Up @@ -115,20 +115,7 @@ structField.jobj <- function(x) {
}

checkType <- function(type) {
primtiveTypes <- c("byte",
"integer",
"float",
"double",
"numeric",
"character",
"string",
"binary",
"raw",
"logical",
"boolean",
"timestamp",
"date")
if (type %in% primtiveTypes) {
if (!is.null(PRIMITIVE_TYPES[[type]])) {
return()
} else {
# Check complex types
Expand Down
43 changes: 43 additions & 0 deletions R/pkg/R/types.R
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# types.R. This file handles the data type mapping between Spark and R

# The primitive data types, where names(PRIMITIVE_TYPES) are Scala types whereas
# values are equivalent R types. This is stored in an environment to allow for
# more efficient look up (environments use hashmaps).
PRIMITIVE_TYPES <- as.environment(list(
"byte"="integer",
"tinyint"="integer",
"smallint"="integer",
"integer"="integer",
"bigint"="numeric",
"float"="numeric",
"double"="numeric",
"decimal"="numeric",
"string"="character",
"binary"="raw",
"boolean"="logical",
"timestamp"="POSIXct",
"date"="Date"))

# The complex data types. These do not have any direct mapping to R's types.
COMPLEX_TYPES <- list(
"map"=NA,
"array"=NA,
"struct"=NA)

# The full list of data types.
DATA_TYPES <- as.environment(c(as.list(PRIMITIVE_TYPES), COMPLEX_TYPES))
38 changes: 31 additions & 7 deletions R/pkg/inst/tests/test_mllib.R
Expand Up @@ -33,6 +33,18 @@ test_that("glm and predict", {
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
})

test_that("glm should work with long formula", {
training <- createDataFrame(sqlContext, iris)
training$LongLongLongLongLongName <- training$Sepal_Width
training$VeryLongLongLongLonLongName <- training$Sepal_Length
training$AnotherLongLongLongLongName <- training$Species
model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName,
data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("predictions match with native glm", {
training <- createDataFrame(sqlContext, iris)
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
Expand All @@ -59,12 +71,18 @@ test_that("feature interaction vs native glm", {

test_that("summary coefficients match with native glm", {
training <- createDataFrame(sqlContext, iris)
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "l-bfgs"))
coefs <- as.vector(stats$coefficients)
rCoefs <- as.vector(coef(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)))
expect_true(all(abs(rCoefs - coefs) < 1e-6))
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal"))
coefs <- unlist(stats$coefficients)
devianceResiduals <- unlist(stats$devianceResiduals)

rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
rCoefs <- unlist(rStats$coefficients)
rDevianceResiduals <- c(-0.95096, 0.72918)

expect_true(all(abs(rCoefs - coefs) < 1e-5))
expect_true(all(abs(rDevianceResiduals - devianceResiduals) < 1e-5))
expect_true(all(
as.character(stats$features) ==
rownames(stats$coefficients) ==
c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
})

Expand All @@ -73,14 +91,20 @@ test_that("summary coefficients match with native glm of family 'binomial'", {
training <- filter(df, df$Species != "setosa")
stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
family = "binomial"))
coefs <- as.vector(stats$coefficients)
coefs <- as.vector(stats$coefficients[,1])

rTraining <- iris[iris$Species %in% c("versicolor","virginica"),]
rCoefs <- as.vector(coef(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
family = binomial(link = "logit"))))

expect_true(all(abs(rCoefs - coefs) < 1e-4))
expect_true(all(
as.character(stats$features) ==
rownames(stats$coefficients) ==
c("(Intercept)", "Sepal_Length", "Sepal_Width")))
})

test_that("summary works on base GLM models", {
baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
baseSummary <- summary(baseModel)
expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
})
36 changes: 29 additions & 7 deletions R/pkg/inst/tests/test_sparkSQL.R
Expand Up @@ -647,11 +647,11 @@ test_that("sample on a DataFrame", {
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_is(sampled, "DataFrame")
sampled2 <- sample(df, FALSE, 0.1)
sampled2 <- sample(df, FALSE, 0.1, 0) # set seed for predictable result
expect_true(count(sampled2) < 3)

# Also test sample_frac
sampled3 <- sample_frac(df, FALSE, 0.1)
sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result
expect_true(count(sampled3) < 3)
})

Expand Down Expand Up @@ -875,9 +875,9 @@ test_that("column binary mathfunctions", {
expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4)
expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4)
expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric")
expect_equal(collect(select(df, rand(1)))[1, 1], 0.45, tolerance = 0.01)
expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01)
expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric")
expect_equal(collect(select(df, randn(1)))[1, 1], -0.0111, tolerance = 0.01)
expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01)
})

test_that("string operators", {
Expand Down Expand Up @@ -1458,17 +1458,18 @@ test_that("sampleBy() on a DataFrame", {
fractions <- list("0" = 0.1, "1" = 0.2)
sample <- sampleBy(df, "key", fractions, 0)
result <- collect(orderBy(count(groupBy(sample, "key")), "key"))
expect_identical(as.list(result[1, ]), list(key = "0", count = 2))
expect_identical(as.list(result[2, ]), list(key = "1", count = 10))
expect_identical(as.list(result[1, ]), list(key = "0", count = 3))
expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
})

test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table not found: blah", retError), TRUE)
})

irisDF <- createDataFrame(sqlContext, iris)

test_that("Method as.data.frame as a synonym for collect()", {
irisDF <- createDataFrame(sqlContext, iris)
expect_equal(as.data.frame(irisDF), collect(irisDF))
irisDF2 <- irisDF[irisDF$Species == "setosa", ]
expect_equal(as.data.frame(irisDF2), collect(irisDF2))
Expand Down Expand Up @@ -1503,6 +1504,27 @@ test_that("with() on a DataFrame", {
expect_equal(nrow(sum2), 35)
})

test_that("Method coltypes() to get R's data types of a DataFrame", {
expect_equal(coltypes(irisDF), c(rep("numeric", 4), "character"))

data <- data.frame(c1=c(1,2,3),
c2=c(T,F,T),
c3=c("2015/01/01 10:00:00", "2015/01/02 10:00:00", "2015/01/03 10:00:00"))

schema <- structType(structField("c1", "byte"),
structField("c3", "boolean"),
structField("c4", "timestamp"))

# Test primitive types
DF <- createDataFrame(sqlContext, data, schema)
expect_equal(coltypes(DF), c("integer", "logical", "POSIXct"))

# Test complex types
x <- createDataFrame(sqlContext, list(list(as.environment(
list("a"="b", "c"="d", "e"="f")))))
expect_equal(coltypes(x), "map<string,string>")
})

unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)

0 comments on commit 01f5cca

Please sign in to comment.