Skip to content

Commit

Permalink
[SPARKR][BACKPORT-2.1] backporting package and test changes
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

cherrypick or manually porting changes to 2.1

## How was this patch tested?

Jenkins

Author: Felix Cheung <felixcheung_m@hotmail.com>
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Wayne Zhang <actuaryzhang@uber.com>

Closes #19165 from felixcheung/rbackportpkg21.
  • Loading branch information
felixcheung authored and Felix Cheung committed Sep 10, 2017
1 parent 6a8a726 commit ae4e8ae
Show file tree
Hide file tree
Showing 29 changed files with 140 additions and 26 deletions.
2 changes: 2 additions & 0 deletions R/pkg/.Rbuildignore
Expand Up @@ -6,3 +6,5 @@
^README\.Rmd$
^src-native$
^html$
^tests/fulltests/*

2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Expand Up @@ -2,7 +2,7 @@ Package: SparkR
Type: Package
Version: 2.1.2
Title: R Frontend for Apache Spark
Description: The SparkR package provides an R Frontend for Apache Spark.
Description: Provides an R Frontend for Apache Spark.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
email = "shivaram@cs.berkeley.edu"),
person("Xiangrui", "Meng", role = "aut",
Expand Down
6 changes: 5 additions & 1 deletion R/pkg/R/install.R
Expand Up @@ -270,7 +270,11 @@ sparkCachePath <- function() {
if (.Platform$OS.type == "windows") {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
if (is.na(winAppPath)) {
stop(paste("%LOCALAPPDATA% not found.",
message("%LOCALAPPDATA% not found. Falling back to %USERPROFILE%.")
winAppPath <- Sys.getenv("USERPROFILE", unset = NA)
}
if (is.na(winAppPath)) {
stop(paste("%LOCALAPPDATA% and %USERPROFILE% not found.",
"Please define the environment variable",
"or restart and enter an installation path in localDir."))
} else {
Expand Down
72 changes: 72 additions & 0 deletions R/pkg/inst/tests/testthat/test_basic.R
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

context("basic tests for CRAN")

test_that("create DataFrame from list or data.frame", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

i <- 4
df <- createDataFrame(data.frame(dummy = 1:i))
expect_equal(count(df), i)

l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(l)
expect_equal(columns(df), c("a", "b"))

a <- 1:3
b <- c("a", "b", "c")
ldf <- data.frame(a, b)
df <- createDataFrame(ldf)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)

mtcarsdf <- createDataFrame(mtcars)
expect_equivalent(collect(mtcarsdf), mtcars)

bytes <- as.raw(c(1, 2, 3))
df <- createDataFrame(list(list(bytes)))
expect_equal(collect(df)[[1]][[1]], bytes)

sparkR.session.stop()
})

test_that("spark.glm and predict", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

training <- suppressWarnings(createDataFrame(iris))
# gaussian family
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
vals <- collect(select(prediction, "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

# Gamma family
x <- runif(100, -1, 1)
y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
model <- glm(y ~ x, family = Gamma, df)
out <- capture.output(print(summary(model)))
expect_true(any(grepl("Dispersion parameter for gamma family", out)))

sparkR.session.stop()
})
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Expand Up @@ -89,6 +89,10 @@ mockLinesComplexType <-
complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)

if (.Platform$OS.type == "windows") {
Sys.setenv(TZ = "GMT")
}

test_that("calling sparkRSQL.init returns existing SQL context", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
22 changes: 20 additions & 2 deletions R/pkg/tests/run-all.R
Expand Up @@ -21,13 +21,31 @@ library(SparkR)
# Turn all warnings into errors
options("warn" = 2)

install.spark()
if (.Platform$OS.type == "windows") {
Sys.setenv(TZ = "GMT")
}

# Setup global test environment
# Install Spark first to set SPARK_HOME
install.spark()

sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRWhitelistSQLDirs,
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)

sparkRTestMaster <- "local[1]"
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
sparkRTestMaster <- ""
}

test_package("SparkR")

if (identical(Sys.getenv("NOT_CRAN"), "true")) {
# for testthat 1.0.2 later, change reporter from "summary" to default_reporter()
testthat:::run_tests("SparkR",
file.path(sparkRDir, "pkg", "tests", "fulltests"),
NULL,
"summary")
}
47 changes: 29 additions & 18 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Expand Up @@ -27,6 +27,17 @@ vignette: >
limitations under the License.
-->

```{r setup, include=FALSE}
library(knitr)
opts_hooks$set(eval = function(options) {
# override eval to FALSE only on windows
if (.Platform$OS.type == "windows") {
options$eval = FALSE
}
options
})
```

## Overview

SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. With Spark `r packageVersion("SparkR")`, SparkR provides a distributed data frame implementation that supports data processing operations like selection, filtering, aggregation etc. and distributed machine learning using [MLlib](http://spark.apache.org/mllib/).
Expand Down Expand Up @@ -65,7 +76,7 @@ We can view the first few rows of the `SparkDataFrame` by `head` or `showDF` fun
head(carsDF)
```

Common data processing operations such as `filter`, `select` are supported on the `SparkDataFrame`.
Common data processing operations such as `filter` and `select` are supported on the `SparkDataFrame`.
```{r}
carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
Expand Down Expand Up @@ -364,7 +375,7 @@ out <- dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
head(collect(out))
```

Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function should be a `data.frame`, but no schema is required in this case. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
Like `dapply`, `dapplyCollect` can apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of the function should be a `data.frame`, but no schema is required in this case. Note that `dapplyCollect` can fail if the output of the UDF on all partitions cannot be pulled into the driver's memory.

```{r}
out <- dapplyCollect(
Expand All @@ -390,7 +401,7 @@ result <- gapply(
head(arrange(result, "max_mpg", decreasing = TRUE))
```

Like gapply, `gapplyCollect` applies a function to each partition of a `SparkDataFrame` and collect the result back to R `data.frame`. The output of the function should be a `data.frame` but no schema is required in this case. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
Like `gapply`, `gapplyCollect` can apply a function to each partition of a `SparkDataFrame` and collect the result back to R `data.frame`. The output of the function should be a `data.frame` but no schema is required in this case. Note that `gapplyCollect` can fail if the output of the UDF on all partitions cannot be pulled into the driver's memory.

```{r}
result <- gapplyCollect(
Expand Down Expand Up @@ -443,20 +454,20 @@ options(ops)


### SQL Queries
A `SparkDataFrame` can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. The sql function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
A `SparkDataFrame` can also be registered as a temporary view in Spark SQL so that one can run SQL queries over its data. The sql function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.

```{r}
people <- read.df(paste0(sparkR.conf("spark.home"),
"/examples/src/main/resources/people.json"), "json")
```

Register this SparkDataFrame as a temporary view.
Register this `SparkDataFrame` as a temporary view.

```{r}
createOrReplaceTempView(people, "people")
```

SQL statements can be run by using the sql method.
SQL statements can be run using the sql method.
```{r}
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
Expand Down Expand Up @@ -718,7 +729,7 @@ head(predict(isoregModel, newDF))
`spark.gbt` fits a [gradient-boosted tree](https://en.wikipedia.org/wiki/Gradient_boosting) classification or regression model on a `SparkDataFrame`.
Users can call `summary` to get a summary of the fitted model, `predict` to make predictions, and `write.ml`/`read.ml` to save/load fitted models.

Similar to the random forest example above, we use the `longley` dataset to train a gradient-boosted tree and make predictions:
We use the `longley` dataset to train a gradient-boosted tree and make predictions:

```{r, warning=FALSE}
df <- createDataFrame(longley)
Expand All @@ -745,7 +756,7 @@ predictions <- predict(rfModel, df)

`spark.gaussianMixture` fits multivariate [Gaussian Mixture Model](https://en.wikipedia.org/wiki/Mixture_model#Multivariate_Gaussian_mixture_model) (GMM) against a `SparkDataFrame`. [Expectation-Maximization](https://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm) (EM) is used to approximate the maximum likelihood estimator (MLE) of the model.

We use a simulated example to demostrate the usage.
We use a simulated example to demonstrate the usage.
```{r}
X1 <- data.frame(V1 = rnorm(4), V2 = rnorm(4))
X2 <- data.frame(V1 = rnorm(6, 3), V2 = rnorm(6, 4))
Expand Down Expand Up @@ -776,9 +787,9 @@ head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20

* Topics and documents both exist in a feature space, where feature vectors are vectors of word counts (bag of words).

* Rather than estimating a clustering using a traditional distance, LDA uses a function based on a statistical model of how text documents are generated.
* Rather than clustering using a traditional distance, LDA uses a function based on a statistical model of how text documents are generated.

To use LDA, we need to specify a `features` column in `data` where each entry represents a document. There are two type options for the column:
To use LDA, we need to specify a `features` column in `data` where each entry represents a document. There are two options for the column:

* character string: This can be a string of the whole document. It will be parsed automatically. Additional stop words can be added in `customizedStopWords`.

Expand Down Expand Up @@ -826,7 +837,7 @@ perplexity

`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](http://dl.acm.org/citation.cfm?id=1608614).

There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, `nonnegative`. For a complete list, refer to the help file.
There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, and `nonnegative`. For a complete list, refer to the help file.

```{r}
ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
Expand Down Expand Up @@ -875,7 +886,7 @@ testSummary


### Model Persistence
The following example shows how to save/load an ML model by SparkR.
The following example shows how to save/load an ML model in SparkR.
```{r, warning=FALSE}
irisDF <- createDataFrame(iris)
gaussianGLM <- spark.glm(irisDF, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")
Expand Down Expand Up @@ -906,19 +917,19 @@ There are three main object classes in SparkR you may be working with.
+ `sdf` stores a reference to the corresponding Spark Dataset in the Spark JVM backend.
+ `env` saves the meta-information of the object such as `isCached`.

It can be created by data import methods or by transforming an existing `SparkDataFrame`. We can manipulate `SparkDataFrame` by numerous data processing functions and feed that into machine learning algorithms.
It can be created by data import methods or by transforming an existing `SparkDataFrame`. We can manipulate `SparkDataFrame` by numerous data processing functions and feed that into machine learning algorithms.

* `Column`: an S4 class representing column of `SparkDataFrame`. The slot `jc` saves a reference to the corresponding Column object in the Spark JVM backend.
* `Column`: an S4 class representing a column of `SparkDataFrame`. The slot `jc` saves a reference to the corresponding `Column` object in the Spark JVM backend.

It can be obtained from a `SparkDataFrame` by `$` operator, `df$col`. More often, it is used together with other functions, for example, with `select` to select particular columns, with `filter` and constructed conditions to select rows, with aggregation functions to compute aggregate statistics for each group.
It can be obtained from a `SparkDataFrame` by `$` operator, e.g., `df$col`. More often, it is used together with other functions, for example, with `select` to select particular columns, with `filter` and constructed conditions to select rows, with aggregation functions to compute aggregate statistics for each group.

* `GroupedData`: an S4 class representing grouped data created by `groupBy` or by transforming other `GroupedData`. Its `sgd` slot saves a reference to a RelationalGroupedDataset object in the backend.
* `GroupedData`: an S4 class representing grouped data created by `groupBy` or by transforming other `GroupedData`. Its `sgd` slot saves a reference to a `RelationalGroupedDataset` object in the backend.

This is often an intermediate object with group information and followed up by aggregation operations.
This is often an intermediate object with group information and followed up by aggregation operations.

### Architecture

A complete description of architecture can be seen in reference, in particular the paper *SparkR: Scaling R Programs with Spark*.
A complete description of architecture can be seen in the references, in particular the paper *SparkR: Scaling R Programs with Spark*.

Under the hood of SparkR is Spark SQL engine. This avoids the overheads of running interpreted R code, and the optimized SQL execution engine in Spark uses structural information about data and computation flow to perform a bunch of optimizations to speed up the computation.

Expand Down
2 changes: 1 addition & 1 deletion R/run-tests.sh
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"
Expand Down
3 changes: 3 additions & 0 deletions appveyor.yml
Expand Up @@ -45,6 +45,9 @@ install:
build_script:
- cmd: mvn -DskipTests -Phadoop-2.6 -Psparkr -Phive -Phive-thriftserver package

environment:
NOT_CRAN: true

test_script:
- cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R

Expand Down
Expand Up @@ -428,8 +428,8 @@ class SparkSubmitSuite
assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.")
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val rScriptDir =
Seq(sparkHome, "R", "pkg", "inst", "tests", "packageInAJarTest.R").mkString(File.separator)
val rScriptDir = Seq(
sparkHome, "R", "pkg", "tests", "fulltests", "packageInAJarTest.R").mkString(File.separator)
assert(new File(rScriptDir).exists)
IvyTestUtils.withRepository(main, None, None, withR = true) { repo =>
val args = Seq(
Expand All @@ -450,7 +450,7 @@ class SparkSubmitSuite
// Check if the SparkR package is installed
assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.")
val rScriptDir =
Seq(sparkHome, "R", "pkg", "inst", "tests", "testthat", "jarTest.R").mkString(File.separator)
Seq(sparkHome, "R", "pkg", "tests", "fulltests", "jarTest.R").mkString(File.separator)
assert(new File(rScriptDir).exists)

// compile a small jar containing a class that will be called from R code.
Expand Down

0 comments on commit ae4e8ae

Please sign in to comment.