Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into nondeter-join
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Aug 8, 2017
2 parents b80d2fc + f763d84 commit abf51f7
Show file tree
Hide file tree
Showing 433 changed files with 10,563 additions and 3,305 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -47,6 +47,8 @@ dev/pr-deps/
dist/
docs/_site
docs/api
sql/docs
sql/site
lib_managed/
lint-r-report.log
log/
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Expand Up @@ -286,6 +286,8 @@ exportMethods("%<=>%",
"lower",
"lpad",
"ltrim",
"map_keys",
"map_values",
"max",
"md5",
"mean",
Expand Down
33 changes: 32 additions & 1 deletion R/pkg/R/functions.R
Expand Up @@ -195,7 +195,10 @@ NULL
#' head(tmp2)
#' head(select(tmp, posexplode(tmp$v1)))
#' head(select(tmp, sort_array(tmp$v1)))
#' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))}
#' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))
#' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl))
#' head(select(tmp3, map_keys(tmp3$v3)))
#' head(select(tmp3, map_values(tmp3$v3)))}
NULL

#' Window functions for Column operations
Expand Down Expand Up @@ -3055,6 +3058,34 @@ setMethod("array_contains",
column(jc)
})

#' @details
#' \code{map_keys}: Returns an unordered array containing the keys of the map.
#'
#' @rdname column_collection_functions
#' @aliases map_keys map_keys,Column-method
#' @export
#' @note map_keys since 2.3.0
setMethod("map_keys",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "map_keys", x@jc)
column(jc)
})

#' @details
#' \code{map_values}: Returns an unordered array containing the values of the map.
#'
#' @rdname column_collection_functions
#' @aliases map_values map_values,Column-method
#' @export
#' @note map_values since 2.3.0
setMethod("map_values",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "map_values", x@jc)
column(jc)
})

#' @details
#' \code{explode}: Creates a new row for each element in the given array or map column.
#'
Expand Down
10 changes: 10 additions & 0 deletions R/pkg/R/generics.R
Expand Up @@ -1213,6 +1213,16 @@ setGeneric("lpad", function(x, len, pad) { standardGeneric("lpad") })
#' @name NULL
setGeneric("ltrim", function(x) { standardGeneric("ltrim") })

#' @rdname column_collection_functions
#' @export
#' @name NULL
setGeneric("map_keys", function(x) { standardGeneric("map_keys") })

#' @rdname column_collection_functions
#' @export
#' @name NULL
setGeneric("map_values", function(x) { standardGeneric("map_values") })

#' @rdname column_misc_functions
#' @export
#' @name NULL
Expand Down
49 changes: 40 additions & 9 deletions R/pkg/R/mllib_classification.R
Expand Up @@ -69,6 +69,11 @@ setClass("NaiveBayesModel", representation(jobj = "jobj"))
#' @param aggregationDepth The depth for treeAggregate (greater than or equal to 2). If the dimensions of features
#' or the number of partitions are large, this param could be adjusted to a larger size.
#' This is an expert parameter. Default value should be good for most cases.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @return \code{spark.svmLinear} returns a fitted linear SVM model.
#' @rdname spark.svmLinear
Expand Down Expand Up @@ -98,7 +103,8 @@ setClass("NaiveBayesModel", representation(jobj = "jobj"))
#' @note spark.svmLinear since 2.2.0
setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, regParam = 0.0, maxIter = 100, tol = 1E-6, standardization = TRUE,
threshold = 0.0, weightCol = NULL, aggregationDepth = 2) {
threshold = 0.0, weightCol = NULL, aggregationDepth = 2,
handleInvalid = c("error", "keep", "skip")) {
formula <- paste(deparse(formula), collapse = "")

if (!is.null(weightCol) && weightCol == "") {
Expand All @@ -107,10 +113,12 @@ setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formu
weightCol <- as.character(weightCol)
}

handleInvalid <- match.arg(handleInvalid)

jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit",
data@sdf, formula, as.numeric(regParam), as.integer(maxIter),
as.numeric(tol), as.logical(standardization), as.numeric(threshold),
weightCol, as.integer(aggregationDepth))
weightCol, as.integer(aggregationDepth), handleInvalid)
new("LinearSVCModel", jobj = jobj)
})

Expand Down Expand Up @@ -218,6 +226,11 @@ function(object, path, overwrite = FALSE) {
#' @param upperBoundsOnIntercepts The upper bounds on intercepts if fitting under bound constrained optimization.
#' The bound vector size must be equal to 1 for binomial regression, or the number
#' of classes for multinomial regression.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @return \code{spark.logit} returns a fitted logistic regression model.
#' @rdname spark.logit
Expand Down Expand Up @@ -257,7 +270,8 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
tol = 1E-6, family = "auto", standardization = TRUE,
thresholds = 0.5, weightCol = NULL, aggregationDepth = 2,
lowerBoundsOnCoefficients = NULL, upperBoundsOnCoefficients = NULL,
lowerBoundsOnIntercepts = NULL, upperBoundsOnIntercepts = NULL) {
lowerBoundsOnIntercepts = NULL, upperBoundsOnIntercepts = NULL,
handleInvalid = c("error", "keep", "skip")) {
formula <- paste(deparse(formula), collapse = "")
row <- 0
col <- 0
Expand Down Expand Up @@ -304,6 +318,8 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
upperBoundsOnCoefficients <- as.array(as.vector(upperBoundsOnCoefficients))
}

handleInvalid <- match.arg(handleInvalid)

jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit",
data@sdf, formula, as.numeric(regParam),
as.numeric(elasticNetParam), as.integer(maxIter),
Expand All @@ -312,7 +328,8 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
weightCol, as.integer(aggregationDepth),
as.integer(row), as.integer(col),
lowerBoundsOnCoefficients, upperBoundsOnCoefficients,
lowerBoundsOnIntercepts, upperBoundsOnIntercepts)
lowerBoundsOnIntercepts, upperBoundsOnIntercepts,
handleInvalid)
new("LogisticRegressionModel", jobj = jobj)
})

