Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-24063
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Jan 24, 2019
2 parents b0c5056 + 9813b1d commit 357f834
Show file tree
Hide file tree
Showing 1,015 changed files with 20,954 additions and 11,447 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -94,3 +94,6 @@ spark-warehouse/
*.Rproj.*

.Rproj.user

# For SBT
.jvmopts
10 changes: 1 addition & 9 deletions R/README.md
Expand Up @@ -39,15 +39,7 @@ To set other options like driver memory, executor memory etc. you can pass in th

#### Using SparkR from RStudio

If you wish to use SparkR from RStudio or other R frontends you will need to set some environment variables which point SparkR to your Spark installation. For example
```R
# Set this to where Spark is installed
Sys.setenv(SPARK_HOME="/Users/username/spark")
# This line loads SparkR from the installed directory
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sparkR.session()
```
If you wish to use SparkR from RStudio, please refer [SparkR documentation](https://spark.apache.org/docs/latest/sparkr.html#starting-up-from-rstudio).

#### Making changes to SparkR

Expand Down
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Expand Up @@ -67,7 +67,8 @@ exportMethods("glm",
"spark.fpGrowth",
"spark.freqItemsets",
"spark.associationRules",
"spark.findFrequentSequentialPatterns")
"spark.findFrequentSequentialPatterns",
"spark.assignClusters")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Expand Up @@ -87,7 +87,7 @@ objectFile <- function(sc, path, minPartitions = NULL) {
#' in the list are split into \code{numSlices} slices and distributed to nodes
#' in the cluster.
#'
#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function
#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MiB), the function
#' will write it to disk and send the file name to JVM. Also to make sure each slice is not
#' larger than that limit, number of slices may be increased.
#'
Expand Down
68 changes: 44 additions & 24 deletions R/pkg/R/functions.R
Expand Up @@ -202,8 +202,9 @@ NULL
#' \itemize{
#' \item \code{from_json}: a structType object to use as the schema to use
#' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or
#' the DDL-formatted string literal can also be accepted.
#' \item \code{from_csv}: a structType object, DDL-formatted string or \code{schema_of_csv}
#' }
#' @param ... additional argument(s).
#' \itemize{
Expand Down Expand Up @@ -1723,7 +1724,7 @@ setMethod("radians",
#' @details
#' \code{to_date}: Converts the column into a DateType. You may optionally specify
#' a format according to the rules in:
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
#' \url{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}.
#' If the string cannot be parsed according to the specified format (or default),
#' the value of the column will be null.
#' By default, it follows casting rules to a DateType if the format is omitted
Expand Down Expand Up @@ -1819,7 +1820,7 @@ setMethod("to_csv", signature(x = "Column"),
#' @details
#' \code{to_timestamp}: Converts the column into a TimestampType. You may optionally specify
#' a format according to the rules in:
#' \url{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}.
#' \url{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}.
#' If the string cannot be parsed according to the specified format (or default),
#' the value of the column will be null.
#' By default, it follows casting rules to a TimestampType if the format is omitted
Expand Down Expand Up @@ -2240,7 +2241,7 @@ setMethod("n", signature(x = "Column"),
#' \code{date_format}: Converts a date/timestamp/string to a value of string in the format
#' specified by the date format given by the second argument. A pattern could be for instance
#' \code{dd.MM.yyyy} and could return a string like '18.03.1993'. All
#' pattern letters of \code{java.text.SimpleDateFormat} can be used.
#' pattern letters of \code{java.time.format.DateTimeFormatter} can be used.
#' Note: Use when ever possible specialized functions like \code{year}. These benefit from a
#' specialized implementation.
#'
Expand All @@ -2254,40 +2255,54 @@ setMethod("date_format", signature(y = "Column", x = "character"),
column(jc)
})

setClassUnion("characterOrstructTypeOrColumn", c("character", "structType", "Column"))

#' @details
#' \code{from_json}: Parses a column containing a JSON string into a Column of \code{structType}
#' with the specified \code{schema} or array of \code{structType} if \code{as.json.array} is set
#' to \code{TRUE}. If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
#' @aliases from_json from_json,Column,characterOrstructType-method
#' @aliases from_json from_json,Column,characterOrstructTypeOrColumn-method
#' @examples
#'
#' \dontrun{
#' df2 <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
#' df2 <- mutate(df2, d2 = to_json(df2$d, dateFormat = 'dd/MM/yyyy'))
#' schema <- structType(structField("date", "string"))
#' head(select(df2, from_json(df2$d2, schema, dateFormat = 'dd/MM/yyyy')))

#' df2 <- sql("SELECT named_struct('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#' schema <- structType(structField("name", "string"))
#' head(select(df2, from_json(df2$people_json, schema)))
#' head(select(df2, from_json(df2$people_json, "name STRING")))}
#' head(select(df2, from_json(df2$people_json, "name STRING")))
#' head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))}
#' @note from_json since 2.2.0
setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"),
setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
function(x, schema, as.json.array = FALSE, ...) {
if (is.character(schema)) {
schema <- structType(schema)
jschema <- structType(schema)$jobj
} else if (class(schema) == "structType") {
jschema <- schema$jobj
} else {
jschema <- schema@jc
}

if (as.json.array) {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
schema$jobj)
} else {
jschema <- schema$jobj
# This case is R-specifically different. Unlike Scala and Python side,
# R side has 'as.json.array' option to indicate if the schema should be
# treated as struct or element type of array in order to make it more
# R-friendly.
if (class(schema) == "Column") {
jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"createArrayType",
jschema)
} else {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
jschema)
}
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down Expand Up @@ -2328,22 +2343,27 @@ setMethod("schema_of_json", signature(x = "characterOrColumn"),
#' If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @aliases from_csv from_csv,Column,character-method
#' @aliases from_csv from_csv,Column,characterOrstructTypeOrColumn-method
#' @examples
#'
#' \dontrun{
#' df <- sql("SELECT 'Amsterdam,2018' as csv")
#' csv <- "Amsterdam,2018"
#' df <- sql(paste0("SELECT '", csv, "' as csv"))
#' schema <- "city STRING, year INT"
#' head(select(df, from_csv(df$csv, schema)))}
#' head(select(df, from_csv(df$csv, schema)))
#' head(select(df, from_csv(df$csv, structType(schema))))
#' head(select(df, from_csv(df$csv, schema_of_csv(csv))))}
#' @note from_csv since 3.0.0
setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
setMethod("from_csv", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
function(x, schema, ...) {
if (class(schema) == "Column") {
jschema <- schema@jc
} else if (is.character(schema)) {
if (class(schema) == "structType") {
schema <- callJMethod(schema$jobj, "toDDL")
}

if (is.character(schema)) {
jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema)
} else {
stop("schema argument should be a column or character")
jschema <- schema@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down Expand Up @@ -2666,7 +2686,7 @@ setMethod("format_string", signature(format = "character", x = "Column"),
#' \code{from_unixtime}: Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC)
#' to a string representing the timestamp of that moment in the current system time zone in the JVM
#' in the given format.
#' See \href{http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html}{
#' See \href{https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html}{
#' Customizing Formats} for available options.
#'
#' @rdname column_datetime_functions
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Expand Up @@ -1479,6 +1479,10 @@ setGeneric("spark.associationRules", function(object) { standardGeneric("spark.a
setGeneric("spark.findFrequentSequentialPatterns",
function(data, ...) { standardGeneric("spark.findFrequentSequentialPatterns") })

#' @rdname spark.powerIterationClustering
setGeneric("spark.assignClusters",
function(data, ...) { standardGeneric("spark.assignClusters") })

#' @param object a fitted ML model object.
#' @param path the directory where the model is saved.
#' @param ... additional argument(s) passed to the method.
Expand Down
59 changes: 59 additions & 0 deletions R/pkg/R/mllib_clustering.R
Expand Up @@ -41,6 +41,12 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' @note LDAModel since 2.1.0
setClass("LDAModel", representation(jobj = "jobj"))

#' S4 class that represents a PowerIterationClustering
#'
#' @param jobj a Java object reference to the backing Scala PowerIterationClustering
#' @note PowerIterationClustering since 3.0.0
setClass("PowerIterationClustering", slots = list(jobj = "jobj"))

#' Bisecting K-Means Clustering Model
#'
#' Fits a bisecting k-means clustering model against a SparkDataFrame.
Expand Down Expand Up @@ -610,3 +616,56 @@ setMethod("write.ml", signature(object = "LDAModel", path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
})

#' PowerIterationClustering
#'
#' A scalable graph clustering algorithm. Users can call \code{spark.assignClusters} to
#' return a cluster assignment for each input vertex.
#' Run the PIC algorithm and returns a cluster assignment for each input vertex.
#' @param data a SparkDataFrame.
#' @param k the number of clusters to create.
#' @param initMode the initialization algorithm; "random" or "degree"
#' @param maxIter the maximum number of iterations.
#' @param sourceCol the name of the input column for source vertex IDs.
#' @param destinationCol the name of the input column for destination vertex IDs
#' @param weightCol weight column name. If this is not set or \code{NULL},
#' we treat all instance weights as 1.0.
#' @param ... additional argument(s) passed to the method.
#' @return A dataset that contains columns of vertex id and the corresponding cluster for the id.
#' The schema of it will be: \code{id: integer}, \code{cluster: integer}
#' @rdname spark.powerIterationClustering
#' @aliases spark.assignClusters,SparkDataFrame-method
#' @examples
#' \dontrun{
#' df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
#' list(1L, 2L, 1.0), list(3L, 4L, 1.0),
#' list(4L, 0L, 0.1)),
#' schema = c("src", "dst", "weight"))
#' clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight")
#' showDF(clusters)
#' }
#' @note spark.assignClusters(SparkDataFrame) since 3.0.0
setMethod("spark.assignClusters",
signature(data = "SparkDataFrame"),
function(data, k = 2L, initMode = c("random", "degree"), maxIter = 20L,
sourceCol = "src", destinationCol = "dst", weightCol = NULL) {
if (!is.integer(k) || k < 1) {
stop("k should be a number with value >= 1.")
}
if (!is.integer(maxIter) || maxIter <= 0) {
stop("maxIter should be a number with value > 0.")
}
initMode <- match.arg(initMode)
if (!is.null(weightCol) && weightCol == "") {
weightCol <- NULL
} else if (!is.null(weightCol)) {
weightCol <- as.character(weightCol)
}
jobj <- callJStatic("org.apache.spark.ml.r.PowerIterationClusteringWrapper",
"getPowerIterationClustering",
as.integer(k), initMode,
as.integer(maxIter), as.character(sourceCol),
as.character(destinationCol), weightCol)
object <- new("PowerIterationClustering", jobj = jobj)
dataFrame(callJMethod(object@jobj, "assignClusters", data@sdf))
})
11 changes: 6 additions & 5 deletions R/pkg/R/mllib_fpm.R
Expand Up @@ -183,16 +183,17 @@ setMethod("write.ml", signature(object = "FPGrowthModel", path = "character"),
#' @return A complete set of frequent sequential patterns in the input sequences of itemsets.
#' The returned \code{SparkDataFrame} contains columns of sequence and corresponding
#' frequency. The schema of it will be:
#' \code{sequence: ArrayType(ArrayType(T))} (T is the item type)
#' \code{freq: Long}
#' \code{sequence: ArrayType(ArrayType(T))}, \code{freq: integer}
#' where T is the item type
#' @rdname spark.prefixSpan
#' @aliases findFrequentSequentialPatterns,PrefixSpan,SparkDataFrame-method
#' @examples
#' \dontrun{
#' df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
#' list(list(list(1L), list(3L, 2L), list(1L, 2L))),
#' list(list(list(1L, 2L), list(5L))),
#' list(list(list(6L)))), schema = c("sequence"))
#' list(list(list(1L), list(3L, 2L), list(1L, 2L))),
#' list(list(list(1L, 2L), list(5L))),
#' list(list(list(6L)))),
#' schema = c("sequence"))
#' frequency <- spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L,
#' maxLocalProjDBSize = 32000000L)
#' showDF(frequency)
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/R/mllib_tree.R
Expand Up @@ -157,7 +157,7 @@ print.summary.decisionTree <- function(x) {
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
Expand Down Expand Up @@ -382,7 +382,7 @@ setMethod("write.ml", signature(object = "GBTClassificationModel", path = "chara
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
Expand Down Expand Up @@ -588,7 +588,7 @@ setMethod("write.ml", signature(object = "RandomForestClassificationModel", path
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param maxMemoryInMB Maximum memory in MiB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
Expand Down
16 changes: 8 additions & 8 deletions R/pkg/inst/profile/shell.R
Expand Up @@ -33,19 +33,19 @@
sc <- SparkR:::callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", spark)
assign("sc", sc, envir = .GlobalEnv)
sparkVer <- SparkR:::callJMethod(sc, "version")
cat("\n Welcome to")
cat("\nWelcome to")
cat("\n")
cat(" ____ __", "\n")
cat(" / __/__ ___ _____/ /__", "\n")
cat(" _\\ \\/ _ \\/ _ `/ __/ '_/", "\n")
cat(" /___/ .__/\\_,_/_/ /_/\\_\\")
cat(" ____ __", "\n")
cat(" / __/__ ___ _____/ /__", "\n")
cat(" _\\ \\/ _ \\/ _ `/ __/ '_/", "\n")
cat(" /___/ .__/\\_,_/_/ /_/\\_\\")
if (nchar(sparkVer) == 0) {
cat("\n")
} else {
cat(" version ", sparkVer, "\n")
cat(" version", sparkVer, "\n")
}
cat(" /_/", "\n")
cat(" /_/", "\n")
cat("\n")

cat("\n SparkSession available as 'spark'.\n")
cat("\nSparkSession available as 'spark'.\n")
}
13 changes: 13 additions & 0 deletions R/pkg/tests/fulltests/test_mllib_clustering.R
Expand Up @@ -319,4 +319,17 @@ test_that("spark.posterior and spark.perplexity", {
expect_equal(length(local.posterior), sum(unlist(local.posterior)))
})

test_that("spark.assignClusters", {
df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
list(1L, 2L, 1.0), list(3L, 4L, 1.0),
list(4L, 0L, 0.1)),
schema = c("src", "dst", "weight"))
clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight")
expected_result <- createDataFrame(list(list(4L, 1L), list(0L, 0L),
list(1L, 0L), list(3L, 1L),
list(2L, 0L)),
schema = c("id", "cluster"))
expect_equivalent(expected_result, clusters)
})

sparkR.session.stop()

0 comments on commit 357f834

Please sign in to comment.