Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Mar 2, 2016
2 parents 4ac73d9 + b4d096d commit b7fc39e
Show file tree
Hide file tree
Showing 557 changed files with 11,472 additions and 4,490 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(Please fill in changes proposed in this fix)


## How was the this patch tested?
## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

Expand Down
5 changes: 4 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ export("print.jobj")
# MLlib integration
exportMethods("glm",
"predict",
"summary")
"summary",
"kmeans",
"fitted")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down Expand Up @@ -109,6 +111,7 @@ exportMethods("%in%",
"add_months",
"alias",
"approxCountDistinct",
"approxQuantile",
"array_contains",
"asc",
"ascii",
Expand Down
15 changes: 15 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })
# @export
setGeneric("freqItems", function(x, cols, support = 0.01) { standardGeneric("freqItems") })

# @rdname statfunctions
# @export
setGeneric("approxQuantile",
function(x, col, probabilities, relativeError) {
standardGeneric("approxQuantile")
})

# @rdname distinct
# @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
Expand Down Expand Up @@ -1160,3 +1167,11 @@ setGeneric("predict", function(object, ...) { standardGeneric("predict") })
#' @rdname rbind
#' @export
setGeneric("rbind", signature = "...")

#' @rdname kmeans
#' @export
setGeneric("kmeans")

#' @rdname fitted
#' @export
setGeneric("fitted")
74 changes: 70 additions & 4 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ setMethod("predict", signature(object = "PipelineModel"),
setMethod("summary", signature(object = "PipelineModel"),
function(object, ...) {
modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelName", object@model)
"getModelName", object@model)
features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelFeatures", object@model)
"getModelFeatures", object@model)
coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelCoefficients", object@model)
"getModelCoefficients", object@model)
if (modelName == "LinearRegressionModel") {
devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelDevianceResiduals", object@model)
Expand All @@ -119,10 +119,76 @@ setMethod("summary", signature(object = "PipelineModel"),
colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)")
rownames(coefficients) <- unlist(features)
return(list(devianceResiduals = devianceResiduals, coefficients = coefficients))
} else {
} else if (modelName == "LogisticRegressionModel") {
coefficients <- as.matrix(unlist(coefficients))
colnames(coefficients) <- c("Estimate")
rownames(coefficients) <- unlist(features)
return(list(coefficients = coefficients))
} else if (modelName == "KMeansModel") {
modelSize <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getKMeansModelSize", object@model)
cluster <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getKMeansCluster", object@model, "classes")
k <- unlist(modelSize)[1]
size <- unlist(modelSize)[-1]
coefficients <- t(matrix(coefficients, ncol = k))
colnames(coefficients) <- unlist(features)
rownames(coefficients) <- 1:k
return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster)))
} else {
stop(paste("Unsupported model", modelName, sep = " "))
}
})

#' Fit a k-means model
#'
#' Fit a k-means model, similarly to R's kmeans().
#'
#' @param x DataFrame for training
#' @param centers Number of centers
#' @param iter.max Maximum iteration number
#' @param algorithm Algorithm choosen to fit the model
#' @return A fitted k-means model
#' @rdname kmeans
#' @export
#' @examples
#'\dontrun{
#' model <- kmeans(x, centers = 2, algorithm="random")
#'}
setMethod("kmeans", signature(x = "DataFrame"),
function(x, centers, iter.max = 10, algorithm = c("random", "k-means||")) {
columnNames <- as.array(colnames(x))
algorithm <- match.arg(algorithm)
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", x@sdf,
algorithm, iter.max, centers, columnNames)
return(new("PipelineModel", model = model))
})

#' Get fitted result from a model
#'
#' Get fitted result from a model, similarly to R's fitted().
#'
#' @param object A fitted MLlib model
#' @return DataFrame containing fitted values
#' @rdname fitted
#' @export
#' @examples
#'\dontrun{
#' model <- kmeans(trainingData, 2)
#' fitted.model <- fitted(model)
#' showDF(fitted.model)
#'}
setMethod("fitted", signature(object = "PipelineModel"),
function(object, method = c("centers", "classes"), ...) {
modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelName", object@model)

if (modelName == "KMeansModel") {
method <- match.arg(method)
fittedResult <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getKMeansCluster", object@model, method)
return(dataFrame(fittedResult))
} else {
stop(paste("Unsupported model", modelName, sep = " "))
}
})
39 changes: 39 additions & 0 deletions R/pkg/R/stats.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,45 @@ setMethod("freqItems", signature(x = "DataFrame", cols = "character"),
collect(dataFrame(sct))
})

#' approxQuantile
#'
#' Calculates the approximate quantiles of a numerical column of a DataFrame.
#'
#' The result of this algorithm has the following deterministic bound:
#' If the DataFrame has N elements and if we request the quantile at probability `p` up to error
#' `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
#' of `x` is close to (p * N). More precisely,
#' floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
#' This method implements a variation of the Greenwald-Khanna algorithm (with some speed
#' optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670
#' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
#'
#' @param x A SparkSQL DataFrame.
#' @param col The name of the numerical column.
#' @param probabilities A list of quantile probabilities. Each number must belong to [0, 1].
#' For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
#' @param relativeError The relative target precision to achieve (>= 0). If set to zero,
#' the exact quantiles are computed, which could be very expensive.
#' Note that values greater than 1 are accepted but give the same result as 1.
#' @return The approximate quantiles at the given probabilities.
#'
#' @rdname statfunctions
#' @name approxQuantile
#' @export
#' @examples
#' \dontrun{
#' df <- jsonFile(sqlContext, "/path/to/file.json")
#' quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0)
#' }
setMethod("approxQuantile",
signature(x = "DataFrame", col = "character",
probabilities = "numeric", relativeError = "numeric"),
function(x, col, probabilities, relativeError) {
statFunctions <- callJMethod(x@sdf, "stat")
callJMethod(statFunctions, "approxQuantile", col,
as.list(probabilities), relativeError)
})

