Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNAP-2231] Limit maximum cores for a job to physical cores on a node #972

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sumwale
Copy link
Contributor

@sumwale sumwale commented Feb 27, 2018

See some details in the JIRA https://jira.snappydata.io/browse/SNAP-2231

These changes limit the maximum cores given to a job to the physical cores on a machine.
With the default of (2 * physical cores) in the cluster, this allows other cores to be free
for any other concurrent jobs. Especially important for short point-lookup queries.

Additionally these improve performance for disk intensive queries. For example measured
a 30-50% improvement in performance in TPCH load and some queries when cores were
limited to physical cores and lot of data has overflowed to disk.

Question: should the default cores in ExecutorInitiator be increased to (4 * physical cores)
to allow for more concurrency?

Changes proposed in this pull request

  • overrides in SnappyTaskSchedulerImpl to track per executor cores used by a job
    and cap it to number of physical cores on a node
  • combined some maps in TaskSchedulerImpl to recover performance due to above
    and improve further compared to base TaskSchedulerImpl
  • property "spark.scheduler.limitJobCores=false" can be set to revert to previous behaviour

Patch testing

precheckin -Pstore -Pspark

TODO: working on porting Spark's TaskScheduler unit tests

ReleaseNotes.txt changes

document the new property and behaviour

Other PRs

TIBCOSoftware/snappy-spark#96

- overrides in SnappyTaskSchedulerImpl to track per executor cores used by a job
  and cap it to number of physical cores on a node
- combined some maps in TaskSchedulerImpl to recover performance due to above
  and improve further compared to base TaskSchedulerImpl
- property "spark.scheduler.limitJobCores=false" can be set to revert to previous behaviour
Copy link
Contributor

@rishitesh rishitesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments and clarifications sought.

}
sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].addBlockId(executorId, blockId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SnappyTaskSchedulerImpl.addBlockId() method has a condition blockId.numProcessors < blockId.executorCores. From here it will never be satisfied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "case None" is for a corner one where blockManager gets added before executor. For normal cases onExecutorAdded will be invoked first where number of physical cores have been properly initialized so addBlockId will work fine. Will add the handling for that case in onExecutorAdded and invoke addBlockId from the Some() match case there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will also add removal in onExecutorRemoved.

private val lookupExecutorCores = new ToLongFunction[String] {
override def applyAsLong(executorId: String): Long = {
maxExecutorTaskCores.get(executorId) match {
case null => Int.MaxValue // no restriction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not defaultParallelism be better than Int.maxVal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null means that cores defined for executor are less than or equal to physical cores on the machine, or limit job has been explicitly disabled. Both cases imply the same thing that is don't put any limits on tasks on a node so this essentially falls back to Spark's TaskSchedulerImpl behaviour.

val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val (stageAvailableCores, stageTaskSets) = stageCoresAndAttempts.computeIfAbsent(
stage, createNewStageMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not be setting the manager for the stageSet. I can see
stageTaskSets(taskSet.stageAttemptId) = manager in original TaskSchedulerImpl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes done below in line number 112.

taskIdExecutorAndManager.justPut(tid, execId -> taskSet)
executorIdToRunningTaskIds(execId).add(tid)
if (availableCores ne null) {
availableCores.addValue(execId, -CPUS_PER_TASK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put an assertion similar to assert(availableCpus(i) >= 0) ?
We might catch some of the erroneous updates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants