Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-32492
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Jul 31, 2020
2 parents cc8c90d + 9d7b1d9 commit 2a319ff
Show file tree
Hide file tree
Showing 17 changed files with 942 additions and 380 deletions.
23 changes: 14 additions & 9 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,18 @@ jobs:
python3.8 -m pip install numpy pyarrow pandas scipy
python3.8 -m pip list
# SparkR
- name: Install R 3.6
uses: r-lib/actions/setup-r@v1
- name: Install R 4.0
if: contains(matrix.modules, 'sparkr')
with:
r-version: 3.6
run: |
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
sudo apt-get update
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
- name: Install R packages
if: contains(matrix.modules, 'sparkr')
run: |
sudo apt-get install -y libcurl4-openssl-dev
# qpdf is required to reduce the size of PDFs to make CRAN check pass. See SPARK-32497.
sudo apt-get install -y libcurl4-openssl-dev qpdf
sudo Rscript -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'devtools', 'e1071', 'survival', 'arrow', 'roxygen2'), repos='https://cloud.r-project.org/')"
# Show installed packages in R.
sudo Rscript -e 'pkg_list <- as.data.frame(installed.packages()[, c(1,3:4)]); pkg_list[is.na(pkg_list$Priority), 1:2, drop = FALSE]'
Expand Down Expand Up @@ -203,10 +206,12 @@ jobs:
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme
- name: Install R 3.6
uses: r-lib/actions/setup-r@v1
with:
r-version: 3.6
- name: Install R 4.0
run: |
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
sudo apt-get update
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
- name: Install R linter dependencies and SparkR
run: |
sudo apt-get install -y libcurl4-openssl-dev
Expand Down
11 changes: 6 additions & 5 deletions bin/load-spark-env.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ rem This script loads spark-env.cmd if it exists, and ensures it is only loaded
rem spark-env.cmd is loaded from SPARK_CONF_DIR if set, or within the current directory's
rem conf\ subdirectory.

set SPARK_ENV_CMD=spark-env.cmd
if not defined SPARK_ENV_LOADED (
set SPARK_ENV_LOADED=1

if [%SPARK_CONF_DIR%] == [] (
set SPARK_CONF_DIR=%~dp0..\conf
)

set SPARK_ENV_CMD=%SPARK_CONF_DIR%\%SPARK_ENV_CMD%
if exist %SPARK_ENV_CMD% (
call %SPARK_ENV_CMD%
)
call :LoadSparkEnv
)

rem Setting SPARK_SCALA_VERSION if not already set.
Expand All @@ -59,3 +55,8 @@ if not defined SPARK_SCALA_VERSION (
)
)
exit /b 0

