Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into SPARK-21040-specula…
Browse files Browse the repository at this point in the history
…te-decommission-exec-tasks
  • Loading branch information
prakharjain09 committed Jun 12, 2020
2 parents 43ba62e + 9b098f1 commit 4affa58
Show file tree
Hide file tree
Showing 94 changed files with 2,688 additions and 1,003 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-htmlunit-driver</artifactId>
<artifactId>htmlunit-driver</artifactId>
<scope>test</scope>
</dependency>
<!-- Coerce sbt into honoring these dependency updates: -->
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,25 @@ private[spark] class PythonRDD(
* runner.
*/
private[spark] case class PythonFunction(
command: Array[Byte],
command: Seq[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: PythonAccumulatorV2)
accumulator: PythonAccumulatorV2) {

def this(
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
pythonExec: String,
pythonVer: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: PythonAccumulatorV2) = {
this(command.toSeq, envVars, pythonIncludes, pythonExec, pythonVer, broadcastVars, accumulator)
}
}

/**
* A wrapper for chained Python functions (from bottom to top).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions])
protected override def writeCommand(dataOut: DataOutputStream): Unit = {
val command = funcs.head.funcs.head.command
dataOut.writeInt(command.length)
dataOut.write(command)
dataOut.write(command.toArray)
}

protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
Expand Down
94 changes: 68 additions & 26 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy

import java.util.concurrent.TimeUnit

import scala.collection.mutable.HashSet
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
Expand All @@ -27,6 +29,7 @@ import org.apache.log4j.Logger
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
Expand Down Expand Up @@ -61,6 +64,11 @@ private class ClientEndpoint(

private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
private val waitAppCompletion = conf.get(config.STANDALONE_SUBMIT_WAIT_APP_COMPLETION)
private val REPORT_DRIVER_STATUS_INTERVAL = 10000
private var submittedDriverID = ""
private var driverStatusReported = false


private def getProperty(key: String, conf: SparkConf): Option[String] = {
sys.props.get(key).orElse(conf.getOption(key))
Expand Down Expand Up @@ -107,8 +115,13 @@ private class ClientEndpoint(

case "kill" =>
val driverId = driverArgs.driverId
submittedDriverID = driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
logInfo("... waiting before polling master for driver state")
forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
monitorDriverStatus()
}, 5000, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)
}

/**
Expand All @@ -124,58 +137,87 @@ private class ClientEndpoint(
}
}

/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String): Unit = {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
logInfo("... polling master for driver state")
val statusResponse =
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
private def monitorDriverStatus(): Unit = {
if (submittedDriverID != "") {
asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID))
}
}

/**
* Processes and reports the driver status then exit the JVM if the
* waitAppCompletion is set to false, else reports the driver status
* if debug logs are enabled.
*/

def reportDriverStatus(
found: Boolean,
state: Option[DriverState],
workerId: Option[String],
workerHostPort: Option[String],
exception: Option[Exception]): Unit = {
if (found) {
// Using driverStatusReported to avoid writing following
// logs again when waitAppCompletion is set to true
if (!driverStatusReported) {
driverStatusReported = true
logInfo(s"State of $submittedDriverID is ${state.get}")
// Worker node, if present
(workerId, workerHostPort, state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
}
// Exception, if present
statusResponse.exception match {
exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
case _ =>
System.exit(0)
state.get match {
case DriverState.FINISHED | DriverState.FAILED |
DriverState.ERROR | DriverState.KILLED =>
logInfo(s"State of driver $submittedDriverID is ${state.get}, " +
s"exiting spark-submit JVM.")
System.exit(0)
case _ =>
if (!waitAppCompletion) {
logInfo(s"spark-submit not configured to wait for completion, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else {
logDebug(s"State of driver $submittedDriverID is ${state.get}, " +
s"continue monitoring driver status.")
}
}
}
} else {
logError(s"ERROR: Cluster master did not recognize $submittedDriverID")
System.exit(-1)
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
}

override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(master, success, driverId, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId.get)
submittedDriverID = driverId.get
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}


case KillDriverResponse(master, driverId, success, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}

case DriverStatusResponse(found, state, workerId, workerHostPort, exception) =>
reportDriverStatus(found, state, workerId, workerHostPort, exception)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ private[deploy] class Master(
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canLaunchExecutor(_, app.desc))
.sortBy(_.coresFree).reverse
if (waitingApps.length == 1 && usableWorkers.isEmpty) {
val appMayHang = waitingApps.length == 1 &&
waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
if (appMayHang) {
logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
}
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
Expand Down
12 changes: 4 additions & 8 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,7 @@ private[spark] class Executor(
}

override def run(): Unit = {

setMDCForTask(taskName, mdcProperties)

threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
Expand Down Expand Up @@ -703,11 +701,11 @@ private[spark] class Executor(
}

private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
// make sure we run the task with the user-specified mdc properties only
MDC.clear()
mdc.foreach { case (key, value) => MDC.put(key, value) }
// avoid overriding the takName by the user
MDC.put("taskName", taskName)

mdc.foreach { case (key, value) =>
MDC.put(key, value)
}
}

/**
Expand Down Expand Up @@ -750,9 +748,7 @@ private[spark] class Executor(
private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)

override def run(): Unit = {

setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)

val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1875,4 +1875,13 @@ package object config {
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STANDALONE_SUBMIT_WAIT_APP_COMPLETION =
ConfigBuilder("spark.standalone.submit.waitAppCompletion")
.doc("In standalone cluster mode, controls whether the client waits to exit until the " +
"application completes. If set to true, the client process will stay alive polling " +
"the driver's status. Otherwise, the client process will exit after submission.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class AppStatusStore(
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
} catch {
case _: NoSuchElementException =>
throw new SparkException("Failed to get the application information. " +
throw new NoSuchElementException("Failed to get the application information. " +
"If you are starting up Spark, please wait a while until it's ready.")
}
}
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import javax.servlet.DispatcherType
import javax.servlet.http._

import scala.language.implicitConversions
import scala.util.Try
import scala.xml.Node

import org.eclipse.jetty.client.HttpClient
Expand Down Expand Up @@ -500,7 +501,11 @@ private[spark] case class ServerInfo(
threadPool match {
case pool: QueuedThreadPool =>
// Workaround for SPARK-30385 to avoid Jetty's acceptor thread shrink.
pool.setIdleTimeout(0)
// As of Jetty 9.4.21, the implementation of
// QueuedThreadPool#setIdleTimeout is changed and IllegalStateException
// will be thrown if we try to set idle timeout after the server has started.
// But this workaround works for Jetty 9.4.28 by ignoring the exception.
Try(pool.setIdleTimeout(0))
case _ =>
}
server.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import scala.io.Source
import scala.xml.Node

import com.gargoylesoftware.css.parser.CSSParseException
import com.gargoylesoftware.htmlunit.DefaultCssErrorHandler
import org.json4s._
import org.json4s.jackson.JsonMethods
Expand All @@ -33,7 +34,6 @@ import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.scalatestplus.selenium.WebBrowser
import org.w3c.css.sac.CSSParseException

import org.apache.spark._
import org.apache.spark.LocalSparkContext._
Expand Down
14 changes: 3 additions & 11 deletions dev/create-release/do-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
# limitations under the License.
#

set -e

SELF=$(cd $(dirname $0) && pwd)
. "$SELF/release-util.sh"

Expand Down Expand Up @@ -54,6 +52,9 @@ function should_build {
if should_build "tag" && [ $SKIP_TAG = 0 ]; then
run_silent "Creating release tag $RELEASE_TAG..." "tag.log" \
"$SELF/release-tag.sh"
echo "It may take some time for the tag to be synchronized to github."
echo "Press enter when you've verified that the new tag ($RELEASE_TAG) is available."
read
else
echo "Skipping tag creation for $RELEASE_TAG."
fi
Expand All @@ -78,12 +79,3 @@ if should_build "publish"; then
else
echo "Skipping publish step."
fi

if should_build "tag" && [ $SKIP_TAG = 0 ]; then
git push origin $RELEASE_TAG
if [[ $RELEASE_TAG != *"preview"* ]]; then
git push origin HEAD:$GIT_BRANCH
else
echo "It's preview release. We only push $RELEASE_TAG to remote."
fi
fi
7 changes: 2 additions & 5 deletions dev/create-release/release-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,9 @@ BASE_DIR=$(pwd)
init_java
init_maven_sbt

# Only clone repo fresh if not present, otherwise use checkout from the tag step
if [ ! -d spark ]; then
git clone "$ASF_REPO"
fi
rm -rf spark
git clone "$ASF_REPO"
cd spark
git fetch
git checkout $GIT_REF
git_hash=`git rev-parse --short HEAD`
echo "Checked out Spark git hash $git_hash"
Expand Down
16 changes: 13 additions & 3 deletions dev/create-release/release-tag.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ function exit_with_usage {
cat << EOF
usage: $NAME
Tags a Spark release on a particular branch.
You must push the tags after.
Inputs are specified with the following environment variables:
ASF_USERNAME - Apache Username
Expand Down Expand Up @@ -106,8 +105,19 @@ sed -i".tmp7" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$R_NEXT_VERSION"

git commit -a -m "Preparing development version $NEXT_VERSION"

cd ..
if is_dry_run; then
if ! is_dry_run; then
# Push changes
git push origin $RELEASE_TAG
if [[ $RELEASE_VERSION != *"preview"* ]]; then
git push origin HEAD:$GIT_BRANCH
else
echo "It's preview release. We only push $RELEASE_TAG to remote."
fi

cd ..
rm -rf spark
else
cd ..
mv spark spark.tag
echo "Clone with version changes and tag available as spark.tag in the output directory."
fi

0 comments on commit 4affa58

Please sign in to comment.