Skip to content

Commit

Permalink
[SPARK-32036] Replace references to blacklist/whitelist language with…
Browse files Browse the repository at this point in the history
… more appropriate terminology, excluding the blacklisting feature

### What changes were proposed in this pull request?

This PR will remove references to these "blacklist" and "whitelist" terms besides the blacklisting feature as a whole, which can be handled in a separate JIRA/PR.

This touches quite a few files, but the changes are straightforward (variable/method/etc. name changes) and most quite self-contained.

### Why are the changes needed?

As per discussion on the Spark dev list, it will be beneficial to remove references to problematic language that can alienate potential community members. One such reference is "blacklist" and "whitelist". While it seems to me that there is some valid debate as to whether these terms have racist origins, the cultural connotations are inescapable in today's world.

### Does this PR introduce _any_ user-facing change?

In the test file `HiveQueryFileTest`, a developer has the ability to specify the system property `spark.hive.whitelist` to specify a list of Hive query files that should be tested. This system property has been renamed to `spark.hive.includelist`. The old property has been kept for compatibility, but will log a warning if used. I am open to feedback from others on whether keeping a deprecated property here is unnecessary given that this is just for developers running tests.

### How was this patch tested?

Existing tests should be suitable since no behavior changes are expected as a result of this PR.

Closes #28874 from xkrogen/xkrogen-SPARK-32036-rename-blacklists.

Authored-by: Erik Krogen <ekrogen@linkedin.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
  • Loading branch information
xkrogen authored and tgravescs committed Jul 15, 2020
1 parent 8950dcb commit cf22d94
Show file tree
Hide file tree
Showing 52 changed files with 231 additions and 219 deletions.
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_context.R
Expand Up @@ -139,7 +139,7 @@ test_that("utility function can be called", {
expect_true(TRUE)
})

test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
test_that("getClientModeSparkSubmitOpts() returns spark-submit args from allowList", {
e <- new.env()
e[["spark.driver.memory"]] <- "512m"
ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Expand Up @@ -3921,14 +3921,14 @@ test_that("No extra files are created in SPARK_HOME by starting session and maki
# before creating a SparkSession with enableHiveSupport = T at the top of this test file
# (filesBefore). The test here is to compare that (filesBefore) against the list of files before
# any test is run in run-all.R (sparkRFilesBefore).
# sparkRWhitelistSQLDirs is also defined in run-all.R, and should contain only 2 whitelisted dirs,
# sparkRAllowedSQLDirs is also defined in run-all.R, and should contain only 2 allowed dirs,
# here allow the first value, spark-warehouse, in the diff, everything else should be exactly the
# same as before any test is run.
compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRWhitelistSQLDirs[[1]]))
compare_list(sparkRFilesBefore, setdiff(filesBefore, sparkRAllowedSQLDirs[[1]]))
# third, ensure only spark-warehouse and metastore_db are created when enableHiveSupport = T
# note: as the note above, after running all tests in this file while enableHiveSupport = T, we
# check the list of files again. This time we allow both whitelisted dirs to be in the diff.
compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRWhitelistSQLDirs))
# check the list of files again. This time we allow both dirs to be in the diff.
compare_list(sparkRFilesBefore, setdiff(filesAfter, sparkRAllowedSQLDirs))
})

unlink(parquetPath)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/tests/run-all.R
Expand Up @@ -35,8 +35,8 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
install.spark(overwrite = TRUE)

sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRWhitelistSQLDirs,
sparkRAllowedSQLDirs <- c("spark-warehouse", "metastore_db")
invisible(lapply(sparkRAllowedSQLDirs,
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)

Expand Down
Expand Up @@ -155,4 +155,4 @@ server will be able to understand. This will cause the server to close the conne
attacker tries to send any command to the server. The attacker can just hold the channel open for
some time, which will be closed when the server times out the channel. These issues could be
separately mitigated by adding a shorter timeout for the first message after authentication, and
potentially by adding host blacklists if a possible attack is detected from a particular host.
potentially by adding host reject-lists if a possible attack is detected from a particular host.
Expand Up @@ -188,23 +188,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
processing.remove(path.getName)
}

private val blacklist = new ConcurrentHashMap[String, Long]
private val inaccessibleList = new ConcurrentHashMap[String, Long]

// Visible for testing
private[history] def isBlacklisted(path: Path): Boolean = {
blacklist.containsKey(path.getName)
private[history] def isAccessible(path: Path): Boolean = {
!inaccessibleList.containsKey(path.getName)
}

private def blacklist(path: Path): Unit = {
blacklist.put(path.getName, clock.getTimeMillis())
private def markInaccessible(path: Path): Unit = {
inaccessibleList.put(path.getName, clock.getTimeMillis())
}

/**
* Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
* Removes expired entries in the inaccessibleList, according to the provided
* `expireTimeInSeconds`.
*/
private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
private def clearInaccessibleList(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
inaccessibleList.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
}

private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()
Expand Down Expand Up @@ -470,7 +471,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")

val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry => !isBlacklisted(entry.getPath) }
.filter { entry => isAccessible(entry.getPath) }
.filter { entry => !isProcessing(entry.getPath) }
.flatMap { entry => EventLogFileReader(fs, entry) }
.filter { reader =>
Expand Down Expand Up @@ -687,8 +688,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case e: AccessControlException =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log $rootPath", e)
blacklist(rootPath)
// SPARK-28157 We should remove this blacklisted entry from the KVStore
markInaccessible(rootPath)
// SPARK-28157 We should remove this inaccessible entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], rootPath.toString)
case e: Exception =>
Expand Down Expand Up @@ -956,8 +957,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
// Clean the inaccessibleList from the expired entries.
clearInaccessibleList(CLEAN_INTERVAL_S)
}

private def deleteAttemptLogs(
Expand Down Expand Up @@ -1334,7 +1335,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private def deleteLog(fs: FileSystem, log: Path): Boolean = {
var deleted = false
if (isBlacklisted(log)) {
if (!isAccessible(log)) {
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
} else {
try {
Expand Down
Expand Up @@ -411,7 +411,7 @@ private[spark] object RestSubmissionClient {

// SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong
// on the remote machine (SPARK-12345) (SPARK-25934)
private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
private val EXCLUDED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
private val REPORT_DRIVER_STATUS_INTERVAL = 1000
private val REPORT_DRIVER_STATUS_MAX_TRIES = 10
val PROTOCOL_VERSION = "v1"
Expand All @@ -421,7 +421,7 @@ private[spark] object RestSubmissionClient {
*/
private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = {
env.filterKeys { k =>
(k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
(k.startsWith("SPARK_") && !EXCLUDED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
}.toMap
}

Expand Down
Expand Up @@ -151,7 +151,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " +
s"partition: $partition, attempt: $attemptNumber")
case _ =>
// Mark the attempt as failed to blacklist from future commit protocol
// Mark the attempt as failed to exclude from future commit protocol
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
if (stageState.authorizedCommitters(partition) == taskId) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Expand Up @@ -328,11 +328,11 @@ private[spark] object JsonProtocol {
("Accumulables" -> accumulablesToJson(taskInfo.accumulables))
}

private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses")
private lazy val accumulableExcludeList = Set("internal.metrics.updatedBlockStatuses")

def accumulablesToJson(accumulables: Iterable[AccumulableInfo]): JArray = {
JArray(accumulables
.filterNot(_.name.exists(accumulableBlacklist.contains))
.filterNot(_.name.exists(accumulableExcludeList.contains))
.toList.map(accumulableInfoToJson))
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/ThreadAudit.scala
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
*/
trait ThreadAudit extends Logging {

val threadWhiteList = Set(
val threadExcludeList = Set(
/**
* Netty related internal threads.
* These are excluded because their lifecycle is handled by the netty itself
Expand Down Expand Up @@ -108,7 +108,7 @@ trait ThreadAudit extends Logging {

if (threadNamesSnapshot.nonEmpty) {
val remainingThreadNames = runningThreadNames().diff(threadNamesSnapshot)
.filterNot { s => threadWhiteList.exists(s.matches(_)) }
.filterNot { s => threadExcludeList.exists(s.matches(_)) }
if (remainingThreadNames.nonEmpty) {
logWarning(s"\n\n===== POSSIBLE THREAD LEAK IN SUITE $shortSuiteName, " +
s"thread names: ${remainingThreadNames.mkString(", ")} =====\n")
Expand Down
22 changes: 11 additions & 11 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Expand Up @@ -1210,17 +1210,17 @@ class SparkSubmitSuite
testRemoteResources(enableHttpFs = true)
}

test("force download from blacklisted schemes") {
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("http"))
test("force download from forced schemes") {
testRemoteResources(enableHttpFs = true, forceDownloadSchemes = Seq("http"))
}

test("force download for all the schemes") {
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
testRemoteResources(enableHttpFs = true, forceDownloadSchemes = Seq("*"))
}

private def testRemoteResources(
enableHttpFs: Boolean,
blacklistSchemes: Seq[String] = Nil): Unit = {
forceDownloadSchemes: Seq[String] = Nil): Unit = {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
if (enableHttpFs) {
Expand All @@ -1237,8 +1237,8 @@ class SparkSubmitSuite
val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}"

val forceDownloadArgs = if (blacklistSchemes.nonEmpty) {
Seq("--conf", s"spark.yarn.dist.forceDownloadSchemes=${blacklistSchemes.mkString(",")}")
val forceDownloadArgs = if (forceDownloadSchemes.nonEmpty) {
Seq("--conf", s"spark.yarn.dist.forceDownloadSchemes=${forceDownloadSchemes.mkString(",")}")
} else {
Nil
}
Expand All @@ -1256,19 +1256,19 @@ class SparkSubmitSuite

val jars = conf.get("spark.yarn.dist.jars").split(",").toSet

def isSchemeBlacklisted(scheme: String) = {
blacklistSchemes.contains("*") || blacklistSchemes.contains(scheme)
def isSchemeForcedDownload(scheme: String) = {
forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme)
}

if (!isSchemeBlacklisted("s3")) {
if (!isSchemeForcedDownload("s3")) {
assert(jars.contains(tmpS3JarPath))
}

if (enableHttpFs && blacklistSchemes.isEmpty) {
if (enableHttpFs && forceDownloadSchemes.isEmpty) {
// If Http FS is supported by yarn service, the URI of remote http resource should
// still be remote.
assert(jars.contains(tmpHttpJarPath))
} else if (!enableHttpFs || isSchemeBlacklisted("http")) {
} else if (!enableHttpFs || isSchemeForcedDownload("http")) {
// If Http FS is not supported by yarn service, or http scheme is configured to be force
// downloading, the URI of remote http resource should be changed to a local one.
val jarName = new File(tmpHttpJar.toURI).getName
Expand Down
Expand Up @@ -1117,7 +1117,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}

test("SPARK-24948: blacklist files we don't have read permission on") {
test("SPARK-24948: ignore files we don't have read permission on") {
val clock = new ManualClock(1533132471)
val provider = new FsHistoryProvider(createTestConf(), clock)
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
Expand All @@ -1137,17 +1137,17 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
// Doing 2 times in order to check the blacklist filter too
// Doing 2 times in order to check the inaccessibleList filter too
updateAndCheck(mockedProvider) { list =>
list.size should be(1)
}
val accessDeniedPath = new Path(accessDenied.getPath)
assert(mockedProvider.isBlacklisted(accessDeniedPath))
assert(!mockedProvider.isAccessible(accessDeniedPath))
clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
isReadable = true
mockedProvider.cleanLogs()
updateAndCheck(mockedProvider) { list =>
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
assert(mockedProvider.isAccessible(accessDeniedPath))
assert(list.exists(_.name == "accessDenied"))
assert(list.exists(_.name == "accessGranted"))
list.size should be(2)
Expand Down
14 changes: 9 additions & 5 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Expand Up @@ -48,24 +48,28 @@ import org.apache.spark.util.CallSite

private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {

private val cssWhiteList = List("bootstrap.min.css", "vis-timeline-graph2d.min.css")
/**
* Some libraries have warn/error messages that are too noisy for the tests; exclude them from
* normal error handling to avoid logging these.
*/
private val cssExcludeList = List("bootstrap.min.css", "vis-timeline-graph2d.min.css")

private def isInWhileList(uri: String): Boolean = cssWhiteList.exists(uri.endsWith)
private def isInExcludeList(uri: String): Boolean = cssExcludeList.exists(uri.endsWith)

override def warning(e: CSSParseException): Unit = {
if (!isInWhileList(e.getURI)) {
if (!isInExcludeList(e.getURI)) {
super.warning(e)
}
}

override def fatalError(e: CSSParseException): Unit = {
if (!isInWhileList(e.getURI)) {
if (!isInExcludeList(e.getURI)) {
super.fatalError(e)
}
}

override def error(e: CSSParseException): Unit = {
if (!isInWhileList(e.getURI)) {
if (!isInExcludeList(e.getURI)) {
super.error(e)
}
}
Expand Down
10 changes: 5 additions & 5 deletions dev/sparktestsupport/modules.py
Expand Up @@ -32,7 +32,7 @@ class Module(object):
"""

def __init__(self, name, dependencies, source_file_regexes, build_profile_flags=(), environ={},
sbt_test_goals=(), python_test_goals=(), blacklisted_python_implementations=(),
sbt_test_goals=(), python_test_goals=(), excluded_python_implementations=(),
test_tags=(), should_run_r_tests=False, should_run_build_tests=False):
"""
Define a new module.
Expand All @@ -49,7 +49,7 @@ def __init__(self, name, dependencies, source_file_regexes, build_profile_flags=
module are changed.
:param sbt_test_goals: A set of SBT test goals for testing this module.
:param python_test_goals: A set of Python test goals for testing this module.
:param blacklisted_python_implementations: A set of Python implementations that are not
:param excluded_python_implementations: A set of Python implementations that are not
supported by this module's Python components. The values in this set should match
strings returned by Python's `platform.python_implementation()`.
:param test_tags A set of tags that will be excluded when running unit tests if the module
Expand All @@ -64,7 +64,7 @@ def __init__(self, name, dependencies, source_file_regexes, build_profile_flags=
self.build_profile_flags = build_profile_flags
self.environ = environ
self.python_test_goals = python_test_goals
self.blacklisted_python_implementations = blacklisted_python_implementations
self.excluded_python_implementations = excluded_python_implementations
self.test_tags = test_tags
self.should_run_r_tests = should_run_r_tests
self.should_run_build_tests = should_run_build_tests
Expand Down Expand Up @@ -524,7 +524,7 @@ def __hash__(self):
"pyspark.mllib.tests.test_streaming_algorithms",
"pyspark.mllib.tests.test_util",
],
blacklisted_python_implementations=[
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there
]
)
Expand Down Expand Up @@ -565,7 +565,7 @@ def __hash__(self):
"pyspark.ml.tests.test_tuning",
"pyspark.ml.tests.test_wrapper",
],
blacklisted_python_implementations=[
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there
]
)
Expand Down

0 comments on commit cf22d94

Please sign in to comment.