Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler #634

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkContext.getConf)
}
}
} finally {
// in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
// so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
}

Expand All @@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All executors have launched.")

Expand Down Expand Up @@ -411,24 +402,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}

object ApplicationMaster extends Logging {
// Number of times to wait for the allocator loop to complete.
// Each loop iteration waits for 100ms, so maximum of 3 seconds.
// This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
private val ALLOCATE_HEARTBEAT_INTERVAL = 100

def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.synchronized {
// to wake threads off wait ...
yarnAllocatorLoop.notifyAll()
}
}
}

private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()

def register(master: ApplicationMaster) {
Expand All @@ -437,7 +414,6 @@ object ApplicationMaster extends Logging {

val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null /* initialValue */)
val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)

def sparkContextInitialized(sc: SparkContext): Boolean = {
var modified = false
Expand Down Expand Up @@ -472,21 +448,6 @@ object ApplicationMaster extends Logging {
modified
}


/**
* Returns when we've either
* 1) received all the requested executors,
* 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
* 3) hit an error that causes us to terminate trying to get containers.
*/
def waitForInitialAllocations() {
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}
}

def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new ApplicationMasterArguments(argStrings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,4 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
val retval = YarnAllocationHandler.lookupRack(conf, host)
if (retval != null) Some(retval) else None
}

override def postStartHook() {

super.postStartHook()
// The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
// TODO It needn't after waitBackendReady
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ private[spark] class YarnClientSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {

if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
minRegisteredRatio = 0.8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why minRegisteredRatio is 0.8 and is not 1.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real reason other then it might take longer to get 100%. Its just kind of a number we choose to hopefully give the user a good experience without having to wait to long if the cluster is really busy. The user can change it if they want.

ready = false
}

var client: Client = null
var appId: ApplicationId = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
}

override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
if (sparkContextInitialized){
ApplicationMaster.waitForInitialAllocations()
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
// TODO It needn't after waitBackendReady
Thread.sleep(3000L)
}
logInfo("YarnClusterScheduler.postStartHook done")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ private[spark] class YarnClusterSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {

if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
minRegisteredRatio = 0.8
ready = false
}

override def start() {
super.start()
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkContext.getConf)
}
}
} finally {
// In case of exceptions, etc - ensure that the loop in
// ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.doneWithInitialAllocations()
}
}

Expand All @@ -254,16 +250,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) {
ApplicationMaster.doneWithInitialAllocations()
}
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
iters += 1
}
} finally {
// In case of exceptions, etc - ensure that the loop in
// ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.doneWithInitialAllocations()
}
logInfo("All executors have launched.")
}
Expand Down Expand Up @@ -365,33 +354,15 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}

object ApplicationMaster extends Logging {
// Number of times to wait for the allocator loop to complete.
// Each loop iteration waits for 100ms, so maximum of 3 seconds.
// This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
private val ALLOCATE_HEARTBEAT_INTERVAL = 100

private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()

val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)

// Variable used to notify the YarnClusterScheduler that it should stop waiting
// for the initial set of executors to be started and get on with its business.
val doneWithInitialAllocationsMonitor = new Object()

@volatile var isDoneWithInitialAllocations = false

def doneWithInitialAllocations() {
isDoneWithInitialAllocations = true
doneWithInitialAllocationsMonitor.synchronized {
// to wake threads off wait ...
doneWithInitialAllocationsMonitor.notifyAll()
}
}

def register(master: ApplicationMaster) {
applicationMasters.add(master)
}
Expand Down Expand Up @@ -434,20 +405,6 @@ object ApplicationMaster extends Logging {
modified
}

/**
* Returns when we've either
* 1) received all the requested executors,
* 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
* 3) hit an error that causes us to terminate trying to get containers.
*/
def waitForInitialAllocations() {
doneWithInitialAllocationsMonitor.synchronized {
while (!isDoneWithInitialAllocations) {
doneWithInitialAllocationsMonitor.wait(1000L)
}
}
}

def getApplicationAttemptId(): ApplicationAttemptId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
val containerId = ConverterUtils.toContainerId(containerIdString)
Expand Down