Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Support HDFS rack locality #350

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.kubernetes

import java.net.InetAddress

/**
* Gets full host names of given IP addresses from DNS.
*/
private[kubernetes] trait InetAddressUtil {

def getFullHostName(ipAddress: String): String
}

private[kubernetes] object InetAddressUtilImpl extends InetAddressUtil {

// NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress
// class for both hits and misses.
override def getFullHostName(ipAddress: String): String = {
InetAddress.getByName(ipAddress).getCanonicalHostName
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,59 @@
*/
package org.apache.spark.scheduler.cluster.kubernetes

import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, TaskSet, TaskSetManager}
import org.apache.spark.util.Utils
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{TaskSchedulerImpl, TaskSet, TaskSetManager}

private[spark] class KubernetesTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) {
private[spark] class KubernetesTaskSchedulerImpl(
sc: SparkContext,
rackResolverUtil: RackResolverUtil,
inetAddressUtil: InetAddressUtil = InetAddressUtilImpl) extends TaskSchedulerImpl(sc) {

var kubernetesSchedulerBackend: KubernetesClusterSchedulerBackend = null

def this(sc: SparkContext) = this(sc, new RackResolverUtilImpl(sc.hadoopConfiguration))

override def initialize(backend: SchedulerBackend): Unit = {
super.initialize(backend)
kubernetesSchedulerBackend = this.backend.asInstanceOf[KubernetesClusterSchedulerBackend]
}
override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
new KubernetesTaskSetManager(this, taskSet, maxTaskFailures)
}

override def getRackForHost(hostPort: String): Option[String] = {
if (!rackResolverUtil.isConfigured) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work on keeping this speedy for non-HDFS users

// Only calls resolver when it is configured to avoid sending DNS queries for cluster nodes.
// See InetAddressUtil for details.
None
} else {
getRackForDatanodeOrExecutor(hostPort)
}
}

private def getRackForDatanodeOrExecutor(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
val executorPod = kubernetesSchedulerBackend.getExecutorPodByIP(host)
val hadoopConfiguration = sc.hadoopConfiguration
executorPod.map(
pod => {
val clusterNodeName = pod.getSpec.getNodeName
val rackByNodeName = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeName)
rackByNodeName.orElse({
val clusterNodeIP = pod.getStatus.getHostIP
val rackByNodeIP = rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeIP)
rackByNodeIP.orElse({
if (conf.get(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)) {
val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP)
rackResolverUtil.resolveRack(hadoopConfiguration, clusterNodeFullName)
} else {
Option.empty
}
})
})
}
).getOrElse(rackResolverUtil.resolveRack(hadoopConfiguration, host))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.scheduler.cluster.kubernetes

import java.net.InetAddress

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.deploy.kubernetes.config._
Expand All @@ -27,7 +25,7 @@ private[spark] class KubernetesTaskSetManager(
sched: TaskSchedulerImpl,
taskSet: TaskSet,
maxTaskFailures: Int,
inetAddressUtil: InetAddressUtil = new InetAddressUtil)
inetAddressUtil: InetAddressUtil = InetAddressUtilImpl)
extends TaskSetManager(sched, taskSet, maxTaskFailures) {

private val conf = sched.sc.conf
Expand Down Expand Up @@ -83,12 +81,3 @@ private[spark] class KubernetesTaskSetManager(
}
}

// To support mocks in unit tests.
private[kubernetes] class InetAddressUtil {

// NOTE: This does issue a network call to DNS. Caching is done internally by the InetAddress
// class for both hits and misses.
def getFullHostName(ipAddress: String): String = {
InetAddress.getByName(ipAddress).getCanonicalHostName
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.kubernetes

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.net.{NetworkTopology, ScriptBasedMapping, TableMapping}
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}

/**
* Finds rack names that cluster nodes belong to in order to support HDFS rack locality.
*/
private[kubernetes] trait RackResolverUtil {

def isConfigured() : Boolean

def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String]
}

private[kubernetes] class RackResolverUtilImpl(hadoopConfiguration: Configuration)
extends RackResolverUtil {

val scriptPlugin : String = classOf[ScriptBasedMapping].getCanonicalName
val tablePlugin : String = classOf[TableMapping].getCanonicalName
val isResolverConfigured : Boolean = checkConfigured(hadoopConfiguration)

// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
}

override def isConfigured() : Boolean = isResolverConfigured

def checkConfigured(hadoopConfiguration: Configuration): Boolean = {
val plugin = hadoopConfiguration.get(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, scriptPlugin)
val scriptName = hadoopConfiguration.get(
CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "")
val tableName = hadoopConfiguration.get(
CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, "")
plugin == scriptPlugin && scriptName.nonEmpty ||
plugin == tablePlugin && tableName.nonEmpty ||
plugin != scriptPlugin && plugin != tablePlugin
}

override def resolveRack(hadoopConfiguration: Configuration, host: String): Option[String] = {
val rack = Option(RackResolver.resolve(hadoopConfiguration, host).getNetworkLocation)
if (rack.nonEmpty && rack.get != NetworkTopology.DEFAULT_RACK) {
rack
} else {
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.kubernetes

import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodStatus}
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.scheduler.FakeTask
import org.scalatest.BeforeAndAfter

class KubernetesTaskSchedulerImplSuite extends SparkFunSuite with BeforeAndAfter {

SparkContext.clearActiveContext()
val sc = new SparkContext("local", "test")
val backend = mock(classOf[KubernetesClusterSchedulerBackend])

before {
sc.conf.remove(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED)
}

test("Create a k8s task set manager") {
val sched = new KubernetesTaskSchedulerImpl(sc)
sched.kubernetesSchedulerBackend = backend
val taskSet = FakeTask.createTaskSet(0)

val manager = sched.createTaskSetManager(taskSet, maxTaskFailures = 3)
assert(manager.isInstanceOf[KubernetesTaskSetManager])
}

test("Gets racks for datanodes") {
val rackResolverUtil = mock(classOf[RackResolverUtil])
Copy link

@mccheah mccheah Jul 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unusual to me to be mocking the RackResolverUtil. It seems to be part of the core KubernetesTaskSchedulerImpl because the RackResolverUtil is a nested private class.

If we indeed want to be testing these separately then the architecture should reflect as such:

  • Extract RackResolverUtil to another class,
  • Put a trait on top of the RackResolverUtil,
  • Inject an instance into the KubernetesTaskSchedulerImpl when we create it,
  • Write a separate test for RackResolverUtil.

If we don't want to test these separately then we should create a real RackResolveUtil and test the top level methods accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It started as a pure wrapper of RackResolver, then I ended up adding a few business logics. I like the suggestion, but I'll have to think about this a little bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. New code looks better. Thanks.

when(rackResolverUtil.isConfigured).thenReturn(true)
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1"))
.thenReturn(Option("/rack1"))
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2"))
.thenReturn(Option("/rack2"))
val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil)
sched.kubernetesSchedulerBackend = backend
when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None)
when(backend.getExecutorPodByIP("kube-node2")).thenReturn(None)

assert(sched.getRackForHost("kube-node1:60010") == Option("/rack1"))
assert(sched.getRackForHost("kube-node2:60010") == Option("/rack2"))
}

test("Gets racks for executor pods") {
sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, true)
val rackResolverUtil = mock(classOf[RackResolverUtil])
when(rackResolverUtil.isConfigured).thenReturn(true)
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1"))
.thenReturn(Option("/rack1"))
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com"))
.thenReturn(Option("/rack2"))
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2"))
.thenReturn(None)
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5"))
.thenReturn(None)
val inetAddressUtil = mock(classOf[InetAddressUtil])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this can't be a real InetAddressUtil?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it can't be then move InetAddressUtil to its own file and place a trait over it. The real InetAddressUtil could probably be an object that extends the trait in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to mock it so unit tests don't call real DNS and potentially get influenced by the responses. The trail approach sounds good, I'll probably try in the next patch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil)
sched.kubernetesSchedulerBackend = backend

val spec1 = mock(classOf[PodSpec])
when(spec1.getNodeName).thenReturn("kube-node1")
val status1 = mock(classOf[PodStatus])
when(status1.getHostIP).thenReturn("192.168.1.4")
val pod1 = mock(classOf[Pod])
when(pod1.getSpec).thenReturn(spec1)
when(pod1.getStatus).thenReturn(status1)
when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1))

val spec2 = mock(classOf[PodSpec])
when(spec2.getNodeName).thenReturn("kube-node2")
val status2 = mock(classOf[PodStatus])
when(status2.getHostIP).thenReturn("192.168.1.5")
val pod2 = mock(classOf[Pod])
when(pod2.getSpec).thenReturn(spec2)
when(pod2.getStatus).thenReturn(status2)
when(inetAddressUtil.getFullHostName("192.168.1.5")).thenReturn("kube-node2.mydomain.com")
when(backend.getExecutorPodByIP("10.0.1.1")).thenReturn(Some(pod2))

assert(sched.getRackForHost("10.0.0.1:7079") == Option("/rack1"))
assert(sched.getRackForHost("10.0.1.1:7079") == Option("/rack2"))

verify(inetAddressUtil, times(1)).getFullHostName(anyString())
}

test("Gets racks for executor pods while disabling DNS lookup ") {
sc.conf.set(KUBERNETES_DRIVER_CLUSTER_NODENAME_DNS_LOOKUP_ENABLED, false)
val rackResolverUtil = mock(classOf[RackResolverUtil])
when(rackResolverUtil.isConfigured).thenReturn(true)
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node1"))
.thenReturn(Option("/rack1"))
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2.mydomain.com"))
.thenReturn(Option("/rack2"))
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "kube-node2"))
.thenReturn(None)
when(rackResolverUtil.resolveRack(sc.hadoopConfiguration, "192.168.1.5"))
.thenReturn(None)
val inetAddressUtil = mock(classOf[InetAddressUtil])
val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil, inetAddressUtil)
sched.kubernetesSchedulerBackend = backend

val spec1 = mock(classOf[PodSpec])
when(spec1.getNodeName).thenReturn("kube-node1")
val status1 = mock(classOf[PodStatus])
when(status1.getHostIP).thenReturn("192.168.1.4")
val pod1 = mock(classOf[Pod])
when(pod1.getSpec).thenReturn(spec1)
when(pod1.getStatus).thenReturn(status1)
when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1))

val spec2 = mock(classOf[PodSpec])
when(spec2.getNodeName).thenReturn("kube-node2")
val status2 = mock(classOf[PodStatus])
when(status2.getHostIP).thenReturn("192.168.1.5")
val pod2 = mock(classOf[Pod])
when(pod2.getSpec).thenReturn(spec2)
when(pod2.getStatus).thenReturn(status2)
when(inetAddressUtil.getFullHostName("192.168.1.5")).thenReturn("kube-node2.mydomain.com")
when(backend.getExecutorPodByIP("10.0.1.1")).thenReturn(Some(pod2))

assert(sched.getRackForHost("10.0.0.1:7079") == Option("/rack1"))
assert(sched.getRackForHost("10.0.1.1:7079") == None)

verify(inetAddressUtil, never).getFullHostName(anyString())
}

test("Does not get racks if plugin is not configured") {
val rackResolverUtil = mock(classOf[RackResolverUtil])
when(rackResolverUtil.isConfigured()).thenReturn(false)
val sched = new KubernetesTaskSchedulerImpl(sc, rackResolverUtil)
sched.kubernetesSchedulerBackend = backend
when(backend.getExecutorPodByIP("kube-node1")).thenReturn(None)

val spec1 = mock(classOf[PodSpec])
when(spec1.getNodeName).thenReturn("kube-node1")
val status1 = mock(classOf[PodStatus])
when(status1.getHostIP).thenReturn("192.168.1.4")
val pod1 = mock(classOf[Pod])
when(pod1.getSpec).thenReturn(spec1)
when(pod1.getStatus).thenReturn(status1)
when(backend.getExecutorPodByIP("10.0.0.1")).thenReturn(Some(pod1))

assert(sched.getRackForHost("kube-node1:60010").isEmpty)
assert(sched.getRackForHost("10.0.0.1:7079").isEmpty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.scheduler.{FakeTask, FakeTaskScheduler, HostTaskLocation

class KubernetesTaskSetManagerSuite extends SparkFunSuite with BeforeAndAfter {

SparkContext.clearActiveContext()
val sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
("execA", "10.0.0.1"), ("execB", "10.0.0.2"), ("execC", "10.0.0.3"))
Expand Down
Loading