From f20f1b379f4405bbbe21315ffd8166827132fe64 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 21 Nov 2014 09:07:33 +0900 Subject: [PATCH 1/2] [SPARK-4525] MesosSchedulerBackend.resourceOffers cannot decline unused offers from acceptedOffers - Added code for declining unused offers among acceptedOffers - Edited testCase for checking declining unused offers --- .../cluster/mesos/MesosSchedulerBackend.scala | 4 +++ .../mesos/MesosSchedulerBackendSuite.scala | 29 +++++++++++++------ 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index d13795186c48e..7d4fd9389f42b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -257,6 +257,10 @@ private[spark] class MesosSchedulerBackend( d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } + for (o <- acceptedOffers if !slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { + d.declineOffer(o.getId) + } + declinedOffers.foreach(o => d.declineOffer(o.getId)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index bef8d3a58ba63..28197e6a84bbd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -25,11 +25,13 @@ import org.apache.mesos.SchedulerDriver import org.apache.mesos.Protos._ import org.scalatest.mock.EasyMockSugar import org.apache.mesos.Protos.Value.Scalar -import org.easymock.{Capture, EasyMock} +import org.easymock.{IAnswer, Capture, EasyMock} import java.nio.ByteBuffer import java.util.Collections import java.util import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Random class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { test("mesos resource offer is launching tasks") { @@ -43,8 +45,8 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder().setValue(id.toString).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue("s1")).setHostname("localhost").build() + builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() } val driver = EasyMock.createMock(classOf[SchedulerDriver]) @@ -61,11 +63,18 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val minCpu = 4 val offers = new java.util.ArrayList[Offer] offers.add(createOffer(1, minMem, minCpu)) - offers.add(createOffer(1, minMem - 1, minCpu)) + offers.add(createOffer(2, minMem - 1, minCpu)) + offers.add(createOffer(3, minMem, minCpu)) val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") - val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer( - o.getSlaveId.getValue, - o.getHostname, + val workerOffers = new ArrayBuffer[WorkerOffer](2) + workerOffers.append(new WorkerOffer( + offers.get(0).getSlaveId.getValue, + offers.get(0).getHostname, + 2 + )) + workerOffers.append(new WorkerOffer( + offers.get(2).getSlaveId.getValue, + offers.get(2).getHostname, 2 )) val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) @@ -79,10 +88,12 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea EasyMock.capture(capture), EasyMock.anyObject(classOf[Filters]) ) - ).andReturn(Status.valueOf(1)) - EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1)) + ).andReturn(Status.valueOf(1)).once + EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1)).times(1) + EasyMock.expect(driver.declineOffer(offers.get(2).getId)).andReturn(Status.valueOf(1)).times(1) EasyMock.replay(driver) backend.resourceOffers(driver, offers) + EasyMock.verify(driver) assert(capture.getValue.size() == 1) val taskInfo = capture.getValue.iterator().next() assert(taskInfo.getName.equals("n1")) From 63855bf5c9327f863d6535c17e4c1053ed2a3c06 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Tue, 25 Nov 2014 14:07:10 +0900 Subject: [PATCH 2/2] [SPARK-4525] MesosSchedulerBackend.resourceOffers cannot decline unused offers from acceptedOffers - Fix a case that unused node cannot be declined when slaveIdsWithExecutors has already that node. --- .../cluster/mesos/MesosSchedulerBackend.scala | 5 +- .../mesos/MesosSchedulerBackendSuite.scala | 76 ++++++++++++++----- 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 7d4fd9389f42b..e648ad5958965 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -255,11 +255,10 @@ private[spark] class MesosSchedulerBackend( mesosTasks.foreach { case (slaveId, tasks) => d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) + acceptedOffers -= slaveIdToOffer(slaveId) } - for (o <- acceptedOffers if !slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { - d.declineOffer(o.getId) - } + acceptedOffers.foreach(o => d.declineOffer(o.getId)) declinedOffers.foreach(o => d.declineOffer(o.getId)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 28197e6a84bbd..d89e5690f75a8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -18,37 +18,37 @@ package org.apache.spark.scheduler.mesos import org.scalatest.FunSuite -import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext} +import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend} -import org.apache.mesos.SchedulerDriver +import org.apache.mesos.{Protos, SchedulerDriver} import org.apache.mesos.Protos._ import org.scalatest.mock.EasyMockSugar import org.apache.mesos.Protos.Value.Scalar -import org.easymock.{IAnswer, Capture, EasyMock} +import org.easymock.{Capture, EasyMock} import java.nio.ByteBuffer +import java.util.{ArrayList => JArrayList, List => JList} import java.util.Collections import java.util import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.util.Random class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { - test("mesos resource offer is launching tasks") { - def createOffer(id: Int, mem: Int, cpu: Int) = { - val builder = Offer.newBuilder() - builder.addResourcesBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(mem)) - builder.addResourcesBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() - } + def createOffer(id: Int, mem: Int, cpu: Int) = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() + } + test("mesos resource offer is launching tasks") { val driver = EasyMock.createMock(classOf[SchedulerDriver]) val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) @@ -102,4 +102,44 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea assert(cpus.getScalar.getValue.equals(2.0)) assert(taskInfo.getSlaveId.getValue.equals("s1")) } + + test("control with multiple task in one slave") { + val driver = EasyMock.createMock(classOf[SchedulerDriver]) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + + val sc = EasyMock.createMock(classOf[SparkContext]) + + EasyMock.expect(sc.executorMemory).andReturn(1000).anyTimes + EasyMock.expect(sc.getSparkHome).andReturn(Option("/path")).anyTimes + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes + EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes + EasyMock.replay(sc) + + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val offers = new JArrayList[Offer] + offers.add(createOffer(0, minMem, 16)) + + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val workerOffersFirst = new ArrayBuffer[WorkerOffer](1) + workerOffersFirst.append(new WorkerOffer(offers.get(0).getSlaveId.getValue, offers.get(0).getHostname, 14)) + val workerOffersSecond = new ArrayBuffer[WorkerOffer](1) + workerOffersSecond.append(new WorkerOffer(offers.get(0).getSlaveId.getValue, offers.get(0).getHostname, 16)) + val taskDesc = new TaskDescription(1L, "s0", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffersFirst))).andReturn(Seq(Seq(taskDesc))).once + EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffersSecond))).andReturn(Seq(Nil)).once + EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes + EasyMock.replay(taskScheduler) + + EasyMock.expect( + driver.launchTasks( + EasyMock.anyObject(classOf[util.Collection[Protos.OfferID]]), + EasyMock.anyObject(classOf[util.Collection[Protos.TaskInfo]]), + EasyMock.anyObject(classOf[Filters])) + ).andReturn(Status.valueOf(1)).once + EasyMock.expect(driver.declineOffer(offers.get(0).getId)).andReturn(Status.valueOf(1)).times(1) + EasyMock.replay(driver) + backend.resourceOffers(driver, offers) + backend.resourceOffers(driver, offers) + EasyMock.verify(driver) + } }