Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pull changes to forked master #7

Merged
merged 30 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6da8ade
[SPARK-33045][SQL][FOLLOWUP] Fix build failure with Scala 2.13
sunchao Nov 19, 2020
883a213
[MINOR] Structured Streaming statistics page indent fix
gaborgsomogyi Nov 19, 2020
02d410a
[MINOR][DOCS] Document 'without' value for HADOOP_VERSION in pip inst…
HyukjinKwon Nov 20, 2020
8218b48
[SPARK-32919][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Driver side change…
venkata91 Nov 20, 2020
2289389
[SPARK-33441][BUILD][FOLLOWUP] Make unused-imports check for SBT spec…
LuciferYang Nov 20, 2020
870d409
[SPARK-32512][SQL][TESTS][FOLLOWUP] Remove duplicate tests for ALTER …
MaxGekk Nov 20, 2020
cbc8be2
[SPARK-33422][DOC] Fix the correct display of left menu item
liucht-inspur Nov 20, 2020
3384bda
[SPARK-33468][SQL] ParseUrl in ANSI mode should fail if input string…
ulysses-you Nov 20, 2020
47326ac
[SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogVersionsS…
AngersZhuuuu Nov 20, 2020
116b7b7
[SPARK-33466][ML][PYTHON] Imputer support mode(most_frequent) strategy
zhengruifeng Nov 20, 2020
a1a3d5c
[MINOR][TESTS][DOCS] Use fully-qualified class name in docker integra…
huaxingao Nov 20, 2020
2479778
[SPARK-33492][SQL] DSv2: Append/Overwrite/ReplaceTable should invalid…
sunchao Nov 20, 2020
de0f50a
[SPARK-32670][SQL] Group exception messages in Catalyst Analyzer in o…
anchovYu Nov 20, 2020
67c6ed9
[SPARK-33223][SS][FOLLOWUP] Clarify the meaning of "number of rows dr…
HeartSaVioR Nov 21, 2020
530c0a8
[SPARK-33505][SQL][TESTS] Fix adding new partitions by INSERT INTO `I…
MaxGekk Nov 21, 2020
b623c03
[SPARK-32381][CORE][FOLLOWUP][TEST-HADOOP2.7] Don't remove Serializab…
sunchao Nov 21, 2020
cf74901
Revert "[SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogV…
dongjoon-hyun Nov 21, 2020
517b810
[SPARK-33463][SQL] Keep Job Id during incremental collect in Spark Th…
gumartinm Nov 21, 2020
d7f4b2a
[SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogVersionsS…
AngersZhuuuu Nov 22, 2020
d338af3
[SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options w…
cchighman Nov 22, 2020
6d625cc
[SPARK-33469][SQL] Add current_timezone function
ulysses-you Nov 22, 2020
df4a1c2
[SPARK-33512][BUILD] Upgrade test libraries
dongjoon-hyun Nov 23, 2020
a459238
[MINOR][INFRA] Suppress warning in check-license
williamhyun Nov 23, 2020
aa78c05
[SPARK-33427][SQL][FOLLOWUP] Put key and value into IdentityHashMap s…
viirya Nov 23, 2020
0bb911d
[SPARK-33143][PYTHON] Add configurable timeout to python server and c…
gaborgsomogyi Nov 23, 2020
84e7036
[SPARK-33510][BUILD] Update SBT to 1.4.4
williamhyun Nov 23, 2020
c891e02
Revert "[SPARK-32481][CORE][SQL] Support truncate table to move data …
gatorsmile Nov 23, 2020
60f3a73
[SPARK-33515][SQL] Improve exception messages while handling Unresolv…
imback82 Nov 23, 2020
23e9920
[SPARK-33511][SQL] Respect case sensitivity while resolving V2 partit…
MaxGekk Nov 23, 2020
f83fcb1
[SPARK-33278][SQL][FOLLOWUP] Improve OptimizeWindowFunctions to avoid…
beliefer Nov 23, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
import org.apache.spark.storage.BlockManagerId

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -95,6 +96,20 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)

/**
* Stores the location of the list of chosen external shuffle services for handling the
* shuffle merge requests from mappers in this shuffle map stage.
*/
private[spark] var mergerLocs: Seq[BlockManagerId] = Nil

def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
if (mergerLocs != null) {
this.mergerLocs = mergerLocs
}
}

def getMergerLocs: Seq[BlockManagerId] = mergerLocs

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
_rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](

private val conf = SparkEnv.get.conf
protected val bufferSize: Int = conf.get(BUFFER_SIZE)
protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
protected val simplifiedTraceback: Boolean = false

Expand Down Expand Up @@ -139,6 +140,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
if (workerMemoryMb.isDefined) {
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString)
}
envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
// Whether is the worker released into idle pool or closed. When any codes try to release or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,8 @@ private[spark] object PythonUtils {
def getBroadcastThreshold(sc: JavaSparkContext): Long = {
sc.conf.get(org.apache.spark.internal.config.BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD)
}

def getPythonAuthSocketTimeout(sc: JavaSparkContext): Long = {
sc.conf.get(org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ private[spark] object Python {
.version("2.4.0")
.bytesConf(ByteUnit.MiB)
.createOptional

val PYTHON_AUTH_SOCKET_TIMEOUT = ConfigBuilder("spark.python.authenticate.socketTimeout")
.internal()
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("15s")
}
47 changes: 47 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1945,4 +1945,51 @@ package object config {
.version("3.0.1")
.booleanConf
.createWithDefault(false)

private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.push.enabled")
.doc("Set to 'true' to enable push-based shuffle on the client side and this works in " +
"conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " +
"which needs to be set with the appropriate " +
"org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " +
"shuffle to be enabled")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_MERGER_MAX_RETAINED_LOCATIONS =
ConfigBuilder("spark.shuffle.push.maxRetainedMergerLocations")
.doc("Maximum number of shuffle push merger locations cached for push based shuffle. " +
"Currently, shuffle push merger locations are nothing but external shuffle services " +
"which are responsible for handling pushed blocks and merging them and serving " +
"merged blocks for later shuffle fetch.")
.version("3.1.0")
.intConf
.createWithDefault(500)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO =
ConfigBuilder("spark.shuffle.push.mergersMinThresholdRatio")
.doc("The minimum number of shuffle merger locations required to enable push based " +
"shuffle for a stage. This is specified as a ratio of the number of partitions in " +
"the child stage. For example, a reduce stage which has 100 partitions and uses the " +
"default value 0.05 requires at least 5 unique merger locations to enable push based " +
"shuffle. Merger locations are currently defined as external shuffle services.")
.version("3.1.0")
.doubleConf
.createWithDefault(0.05)

private[spark] val SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD =
ConfigBuilder("spark.shuffle.push.mergersMinStaticThreshold")
.doc(s"The static threshold for number of shuffle push merger locations should be " +
"available in order to enable push based shuffle for a stage. Note this config " +
s"works in conjunction with ${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key}. " +
"Maximum of spark.shuffle.push.mergersMinStaticThreshold and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} ratio number of mergers needed to " +
"enable push based shuffle for a stage. For eg: with 1000 partitions for the child " +
"stage with spark.shuffle.push.mergersMinStaticThreshold as 5 and " +
s"${SHUFFLE_MERGER_LOCATIONS_MIN_THRESHOLD_RATIO.key} set to 0.05, we would need " +
"at least 50 mergers to enable push based shuffle for that stage.")
.version("3.1.0")
.doubleConf
.createWithDefault(5)
}
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ private[spark] class DAGScheduler(
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf)

/**
* Called by the TaskSetManager to report task's starting.
*/
Expand Down Expand Up @@ -1252,6 +1254,33 @@ private[spark] class DAGScheduler(
execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
}

/**
* If push based shuffle is enabled, set the shuffle services to be used for the given
* shuffle map stage for block push/merge.
*
* Even with dynamic resource allocation kicking in and significantly reducing the number
* of available active executors, we would still be able to get sufficient shuffle service
* locations for block push/merge by getting the historical locations of past executors.
*/
private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = {
// TODO(SPARK-32920) Handle stage reuse/retry cases separately as without finalize
// TODO changes we cannot disable shuffle merge for the retry/reuse cases
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)

if (mergerLocs.nonEmpty) {
stage.shuffleDep.setMergerLocs(mergerLocs)
logInfo(s"Push-based shuffle enabled for $stage (${stage.name}) with" +
s" ${stage.shuffleDep.getMergerLocs.size} merger locations")

logDebug("List of shuffle push merger locations " +
s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
} else {
logInfo("No available merger locations." +
s" Push-based shuffle disabled for $stage (${stage.name})")
}
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
Expand Down Expand Up @@ -1281,6 +1310,12 @@ private[spark] class DAGScheduler(
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
// Only generate merger location for a given shuffle dependency once. This way, even if
// this stage gets retried, it would still be merging blocks using the same set of
// shuffle services.
if (pushBasedShuffleEnabled) {
prepareShuffleServicesForShuffleMapStage(s)
}
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
Expand Down Expand Up @@ -2027,6 +2062,11 @@ private[spark] class DAGScheduler(
if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
executorFailureEpoch(execId) = currentEpoch
logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
if (pushBasedShuffleEnabled) {
// Remove fetchFailed host in the shuffle push merger list for push based shuffle
hostToUnregisterOutputs.foreach(
host => blockManagerMaster.removeShufflePushMergerLocation(host))
}
blockManagerMaster.removeExecutor(execId)
clearCacheLocs()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.resource.ResourceProfile
import org.apache.spark.storage.BlockManagerId

/**
* A backend interface for scheduling systems that allows plugging in different ones under
Expand Down Expand Up @@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend {
*/
def maxNumConcurrentTasks(rp: ResourceProfile): Int

/**
* Get the list of host locations for push based shuffle
*
* Currently push based shuffle is disabled for both stage retry and stage reuse cases
* (for eg: in the case where few partitions are lost due to failure). Hence this method
* should be invoked only once for a ShuffleDependency.
* @return List of external shuffle services locations
*/
def getShufflePushMergerLocations(
numPartitions: Int,
resourceProfileId: Int): Seq[BlockManagerId] = Nil

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
*
* There's no secrecy, so this relies on the sockets being either local or somehow encrypted.
*/
private[spark] class SocketAuthHelper(conf: SparkConf) {
private[spark] class SocketAuthHelper(val conf: SparkConf) {

val secret = Utils.createSecret(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.concurrent.duration.Duration
import scala.util.Try

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.{ThreadUtils, Utils}

Expand All @@ -34,31 +36,38 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* handling one batch of data, with authentication and error handling.
*
* The socket server can only accept one connection, or close if no connection
* in 15 seconds.
* in configurable amount of seconds (default 15).
*/
private[spark] abstract class SocketAuthServer[T](
authHelper: SocketAuthHelper,
threadName: String) {
threadName: String) extends Logging {

def this(env: SparkEnv, threadName: String) = this(new SocketAuthHelper(env.conf), threadName)
def this(threadName: String) = this(SparkEnv.get, threadName)

private val promise = Promise[T]()

private def startServer(): (Int, String) = {
logTrace("Creating listening socket")
val serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Close the socket if no connection in 15 seconds
serverSocket.setSoTimeout(15000)
// Close the socket if no connection in the configured seconds
val timeout = authHelper.conf.get(PYTHON_AUTH_SOCKET_TIMEOUT).toInt
logTrace(s"Setting timeout to $timeout sec")
serverSocket.setSoTimeout(timeout * 1000)

new Thread(threadName) {
setDaemon(true)
override def run(): Unit = {
var sock: Socket = null
try {
logTrace(s"Waiting for connection on port ${serverSocket.getLocalPort}")
sock = serverSocket.accept()
logTrace(s"Connection accepted from address ${sock.getRemoteSocketAddress}")
authHelper.authClient(sock)
logTrace("Client authenticated")
promise.complete(Try(handleConnection(sock)))
} finally {
logTrace("Closing server")
JavaUtils.closeQuietly(serverSocket)
JavaUtils.closeQuietly(sock)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ private[spark] object BlockManagerId {
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
blockManagerIdCache.get(id)
}

private[spark] val SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger"
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,26 @@ class BlockManagerMaster(
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

/**
* Get a list of unique shuffle service locations where an executor is successfully
* registered in the past for block push/merge with push based shuffle.
*/
def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
}

/**
* Remove the host from the candidate list of shuffle push mergers. This can be
* triggered if there is a FetchFailedException on the host
* @param host
*/
def removeShufflePushMergerLocation(host: String): Unit = {
driverEndpoint.askSync[Seq[BlockManagerId]](RemoveShufflePushMergerLocation(host))
}

def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}
Expand Down
Loading