Skip to content

Commit

Permalink
Merge pull request #67 from airbnb/flo_branch
Browse files Browse the repository at this point in the history
[core] Support overrides of resource reqs
  • Loading branch information
andykram committed Jul 15, 2013
2 parents 8a1bde1 + 17c1c39 commit faaa372
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 20 deletions.
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
<name>Twitter Maven2 Repository</name>
<url>http://maven.twttr.com/</url>
</repository>
<repository>
<id>mesosphere-public-repo</id>
<name>Mesosphere Public Snapshot Repo</name>
<url>http://s3.amazonaws.com/mesosphere-maven-public/snapshot/</url>
</repository>
<repository>
<id>libs-folder</id>
<url>file://${basedir}/lib</url>
Expand Down Expand Up @@ -94,7 +99,7 @@
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<version>0.14.0</version>
<version>0.0.14-SNAPSHOT</version>
</dependency>

<!-- Everything else -->
Expand Down Expand Up @@ -151,12 +156,7 @@
<dependency>
<groupId>com.twitter.common.zookeeper</groupId>
<artifactId>candidate</artifactId>
<version>0.0.31</version>
</dependency>
<dependency>
<groupId>com.twitter.common.zookeeper</groupId>
<artifactId>group</artifactId>
<version>0.0.31</version>
<version>0.0.43</version>
</dependency>

