Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Support HDFS rack locality #350

Conversation

kimoonkim
Copy link
Member

Closes #349 and #206.

@ash211 @foxish

Supports HDFS rack locality by implementing getRackForHost in KubernetesTaskSchedulerImpl

Added unit tests.

Also did manual testing using a dummy topology script that always returns a dummy rack name, "/rack0".

The driver log shows a small number ofRACK_LOCAL tasks, which used to be ANY tasks. (The majority of tasks are still NODE_LOCAL tasks)

2017-06-16 16:56:38 INFO KubernetesTaskSetManager:54 - Starting task 34.0 in stage 0.0 (TID 57, 10.44.0.5, executor 9, partition 34, RACK_LOCAL, 6718 bytes)
2017-06-16 16:56:39 INFO KubernetesTaskSetManager:54 - Starting task 40.0 in stage 0.0 (TID 59, 10.44.0.4, executor 7, partition 40, RACK_LOCAL, 6718 bytes)
2017-06-16 16:56:39 INFO KubernetesTaskSetManager:54 - Starting task 50.0 in stage 0.0 (TID 63, 10.46.0.4, executor 5, partition 50, RACK_LOCAL, 6719 bytes)

The job was HdfsTest.

/usr/local/spark-on-k8s/bin/spark-submit --class org.apache.spark.examples.HdfsTest --conf spark.app.name=spark-hdfstest --conf spark.dynamicAllocation.enabled=false --conf spark.shuffle.service.enabled=false --conf spark.executor.instances=10 --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" local:///opt/spark/examples/jars/spark-examples_2.11-2.1.0-k8s-0.2.0-SNAPSHOT.jar 10GB-txt

spark-defaults.conf specified the dummy topology script:

spark.hadoop.fs.defaultFS hdfs://hdfs-namenode-0.hdfs-namenode.default.svc.cluster.local:8020
spark.hadoop.net.topology.script.file.name /tmp/print_rack.sh

The dummy script print_rack.sh

#!/bin/bash

echo /rack-0

I added the script to the driver docker image manually.

@kimoonkim
Copy link
Member Author

FYI, the unit tests failure seem genuine. I'm looking at it.

@foxish
Copy link
Member

foxish commented Jun 21, 2017

Thanks @kimoonkim! This looks awesome. Looking into this in detail shortly.

Copy link

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @kimoonkim !

isConfigured = checkConfigured(hadoopConfiguration)
// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha yes I've seen this too -- I think Hadoop 2.8.0 lowered this log level. What version of are you testing against?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using 2.7. Good to know 2.8 fixed this (YARN-3350)

private[spark] class KubernetesTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) {
private[spark] class KubernetesTaskSchedulerImpl(
sc: SparkContext,
rackResolverUtil: RackResolverUtil = new RackResolverUtil,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put sc as a parameter into a RackResolverUtil constructor? I'm hoping to be able to get rid of the rackResolverUtil.init method since that class has a two-step initialization step with it (instantiate class, call .init)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks for the suggestion. I wanted to do this earlier but didn't try hard enough :-)


private def getRackForDatanodeOrExecutor(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
val backend = this.backend.asInstanceOf[KubernetesClusterSchedulerBackend]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this cast once in the constructor (outside this method) ? if this is cheap in the JVM JIT then maybe no need to avoid frequent casts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. FYI it appears this has to be done in the separate init method.

val host = Utils.parseHostPort(hostPort)._1
val backend = this.backend.asInstanceOf[KubernetesClusterSchedulerBackend]
val executorPod = backend.getExecutorPodByIP(host)
if (executorPod.isEmpty) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if/else could be turned into a match which might be cleaner in scala

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
new KubernetesTaskSetManager(this, taskSet, maxTaskFailures)
}

override def getRackForHost(hostPort: String): Option[String] = {
if (!rackResolverUtil.isConfigured) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work on keeping this speedy for non-HDFS users

import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation, TaskLocation}

class KubernetesTaskSetManagerSuite extends SparkFunSuite {

val sc = new SparkContext("local", "test")
val sc = new SparkContext(master = "local", appName = "test",
new SparkConf().set("spark.driver.allowMultipleContexts", "true"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed? are we parallelizing tests and running multiple drivers in the same JVM at once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this project, we have KubernetesTaskSetManagerSuite and KubernetesTaskSchedulerImplSuite. The two suites run inside the single JVM. We are still running one suite at a time sequentially. But we are creating two SparkContext instances. Without this option, the second suite fails because only one SparkContext instance is allowed. I was hoping to prevent that.

But I think there is a better option, which is just to call SparkContext.clearActiveContext(). Switched to that.

@ash211 ash211 requested a review from mccheah June 26, 2017 20:54
@foxish
Copy link
Member

foxish commented Jul 7, 2017

LGTM! Thanks @kimoonkim, that looks good, and appears to handle both hdfs nodes and executor pods well. Question - for the executor pods, how do you see the script to resolve rack-info working? Could it have access to more than just the Pod IP to find what rack it belongs to?

@kimoonkim
Copy link
Member Author

@foxish Good question. My understanding is that executors do not call the topology plugin. Only the driver will consult with the topology plugin to decide which executor, hopefully a rack-local one, should receive a new task.

When an executor reads a HDFS block for the new task, it will then simply use Hadoop library code that sends a RPC request to the namenode. The namenode will consider the list of datanodes that have copies of the block. And the namenode asks the topology plugin which datanodes are better. When the namenode returns the list of datanodes, it sort them in the locality order. (node local to rack local to remote)

FYI, the HDFS on kubernetes we have does not support configuration of the topology plugin yet in the namenode helm chart. But I intend to do it soon.

The pod IP question on the namenode side is not as important as we originally thought. Because most k8s network plugins do NAT and the namenode sees the k8s cluster node IPs. (The pod IP issue on the namenode side only manifests in the kubenet on GKE) For details, please see this kubernetes-HDFS/topology/README.md

@kimoonkim
Copy link
Member Author

@ash211 @foxish @mccheah Thanks for reviews so far. I wonder if we have any more questions or comments on this. Maybe ready to be merged soon?

Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this took awhile to review. I have some suggestions.

private def getRackForDatanodeOrExecutor(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
val executorPod = kubernetesSchedulerBackend.getExecutorPodByIP(host)
executorPod.isEmpty match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Option.map.getOrElse instead of matching on isEmpty.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks for the suggestion. Found and fixed a minor bug thanks to that!


class KubernetesTaskSchedulerImplSuite extends SparkFunSuite {

SparkContext.clearActiveContext()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done in a before block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be put in a before block. But then I need to put val sc = new SparkContext(...) also inside the block because SparkContext.clearActiveContext() should be called before a new SparkContext is created. Please let me know what you think.

class KubernetesTaskSchedulerImplSuite extends SparkFunSuite {

SparkContext.clearActiveContext()
val sc = new SparkContext("local", "test")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use a mock SparkContext here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about the possibility and tried a bit. But then I realized SparkContext is not friendly to mock. It just has too many methods returning too many objects. I'll have to mock many of those that the test subject class happens to interact with. And if any of interaction changes, the test will break.

I think the test maintenance cost becomes too much. I found other Spark core unit tests avoid using mock probably for the same reason.

}

test("Gets racks for datanodes") {
val rackResolverUtil = mock(classOf[RackResolverUtil])
Copy link

@mccheah mccheah Jul 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unusual to me to be mocking the RackResolverUtil. It seems to be part of the core KubernetesTaskSchedulerImpl because the RackResolverUtil is a nested private class.

If we indeed want to be testing these separately then the architecture should reflect as such:

  • Extract RackResolverUtil to another class,
  • Put a trait on top of the RackResolverUtil,
  • Inject an instance into the KubernetesTaskSchedulerImpl when we create it,
  • Write a separate test for RackResolverUtil.

If we don't want to test these separately then we should create a real RackResolveUtil and test the top level methods accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It started as a pure wrapper of RackResolver, then I ended up adding a few business logics. I like the suggestion, but I'll have to think about this a little bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. New code looks better. Thanks.

.thenReturn(None)
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5"))
.thenReturn(None)
val inetAddressUtil = mock(classOf[InetAddressUtil])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this can't be a real InetAddressUtil?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it can't be then move InetAddressUtil to its own file and place a trait over it. The real InetAddressUtil could probably be an object that extends the trait in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to mock it so unit tests don't call real DNS and potentially get influenced by the responses. The trail approach sounds good, I'll probably try in the next patch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -26,6 +26,7 @@ import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation

class KubernetesTaskSetManagerSuite extends SparkFunSuite {

SparkContext.clearActiveContext()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again - should this be being done in a before block? Also, can we use a mock SparkContext?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the answer above.

@kimoonkim
Copy link
Member Author

@mccheah Thanks for the review. Addressed comments. PTAL.

@kimoonkim kimoonkim changed the base branch from branch-2.1-kubernetes to branch-2.2-kubernetes July 26, 2017 18:35
@kimoonkim kimoonkim changed the base branch from branch-2.2-kubernetes to branch-2.1-kubernetes July 26, 2017 18:36
@kimoonkim
Copy link
Member Author

I was trying to retarget this to branch-2.2-kubernetes using github UI. Then I realized that there will be too many diffs in this PR.

Maybe I should retarget to branch-2.2-kubernetes and merge changes from branch-2.2-kubernetes? @foxish Any better suggestion?

@kimoonkim kimoonkim changed the base branch from branch-2.1-kubernetes to branch-2.2-kubernetes August 1, 2017 16:17
@kimoonkim
Copy link
Member Author

Ok. Retargeted to branch-2.2-kubernetes after rebasing my branch.

@kimoonkim
Copy link
Member Author

Merged with branch-2.2-kubernetes. While at it, also incorporated the flag from #412 to avoid expensive DNS lookup.

@kimoonkim
Copy link
Member Author

@mccheah Can you please take a look at this PR? Perhaps, this is ready to merge after another look.

@kimoonkim
Copy link
Member Author

@foxish @ash211 @mccheah Thanks for the reviews. Maybe this can be merged now?

@ash211 ash211 merged commit c457f10 into apache-spark-on-k8s:branch-2.2-kubernetes Aug 17, 2017
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
* Support HDFS rack locality

* Fix unit tests

* Address review comments

* Address some review comments

* Use traits for InetAddress and RackResolver util classes

* Disables expensive DNS lookup by default
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants