Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into column-vector
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 19, 2018
2 parents eccdca1 + 6121e91 commit ab4a625
Show file tree
Hide file tree
Showing 124 changed files with 5,009 additions and 1,583 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2853,7 +2853,7 @@ setMethod("intersect",
#' except
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT} in SQL.
#' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT DISTINCT} in SQL.
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
Expand Down
1 change: 0 additions & 1 deletion R/pkg/R/mllib_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,3 @@ read.ml <- function(path) {
stop("Unsupported model: ", jobj)
}
}

6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,9 @@ private[spark] object SparkConf extends Logging {
translation = s => s"${s.toLong * 10}s")),
"spark.reducer.maxSizeInFlight" -> Seq(
AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
"spark.kryoserializer.buffer" ->
Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
translation = s => s"${(s.toDouble * 1000).toInt}k")),
"spark.kryoserializer.buffer.max" -> Seq(
AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
"spark.shuffle.file.buffer" -> Seq(
Expand Down
47 changes: 27 additions & 20 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ package object config {
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)

private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
.doc("Amount of memory to use for the driver process, in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead")
.doc("The amount of off-heap memory to be allocated per driver in cluster mode, " +
"in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createOptional

Expand All @@ -62,6 +65,7 @@ package object config {
.createWithDefault(false)

private[spark] val EVENT_LOG_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.eventLog.buffer.kb")
.doc("Buffer size to use when writing to output streams, in KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.createWithDefaultString("100k")

Expand All @@ -81,10 +85,13 @@ package object config {
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)

private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.doc("Amount of memory to use per executor process, in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
.doc("The amount of off-heap memory to be allocated per executor in cluster mode, " +
"in MiB unless otherwise specified.")
.bytesConf(ByteUnit.MiB)
.createOptional

Expand Down Expand Up @@ -353,7 +360,7 @@ package object config {
private[spark] val BUFFER_WRITE_CHUNK_SIZE =
ConfigBuilder("spark.buffer.write.chunkSize")
.internal()
.doc("The chunk size during writing out the bytes of ChunkedByteBuffer.")
.doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" +
" ChunkedByteBuffer should not larger than Int.MaxValue.")
Expand All @@ -368,9 +375,9 @@ package object config {

private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
"record the size accurately if it's above this config. This helps to prevent OOM by " +
"avoiding underestimating shuffle block size when fetch shuffle blocks.")
.doc("Threshold in bytes above which the size of shuffle blocks in " +
"HighlyCompressedMapStatus is accurately recorded. This helps to prevent OOM " +
"by avoiding underestimating shuffle block size when fetch shuffle blocks.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024)

Expand All @@ -389,23 +396,23 @@ package object config {

private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
.doc("This configuration limits the number of remote blocks being fetched per reduce task" +
" from a given host port. When a large number of blocks are being requested from a given" +
" address in a single fetch or simultaneously, this could crash the serving executor or" +
" Node Manager. This is especially useful to reduce the load on the Node Manager when" +
" external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
.doc("This configuration limits the number of remote blocks being fetched per reduce task " +
"from a given host port. When a large number of blocks are being requested from a given " +
"address in a single fetch or simultaneously, this could crash the serving executor or " +
"Node Manager. This is especially useful to reduce the load on the Node Manager when " +
"external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
.intConf
.checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.")
.createWithDefault(Int.MaxValue)

private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
.doc("Remote block will be fetched to disk when size of the block is " +
"above this threshold. This is to avoid a giant request takes too much memory. We can " +
"enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
"affect both shuffle fetch and block manager remote block fetch. For users who " +
"enabled external shuffle service, this feature can only be worked when external shuffle" +
" service is newer than Spark 2.2.")
.doc("Remote block will be fetched to disk when size of the block is above this threshold " +
"in bytes. This is to avoid a giant request takes too much memory. We can enable this " +
"config by setting a specific value(e.g. 200m). Note this configuration will affect " +
"both shuffle fetch and block manager remote block fetch. For users who enabled " +
"external shuffle service, this feature can only be worked when external shuffle" +
"service is newer than Spark 2.2.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

Expand All @@ -419,9 +426,9 @@ package object config {

private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.file.buffer")
.doc("Size of the in-memory buffer for each shuffle file output stream. " +
"These buffers reduce the number of disk seeks and system calls made " +
"in creating intermediate shuffle files.")
.doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " +
"otherwise specified. These buffers reduce the number of disk seeks and system calls " +
"made in creating intermediate shuffle files.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
Expand All @@ -430,15 +437,15 @@ package object config {
private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in unsafe shuffle writer.")
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size to use when writing the sorted records to an on-disk file.")
.doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v > 0 && v <= Int.MaxValue,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private[ui] class TaskDataSource(

private var _tasksToShow: Seq[TaskData] = null

override def dataSize: Int = stage.numCompleteTasks + stage.numFailedTasks + stage.numKilledTasks
override def dataSize: Int = stage.numTasks

override def sliceData(from: Int, to: Int): Seq[TaskData] = {
if (_tasksToShow == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.launcher;

import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -26,13 +25,13 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.mockito.Mockito.*;

import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.internal.config.package$;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -122,7 +121,8 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

@Test
// TODO: [SPARK-23020] Re-enable this
@Ignore
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
Expand All @@ -139,9 +139,7 @@ public void testInProcessLauncher() throws Exception {
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
});
TimeUnit.MILLISECONDS.sleep(500);
}
}

Expand All @@ -150,35 +148,26 @@ private void inProcessLauncherTestImpl() throws Exception {
SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
doAnswer(invocation -> {
SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
synchronized (transitions) {
transitions.add(h.getState());
}
transitions.add(h.getState());
return null;
}).when(listener).stateChanged(any(SparkAppHandle.class));

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
} finally {
if (handle != null) {
handle.kill();
}
}
SparkAppHandle handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
}

public static class SparkLauncherTestApp {
Expand Down
34 changes: 34 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,53 @@ import org.apache.spark.util.AccumulatorContext

/**
* Base abstract class for all unit tests in Spark for handling common functionality.
*
* Thread audit happens normally here automatically when a new test suite created.
* The only prerequisite for that is that the test class must extend [[SparkFunSuite]].
*
* It is possible to override the default thread audit behavior by setting enableAutoThreadAudit
* to false and manually calling the audit methods, if desired. For example:
*
* class MyTestSuite extends SparkFunSuite {
*
* override val enableAutoThreadAudit = false
*
* protected override def beforeAll(): Unit = {
* doThreadPreAudit()
* super.beforeAll()
* }
*
* protected override def afterAll(): Unit = {
* super.afterAll()
* doThreadPostAudit()
* }
* }
*/
abstract class SparkFunSuite
extends FunSuite
with BeforeAndAfterAll
with ThreadAudit
with Logging {
// scalastyle:on

protected val enableAutoThreadAudit = true

protected override def beforeAll(): Unit = {
if (enableAutoThreadAudit) {
doThreadPreAudit()
}
super.beforeAll()
}

protected override def afterAll(): Unit = {
try {
// Avoid leaking map entries in tests that use accumulators without SparkContext
AccumulatorContext.clear()
} finally {
super.afterAll()
if (enableAutoThreadAudit) {
doThreadPostAudit()
}
}
}

Expand Down
Loading

0 comments on commit ab4a625

Please sign in to comment.