Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into spark-3…
Browse files Browse the repository at this point in the history
…3600

Signed-off-by: Karen Feng <karen.feng@databricks.com>
  • Loading branch information
karenfeng committed Mar 1, 2021
2 parents d004f5c + 70f6267 commit 21bfce8
Show file tree
Hide file tree
Showing 221 changed files with 5,826 additions and 2,076 deletions.
12 changes: 4 additions & 8 deletions .github/workflows/build_and_test.yml
Expand Up @@ -99,12 +99,11 @@ jobs:
if: ${{ github.event.inputs.target != '' }}
run: git merge --progress --ff-only origin/${{ github.event.inputs.target }}
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
- name: Cache Scala, SBT, Maven and Zinc
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/zinc-*
build/scala-*
build/*.jar
~/.sbt
Expand Down Expand Up @@ -186,12 +185,11 @@ jobs:
if: ${{ github.event.inputs.target != '' }}
run: git merge --progress --ff-only origin/${{ github.event.inputs.target }}
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
- name: Cache Scala, SBT, Maven and Zinc
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/zinc-*
build/scala-*
build/*.jar
~/.sbt
Expand Down Expand Up @@ -254,12 +252,11 @@ jobs:
if: ${{ github.event.inputs.target != '' }}
run: git merge --progress --ff-only origin/${{ github.event.inputs.target }}
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
- name: Cache Scala, SBT, Maven and Zinc
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/zinc-*
build/scala-*
build/*.jar
~/.sbt
Expand Down Expand Up @@ -297,12 +294,11 @@ jobs:
- name: Checkout Spark repository
uses: actions/checkout@v2
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
- name: Cache Scala, SBT, Maven and Zinc
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/zinc-*
build/scala-*
build/*.jar
~/.sbt
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Expand Up @@ -30,7 +30,6 @@ R/pkg/tests/fulltests/Rplots.pdf
build/*.jar
build/apache-maven*
build/scala*
build/zinc*
cache
checkpoint
conf/*.cmd
Expand Down
4 changes: 4 additions & 0 deletions assembly/pom.xml
Expand Up @@ -136,6 +136,10 @@
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
Expand Down
47 changes: 1 addition & 46 deletions build/mvn
Expand Up @@ -91,27 +91,6 @@ install_mvn() {
fi
}

# Install zinc under the build/ folder
install_zinc() {
local ZINC_VERSION=0.3.15
ZINC_BIN="$(command -v zinc)"
if [ "$ZINC_BIN" ]; then
local ZINC_DETECTED_VERSION="$(zinc -version | head -n1 | awk '{print $5}')"
fi

if [ $(version $ZINC_DETECTED_VERSION) -lt $(version $ZINC_VERSION) ]; then
local zinc_path="zinc-${ZINC_VERSION}/bin/zinc"
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
local TYPESAFE_MIRROR=${TYPESAFE_MIRROR:-https://downloads.lightbend.com}

install_app \
"${TYPESAFE_MIRROR}/zinc/${ZINC_VERSION}" \
"zinc-${ZINC_VERSION}.tgz" \
"${zinc_path}"
ZINC_BIN="${_DIR}/${zinc_path}"
fi
}

# Determine the Scala version from the root pom.xml file, set the Scala URL,
# and, with that, download the specific version of Scala necessary under
# the build/ folder
Expand All @@ -131,44 +110,20 @@ install_scala() {
SCALA_LIBRARY="$(cd "$(dirname "${scala_bin}")/../lib" && pwd)/scala-library.jar"
}

# Setup healthy defaults for the Zinc port if none were provided from
# the environment
ZINC_PORT=${ZINC_PORT:-"3030"}

# Install the proper version of Scala, Zinc and Maven for the build
if [ "$(uname -m)" != 'aarch64' ]; then
install_zinc
fi
install_scala
install_mvn

# Reset the current working directory
cd "${_CALLING_DIR}"

# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it
if [ "$(uname -m)" != 'aarch64' ] && [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
"${ZINC_BIN}" -start -port ${ZINC_PORT} \
-server 127.0.0.1 -idle-timeout 3h \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi

# Set any `mvn` options if not already present
export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}

echo "Using \`mvn\` from path: $MVN_BIN" 1>&2

# call the `mvn` command as usual
# SPARK-25854
"${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@"
"${MVN_BIN}" "$@"
MVN_RETCODE=$?

# Try to shut down zinc explicitly if the server is still running.
if [ "$(uname -m)" != 'aarch64' ]; then
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
fi

exit $MVN_RETCODE
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Expand Up @@ -314,7 +314,7 @@ object SparkEnv extends Logging {
}
}

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
val broadcastManager = new BroadcastManager(isDriver, conf)

val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
Expand Down Expand Up @@ -397,14 +397,13 @@ object SparkEnv extends Logging {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf, securityManager)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set(EXECUTOR_ID, executorId)
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,
securityManager)
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf)
ms.start(conf.get(METRICS_STATIC_SOURCES_ENABLED))
ms
}
Expand Down
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.broadcast

import scala.reflect.ClassTag

import org.apache.spark.SecurityManager
import org.apache.spark.SparkConf

/**
Expand All @@ -29,7 +28,7 @@ import org.apache.spark.SparkConf
*/
private[spark] trait BroadcastFactory {

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
def initialize(isDriver: Boolean, conf: SparkConf): Unit

/**
* Creates a new broadcast variable.
Expand Down
Expand Up @@ -24,15 +24,12 @@ import scala.reflect.ClassTag

import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.internal.Logging

private[spark] class BroadcastManager(
val isDriver: Boolean,
conf: SparkConf,
securityManager: SecurityManager)
extends Logging {
val isDriver: Boolean, conf: SparkConf) extends Logging {

private var initialized = false
private var broadcastFactory: BroadcastFactory = null
Expand All @@ -44,7 +41,7 @@ private[spark] class BroadcastManager(
synchronized {
if (!initialized) {
broadcastFactory = new TorrentBroadcastFactory
broadcastFactory.initialize(isDriver, conf, securityManager)
broadcastFactory.initialize(isDriver, conf)
initialized = true
}
}
Expand Down
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.broadcast

import scala.reflect.ClassTag

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.SparkConf

/**
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
Expand All @@ -28,8 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

override def initialize(isDriver: Boolean, conf: SparkConf,
securityMgr: SecurityManager): Unit = { }
override def initialize(isDriver: Boolean, conf: SparkConf): Unit = { }

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
Expand Down
Expand Up @@ -44,8 +44,7 @@ private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {
protected val masterMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.SHUFFLE_SERVICE,
sparkConf, securityManager)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.SHUFFLE_SERVICE, sparkConf)

private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
Expand Down
101 changes: 51 additions & 50 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Expand Up @@ -87,9 +87,9 @@ private[deploy] class Master(
Utils.checkHost(address.host)

private val masterMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.MASTER, conf, securityMgr)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.MASTER, conf)
private val applicationMetricsSystem =
MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATIONS, conf, securityMgr)
MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATIONS, conf)
private val masterSource = new MasterSource(this)

// After onStart, webUi will be set
Expand Down Expand Up @@ -310,54 +310,6 @@ private[deploy] class Master(
schedule()
}

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) =>
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state

if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None))

if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
// We also don't count failures from decommissioned workers since they are "expected."
if (!normalExit
&& oldState != ExecutorState.DECOMMISSIONED
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
schedule()
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}

case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
Expand Down Expand Up @@ -550,6 +502,55 @@ private[deploy] class Master(
} else {
context.reply(0)
}

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) =>
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state

if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None))

if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
// We also don't count failures from decommissioned workers since they are "expected."
if (!normalExit
&& oldState != ExecutorState.DECOMMISSIONED
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
schedule()
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
context.reply(true)
}

override def onDisconnected(address: RpcAddress): Unit = {
Expand Down

0 comments on commit 21bfce8

Please sign in to comment.