Skip to content

Commit

Permalink
Treat a message 13 as premptible when the VM is preemptible (#3162)
Browse files Browse the repository at this point in the history
* Treat a message 13 as  premptible when the VM is preemptible
  • Loading branch information
geoffjentry committed Jan 19, 2018
1 parent dc5e8ce commit 2347c14
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,17 @@ object RunStatus {
eventList: Seq[ExecutionEvent],
machineType: Option[String],
zone: Option[String],
instanceName: Option[String]): UnsuccessfulRunStatus = {
instanceName: Option[String],
wasPreemptible: Boolean): UnsuccessfulRunStatus = {
val jesCode: Option[Int] = errorMessage flatMap { em => Try(em.substring(0, em.indexOf(':')).toInt).toOption }

/*
Because of Reasons, sometimes errors which aren't indicative of preemptions are treated as preemptions. The belief
is that PAPI v2 will get rid of this so we should look into removing in the future
*/
val unsuccessfulStatusBuilder = errorCode match {
case Status.ABORTED if jesCode.contains(JesAsyncBackendJobExecutionActor.JesPreemption) => Preempted.apply _
case Status.ABORTED if jesCode.contains(JesAsyncBackendJobExecutionActor.JesUnexpectedTermination) && wasPreemptible => Preempted.apply _
case Status.UNKNOWN if errorMessage.exists(_.contains(JesAsyncBackendJobExecutionActor.FailedToStartDueToPreemptionSubstring)) => Preempted.apply _
case Status.CANCELLED => Cancelled.apply _
case _ => Failed.apply _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,25 @@ private[statuspolling] object StatusPolling {
runtimeMetadata <- op.getMetadata.asScala.get("runtimeMetadata")
computeEngine <- runtimeMetadata.asInstanceOf[GArrayMap[String, Object]].asScala.get("computeEngine")
} yield computeEngine.asInstanceOf[GArrayMap[String, String]].asScala

lazy val machineType = computeEngineOption.flatMap(_.get("machineType"))
lazy val instanceName = computeEngineOption.flatMap(_.get("instanceName"))
lazy val zone = computeEngineOption.flatMap(_.get("zone"))

val preemptible = {
for {
request <- op.getMetadata.asScala.get("request")
pipelineArgs <- request.asInstanceOf[GArrayMap[String, Object]].asScala.get("pipelineArgs")
resources <- pipelineArgs.asInstanceOf[GArrayMap[String, Object]].asScala.get("resources")
preemptible <- resources.asInstanceOf[GArrayMap[String, Object]].asScala.get("preemptible")
} yield preemptible.asInstanceOf[Boolean]
} getOrElse false

// If there's an error, generate an unsuccessful status. Otherwise, we were successful!
Option(op.getError) match {
case Some(error) =>
val errorCode = Status.fromCodeValue(error.getCode)
UnsuccessfulRunStatus(errorCode, Option(error.getMessage), eventList, machineType, zone, instanceName)
UnsuccessfulRunStatus(errorCode, Option(error.getMessage), eventList, machineType, zone, instanceName, preemptible)
case None => Success(eventList, machineType, zone, instanceName)
}
} else if (op.hasStarted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend

private def runAndFail(previousPreemptions: Int, previousUnexpectedRetries: Int, preemptible: Int, errorCode: Status, innerErrorMessage: String, expectPreemptible: Boolean): BackendJobExecutionResponse = {

val runStatus = UnsuccessfulRunStatus(errorCode, Option(innerErrorMessage), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"))
val runStatus = UnsuccessfulRunStatus(errorCode, Option(innerErrorMessage), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"), expectPreemptible)
val statusPoller = TestProbe()

val promise = Promise[BackendJobExecutionResponse]()
Expand Down Expand Up @@ -244,10 +244,13 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend
(0, 0, 0, Status.ABORTED, "15: other error", false, false),
(0, 0, 0, Status.OUT_OF_RANGE, "13: unexpected error", false, false),
(0, 0, 0, Status.OUT_OF_RANGE, "14: test error msg", false, false),
// These commented out tests should be uncommented if/when we stop mapping 13 to 14 in preemption mode
// 1 preemptible attempt allowed, but not all failures represent preemptions.
// (0, 0, 1, Status.ABORTED, "13: retryable error", true, true),
// (0, 1, 1, Status.ABORTED, "13: retryable error", true, true),
// (0, 2, 1, Status.ABORTED, "13: retryable error", true, false),
// The following 13 based test should be removed if/when we stop mapping 13 to 14 in preemption mode
(0, 0, 1, Status.ABORTED, "13: retryable error", true, true),
(0, 1, 1, Status.ABORTED, "13: retryable error", true, true),
(0, 2, 1, Status.ABORTED, "13: retryable error", true, false),
(0, 0, 1, Status.ABORTED, "14: preempted", true, true),
(0, 0, 1, Status.UNKNOWN, "Instance failed to start due to preemption.", true, true),
(0, 0, 1, Status.ABORTED, "15: other error", true, false),
Expand All @@ -263,7 +266,7 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend
(1, 0, 1, Status.ABORTED, "15: other error", false, false),
(1, 0, 1, Status.OUT_OF_RANGE, "13: retryable error", false, false),
(1, 0, 1, Status.OUT_OF_RANGE, "14: preempted", false, false),
(1, 0, 1, Status.OUT_OF_RANGE, "Instance failed to start due to preemption.", false, false),
(1, 0, 1, Status.OUT_OF_RANGE, "Instance failed to start due to preemption.", false, false)
)

expectations foreach { case (previousPreemptions, previousUnexpectedRetries, preemptible, errorCode, innerErrorMessage, shouldBePreemptible, shouldRetry) =>
Expand All @@ -286,7 +289,7 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend
val runId = StandardAsyncJob(UUID.randomUUID().toString)
val handle = new JesPendingExecutionHandle(null, runId, None, None)

val failedStatus = UnsuccessfulRunStatus(Status.ABORTED, Option("14: VM XXX shut down unexpectedly."), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"))
val failedStatus = UnsuccessfulRunStatus(Status.ABORTED, Option("14: VM XXX shut down unexpectedly."), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"), true)
val executionResult = jesBackend.handleExecutionResult(failedStatus, handle)
val result = Await.result(executionResult, timeout)
result.isInstanceOf[FailedNonRetryableExecutionHandle] shouldBe true
Expand All @@ -300,7 +303,7 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend
val runId = StandardAsyncJob(UUID.randomUUID().toString)
val handle = new JesPendingExecutionHandle(null, runId, None, None)

val failedStatus = UnsuccessfulRunStatus(Status.ABORTED, Option("14: VM XXX shut down unexpectedly."), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"))
val failedStatus = UnsuccessfulRunStatus(Status.ABORTED, Option("14: VM XXX shut down unexpectedly."), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"), true)
val executionResult = jesBackend.handleExecutionResult(failedStatus, handle)
val result = Await.result(executionResult, timeout)
result.isInstanceOf[FailedRetryableExecutionHandle] shouldBe true
Expand All @@ -315,7 +318,22 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend
val runId = StandardAsyncJob(UUID.randomUUID().toString)
val handle = new JesPendingExecutionHandle(null, runId, None, None)

val failedStatus = UnsuccessfulRunStatus(Status.ABORTED, Option("14: VM XXX shut down unexpectedly."), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"))
val failedStatus = UnsuccessfulRunStatus(Status.ABORTED, Option("14: VM XXX shut down unexpectedly."), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"), true)
val executionResult = jesBackend.handleExecutionResult(failedStatus, handle)
val result = Await.result(executionResult, timeout)
result.isInstanceOf[FailedRetryableExecutionHandle] shouldBe true
val retryableHandle = result.asInstanceOf[FailedRetryableExecutionHandle]
retryableHandle.returnCode shouldBe None
retryableHandle.throwable.getMessage should include("will be restarted with another preemptible VM")
}

it should "treat a JES message 13 as preemptible if the VM was preemptible" in {
val actorRef = buildPreemptibleTestActorRef(1, 2)
val jesBackend = actorRef.underlyingActor
val runId = StandardAsyncJob(UUID.randomUUID().toString)
val handle = new JesPendingExecutionHandle(null, runId, None, None)

val failedStatus = UnsuccessfulRunStatus(Status.ABORTED, Option("13: Retryable Error."), Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"), true)
val executionResult = jesBackend.handleExecutionResult(failedStatus, handle)
val result = Await.result(executionResult, timeout)
result.isInstanceOf[FailedRetryableExecutionHandle] shouldBe true
Expand All @@ -331,7 +349,7 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend
val handle = new JesPendingExecutionHandle(null, runId, None, None)

def checkFailedResult(errorCode: Status, errorMessage: Option[String]): ExecutionHandle = {
val failed = UnsuccessfulRunStatus(errorCode, errorMessage, Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"))
val failed = UnsuccessfulRunStatus(errorCode, errorMessage, Seq.empty, Option("fakeMachine"), Option("fakeZone"), Option("fakeInstance"), true)
Await.result(jesBackend.handleExecutionResult(failed, handle), timeout)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class JesPollingActorSpec extends TestKitSuite("JesPollingActor") with FlatSpecL
jpActor.underlyingActor.callbackResponses :+= CallbackFailure

val successStatus = RunStatus.Success(Seq.empty[ExecutionEvent], None, None, None)
val failureStatus = RunStatus.UnsuccessfulRunStatus(Status.UNIMPLEMENTED, Option.empty[String], Seq.empty[ExecutionEvent], None, None, None)
val failureStatus = RunStatus.UnsuccessfulRunStatus(Status.UNIMPLEMENTED, Option.empty[String], Seq.empty[ExecutionEvent], None, None, None, false)
jpActor.underlyingActor.operationStatusResponses :+= successStatus
jpActor.underlyingActor.operationStatusResponses :+= failureStatus

Expand Down

0 comments on commit 2347c14

Please sign in to comment.