Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26867][YARN] Spark Support of YARN Placement Constraint #32804

Closed
wants to merge 6 commits into from

Conversation

AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Jun 7, 2021

What changes were proposed in this pull request?

Support YARN Placement Constraint, In this pr:

  • Add LocalityPreferredSchedulingRequestContainerPlacementStrategy for compute locality for SchedulingRequest
  • Add ContainerImpl for UT, since the interface Container's common API can't support setAllocationRequestId and getAllocationRequestId
  • Add YarnSchedulingRequestAllocator as the implement of allocator when use Placement Constraint.

Why are the changes needed?

Spark can allow users to configure the Placement Constraint so that users will have more control on where the executors will get placed. For example:

  1. Spark job wants to be run on machines where Python version is x or Java version is y (Node Attributes)
  2. Spark job needs / does not need executors to be placed on machine where Hbase RegionServer / Zookeeper / Or any other Service is running. (Affinity / Anti Affinity)
  3. Spark job wants no more than 2 of it's executors on same node (Cardinality)
  4. Spark Job A executors wants / does not want to be run on where Spark Job / Any Other Job B containers runs (Application_Tag NameSpace)

Does this PR introduce any user-facing change?

Use can set spark.yarn.schedulingRequestEnabled to use YARN placement constraint and set spark.yarn.executor.nodeAttributes to set constraint.

How was this patch tested?

Added UT and manuel tested this in yarn cluser:

set node attribute

/usr/share/yarn-3/bin/yarn nodeattributes -attributestonodes -attributes attr1
Hostname Attribute-value  rm.yarn.io/attr1 :ip-xxxx              1

/usr/share/yarn-3/bin/yarn nodeattributes -attributestonodes -attributes attr2
Hostname Attribute-value  rm.yarn.io/attr2 :ip-xxx
spark command

export SPARK_CONF_DIR=/tmp/spark-conf-3.1.1
export SPARK_HOME=/tmp/spark-3.1.1
/tmp/spark-3.1.1/bin/spark-sql --queue infra \
--executor-memory 1g --executor-cores 1  \
--conf spark.yarn.schedulingRequestEnabled=true  \
--conf spark.executor.instances=1 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=2 \
--conf spark.dynamicAllocation.minExecutors=0    \
--conf spark.dynamicAllocation.executorIdleTimeout=10s \
--conf spark.yarn.executor.nodeAttribute='attr1=1'

Under dynamic allocation, can work as expect, all allocated container is under NM ip-xxx,
if we set node attribute as attr1=2, we can't allocate any executor container.

@github-actions github-actions bot added the YARN label Jun 7, 2021
@SparkQA
Copy link

SparkQA commented Jun 7, 2021

Test build #139410 has finished for PR 32804 at commit c80b66f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 7, 2021

Test build #139414 has finished for PR 32804 at commit 4ef8f35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43932/

@SparkQA
Copy link

SparkQA commented Jun 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43936/

@SparkQA
Copy link

SparkQA commented Jun 7, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43932/

@SparkQA
Copy link

SparkQA commented Jun 7, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43936/

@mridulm
Copy link
Contributor

mridulm commented Jun 7, 2021

+CC @otterc, @venkata91

@SparkQA
Copy link

SparkQA commented Jun 8, 2021

Test build #139445 has finished for PR 32804 at commit ec9e127.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43968/

@SparkQA
Copy link

SparkQA commented Jun 8, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43968/

}

val expression = (nodeAttributes.isDefined, locality.isDefined) match {
case (true, true) => Some(or(and(nodeAttributes.get, locality.get), nodeAttributes.get))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should use below constraint is better.

delayedOr(
timedClockConstraint(and(nodeAttributes, locality), delayedOrIntervalMilliseconds, TimeUnit.MILLISECONDS),
timedClockConstraint(nodeAttributes, delayedOrIntervalMilliseconds * 2, TimeUnit.MILLISECONDS)))

But it always stuck when test in yarn-3.3.0 cluster. Hope for help at this part. It's hard to balance node attribute constraint and locality requirement.

@SparkQA
Copy link

SparkQA commented Jun 9, 2021

Test build #139553 has finished for PR 32804 at commit 2cd9e69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 9, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44079/

@tgravescs
Copy link
Contributor

thanks for working on this, it looks very interesting

Add LocalityPreferredSchedulingRequestContainerPlacementStrategy for compute locality for SchedulingRequest

Can you add more description about this? This seems like a lot of changes and not what I expected from the description. I was expecting us just to pass the node attributes along to yarn but this is much more then that so please describe in detail how this is working. How does this work exactly with constraint vs data locality? Is there some sort of timed wait or error handling if it never becomes available or cluster doesn't have node with that attribute? You had like 4 examples of things you could do, but are there more, what dependencies does it need. How does an attribute allow you to get cardinality or affinity?

is there any constraints on Hadoop version? (I thought it was just introduced in 3.2.0)

eventually the documentation .md file would need to be updated.

@@ -27,6 +27,24 @@ import org.apache.spark.network.util.ByteUnit
package object config extends Logging {

/* Common app configuration. */
private[spark] val SCHEDULING_REQUEST_ENABLED =
ConfigBuilder("spark.yarn.schedulingRequestEnabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to come up with better name and more description. From just reading this, I have no idea what it does. spark always requests containers from yarn to schedule on, so why is it off...(ie that is what I read from the config name)



/**
* This strategy is calculating the optimal locality preferences of YARN containers by considering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a direct copy of the ContainerLocalityPreferences javadoc, I'm assuming this is different so please update description, is I missed the difference, I think we need to pull it up to the top to be clear how its different

hostToLocalTaskCount: Map[String, Int],
allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
localityMatchedPendingAllocations: Seq[SchedulingRequest],
schedulingRequestToNodes: mutable.HashMap[Long, Array[String]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not documented

@github-actions
Copy link

github-actions bot commented Oct 1, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 1, 2021
@github-actions github-actions bot closed this Oct 2, 2021
@zuston
Copy link
Member

zuston commented Aug 17, 2023

Thanks for your effort. Any update on this? @AngersZhuuuu

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants