Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13232][YARN] Fix executor node label #11129

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,14 @@ private[yarn] class YarnAllocator(
nodes: Array[String],
racks: Array[String]): ContainerRequest = {
nodeLabelConstructor.map { constructor =>
val labelExp = if ((racks != null && (!racks.isEmpty))
|| (nodes != null && (!nodes.isEmpty))) {
null
} else {
labelExpression.orNull
}
constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
labelExpression.orNull)
labelExp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, currently in your implementation if nodes or racks is not empty, label expression will not be worked even it is explicitly set through configuration.

IMO I would choose to set nodes and racks to null if label expression is configured, otherwise user will be confused why explicitly setting lab expression is not worked.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had this idea before, but imo if totally disregard the data locality may cause a lot of network overhead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally if user set this label expression configuration, they want the label-based scheduling obviously. But in your implementation you silently disable this, this will make user confuse. Also since label-based scheduling cannot be worked with locality preferences in YARN side (I think it is intentionally), it would be better to ignore the locality information here.

}.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
override def equals(other: Any): Boolean = false
}

def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
def createAllocator(
maxExecutors: Int = 5,
executorNodeLabel: Option[String] = None): YarnAllocator = {
val args = Array(
"--executor-cores", "5",
"--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
val sparkConfClone = sparkConf.clone()
sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
executorNodeLabel.foreach(sparkConfClone.set("spark.yarn.executor.nodeLabelExpression", _))
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
Expand Down Expand Up @@ -272,4 +275,22 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}

test("request executors with locality") {
val handler = createAllocator(1, Some("label"))
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (1)

handler.requestTotalExecutorsWithPreferredLocalities(3, 20, Map(("host1", 10), ("host2", 20)))
handler.updateResourceRequests()
handler.getPendingAllocate.size should be (3)

val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))

handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
}
}