Skip to content

Commit

Permalink
[SPARK-33084][CORE][SQL] Rename Unit test file and use fake ivy link
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
According to apache/spark#29966 (comment)
Use wrong name about suite file, this pr to fix this problem.
And change to use some fake ivy link for this test

### Why are the changes needed?
Follow file name rule

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No

Closes #31118 from AngersZhuuuu/SPARK-33084-FOLLOW-UP.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
a0x8o committed Jan 11, 2021
1 parent 4daea24 commit 1fded6e
Show file tree
Hide file tree
Showing 214 changed files with 7,112 additions and 4,194 deletions.
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Expand Up @@ -11,7 +11,7 @@ Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
email = "felixcheung@apache.org"),
person(family = "The Apache Software Foundation", role = c("aut", "cph")))
License: Apache License (== 2.0)
URL: https://www.apache.org/ https://spark.apache.org/
URL: https://www.apache.org https://spark.apache.org
BugReports: https://spark.apache.org/contributing.html
SystemRequirements: Java (>= 8, < 12)
Depends:
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Expand Up @@ -880,7 +880,7 @@ setMethod("toJSON",

#' Save the contents of SparkDataFrame as a JSON file
#'
#' Save the contents of a SparkDataFrame as a JSON file (\href{http://jsonlines.org/}{
#' Save the contents of a SparkDataFrame as a JSON file (\href{https://jsonlines.org/}{
#' JSON Lines text format or newline-delimited JSON}). Files written out
#' with this method can be read back in as a SparkDataFrame using read.json().
#'
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/SQLContext.R
Expand Up @@ -374,7 +374,7 @@ setMethod("toDF", signature(x = "RDD"),
#' Create a SparkDataFrame from a JSON file.
#'
#' Loads a JSON file, returning the result as a SparkDataFrame
#' By default, (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON}
#' By default, (\href{https://jsonlines.org/}{JSON Lines text format or newline-delimited JSON}
#' ) is supported. For JSON (one record per file), set a named property \code{multiLine} to
#' \code{TRUE}.
#' It goes through the entire dataset once to determine the schema.
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/R/install.R
Expand Up @@ -39,11 +39,11 @@
#' version number in the format of "x.y" where x and y are integer.
#' If \code{hadoopVersion = "without"}, "Hadoop free" build is installed.
#' See
#' \href{http://spark.apache.org/docs/latest/hadoop-provided.html}{
#' \href{https://spark.apache.org/docs/latest/hadoop-provided.html}{
#' "Hadoop Free" Build} for more information.
#' Other patched version names can also be used, e.g. \code{"cdh4"}
#' @param mirrorUrl base URL of the repositories to use. The directory layout should follow
#' \href{http://www.apache.org/dyn/closer.lua/spark/}{Apache mirrors}.
#' \href{https://www.apache.org/dyn/closer.lua/spark/}{Apache mirrors}.
#' @param localDir a local directory where Spark is installed. The directory contains
#' version-specific folders of Spark packages. Default is path to
#' the cache directory:
Expand All @@ -64,7 +64,7 @@
#'}
#' @note install.spark since 2.1.0
#' @seealso See available Hadoop versions:
#' \href{http://spark.apache.org/downloads.html}{Apache Spark}
#' \href{https://spark.apache.org/downloads.html}{Apache Spark}
install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
localDir = NULL, overwrite = FALSE) {
sparkHome <- Sys.getenv("SPARK_HOME")
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/mllib_classification.R
Expand Up @@ -425,7 +425,7 @@ setMethod("write.ml", signature(object = "LogisticRegressionModel", path = "char
#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
#' Only categorical data is supported.
#' For more details, see
#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html}{
#' \href{https://spark.apache.org/docs/latest/ml-classification-regression.html}{
#' Multilayer Perceptron}
#'
#' @param data a \code{SparkDataFrame} of observations and labels for model fitting.
Expand Down Expand Up @@ -574,7 +574,7 @@ setMethod("write.ml", signature(object = "MultilayerPerceptronClassificationMode
#' @rdname spark.naiveBayes
#' @aliases spark.naiveBayes,SparkDataFrame,formula-method
#' @name spark.naiveBayes
#' @seealso e1071: \url{https://cran.r-project.org/package=e1071}
#' @seealso e1071: \url{https://cran.r-project.org/web/packages/e1071/index.html}
#' @examples
#' \dontrun{
#' data <- as.data.frame(UCBAdmissions)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/mllib_clustering.R
Expand Up @@ -204,7 +204,7 @@ setMethod("write.ml", signature(object = "BisectingKMeansModel", path = "charact
#' @return \code{spark.gaussianMixture} returns a fitted multivariate gaussian mixture model.
#' @rdname spark.gaussianMixture
#' @name spark.gaussianMixture
#' @seealso mixtools: \url{https://cran.r-project.org/package=mixtools}
#' @seealso mixtools: \url{https://cran.r-project.org/web/packages/mixtools/index.html}
#' @examples
#' \dontrun{
#' sparkR.session()
Expand Down Expand Up @@ -483,7 +483,7 @@ setMethod("write.ml", signature(object = "KMeansModel", path = "character"),
#' @return \code{spark.lda} returns a fitted Latent Dirichlet Allocation model.
#' @rdname spark.lda
#' @aliases spark.lda,SparkDataFrame-method
#' @seealso topicmodels: \url{https://cran.r-project.org/package=topicmodels}
#' @seealso topicmodels: \url{https://cran.r-project.org/web/packages/topicmodels/index.html}
#' @examples
#' \dontrun{
#' text <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/mllib_recommendation.R
Expand Up @@ -30,7 +30,7 @@ setClass("ALSModel", representation(jobj = "jobj"))
#' to make predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
#'
#' For more details, see
#' \href{http://spark.apache.org/docs/latest/ml-collaborative-filtering.html}{MLlib:
#' \href{https://spark.apache.org/docs/latest/ml-collaborative-filtering.html}{MLlib:
#' Collaborative Filtering}.
#'
#' @param data a SparkDataFrame for training.
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/mllib_regression.R
Expand Up @@ -475,7 +475,7 @@ setMethod("write.ml", signature(object = "IsotonicRegressionModel", path = "char
#' @param ... additional arguments passed to the method.
#' @return \code{spark.survreg} returns a fitted AFT survival regression model.
#' @rdname spark.survreg
#' @seealso survival: \url{https://cran.r-project.org/package=survival}
#' @seealso survival: \url{https://cran.r-project.org/web/packages/survival/index.html}
#' @examples
#' \dontrun{
#' df <- createDataFrame(ovarian)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/mllib_stat.R
Expand Up @@ -49,7 +49,7 @@ setClass("KSTest", representation(jobj = "jobj"))
#' @rdname spark.kstest
#' @aliases spark.kstest,SparkDataFrame-method
#' @name spark.kstest
#' @seealso \href{http://spark.apache.org/docs/latest/mllib-statistics.html#hypothesis-testing}{
#' @seealso \href{https://spark.apache.org/docs/latest/mllib-statistics.html#hypothesis-testing}{
#' MLlib: Hypothesis Testing}
#' @examples
#' \dontrun{
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/R/mllib_tree.R
Expand Up @@ -127,9 +127,9 @@ print.summary.decisionTree <- function(x) {
#' \code{write.ml}/\code{read.ml} to save/load fitted models.
#' For more details, see
# nolint start
#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression}{
#' \href{https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression}{
#' GBT Regression} and
#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier}{
#' \href{https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier}{
#' GBT Classification}
# nolint end
#'
Expand Down Expand Up @@ -343,9 +343,9 @@ setMethod("write.ml", signature(object = "GBTClassificationModel", path = "chara
#' save/load fitted models.
#' For more details, see
# nolint start
#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-regression}{
#' \href{https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-regression}{
#' Random Forest Regression} and
#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier}{
#' \href{https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier}{
#' Random Forest Classification}
# nolint end
#'
Expand Down Expand Up @@ -568,9 +568,9 @@ setMethod("write.ml", signature(object = "RandomForestClassificationModel", path
#' save/load fitted models.
#' For more details, see
# nolint start
#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-regression}{
#' \href{https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-regression}{
#' Decision Tree Regression} and
#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier}{
#' \href{https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier}{
#' Decision Tree Classification}
# nolint end
#'
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/stats.R
Expand Up @@ -109,7 +109,8 @@ setMethod("corr",
#'
#' Finding frequent items for columns, possibly with false positives.
#' Using the frequent element count algorithm described in
#' \url{https://doi.org/10.1145/762471.762473}, proposed by Karp, Schenker, and Papadimitriou.
#' \url{https://dl.acm.org/doi/10.1145/762471.762473}, proposed by Karp, Schenker,
#' and Papadimitriou.
#'
#' @param x A SparkDataFrame.
#' @param cols A vector column names to search frequent items in.
Expand Down
9 changes: 5 additions & 4 deletions R/pkg/inst/worker/worker.R
Expand Up @@ -196,25 +196,26 @@ if (isEmpty != 0) {
outputs <- list()
for (i in seq_len(length(data))) {
# Timing reading input data for execution
inputElap <- elapsedSecs()
computeStart <- elapsedSecs()
output <- compute(mode, partition, serializer, deserializer, keys[[i]],
colNames, computeFunc, data[[i]])
computeElap <- elapsedSecs()
if (serializer == "arrow") {
outputs[[length(outputs) + 1L]] <- output
} else {
outputResult(serializer, output, outputCon)
outputComputeElapsDiff <- outputComputeElapsDiff + (elapsedSecs() - computeElap)
}
outputElap <- elapsedSecs()
computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap)
outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap)
computeInputElapsDiff <- computeInputElapsDiff + (computeElap - computeStart)
}

if (serializer == "arrow") {
# See https://stat.ethz.ch/pipermail/r-help/2010-September/252046.html
# rbind.fill might be an alternative to make it faster if plyr is installed.
outputStart <- elapsedSecs()
combined <- do.call("rbind", outputs)
SparkR:::writeSerializeInArrow(outputCon, combined)
outputComputeElapsDiff <- elapsedSecs() - outputStart
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/vignettes/sparkr-vignettes.Rmd
Expand Up @@ -1007,7 +1007,7 @@ perplexity

#### Alternating Least Squares

`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](https://dl.acm.org/citation.cfm?id=1608614).
`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](https://dl.acm.org/doi/10.1109/MC.2009.263).

There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, and `nonnegative`. For a complete list, refer to the help file.

Expand Down
Expand Up @@ -254,7 +254,7 @@ TransportClient createClient(InetSocketAddress address)
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionCreationTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);

if (conf.receiveBuf() > 0) {
Expand All @@ -280,9 +280,10 @@ public void initChannel(SocketChannel ch) {
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
if (!cf.await(conf.connectionTimeoutMs())) {
if (!cf.await(conf.connectionCreationTimeoutMs())) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
String.format("Connecting to %s timed out (%s ms)",
address, conf.connectionCreationTimeoutMs()));
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import com.google.common.primitives.Ints;
import io.netty.util.NettyRuntime;
Expand All @@ -31,6 +32,7 @@ public class TransportConf {
private final String SPARK_NETWORK_IO_MODE_KEY;
private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
private final String SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY;
private final String SPARK_NETWORK_IO_BACKLOG_KEY;
private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
Expand All @@ -54,6 +56,7 @@ public TransportConf(String module, ConfigProvider conf) {
SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY = getConfKey("io.connectionCreationTimeout");
SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY = getConfKey("io.numConnectionsPerPeer");
SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
Expand Down Expand Up @@ -94,7 +97,7 @@ public boolean preferDirectBufs() {
return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
}

/** Connect timeout in milliseconds. Default 120 secs. */
/** Connection idle timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
conf.get("spark.network.timeout", "120s"));
Expand All @@ -103,6 +106,14 @@ public int connectionTimeoutMs() {
return (int) defaultTimeoutMs;
}

/** Connect creation timeout in milliseconds. Default 30 secs. */
public int connectionCreationTimeoutMs() {
long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY, connectionTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
}

/** Number of concurrent connections between two nodes for fetching data. */
public int numConnectionsPerPeer() {
return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
Expand Down
Expand Up @@ -31,7 +31,6 @@
import scala.Tuple2;
import scala.collection.Iterator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -178,8 +177,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}
}

@VisibleForTesting
long[] getPartitionLengths() {
@Override
public long[] getPartitionLengths() {
return partitionLengths;
}

Expand Down
Expand Up @@ -88,6 +88,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {

@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
@Nullable private long[] partitionLengths;
private long peakMemoryUsedBytes = 0;

/** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
Expand Down Expand Up @@ -219,7 +220,6 @@ void closeAndWriteOutput() throws IOException {
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
try {
partitionLengths = mergeSpills(spills);
} finally {
Expand Down Expand Up @@ -543,4 +543,9 @@ public void close() throws IOException {
channel.close();
}
}

@Override
public long[] getPartitionLengths() {
return partitionLengths;
}
}
Expand Up @@ -104,11 +104,14 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
int initialSize,
long pageSizeBytes,
int numElementsForSpillThreshold,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeInMemorySorter inMemorySorter,
long existingMemoryConsumption) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */);
sorter.spill(Long.MAX_VALUE, sorter);
taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption);
sorter.totalSpillBytes += existingMemoryConsumption;
// The external sorter will be used to insert records, in-memory sorter is not needed.
sorter.inMemSorter = null;
return sorter;
Expand Down Expand Up @@ -496,6 +499,7 @@ public void insertKVRecord(Object keyBase, long keyOffset, int keyLen,
*/
public void merge(UnsafeExternalSorter other) throws IOException {
other.spill();
totalSpillBytes += other.totalSpillBytes;
spillWriters.addAll(other.spillWriters);
// remove them from `spillWriters`, or the files will be deleted in `cleanupResources`.
other.spillWriters.clear();
Expand Down
Expand Up @@ -798,7 +798,11 @@ private[spark] class ExecutorAllocationManager(
}
if (taskEnd.taskInfo.speculative) {
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
// If the previous task attempt succeeded first and it was the last task in a stage,
// the stage may have been removed before handing this speculative TaskEnd event.
if (stageAttemptToNumSpeculativeTasks.contains(stageAttempt)) {
stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
}
}

taskEnd.reason match {
Expand Down
Expand Up @@ -244,7 +244,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
/* backlog */ 1,
InetAddress.getByName("localhost")))
// A call to accept() for ServerSocket shall block infinitely.
serverSocket.map(_.setSoTimeout(0))
serverSocket.foreach(_.setSoTimeout(0))
new Thread("accept-connections") {
setDaemon(true)

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.shuffle.{FetchFailedException, ShuffleBlockPusher}
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
Expand Down Expand Up @@ -325,6 +325,7 @@ private[spark] class Executor(
case NonFatal(e) =>
logWarning("Unable to stop heartbeater", e)
}
ShuffleBlockPusher.stop()
threadPool.shutdown()

// Notify plugins that executor is shutting down so they can terminate cleanly
Expand Down

0 comments on commit 1fded6e

Please sign in to comment.