:LoadSparkEnv
if exist "%SPARK_CONF_DIR%\spark-env.cmd" (
call "%SPARK_CONF_DIR%\spark-env.cmd"
)
19 changes: 14 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1821,10 +1821,19 @@ private[spark] class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled &&
unRegisterOutputOnHostOnFetchFailure) {
// We had a fetch failure with the external shuffle service, so we
// assume all shuffle data on the node is bad.
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
val isHostDecommissioned = taskScheduler
.getExecutorDecommissionInfo(bmAddress.executorId)
.exists(_.isHostDecommissioned)

// Shuffle output of all executors on host `bmAddress.host` may be lost if:
// - External shuffle service is enabled, so we assume that all shuffle data on node is
// bad.
// - Host is decommissioned, thus all executors on that host will die.
val shuffleOutputOfEntireHostLost = externalShuffleServiceEnabled ||
isHostDecommissioned
val hostToUnregisterOutputs = if (shuffleOutputOfEntireHostLost
&& unRegisterOutputOnHostOnFetchFailure) {
Some(bmAddress.host)
} else {
// Unregister shuffle data just for one executor (we don't have any
Expand Down Expand Up @@ -2339,7 +2348,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler

case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case ExecutorProcessLost(_, true) => true
case ExecutorProcessLost(_, true, _) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
/**
* @param _message human readable loss reason
* @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service)
* @param causedByApp whether the loss of the executor is the fault of the running app.
* (assumed true by default unless known explicitly otherwise)
*/
private[spark]
case class ExecutorProcessLost(_message: String = "Worker lost", workerLost: Boolean = false)
case class ExecutorProcessLost(
_message: String = "Executor Process Lost",
workerLost: Boolean = false,
causedByApp: Boolean = true)
extends ExecutorLossReason(_message)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ private[spark] trait TaskScheduler {
*/
def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit

/**
* If an executor is decommissioned, return its corresponding decommission info
*/
def getExecutorDecommissionInfo(executorId: String): Option[ExecutorDecommissionInfo]

/**
* Process a lost executor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ private[spark] class TaskSchedulerImpl(
// IDs of the tasks running on each executor
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]

private val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo]

def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
}
Expand Down Expand Up @@ -939,12 +941,43 @@ private[spark] class TaskSchedulerImpl(

override def executorDecommission(
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
synchronized {
// Don't bother noting decommissioning for executors that we don't know about
if (executorIdToHost.contains(executorId)) {
// The scheduler can get multiple decommission updates from multiple sources,
// and some of those can have isHostDecommissioned false. We merge them such that
// if we heard isHostDecommissioned ever true, then we keep that one since it is
// most likely coming from the cluster manager and thus authoritative
val oldDecomInfo = executorsPendingDecommission.get(executorId)
if (oldDecomInfo.isEmpty || !oldDecomInfo.get.isHostDecommissioned) {
executorsPendingDecommission(executorId) = decommissionInfo
}
}
}
rootPool.executorDecommission(executorId)
backend.reviveOffers()
}

override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
override def getExecutorDecommissionInfo(executorId: String)
: Option[ExecutorDecommissionInfo] = synchronized {
executorsPendingDecommission.get(executorId)
}

override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None
val reason = givenReason match {
// Handle executor process loss due to decommissioning
case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) =>
val executorDecommissionInfo = getExecutorDecommissionInfo(executorId)
ExecutorProcessLost(
message,
// Also mark the worker lost if we know that the host was decommissioned
origWorkerLost || executorDecommissionInfo.exists(_.isHostDecommissioned),
// Executor loss is certainly not caused by app if we knew that this executor is being
// decommissioned
causedByApp = executorDecommissionInfo.isEmpty && origCausedByApp)
case e => e
}

synchronized {
if (executorIdToRunningTaskIds.contains(executorId)) {
Expand Down Expand Up @@ -1033,6 +1066,8 @@ private[spark] class TaskSchedulerImpl(
}
}

executorsPendingDecommission -= executorId

if (reason != LossReasonPending) {
executorIdToHost -= executorId
rootPool.executorLost(executorId, host, reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ private[spark] class TaskSetManager(
val exitCausedByApp: Boolean = reason match {
case exited: ExecutorExited => exited.exitCausedByApp
case ExecutorKilled => false
case ExecutorProcessLost(_, _, false) => false
case _ => true
}
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
val resourceProfileInfo = ui.store.resourceProfileInfo()
new v1.ApplicationEnvironmentInfo(
envInfo.runtime,
Utils.redact(ui.conf, envInfo.sparkProperties),
Utils.redact(ui.conf, envInfo.hadoopProperties),
Utils.redact(ui.conf, envInfo.systemProperties),
envInfo.classpathEntries,
Utils.redact(ui.conf, envInfo.sparkProperties).sortBy(_._1),
Utils.redact(ui.conf, envInfo.hadoopProperties).sortBy(_._1),
Utils.redact(ui.conf, envInfo.systemProperties).sortBy(_._1),
envInfo.classpathEntries.sortBy(_._1),
resourceProfileInfo)
}

Expand Down

0 comments on commit 2a319ff

Please sign in to comment.