Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jun 27, 2020
2 parents 533dd8d + 9c134b5 commit 9fba69e
Show file tree
Hide file tree
Showing 298 changed files with 6,659 additions and 3,116 deletions.
4 changes: 2 additions & 2 deletions R/WINDOWS.md
Expand Up @@ -22,8 +22,8 @@ To build SparkR on Windows, the following steps are required

1. Make sure `bash` is available and in `PATH` if you already have a built-in `bash` on Windows. If you do not have, install [Cygwin](https://www.cygwin.com/).

2. Install R (>= 3.1) and [Rtools](https://cloud.r-project.org/bin/windows/Rtools/). Make sure to
include Rtools and R in `PATH`. Note that support for R prior to version 3.4 is deprecated as of Spark 3.0.0.
2. Install R (>= 3.5) and [Rtools](https://cloud.r-project.org/bin/windows/Rtools/). Make sure to
include Rtools and R in `PATH`.

3. Install JDK that SparkR supports (see `R/pkg/DESCRIPTION`), and set `JAVA_HOME` in the system environment variables.

Expand Down
8 changes: 7 additions & 1 deletion R/install-dev.bat
Expand Up @@ -24,7 +24,13 @@ set SPARK_HOME=%~dp0..

MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\
rem When you pass the package path directly as an argument to R CMD INSTALL,
rem it takes the path as 'C:\projects\spark\R\..\R\pkg"' as an example at
rem R 4.0. To work around this, directly go to the directoy and install it.
rem See also SPARK-32074
pushd %SPARK_HOME%\R\pkg\
R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" .
popd

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Expand Up @@ -15,7 +15,7 @@ URL: https://www.apache.org/ https://spark.apache.org/
BugReports: https://spark.apache.org/contributing.html
SystemRequirements: Java (>= 8, < 12)
Depends:
R (>= 3.1),
R (>= 3.5),
methods
Suggests:
knitr,
Expand Down
5 changes: 3 additions & 2 deletions R/pkg/R/mllib_fpm.R
Expand Up @@ -122,11 +122,12 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"),
# Get association rules.

#' @return A \code{SparkDataFrame} with association rules.
#' The \code{SparkDataFrame} contains four columns:
#' The \code{SparkDataFrame} contains five columns:
#' \code{antecedent} (an array of the same type as the input column),
#' \code{consequent} (an array of the same type as the input column),
#' \code{condfidence} (confidence for the rule)
#' and \code{lift} (lift for the rule)
#' \code{lift} (lift for the rule)
#' and \code{support} (support for the rule)
#' @rdname spark.fpGrowth
#' @aliases associationRules,FPGrowthModel-method
#' @note spark.associationRules(FPGrowthModel) since 2.2.0
Expand Down
5 changes: 4 additions & 1 deletion R/pkg/R/utils.R
Expand Up @@ -529,7 +529,10 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
# Namespaces other than "SparkR" will not be searched.
if (!isNamespace(func.env) ||
(getNamespaceName(func.env) == "SparkR" &&
!(nodeChar %in% getNamespaceExports("SparkR")))) {
!(nodeChar %in% getNamespaceExports("SparkR")) &&
# Note that generic S4 methods should not be set to the environment of
# cleaned closure. It does not work with R 4.0.0+. See also SPARK-31918.
nodeChar != "" && !methods::isGeneric(nodeChar, func.env))) {
# Only include SparkR internals.

# Set parameter 'inherits' to FALSE since we do not need to search in
Expand Down
4 changes: 0 additions & 4 deletions R/pkg/inst/profile/general.R
Expand Up @@ -16,10 +16,6 @@
#

.First <- function() {
if (utils::compareVersion(paste0(R.version$major, ".", R.version$minor), "3.4.0") == -1) {
warning("Support for R prior to version 3.4 is deprecated since Spark 3.0.0")
}

packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
dirs <- strsplit(packageDir, ",")[[1]]
.libPaths(c(dirs, .libPaths()))
Expand Down
4 changes: 0 additions & 4 deletions R/pkg/inst/profile/shell.R
Expand Up @@ -16,10 +16,6 @@
#

.First <- function() {
if (utils::compareVersion(paste0(R.version$major, ".", R.version$minor), "3.4.0") == -1) {
warning("Support for R prior to version 3.4 is deprecated since Spark 3.0.0")
}

home <- Sys.getenv("SPARK_HOME")
.libPaths(c(file.path(home, "R", "lib"), .libPaths()))
Sys.setenv(NOAWT = 1)
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/tests/fulltests/test_context.R
Expand Up @@ -26,7 +26,9 @@ test_that("Check masked functions", {
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop", "window", "as.data.frame", "union", "not")
version <- packageVersion("base")
if (as.numeric(version$major) >= 3 && as.numeric(version$minor) >= 3) {
is33Above <- as.numeric(version$major) >= 3 && as.numeric(version$minor) >= 3
is40Above <- as.numeric(version$major) >= 4
if (is33Above || is40Above) {
namesOfMasked <- c("endsWith", "startsWith", namesOfMasked)
}
masked <- conflicts(detail = TRUE)$`package:SparkR`
Expand Down
18 changes: 9 additions & 9 deletions R/pkg/tests/fulltests/test_mllib_classification.R
Expand Up @@ -34,7 +34,7 @@ test_that("spark.svmLinear", {
summary <- summary(model)

# test summary coefficients return matrix type
expect_true(class(summary$coefficients) == "matrix")
expect_true(any(class(summary$coefficients) == "matrix"))
expect_true(class(summary$coefficients[, 1]) == "numeric")

coefs <- summary$coefficients[, "Estimate"]
Expand Down Expand Up @@ -130,7 +130,7 @@ test_that("spark.logit", {
summary <- summary(model)

# test summary coefficients return matrix type
expect_true(class(summary$coefficients) == "matrix")
expect_true(any(class(summary$coefficients) == "matrix"))
expect_true(class(summary$coefficients[, 1]) == "numeric")

versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00)
Expand Down Expand Up @@ -242,8 +242,8 @@ test_that("spark.logit", {
# Test binomial logistic regression against two classes with upperBoundsOnCoefficients
# and upperBoundsOnIntercepts
u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
upperBoundsOnIntercepts = 1.0)
model <- suppressWarnings(spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
upperBoundsOnIntercepts = 1.0))
summary <- summary(model)
coefsR <- c(-11.13331, 1.00000, 0.00000, 1.00000, 0.00000)
coefs <- summary$coefficients[, "Estimate"]
Expand All @@ -255,8 +255,8 @@ test_that("spark.logit", {
# Test binomial logistic regression against two classes with lowerBoundsOnCoefficients
# and lowerBoundsOnIntercepts
l <- matrix(c(0.0, -1.0, 0.0, -1.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,
lowerBoundsOnIntercepts = 0.0)
model <- suppressWarnings(spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,
lowerBoundsOnIntercepts = 0.0))
summary <- summary(model)
coefsR <- c(0, 0, -1, 0, 1.902192)
coefs <- summary$coefficients[, "Estimate"]
Expand All @@ -268,9 +268,9 @@ test_that("spark.logit", {
# Test multinomial logistic regression with lowerBoundsOnCoefficients
# and lowerBoundsOnIntercepts
l <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4)
model <- spark.logit(training, Species ~ ., family = "multinomial",
lowerBoundsOnCoefficients = l,
lowerBoundsOnIntercepts = as.array(c(0.0, 0.0)))
model <- suppressWarnings(spark.logit(training, Species ~ ., family = "multinomial",
lowerBoundsOnCoefficients = l,
lowerBoundsOnIntercepts = as.array(c(0.0, 0.0))))
summary <- summary(model)
versicolorCoefsR <- c(42.639465, 7.258104, 14.330814, 16.298243, 11.716429)
virginicaCoefsR <- c(0.0002970796, 4.79274, 7.65047, 25.72793, 30.0021)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_mllib_clustering.R
Expand Up @@ -171,7 +171,7 @@ test_that("spark.kmeans", {
expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))

# test summary coefficients return matrix type
expect_true(class(summary.model$coefficients) == "matrix")
expect_true(any(class(summary.model$coefficients) == "matrix"))
expect_true(class(summary.model$coefficients[1, ]) == "numeric")

# Test model save/load
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_mllib_fpm.R
Expand Up @@ -45,7 +45,8 @@ test_that("spark.fpGrowth", {
antecedent = I(list(list("2"), list("3"))),
consequent = I(list(list("1"), list("1"))),
confidence = c(1, 1),
lift = c(1, 1)
lift = c(1, 1),
support = c(0.75, 0.5)
)

expect_equivalent(expected_association_rules, collect(spark.associationRules(model)))
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_mllib_regression.R
Expand Up @@ -116,7 +116,7 @@ test_that("spark.glm summary", {
rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset))

# test summary coefficients return matrix type
expect_true(class(stats$coefficients) == "matrix")
expect_true(any(class(stats$coefficients) == "matrix"))
expect_true(class(stats$coefficients[, 1]) == "numeric")

coefs <- stats$coefficients
Expand Down
Expand Up @@ -19,8 +19,10 @@

import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -64,6 +66,13 @@ public class LevelDB implements KVStore {
private final ConcurrentMap<String, byte[]> typeAliases;
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;

/**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to
* ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
private final ConcurrentLinkedQueue<SoftReference<LevelDBIterator<?>>> iteratorTracker;

public LevelDB(File path) throws Exception {
this(path, new KVStoreSerializer());
}
Expand Down Expand Up @@ -94,6 +103,8 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
aliases = new HashMap<>();
}
typeAliases = new ConcurrentHashMap<>(aliases);

iteratorTracker = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -189,7 +200,9 @@ public <T> KVStoreView<T> view(Class<T> type) throws Exception {
@Override
public Iterator<T> iterator() {
try {
return new LevelDBIterator<>(type, LevelDB.this, this);
LevelDBIterator<T> it = new LevelDBIterator<>(type, LevelDB.this, this);
iteratorTracker.add(new SoftReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down Expand Up @@ -238,6 +251,14 @@ public void close() throws IOException {
}

try {
if (iteratorTracker != null) {
for (SoftReference<LevelDBIterator<?>> ref: iteratorTracker) {
LevelDBIterator<?> it = ref.get();
if (it != null) {
it.close();
}
}
}
_db.close();
} catch (IOException ioe) {
throw ioe;
Expand All @@ -252,6 +273,7 @@ public void close() throws IOException {
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator<?> it) throws IOException {
notifyIteratorClosed(it);
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
Expand All @@ -260,6 +282,14 @@ void closeIterator(LevelDBIterator<?> it) throws IOException {
}
}

/**
* Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify
* iterator is closed.
*/
void notifyIteratorClosed(LevelDBIterator<?> it) {
iteratorTracker.removeIf(ref -> it.equals(ref.get()));
}

/** Returns metadata about indices for the given type. */
LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
LevelDBTypeInfo ti = types.get(type);
Expand Down
Expand Up @@ -185,6 +185,7 @@ public boolean skip(long n) {

@Override
public synchronized void close() throws IOException {
db.notifyIteratorClosed(this);
if (!closed) {
it.close();
closed = true;
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -276,6 +277,41 @@ public void testNegativeIndexValues() throws Exception {
assertEquals(expected, results);
}

@Test
public void testCloseLevelDBIterator() throws Exception {
// SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators
// are closed. And files opened by iterators are also closed.
File dbPathForCloseTest = File
.createTempFile(
"test_db_close.",
".ldb");
dbPathForCloseTest.delete();
LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest);
for (int i = 0; i < 8192; i++) {
dbForCloseTest.write(createCustomType1(i));
}
String key = dbForCloseTest
.view(CustomType1.class).iterator().next().key;
assertEquals("key0", key);
Iterator<CustomType1> it0 = dbForCloseTest
.view(CustomType1.class).max(1).iterator();
while (it0.hasNext()) {
it0.next();
}
System.gc();
Iterator<CustomType1> it1 = dbForCloseTest
.view(CustomType1.class).iterator();
assertEquals("key0", it1.next().key);
try (KVStoreIterator<CustomType1> it2 = dbForCloseTest
.view(CustomType1.class).closeableIterator()) {
assertEquals("key0", it2.next().key);
}
dbForCloseTest.close();
assertTrue(dbPathForCloseTest.exists());
FileUtils.deleteQuietly(dbPathForCloseTest);
assertTrue(!dbPathForCloseTest.exists());
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;

/**
* :: Private ::
Expand Down Expand Up @@ -60,10 +61,15 @@ public interface ShuffleMapOutputWriter {
* <p>
* This can also close any resources and clean up temporary state if necessary.
* <p>
* The returned array should contain, for each partition from (0) to (numPartitions - 1), the
* number of bytes written by the partition writer for that partition id.
* The returned commit message is a structure with two components:
* <p>
* 1) An array of longs, which should contain, for each partition from (0) to
* (numPartitions - 1), the number of bytes written by the partition writer
* for that partition id.
* <p>
* 2) An optional metadata blob that can be used by shuffle readers.
*/
long[] commitAllPartitions() throws IOException;
MapOutputCommitMessage commitAllPartitions() throws IOException;

/**
* Abort all of the writes done by any writers returned by {@link #getPartitionWriter(int)}.
Expand Down

0 comments on commit 9fba69e

Please sign in to comment.