#' sampleBy
#'
#' Returns a stratified sample without replacement based on the fraction given on each stratum.
Expand Down
28 changes: 28 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,31 @@ test_that("summary works on base GLM models", {
baseSummary <- summary(baseModel)
expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
})

test_that("kmeans", {
newIris <- iris
newIris$Species <- NULL
training <- suppressWarnings(createDataFrame(sqlContext, newIris))

# Cache the DataFrame here to work around the bug SPARK-13178.
cache(training)
take(training, 1)

model <- kmeans(x = training, centers = 2)
sample <- take(select(predict(model, training), "prediction"), 1)
expect_equal(typeof(sample$prediction), "integer")
expect_equal(sample$prediction, 1)

# Test stats::kmeans is working
statsModel <- kmeans(x = newIris, centers = 2)
expect_equal(sort(unique(statsModel$cluster)), c(1, 2))

# Test fitted works on KMeans
fitted.model <- fitted(model)
expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1))

# Test summary works on KMeans
summary.model <- summary(model)
cluster <- summary.model$cluster
expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))
})
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,14 @@ test_that("sampleBy() on a DataFrame", {
expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
})

test_that("approxQuantile() on a DataFrame", {
l <- lapply(c(0:99), function(i) { i })
df <- createDataFrame(sqlContext, l, "key")
quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0)
expect_equal(quantiles[[1]], 50)
expect_equal(quantiles[[2]], 80)
})

test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table not found: blah", retError), TRUE)
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-submit.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C spark-submit2.cmd %*
cmd /V /E /C "%~dp0spark-submit2.cmd" %*
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* A {@link ManagedBuffer} backed by {@link ByteBuffer}.
*/
public final class NioManagedBuffer extends ManagedBuffer {
public class NioManagedBuffer extends ManagedBuffer {
private final ByteBuffer buf;

public NioManagedBuffer(ByteBuffer buf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public int ioRetryWaitTimeMs() {
* memory mapping has high overhead for blocks close to or below the page size of the OS.
*/
public int memoryMapBytes() {
return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
return Ints.checkedCast(JavaUtils.byteStringAsBytes(
conf.get("spark.storage.memoryMapThreshold", "2m")));
}

/**
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
*
* This implementation is largely based on the {@code CountMinSketch} class from stream-lib.
*/
abstract public class CountMinSketch {
public abstract class CountMinSketch {

public enum Version {
/**
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion tags/pom.xml → common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
Expand Down
2 changes: 1 addition & 1 deletion unsafe/pom.xml → common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
<tbody>
{{#applications}}
<tr>
<td class="rowGroupColumn"><a href="/history/{{id}}/{{num}}/jobs/">{{id}}</a></td>
<td class="rowGroupColumn"><span title="{{id}}"><a href="/history/{{id}}/{{num}}/jobs/">{{id}}</a></span></td>
<td class="rowGroupColumn">{{name}}</td>
{{#attempts}}
<td class="attemptIDSpan"><a href="/history/{{id}}/{{attemptId}}/jobs/">{{attemptId}}</a></td>
Expand Down
36 changes: 34 additions & 2 deletions core/src/main/resources/org/apache/spark/ui/static/historypage.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ function formatDuration(milliseconds) {
return hours.toFixed(1) + " h";
}

function makeIdNumeric(id) {
var strs = id.split("_");
if (strs.length < 3) {
return id;
}
var appSeqNum = strs[2];
var resl = strs[0] + "_" + strs[1] + "_";
var diff = 10 - appSeqNum.length;
while (diff > 0) {
resl += "0"; // padding 0 before the app sequence number to make sure it has 10 characters
diff--;
}
resl += appSeqNum;
return resl;
}

function formatDate(date) {
return date.split(".")[0].replace("T", " ");
}
Expand All @@ -62,6 +78,21 @@ jQuery.extend( jQuery.fn.dataTableExt.oSort, {
}
} );

jQuery.extend( jQuery.fn.dataTableExt.oSort, {
"appid-numeric-pre": function ( a ) {
var x = a.match(/title="*(-?[0-9a-zA-Z\-\_]+)/)[1];
return makeIdNumeric(x);
},

"appid-numeric-asc": function ( a, b ) {
return ((a < b) ? -1 : ((a > b) ? 1 : 0));
},

"appid-numeric-desc": function ( a, b ) {
return ((a < b) ? 1 : ((a > b) ? -1 : 0));
}
} );

$(document).ajaxStop($.unblockUI);
$(document).ajaxStart(function(){
$.blockUI({ message: '<h3>Loading history summary...</h3>'});
Expand Down Expand Up @@ -109,7 +140,7 @@ $(document).ready(function() {
var selector = "#history-summary-table";
var conf = {
"columns": [
{name: 'first'},
{name: 'first', type: "appid-numeric"},
{name: 'second'},
{name: 'third'},
{name: 'fourth'},
Expand All @@ -118,7 +149,8 @@ $(document).ready(function() {
{name: 'seventh'},
{name: 'eighth'},
],
"autoWidth": false
"autoWidth": false,
"order": [[ 0, "desc" ]]
};

var rowGroupConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,11 @@ function renderDagVizForJob(svgContainer) {
var attemptId = 0
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
.select("a.name-link")
.attr("href") + "&expandDagViz=true";
.attr("href");
container = svgContainer
.append("a")
.attr("xlink:href", stageLink)
.attr("onclick", "window.localStorage.setItem(expandDagVizArrowKey(false), true)")
.append("g")
.attr("id", containerId);
}
Expand Down
Loading

0 comments on commit b7fc39e

Please sign in to comment.