From 2d01ac30bce82c41bdee756c57c13b395d324261 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Fri, 29 Sep 2017 14:49:23 +0300 Subject: [PATCH] fix dynamic reservations --- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- resource-managers/mesos/pom.xml | 2 +- .../cluster/mesos/MesosClusterScheduler.scala | 1 - .../MesosCoarseGrainedSchedulerBackend.scala | 17 +++- .../cluster/mesos/MesosSchedulerUtils.scala | 99 ++++++++++++------- 6 files changed, 80 insertions(+), 43 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 76fcbd15869f1..c98e197606887 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -138,7 +138,7 @@ lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar -mesos-1.3.0-shaded-protobuf.jar +mesos-1.4.0-shaded-protobuf.jar metrics-core-3.1.5.jar metrics-graphite-3.1.5.jar metrics-json-3.1.5.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index cb20072bf8b30..9760a8e7e2b12 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -139,7 +139,7 @@ lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar -mesos-1.3.0-shaded-protobuf.jar +mesos-1.4.0-shaded-protobuf.jar metrics-core-3.1.5.jar metrics-graphite-3.1.5.jar metrics-json-3.1.5.jar diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml index de8f1c913651d..70d0c1750b14e 100644 --- a/resource-managers/mesos/pom.xml +++ b/resource-managers/mesos/pom.xml @@ -29,7 +29,7 @@ Spark Project Mesos mesos - 1.3.0 + 1.4.0 shaded-protobuf diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index ec533f91474f2..06ea9df586473 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -37,7 +37,6 @@ import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionRes import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.Utils - /** * Tracks the current state of a Mesos Task that runs a Spark driver. * @param driverDescription Submitted driver description from diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 26699873145b4..43062007afccd 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -349,13 +349,20 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerMem = getResource(offer.getResourcesList, "mem") val offerCpus = getResource(offer.getResourcesList, "cpus") val offerPorts = getRangeResource(offer.getResourcesList, "ports") + val offerReservationInfo = offer + .getResourcesList + .asScala + .find { r => r.getReservation != null } val id = offer.getId.getValue if (tasks.contains(offer.getId)) { // accept val offerTasks = tasks(offer.getId) logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + - s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." + + offerReservationInfo.map(resInfo => + s"reservation info: ${resInfo.getReservation.toString}").getOrElse("") + + s"mem: $offerMem cpu: $offerCpus ports: $offerPorts " + + s"resources: ${offer.getResourcesList.asScala.mkString(",")}." + s" Launching ${offerTasks.size} Mesos tasks.") for (task <- offerTasks) { @@ -365,7 +372,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + - s" ports: $ports") + s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ") } driver.launchTasks( @@ -380,7 +387,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } else { declineOffer( driver, - offer) + offer, + Some("Offer was declined due to unmet task launch constraints.")) } } } @@ -446,6 +454,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( totalGpusAcquired += taskGPUs gpusByTaskId(taskId) = taskGPUs } + } else { + logDebug(s"Cannot launch a task for offer with id: $offerId on slave " + + s"with id: $slaveId. Requirements were not met for this offer.") } } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 6fcb30af8a733..e75450369ad85 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -28,7 +28,8 @@ import com.google.common.base.Splitter import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.FrameworkInfo.Capability -import org.apache.mesos.protobuf.{ByteString, GeneratedMessage} +import org.apache.mesos.Protos.Resource.ReservationInfo +import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3} import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.TaskState @@ -36,8 +37,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.Utils - - /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper * methods and Mesos scheduler will use. @@ -46,6 +45,8 @@ trait MesosSchedulerUtils extends Logging { // Lock used to wait for scheduler to be registered private final val registerLatch = new CountDownLatch(1) + private final val ANY_ROLE = "*" + /** * Creates a new MesosSchedulerDriver that communicates to the Mesos master. * @@ -175,17 +176,36 @@ trait MesosSchedulerUtils extends Logging { registerLatch.countDown() } - def createResource(name: String, amount: Double, role: Option[String] = None): Resource = { + private def setReservationInfo( + reservationInfo: Option[ReservationInfo], + role: Option[String], + builder: Resource.Builder): Unit = { + if (!role.contains(ANY_ROLE)) { + reservationInfo.foreach { res => builder.setReservation(res) } + } + } + + def createResource( + name: String, + amount: Double, + role: Option[String] = None, + reservationInfo: Option[ReservationInfo] = None): Resource = { val builder = Resource.newBuilder() .setName(name) .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(amount).build()) - role.foreach { r => builder.setRole(r) } - + setReservationInfo(reservationInfo, role, builder) builder.build() } + private def getReservation(resource: Resource): Option[ReservationInfo] = { + if (resource.hasReservation) { + Some(resource.getReservation) + } else { + None + } + } /** * Partition the existing set of resources into two groups, those remaining to be * scheduled and those requested to be used for a new task. @@ -203,14 +223,17 @@ trait MesosSchedulerUtils extends Logging { var requestedResources = new ArrayBuffer[Resource] val remainingResources = resources.asScala.map { case r => + val reservation = getReservation(r) if (remain > 0 && r.getType == Value.Type.SCALAR && r.getScalar.getValue > 0.0 && r.getName == resourceName) { val usage = Math.min(remain, r.getScalar.getValue) - requestedResources += createResource(resourceName, usage, Some(r.getRole)) + requestedResources += createResource(resourceName, usage, + Option(r.getRole), reservation) remain -= usage - createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole)) + createResource(resourceName, r.getScalar.getValue - usage, + Option(r.getRole), reservation) } else { r } @@ -228,16 +251,6 @@ trait MesosSchedulerUtils extends Logging { (attr.getName, attr.getText.getValue.split(',').toSet) } - - /** Build a Mesos resource protobuf object */ - protected def createResource(resourceName: String, quantity: Double): Protos.Resource = { - Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } - /** * Converts the attributes from the resource offer into a Map of name to Attribute Value * The attribute values are the mesos attribute types and they are @@ -245,7 +258,8 @@ trait MesosSchedulerUtils extends Logging { * @param offerAttributes the attributes offered * @return */ - protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { + protected def toAttributeMap(offerAttributes: JList[Attribute]) + : Map[String, GeneratedMessageV3] = { offerAttributes.asScala.map { attr => val attrValue = attr.getType match { case Value.Type.SCALAR => attr.getScalar @@ -266,7 +280,7 @@ trait MesosSchedulerUtils extends Logging { */ def matchesAttributeRequirements( slaveOfferConstraints: Map[String, Set[String]], - offerAttributes: Map[String, GeneratedMessage]): Boolean = { + offerAttributes: Map[String, GeneratedMessageV3]): Boolean = { slaveOfferConstraints.forall { // offer has the required attribute and subsumes the required values for that attribute case (name, requiredValues) => @@ -427,10 +441,10 @@ trait MesosSchedulerUtils extends Logging { // partition port offers val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources) - val portsAndRoles = requestedPorts. - map(x => (x, findPortAndGetAssignedRangeRole(x, portResources))) + val portsAndResourceInfo = requestedPorts. + map { x => (x, findPortAndGetAssignedResourceInfo(x, portResources)) } - val assignedPortResources = createResourcesFromPorts(portsAndRoles) + val assignedPortResources = createResourcesFromPorts(portsAndResourceInfo) // ignore non-assigned port resources, they will be declined implicitly by mesos // no need for splitting port resources. @@ -450,16 +464,25 @@ trait MesosSchedulerUtils extends Logging { managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0) } + private case class RoleResourceInfo( + role: String, + resInfo: Option[ReservationInfo]) + /** Creates a mesos resource for a specific port number. */ - private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = { - portsAndRoles.flatMap{ case (port, role) => - createMesosPortResource(List((port, port)), Some(role))} + private def createResourcesFromPorts( + portsAndResourcesInfo: List[(Long, RoleResourceInfo)]) + : List[Resource] = { + portsAndResourcesInfo.flatMap { case (port, rInfo) => + createMesosPortResource(List((port, port)), Option(rInfo.role), rInfo.resInfo)} } /** Helper to create mesos resources for specific port ranges. */ private def createMesosPortResource( ranges: List[(Long, Long)], - role: Option[String] = None): List[Resource] = { + role: Option[String] = None, + reservationInfo: Option[ReservationInfo] = None): List[Resource] = { + // for ranges we are going to use (user defined ports fall in there) create mesos resources + // for each range there is a role associated with it. ranges.map { case (rangeStart, rangeEnd) => val rangeValue = Value.Range.newBuilder() .setBegin(rangeStart) @@ -468,7 +491,8 @@ trait MesosSchedulerUtils extends Logging { .setName("ports") .setType(Value.Type.RANGES) .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) - role.foreach(r => builder.setRole(r)) + role.foreach { r => builder.setRole(r) } + setReservationInfo(reservationInfo, role, builder) builder.build() } } @@ -477,19 +501,21 @@ trait MesosSchedulerUtils extends Logging { * Helper to assign a port to an offered range and get the latter's role * info to use it later on. */ - private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource]) - : String = { + private def findPortAndGetAssignedResourceInfo(port: Long, portResources: List[Resource]) + : RoleResourceInfo = { val ranges = portResources. - map(resource => - (resource.getRole, resource.getRanges.getRangeList.asScala - .map(r => (r.getBegin, r.getEnd)).toList)) + map { resource => + val reservation = getReservation(resource) + (RoleResourceInfo(resource.getRole, reservation), + resource.getRanges.getRangeList.asScala.map(r => (r.getBegin, r.getEnd)).toList) + } - val rangePortRole = ranges - .find { case (role, rangeList) => rangeList + val rangePortResourceInfo = ranges + .find { case (resourceInfo, rangeList) => rangeList .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}} // this is safe since we have previously checked about the ranges (see checkPorts method) - rangePortRole.map{ case (role, rangeList) => role}.get + rangePortResourceInfo.map{ case (resourceInfo, rangeList) => resourceInfo}.get } /** Retrieves the port resources from a list of mesos offered resources */ @@ -564,3 +590,4 @@ trait MesosSchedulerUtils extends Logging { } } } +