Expand Down Expand Up @@ -394,7 +411,12 @@ setMethod("write.ml", signature(object = "LogisticRegressionModel", path = "char
#' @param stepSize stepSize parameter.
#' @param seed seed parameter for weights initialization.
#' @param initialWeights initialWeights parameter for weights initialization, it should be a
#' numeric vector.
#' numeric vector.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @return \code{spark.mlp} returns a fitted Multilayer Perceptron Classification Model.
#' @rdname spark.mlp
Expand Down Expand Up @@ -426,7 +448,8 @@ setMethod("write.ml", signature(object = "LogisticRegressionModel", path = "char
#' @note spark.mlp since 2.1.0
setMethod("spark.mlp", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100,
tol = 1E-6, stepSize = 0.03, seed = NULL, initialWeights = NULL) {
tol = 1E-6, stepSize = 0.03, seed = NULL, initialWeights = NULL,
handleInvalid = c("error", "keep", "skip")) {
formula <- paste(deparse(formula), collapse = "")
if (is.null(layers)) {
stop ("layers must be a integer vector with length > 1.")
Expand All @@ -441,10 +464,11 @@ setMethod("spark.mlp", signature(data = "SparkDataFrame", formula = "formula"),
if (!is.null(initialWeights)) {
initialWeights <- as.array(as.numeric(na.omit(initialWeights)))
}
handleInvalid <- match.arg(handleInvalid)
jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
"fit", data@sdf, formula, as.integer(blockSize), as.array(layers),
as.character(solver), as.integer(maxIter), as.numeric(tol),
as.numeric(stepSize), seed, initialWeights)
as.numeric(stepSize), seed, initialWeights, handleInvalid)
new("MultilayerPerceptronClassificationModel", jobj = jobj)
})

Expand Down Expand Up @@ -514,6 +538,11 @@ setMethod("write.ml", signature(object = "MultilayerPerceptronClassificationMode
#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' @param smoothing smoothing parameter.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional argument(s) passed to the method. Currently only \code{smoothing}.
#' @return \code{spark.naiveBayes} returns a fitted naive Bayes model.
#' @rdname spark.naiveBayes
Expand Down Expand Up @@ -543,10 +572,12 @@ setMethod("write.ml", signature(object = "MultilayerPerceptronClassificationMode
#' }
#' @note spark.naiveBayes since 2.0.0
setMethod("spark.naiveBayes", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, smoothing = 1.0) {
function(data, formula, smoothing = 1.0,
handleInvalid = c("error", "keep", "skip")) {
formula <- paste(deparse(formula), collapse = "")
handleInvalid <- match.arg(handleInvalid)
jobj <- callJStatic("org.apache.spark.ml.r.NaiveBayesWrapper", "fit",
formula, data@sdf, smoothing)
formula, data@sdf, smoothing, handleInvalid)
new("NaiveBayesModel", jobj = jobj)
})

Expand Down
22 changes: 18 additions & 4 deletions R/pkg/R/mllib_regression.R
Expand Up @@ -76,6 +76,8 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj"))
#' "frequencyDesc", "frequencyAsc", "alphabetDesc", and "alphabetAsc".
#' The default value is "frequencyDesc". When the ordering is set to
#' "alphabetDesc", this drops the same category as R when encoding strings.
#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets
#' as 0.0. The feature specified as offset has a constant coefficient of 1.0.
#' @param ... additional arguments passed to the method.
#' @aliases spark.glm,SparkDataFrame,formula-method
#' @return \code{spark.glm} returns a fitted generalized linear model.
Expand Down Expand Up @@ -127,7 +129,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL,
regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power,
stringIndexerOrderType = c("frequencyDesc", "frequencyAsc",
"alphabetDesc", "alphabetAsc")) {
"alphabetDesc", "alphabetAsc"),
offsetCol = NULL) {

stringIndexerOrderType <- match.arg(stringIndexerOrderType)
if (is.character(family)) {
Expand Down Expand Up @@ -159,12 +162,19 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
weightCol <- as.character(weightCol)
}

if (!is.null(offsetCol)) {
offsetCol <- as.character(offsetCol)
if (nchar(offsetCol) == 0) {
offsetCol <- NULL
}
}

# For known families, Gamma is upper-cased
jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
"fit", formula, data@sdf, tolower(family$family), family$link,
tol, as.integer(maxIter), weightCol, regParam,
as.double(var.power), as.double(link.power),
stringIndexerOrderType)
stringIndexerOrderType, offsetCol)
new("GeneralizedLinearRegressionModel", jobj = jobj)
})

Expand Down Expand Up @@ -192,6 +202,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
#' "frequencyDesc", "frequencyAsc", "alphabetDesc", and "alphabetAsc".
#' The default value is "frequencyDesc". When the ordering is set to
#' "alphabetDesc", this drops the same category as R when encoding strings.
#' @param offsetCol the offset column name. If this is not set or empty, we treat all instance offsets
#' as 0.0. The feature specified as offset has a constant coefficient of 1.0.
#' @return \code{glm} returns a fitted generalized linear model.
#' @rdname glm
#' @export
Expand All @@ -209,10 +221,12 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "SparkDat
function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 25, weightCol = NULL,
var.power = 0.0, link.power = 1.0 - var.power,
stringIndexerOrderType = c("frequencyDesc", "frequencyAsc",
"alphabetDesc", "alphabetAsc")) {
"alphabetDesc", "alphabetAsc"),
offsetCol = NULL) {
spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, weightCol = weightCol,
var.power = var.power, link.power = link.power,
stringIndexerOrderType = stringIndexerOrderType)
stringIndexerOrderType = stringIndexerOrderType,
offsetCol = offsetCol)
})

# Returns the summary of a model produced by glm() or spark.glm(), similarly to R's summary().
Expand Down

0 comments on commit abf51f7

Please sign in to comment.