Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-20629-copy-shuffle-data-when-nodes-a…
Browse files Browse the repository at this point in the history
…re-being-shutdown-cleaned-up
  • Loading branch information
holdenk committed Jul 17, 2020
2 parents 2467732 + 0678afe commit 16b7376
Show file tree
Hide file tree
Showing 210 changed files with 6,158 additions and 3,514 deletions.
32 changes: 18 additions & 14 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,38 +117,42 @@ jobs:
java-version: ${{ matrix.java }}
# PySpark
- name: Install PyPy3
# SQL component also has Python related tests, for example, IntegratedUDFTestUtils.
# Note that order of Python installations here matters because default python3 is
# overridden by pypy3.
uses: actions/setup-python@v2
if: contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
if: contains(matrix.modules, 'pyspark')
with:
python-version: pypy3
architecture: x64
- name: Install Python 2.7
- name: Install Python 3.6
uses: actions/setup-python@v2
if: contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
if: contains(matrix.modules, 'pyspark')
with:
python-version: 2.7
python-version: 3.6
architecture: x64
- name: Install Python 3.6
- name: Install Python 3.8
uses: actions/setup-python@v2
# Yarn has a Python specific test too, for example, YarnClusterSuite.
# We should install one Python that is higher then 3+ for SQL and Yarn because:
# - SQL component also has Python related tests, for example, IntegratedUDFTestUtils.
# - Yarn has a Python specific test too, for example, YarnClusterSuite.
if: contains(matrix.modules, 'yarn') || contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
with:
python-version: 3.6
python-version: 3.8
architecture: x64
- name: Install Python packages
if: contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
- name: Install Python packages (Python 3.6 and PyPy3)
if: contains(matrix.modules, 'pyspark')
# PyArrow is not supported in PyPy yet, see ARROW-2651.
# TODO(SPARK-32247): scipy installation with PyPy fails for an unknown reason.
run: |
python3 -m pip install numpy pyarrow pandas scipy
python3 -m pip list
python2 -m pip install numpy pyarrow pandas scipy
python2 -m pip list
python3.6 -m pip install numpy pyarrow pandas scipy
python3.6 -m pip list
pypy3 -m pip install numpy pandas
pypy3 -m pip list
- name: Install Python packages (Python 3.8)
if: contains(matrix.modules, 'pyspark') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
run: |
python3.8 -m pip install numpy pyarrow pandas scipy
python3.8 -m pip list
# SparkR
- name: Install R 3.6
uses: r-lib/actions/setup-r@v1
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ BSD 3-Clause
------------

python/lib/py4j-*-src.zip
python/pyspark/cloudpickle.py
python/pyspark/cloudpickle/*.py
python/pyspark/join.py
core/src/main/resources/org/apache/spark/ui/static/d3.min.js

Expand Down
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ test_that("utility function can be called", {
expect_true(TRUE)
})

test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
test_that("getClientModeSparkSubmitOpts() returns spark-submit args from allowList", {
e <- new.env()
e[["spark.driver.memory"]] <- "512m"
ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -3921,14 +3921,14 @@ test_that("No extra files are created in SPARK_HOME by starting session and maki
# before creating a SparkSession with enableHiveSupport = T at the top of this test file
# (filesBefore). The test here is to compare that (filesBefore) against the list of files before
# any test is run in run-all.R (sparkRFilesBefore).
# sparkRWhitelistSQLDirs is also defined in run-all.R, and should contain only 2 whitelisted dirs,
# sparkRAllowedSQLDirs is also defined in run-all.R, and should contain only 2 allowed dirs,
# here allow the first value, spark-warehouse, in the diff, everything else should be exactly the
# same as before any test is run.
compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRWhitelistSQLDirs[[1]]))
compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRAllowedSQLDirs[[1]]))
# third, ensure only spark-warehouse and metastore_db are created when enableHiveSupport = T
# note: as the note above, after running all tests in this file while enableHiveSupport = T, we
# check the list of files again. This time we allow both whitelisted dirs to be in the diff.
compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRWhitelistSQLDirs))
# check the list of files again. This time we allow both dirs to be in the diff.
compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRAllowedSQLDirs))
})

unlink(parquetPath)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/tests/run-all.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
install.spark(overwrite = TRUE)

sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRWhitelistSQLDirs,
sparkRAllowedSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRAllowedSQLDirs,
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ server will be able to understand. This will cause the server to close the conne
attacker tries to send any command to the server. The attacker can just hold the channel open for
some time, which will be closed when the server times out the channel. These issues could be
separately mitigated by adding a shorter timeout for the first message after authentication, and
potentially by adding host blacklists if a possible attack is detected from a particular host.
potentially by adding host reject-lists if a possible attack is detected from a particular host.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.continuous.shuffle
package org.apache.spark.status.api.v1;

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.util.EnumUtil;

/**
* Trait for writing to a continuous processing shuffle.
*/
trait ContinuousShuffleWriter {
def write(epoch: Iterator[UnsafeRow]): Unit
public enum TaskStatus {
RUNNING,
KILLED,
FAILED,
SUCCESS,
UNKNOWN;

public static TaskStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(TaskStatus.class, str);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ private[deploy] object DeployMessages {

case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage

/**
* Used by the MasterWebUI to request the master to decommission all workers that are active on
* any of the given hostnames.
* @param hostnames: A list of hostnames without the ports. Like "localhost", "foo.bar.com" etc
*/
case class DecommissionWorkersOnHosts(hostnames: Seq[String])

// Master to Worker

sealed trait RegisterWorkerResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,23 +188,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
processing.remove(path.getName)
}

private val blacklist = new ConcurrentHashMap[String, Long]
private val inaccessibleList = new ConcurrentHashMap[String, Long]

// Visible for testing
private[history] def isBlacklisted(path: Path): Boolean = {
blacklist.containsKey(path.getName)
private[history] def isAccessible(path: Path): Boolean = {
!inaccessibleList.containsKey(path.getName)
}

private def blacklist(path: Path): Unit = {
blacklist.put(path.getName, clock.getTimeMillis())
private def markInaccessible(path: Path): Unit = {
inaccessibleList.put(path.getName, clock.getTimeMillis())
}

/**
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
* Removes expired entries in the inaccessibleList, according to the provided
* `expireTimeInSeconds`.
*/
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
private def clearInaccessibleList(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
inaccessibleList.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
}

private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()
Expand Down Expand Up @@ -470,7 +471,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")

val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry => !isBlacklisted(entry.getPath) }
.filter { entry => isAccessible(entry.getPath) }
.filter { entry => !isProcessing(entry.getPath) }
.flatMap { entry => EventLogFileReader(fs, entry) }
.filter { reader =>
Expand Down Expand Up @@ -687,8 +688,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case e: AccessControlException =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log $rootPath", e)
blacklist(rootPath)
// SPARK-28157 We should remove this blacklisted entry from the KVStore
markInaccessible(rootPath)
// SPARK-28157 We should remove this inaccessible entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], rootPath.toString)
case e: Exception =>
Expand Down Expand Up @@ -956,8 +957,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
// Clean the inaccessibleList from the expired entries.
clearInaccessibleList(CLEAN_INTERVAL_S)
}

private def deleteAttemptLogs(
Expand Down Expand Up @@ -1334,7 +1335,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private def deleteLog(fs: FileSystem, log: Path): Boolean = {
var deleted = false
if (isBlacklisted(log)) {
if (!isAccessible(log)) {
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
} else {
try {
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import java.util.{Date, Locale}
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.mutable
import scala.util.Random
import scala.util.control.NonFatal

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil}
Expand Down Expand Up @@ -525,6 +527,13 @@ private[deploy] class Master(
case KillExecutors(appId, executorIds) =>
val formattedExecutorIds = formatExecutorIds(executorIds)
context.reply(handleKillExecutors(appId, formattedExecutorIds))

case DecommissionWorkersOnHosts(hostnames) =>
if (state != RecoveryState.STANDBY) {
context.reply(decommissionWorkersOnHosts(hostnames))
} else {
context.reply(0)
}
}

override def onDisconnected(address: RpcAddress): Unit = {
Expand Down Expand Up @@ -863,6 +872,34 @@ private[deploy] class Master(
true
}

/**
* Decommission all workers that are active on any of the given hostnames. The decommissioning is
* asynchronously done by enqueueing WorkerDecommission messages to self. No checks are done about
* the prior state of the worker. So an already decommissioned worker will match as well.
*
* @param hostnames: A list of hostnames without the ports. Like "localhost", "foo.bar.com" etc
*
* Returns the number of workers that matched the hostnames.
*/
private def decommissionWorkersOnHosts(hostnames: Seq[String]): Integer = {
val hostnamesSet = hostnames.map(_.toLowerCase(Locale.ROOT)).toSet
val workersToRemove = addressToWorker
.filterKeys(addr => hostnamesSet.contains(addr.host.toLowerCase(Locale.ROOT)))
.values

val workersToRemoveHostPorts = workersToRemove.map(_.hostPort)
logInfo(s"Decommissioning the workers with host:ports ${workersToRemoveHostPorts}")

// The workers are removed async to avoid blocking the receive loop for the entire batch
workersToRemove.foreach(wi => {
logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}")
self.send(WorkerDecommission(wi.id, wi.endpoint))
})

// Return the count of workers actually removed
workersToRemove.size
}

private def decommissionWorker(worker: WorkerInfo): Unit = {
if (worker.state != WorkerState.DECOMMISSIONED) {
logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.spark.deploy.master.ui

import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import java.net.{InetAddress, NetworkInterface, SocketException}
import java.util.Locale
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
Expand All @@ -36,6 +41,7 @@ class MasterWebUI(

val masterEndpointRef = master.self
val killEnabled = master.conf.get(UI_KILL_ENABLED)
val decommissionAllowMode = master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE)

initialize()

Expand All @@ -49,6 +55,27 @@ class MasterWebUI(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
attachHandler(createServletHandler("/workers/kill", new HttpServlet {
override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
val hostnames: Seq[String] = Option(req.getParameterValues("host"))
.getOrElse(Array[String]()).toSeq
if (!isDecommissioningRequestAllowed(req)) {
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
} else {
val removedWorkers = masterEndpointRef.askSync[Integer](
DecommissionWorkersOnHosts(hostnames))
logInfo(s"Decommissioning of hosts $hostnames decommissioned $removedWorkers workers")
if (removedWorkers > 0) {
resp.setStatus(HttpServletResponse.SC_OK)
} else if (removedWorkers == 0) {
resp.sendError(HttpServletResponse.SC_NOT_FOUND)
} else {
// We shouldn't even see this case.
resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
}
}
}
}, ""))
}

def addProxy(): Unit = {
Expand All @@ -64,6 +91,25 @@ class MasterWebUI(
maybeWorkerUiAddress.orElse(maybeAppUiAddress)
}

private def isLocal(address: InetAddress): Boolean = {
if (address.isAnyLocalAddress || address.isLoopbackAddress) {
return true
}
try {
NetworkInterface.getByInetAddress(address) != null
} catch {
case _: SocketException => false
}
}

private def isDecommissioningRequestAllowed(req: HttpServletRequest): Boolean = {
decommissionAllowMode match {
case "ALLOW" => true
case "LOCAL" => isLocal(InetAddress.getByName(req.getRemoteAddr))
case _ => false
}
}

}

private[master] object MasterWebUI {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ private[spark] object RestSubmissionClient {

// SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong
// on the remote machine (SPARK-12345) (SPARK-25934)
private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
private val EXCLUDED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
private val REPORT_DRIVER_STATUS_INTERVAL = 1000
private val REPORT_DRIVER_STATUS_MAX_TRIES = 10
val PROTOCOL_VERSION = "v1"
Expand All @@ -421,7 +421,7 @@ private[spark] object RestSubmissionClient {
*/
private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = {
env.filterKeys { k =>
(k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
(k.startsWith("SPARK_") && !EXCLUDED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
}.toMap
}

Expand Down
Loading

0 comments on commit 16b7376

Please sign in to comment.