<dependency>
Expand Down
9 changes: 9 additions & 0 deletions src/main/scala/com/airbnb/scheduler/api/JobSerializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ class JobSerializer extends JsonSerializer[BaseJob] {
json.writeFieldName("lastError")
json.writeString(baseJob.lastError)

json.writeFieldName("cpus")
json.writeNumber(baseJob.cpus)

json.writeFieldName("disk")
json.writeNumber(baseJob.disk)

json.writeFieldName("mem")
json.writeNumber(baseJob.mem)

json.writeFieldName("disabled")
json.writeBoolean(baseJob.disabled)
if (baseJob.isInstanceOf[DependencyBasedJob]) {
Expand Down
16 changes: 14 additions & 2 deletions src/main/scala/com/airbnb/scheduler/api/JobsDeserializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ class JobsDeserializer extends JsonDeserializer[BaseJob] {
if (node.has("lastError") && node.get("lastError") != null) node.get("lastError").asText
else ""

val cpus =
if (node.has("cpus") && node.get("cpus") != null) node.get("cpus").asDouble
else 0

val disks =
if (node.has("disk") && node.get("disk") != null) node.get("disk").asInt
else 0

val mem =
if (node.has("mem") && node.get("mem") != null) node.get("mem").asInt
else 0

var parentList = scala.collection.mutable.ListBuffer[String]()
if (node.has("parents")) {
for (parent <- node.path("parents")) {
Expand All @@ -71,12 +83,12 @@ class JobsDeserializer extends JsonDeserializer[BaseJob] {
new DependencyBasedJob(parents = parentList.toSet,
name = name, command = command, epsilon = epsilon, successCount = successCount, errorCount = errorCount,
executor = executor, executorFlags = executorFlags, retries = retries, owner = owner, lastError = lastError,
lastSuccess = lastSuccess, async = async, disabled = disabled)
lastSuccess = lastSuccess, async = async, cpus = cpus, disk = disks, mem = mem, disabled = disabled)
} else if (node.has("schedule")) {
new ScheduleBasedJob(node.get("schedule").asText, name = name, command = command,
epsilon = epsilon, successCount = successCount, errorCount = errorCount, executor = executor,
executorFlags = executorFlags, retries = retries, owner = owner, lastError = lastError,
lastSuccess = lastSuccess, async = async, disabled = disabled)
lastSuccess = lastSuccess, async = async, cpus = cpus, disk = disks, mem = mem, disabled = disabled)
} else {
throw new IllegalStateException("The job found was neither schedule based nor dependency based.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.airbnb.scheduler.api
object PathConstants {
final val iso8601JobPath = "/iso8601"
final val dependentJobPath = "/dependency"
final val infoPath = "/info"

final val jobBasePath = "/"
final val jobPatternPath = "job/{jobName}"
Expand Down
4 changes: 0 additions & 4 deletions src/main/scala/com/airbnb/scheduler/config/MainModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,11 @@ class MainModule(val config: SchedulerConfiguration) extends AbstractModule {
bind(classOf[JobGraph]).asEagerSingleton()
}

@Inject
@Singleton
@Provides
def provideMesosSchedulerDriverFactory(mesosScheduler: Scheduler, frameworkInfo: FrameworkInfo): MesosDriverFactory =
new MesosDriverFactory(mesosScheduler, frameworkInfo, config)

@Inject
@Singleton
@Provides
def provideTaskScheduler(
Expand All @@ -64,7 +62,6 @@ class MainModule(val config: SchedulerConfiguration) extends AbstractModule {
config.failureRetryDelay)
}

@Inject
@Singleton
@Provides
def provideMailClient(): Option[MailClient] = {
Expand All @@ -79,7 +76,6 @@ class MainModule(val config: SchedulerConfiguration) extends AbstractModule {
}
}

@Inject
@Singleton
@Provides
def provideListeningExecutorService(): ListeningScheduledExecutorService = {
Expand Down
14 changes: 12 additions & 2 deletions src/main/scala/com/airbnb/scheduler/jobs/JobScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import annotation.tailrec
import collection.mutable.ListBuffer
import java.util.concurrent.{Future, Executors}
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.logging.Logger
import java.util.logging.{Level, Logger}

import com.airbnb.notification.MailClient
import com.airbnb.scheduler.graph.JobGraph
Expand Down Expand Up @@ -56,7 +56,17 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,

def isLeader: Boolean = leader.get

def getLeader: String = new String(candidate.getLeaderData)
def getLeader: String = {
try {
new String(candidate.getLeaderData.get())
} catch {
case e : Exception => {
log.log(Level.SEVERE, "Error trying to talk to zookeeper. Exiting.", e)
System.exit(1)
null
}
}
}

def sendNotification(job: BaseJob, subject: String, message: Option[String] = None) {
if (!mailClient.isEmpty) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/scala/com/airbnb/scheduler/jobs/Jobs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ trait BaseJob {
def lastSuccess: String = ""
def lastError: String = ""
def async: Boolean = false
def cpus: Double = 0
def disk: Int = 0
def mem: Int = 0
def disabled: Boolean = false
}

Expand All @@ -53,6 +56,9 @@ case class ScheduleBasedJob(
@JsonProperty override val lastSuccess: String = "",
@JsonProperty override val lastError: String = "",
@JsonProperty override val async: Boolean = false,
@JsonProperty override val cpus: Double = 0,
@JsonProperty override val disk: Int = 0,
@JsonProperty override val mem: Int = 0,
@JsonProperty override val disabled: Boolean = false)
extends BaseJob

Expand All @@ -72,5 +78,8 @@ case class DependencyBasedJob(
@JsonProperty override val lastSuccess: String = "",
@JsonProperty override val lastError: String = "",
@JsonProperty override val async: Boolean = false,
@JsonProperty override val cpus: Double = 0,
@JsonProperty override val disk: Int = 0,
@JsonProperty override val mem: Int = 0,
@JsonProperty override val disabled: Boolean = false)
extends BaseJob
19 changes: 14 additions & 5 deletions src/main/scala/com/airbnb/scheduler/mesos/MesosJobFramework.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class MesosJobFramework @Inject()(
offer => buildTask(x, j, offer) }.find(_._1)) match {
case Some((sufficient, taskBuilder, offer)) =>
processTask(x, j, offer, taskBuilder)
getNextTask(offers.filter( x => x.getId != offer.getId))
getNextTask(offers.filter(x => x.getId != offer.getId))
case _ =>
log.warning("No sufficient offers found for task '%s', will append to queue".format(x))
offers.foreach ( offer => mesosDriver.get().declineOffer(offer.getId) )
Expand Down Expand Up @@ -128,6 +128,14 @@ class MesosJobFramework @Inject()(
scheduler.stop()
}

/**
* Builds a task
* @param taskId
* @param job
* @param offer
* @return and returns a tuple containing a boolean indicating if sufficient
* resources where offered, the TaskBuilder and the offer.
*/
def buildTask(taskId: String, job: BaseJob, offer: Offer) : (Boolean, TaskInfo.Builder, Offer) = {
val taskInfoTemplate = MesosUtils.getMesosTaskInfoBuilder(taskId, job)
log.fine("Job %s ready for launch at time: %d".format(taskInfoTemplate.getTaskId.getValue,
Expand All @@ -141,11 +149,11 @@ class MesosJobFramework @Inject()(
case Value.Type.SCALAR =>
(x.getName match {
case "mem" =>
config.mesosTaskMem
if (job.mem == 0) config.mesosTaskMem else job.mem
case "cpus" =>
config.mesosTaskCpu
if (job.cpus == 0) config.mesosTaskCpu else job.cpus
case "disk" =>
config.mesosTaskDisk
if (job.disk == 0) config.mesosTaskDisk else job.disk
case _ =>
x.getScalar.getValue / math.max(x.getScalar.getValue, 1)
}) match {
Expand All @@ -155,7 +163,8 @@ class MesosJobFramework @Inject()(
Protos.Value.Scalar.newBuilder()
.setValue(value)).setName(x.getName))
case value =>
log.warning("Insufficient offer, needed %s offered %s: ".format(value.toString, x.getScalar.getValue.toString) + offer)
log.warning("Insufficient offer, needed %s offered %s: "
.format(value.toString, x.getScalar.getValue.toString) + offer)
sufficient = false
}
case _ =>
Expand Down

0 comments on commit faaa372

Please sign in to comment.