Skip to content

Commit

Permalink
Added a new config called MAX_FETCH_FAILURES_PER_NODE
Browse files Browse the repository at this point in the history
This config defines the threshold after which a node should be blacklisted.
  • Loading branch information
ankuriitg committed Mar 26, 2019
1 parent 9751318 commit a4c77c7
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,11 @@ package object config {
ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
.booleanConf
.createWithDefault(true)

private[spark] val MAX_FETCH_FAILURES_PER_NODE =
ConfigBuilder("spark.blacklist.application.maxFetchFailuresPerNode")
.intConf
.createWithDefault(2)
// End blacklist confs

private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[scheduler] class BlacklistTracker (
BlacklistTracker.validateBlacklistConfs(conf)
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
private val MAX_FETCH_FAILURES_PER_NODE = conf.get(config.MAX_FETCH_FAILURES_PER_NODE)
val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
private val BLACKLIST_FETCH_FAILURE_ENABLED = conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED)

Expand Down Expand Up @@ -220,7 +221,7 @@ private[scheduler] class BlacklistTracker (
fetchFailuresOnHost.dropFailuresWithTimeoutBefore(now)
val totalFailures = fetchFailuresOnHost.numUniqueTaskFailures

if (totalFailures >= MAX_FAILED_EXEC_PER_NODE &&
if (totalFailures >= MAX_FETCH_FAILURES_PER_NODE &&
!nodeIdToBlacklistExpiryTime.contains(host)) {
logInfo(s"Blacklisting node $host due to fetch failure of external shuffle service")
nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
})

conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
conf.set(config.MAX_FAILED_EXEC_PER_NODE, 1)
conf.set(config.MAX_FETCH_FAILURES_PER_NODE, 1)
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)

// Disable auto-kill. Blacklist an executor and make sure killExecutors is not called.
Expand Down Expand Up @@ -610,7 +610,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)

// Ensure MAX_FAILED_EXEC_PER_NODE config is used for blacklisting
conf.set(config.MAX_FAILED_EXEC_PER_NODE, 2)
conf.set(config.MAX_FETCH_FAILURES_PER_NODE, 2)
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
clock.advance(1000)
blacklist.updateBlacklistForFetchFailure(1, 1, 1, "hostB", exec = "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
.set(config.BLACKLIST_ENABLED, true)
.set(config.SHUFFLE_SERVICE_ENABLED, true)
.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
.set(config.MAX_FAILED_EXEC_PER_NODE, 1)
.set(config.MAX_FETCH_FAILURES_PER_NODE, 1)
sc = new SparkContext("local", "test", conf)
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
Expand Down

0 comments on commit a4c77c7

Please sign in to comment.