Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge master #8

Merged
merged 38 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
6da8ade
[SPARK-33045][SQL][FOLLOWUP] Fix build failure with Scala 2.13
sunchao Nov 19, 2020
883a213
[MINOR] Structured Streaming statistics page indent fix
gaborgsomogyi Nov 19, 2020
02d410a
[MINOR][DOCS] Document 'without' value for HADOOP_VERSION in pip inst…
HyukjinKwon Nov 20, 2020
8218b48
[SPARK-32919][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Driver side change…
venkata91 Nov 20, 2020
2289389
[SPARK-33441][BUILD][FOLLOWUP] Make unused-imports check for SBT spec…
LuciferYang Nov 20, 2020
870d409
[SPARK-32512][SQL][TESTS][FOLLOWUP] Remove duplicate tests for ALTER …
MaxGekk Nov 20, 2020
cbc8be2
[SPARK-33422][DOC] Fix the correct display of left menu item
liucht-inspur Nov 20, 2020
3384bda
[SPARK-33468][SQL] ParseUrl in ANSI mode should fail if input string…
ulysses-you Nov 20, 2020
47326ac
[SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogVersionsS…
AngersZhuuuu Nov 20, 2020
116b7b7
[SPARK-33466][ML][PYTHON] Imputer support mode(most_frequent) strategy
zhengruifeng Nov 20, 2020
a1a3d5c
[MINOR][TESTS][DOCS] Use fully-qualified class name in docker integra…
huaxingao Nov 20, 2020
2479778
[SPARK-33492][SQL] DSv2: Append/Overwrite/ReplaceTable should invalid…
sunchao Nov 20, 2020
de0f50a
[SPARK-32670][SQL] Group exception messages in Catalyst Analyzer in o…
anchovYu Nov 20, 2020
67c6ed9
[SPARK-33223][SS][FOLLOWUP] Clarify the meaning of "number of rows dr…
HeartSaVioR Nov 21, 2020
530c0a8
[SPARK-33505][SQL][TESTS] Fix adding new partitions by INSERT INTO `I…
MaxGekk Nov 21, 2020
b623c03
[SPARK-32381][CORE][FOLLOWUP][TEST-HADOOP2.7] Don't remove Serializab…
sunchao Nov 21, 2020
cf74901
Revert "[SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogV…
dongjoon-hyun Nov 21, 2020
517b810
[SPARK-33463][SQL] Keep Job Id during incremental collect in Spark Th…
gumartinm Nov 21, 2020
d7f4b2a
[SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogVersionsS…
AngersZhuuuu Nov 22, 2020
d338af3
[SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options w…
cchighman Nov 22, 2020
6d625cc
[SPARK-33469][SQL] Add current_timezone function
ulysses-you Nov 22, 2020
df4a1c2
[SPARK-33512][BUILD] Upgrade test libraries
dongjoon-hyun Nov 23, 2020
a459238
[MINOR][INFRA] Suppress warning in check-license
williamhyun Nov 23, 2020
aa78c05
[SPARK-33427][SQL][FOLLOWUP] Put key and value into IdentityHashMap s…
viirya Nov 23, 2020
0bb911d
[SPARK-33143][PYTHON] Add configurable timeout to python server and c…
gaborgsomogyi Nov 23, 2020
84e7036
[SPARK-33510][BUILD] Update SBT to 1.4.4
williamhyun Nov 23, 2020
c891e02
Revert "[SPARK-32481][CORE][SQL] Support truncate table to move data …
gatorsmile Nov 23, 2020
60f3a73
[SPARK-33515][SQL] Improve exception messages while handling Unresolv…
imback82 Nov 23, 2020
23e9920
[SPARK-33511][SQL] Respect case sensitivity while resolving V2 partit…
MaxGekk Nov 23, 2020
f83fcb1
[SPARK-33278][SQL][FOLLOWUP] Improve OptimizeWindowFunctions to avoid…
beliefer Nov 23, 2020
1bd897c
[SPARK-32918][SHUFFLE] RPC implementation to support control plane co…
zhouyejoe Nov 23, 2020
0592181
[SPARK-33479][DOC][FOLLOWUP] DocSearch: Support filtering search resu…
gengliangwang Nov 24, 2020
3ce4ab5
[SPARK-33513][BUILD] Upgrade to Scala 2.13.4 to improve exhaustivity
dongjoon-hyun Nov 24, 2020
8380e00
[SPARK-33524][SQL][TESTS] Change `InMemoryTable` not to use Tuple.has…
dongjoon-hyun Nov 24, 2020
f35e28f
[SPARK-33523][SQL][TEST] Add predicate related benchmark to SubExprEl…
viirya Nov 24, 2020
a6555ee
[SPARK-33521][SQL] Universal type conversion in resolving V2 partitio…
MaxGekk Nov 24, 2020
fdd6c73
[SPARK-33514][SQL] Migrate TRUNCATE TABLE command to use UnresolvedTa…
imback82 Nov 24, 2020
048a982
[SPARK-33535][INFRA][TESTS] Export LANG to en_US.UTF-8 in run-tests-j…
LuciferYang Nov 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public void onFailure(Throwable t) {
* @param blockIds block ids to be pushed
* @param buffers buffers to be pushed
* @param listener the listener to receive block push status.
*
* @since 3.1.0
*/
public void pushBlocks(
String host,
Expand All @@ -156,4 +158,24 @@ public void pushBlocks(
BlockFetchingListener listener) {
throw new UnsupportedOperationException();
}

/**
* Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
* for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
* finishing the shuffle merge process associated with the shuffle mapper stage.
*
* @param host host of shuffle server
* @param port port of shuffle server.
* @param shuffleId shuffle ID of the shuffle to be finalized
* @param listener the listener to receive MergeStatuses
*
* @since 3.1.0
*/
public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
MergeFinalizerListener listener) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,35 @@ public void pushBlocks(
}
}

@Override
public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
MergeFinalizerListener listener) {
checkInit();
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
listener.onShuffleMergeSuccess(
(MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
}

@Override
public void onFailure(Throwable e) {
listener.onShuffleMergeFailure(e);
}
});
} catch (Exception e) {
logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",
host, port, e);
listener.onShuffleMergeFailure(e);
}
}

@Override
public MetricSet shuffleMetrics() {
checkInit();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.util.EventListener;

import org.apache.spark.network.shuffle.protocol.MergeStatuses;

/**
* :: DeveloperApi ::
*
* Listener providing a callback function to invoke when driver receives the response for the
* finalize shuffle merge request sent to remote shuffle service.
*
* @since 3.1.0
*/
public interface MergeFinalizerListener extends EventListener {
/**
* Called once upon successful response on finalize shuffle merge on a remote shuffle service.
* The returned {@link MergeStatuses} is passed to the listener for further processing
*/
void onShuffleMergeSuccess(MergeStatuses statuses);

/**
* Called once upon failure response on finalize shuffle merge on a remote shuffle service.
*/
void onShuffleMergeFailure(Throwable e);
}
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
import org.apache.spark.storage.BlockManagerId

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -95,6 +96,20 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)

/**
* Stores the location of the list of chosen external shuffle services for handling the
* shuffle merge requests from mappers in this shuffle map stage.
*/
private[spark] var mergerLocs: Seq[BlockManagerId] = Nil

def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
if (mergerLocs != null) {
this.mergerLocs = mergerLocs
}
}

def getMergerLocs: Seq[BlockManagerId] = mergerLocs

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](

private val conf = SparkEnv.get.conf
protected val bufferSize: Int = conf.get(BUFFER_SIZE)
protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
protected val simplifiedTraceback: Boolean = false

Expand Down Expand Up @@ -139,6 +140,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (workerMemoryMb.isDefined) {
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString)
}
envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
// Whether is the worker released into idle pool or closed. When any codes try to release or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,8 @@ private[spark] object PythonUtils {
def getBroadcastThreshold(sc: JavaSparkContext): Long = {
sc.conf.get(org.apache.spark.internal.config.BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD)
}

def getPythonAuthSocketTimeout(sc: JavaSparkContext): Long = {
sc.conf.get(org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ private[spark] object Python {
.version("2.4.0")
.bytesConf(ByteUnit.MiB)
.createOptional

val PYTHON_AUTH_SOCKET_TIMEOUT = ConfigBuilder("spark.python.authenticate.socketTimeout")
.internal()
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("15s")
}
47 changes: 47 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1945,4 +1945,51 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)

private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.push.enabled")
.doc("Set to 'true' to enable push-based shuffle on the client side and this works in " +
"conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " +
"which needs to be set with the appropriate " +
"org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " +
"shuffle to be enabled")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS =
ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations")
.doc("Maximum number of shuffle push merger locations cached for push based shuffle. " +
"Currently, shuffle push merger locations are nothing but external shuffle services " +
"which are responsible for handling pushed blocks and merging them and serving " +
"merged blocks for later shuffle fetch.")
.version("3.1.0")
.intConf
.createWithDefault(500)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
.doc("The minimum number of shuffle merger locations required to enable push based " +
"shuffle for a stage. This is specified as a ratio of the number of partitions in " +
"the child stage. For example, a reduce stage which has 100 partitions and uses the " +
"default value 0.05 requires at least 5 unique merger locations to enable push based " +
"shuffle. Merger locations are currently defined as external shuffle services.")
.version("3.1.0")
.doubleConf
.createWithDefault(0.05)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
.doc(s"The static threshold for number of shuffle push merger locations should be " +
"available in order to enable push based shuffle for a stage. Note this config " +
s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " +
"Maximum of spark.shuffle.push.mergersMinStaticThreshold and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " +
"enable push based shuffle for a stage. For eg: with 1000 partitions for the child " +
"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " +
"at least 50 mergers to enable push based shuffle for that stage.")
.version("3.1.0")
.doubleConf
.createWithDefault(5)
}
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ private[spark] class DAGScheduler(
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf)

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1252,6 +1254,33 @@ private[spark] class DAGScheduler(
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}

/**
* If push based shuffle is enabled, set the shuffle services to be used for the given
* shuffle map stage for block push/merge.
*
* Even with dynamic resource allocation kicking in and significantly reducing the number
* of available active executors, we would still be able to get sufficient shuffle service
* locations for block push/merge by getting the historical locations of past executors.
*/
private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
// TODO(SPARK-32920) Handle stage reuse/retry cases separately as without finalize
// TODO changes we cannot disable shuffle merge for the retry/reuse cases
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)

if (mergerLocs.nonEmpty) {
stage.shuffleDep.setMergerLocs(mergerLocs)
logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
s" ${stage.shuffleDep.getMergerLocs.size} merger locations")

logDebug("List of shuffle push merger locations " +
s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
} else {
logInfo("No available merger locations." +
s" Push-based shuffle disabled for $stage (${stage.name})")
}
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand Down Expand Up @@ -1281,6 +1310,12 @@ private[spark] class DAGScheduler(
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
// Only generate merger location for a given shuffle dependency once. This way, even if
// this stage gets retried, it would still be merging blocks using the same set of
// shuffle services.
if (pushBasedShuffleEnabled) {
prepareShuffleServicesForShuffleMapStage(s)
}
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
Expand Down Expand Up @@ -2027,6 +2062,11 @@ private[spark] class DAGScheduler(
if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
executorFailureEpoch(execId) = currentEpoch
logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
if (pushBasedShuffleEnabled) {
// Remove fetchFailed host in the shuffle push merger list for push based shuffle
hostToUnregisterOutputs.foreach(
host => blockManagerMaster.removeShufflePushMergerLocation(host))
}
blockManagerMaster.removeExecutor(execId)
clearCacheLocs()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.BlockManagerId

/**
* A backend interface for scheduling systems that allows plugging in different ones under
Expand Down Expand Up @@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend {
*/
def maxNumConcurrentTasks(rp: ResourceProfile): Int

/**
* Get the list of host locations for push based shuffle
*
* Currently push based shuffle is disabled for both stage retry and stage reuse cases
* (for eg: in the case where few partitions are lost due to failure). Hence this method
* should be invoked only once for a ShuffleDependency.
* @return List of external shuffle services locations
*/
def getShufflePushMergerLocations(
numPartitions: Int,
resourceProfileId: Int): Seq[BlockManagerId] = Nil

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
*
* There's no secrecy, so this relies on the sockets being either local or somehow encrypted.
*/
private[spark] class SocketAuthHelper(conf: SparkConf) {
private[spark] class SocketAuthHelper(val conf: SparkConf) {

val secret = Utils.createSecret(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.concurrent.duration.Duration
import scala.util.Try

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.{ThreadUtils, Utils}

Expand All @@ -34,31 +36,38 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* handling one batch of data, with authentication and error handling.
*
* The socket server can only accept one connection, or close if no connection
* in 15 seconds.
* in configurable amount of seconds (default 15).
*/
private[spark] abstract class SocketAuthServer[T](
authHelper: SocketAuthHelper,
threadName: String) {
threadName: String) extends Logging {

def this(env: SparkEnv, threadName: String) = this(new SocketAuthHelper(env.conf), threadName)
def this(threadName: String) = this(SparkEnv.get, threadName)

private val promise = Promise[T]()

private def startServer(): (Int, String) = {
logTrace("Creating listening socket")
val serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Close the socket if no connection in 15 seconds
serverSocket.setSoTimeout(15000)
// Close the socket if no connection in the configured seconds
val timeout = authHelper.conf.get(PYTHON_AUTH_SOCKET_TIMEOUT).toInt
logTrace(s"Setting timeout to $timeout sec")
serverSocket.setSoTimeout(timeout * 1000)

new Thread(threadName) {
setDaemon(true)
override def run(): Unit = {
var sock: Socket = null
try {
logTrace(s"Waiting for connection on port ${serverSocket.getLocalPort}")
sock = serverSocket.accept()
logTrace(s"Connection accepted from address ${sock.getRemoteSocketAddress}")
authHelper.authClient(sock)
logTrace("Client authenticated")
promise.complete(Try(handleConnection(sock)))
} finally {
logTrace("Closing server")
JavaUtils.closeQuietly(serverSocket)
JavaUtils.closeQuietly(sock)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ private[spark] object BlockManagerId {
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
blockManagerIdCache.get(id)
}

private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger"
}