Skip to content

Commit

Permalink
WX-1625 Refactor ahead of quota retry (#7432)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols committed May 15, 2024
1 parent 9db0661 commit 0e21d72
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import cromwell.backend._
import cromwell.backend.async.AsyncBackendJobExecutionActor._
import cromwell.backend.async._
import cromwell.backend.standard.StandardAdHocValue._
import cromwell.backend.standard.retry.memory.MemoryRetryResult
import cromwell.backend.validation._
import cromwell.core.io.{AsyncIoActorClient, DefaultIoCommandBuilder, IoCommandBuilder}
import cromwell.core.path.Path
Expand Down Expand Up @@ -896,6 +897,16 @@ trait StandardAsyncExecutionActor
/**
* Returns true if the status represents a completion.
*
* Select meanings by backend:
* - TES:
* `cromwell.backend.impl.tes.Complete` derived from "state": "COMPLETE"
* - Life Sciences:
* `com.google.api.services.genomics.v2alpha1.model.Operation.getDone` is true
* -- AND --
* `com.google.api.services.genomics.v2alpha1.model.Operation#getError` is empty
* - GCP Batch:
* `com.google.cloud.batch.v1.JobStatus.State` is `SUCCEEDED`
*
* @param runStatus The run status.
* @return True if the job is done.
*/
Expand Down Expand Up @@ -1054,7 +1065,7 @@ trait StandardAsyncExecutionActor
* @return The execution handle.
*/
def retryElseFail(backendExecutionStatus: Future[ExecutionHandle],
retryWithMoreMemory: Boolean = false
memoryRetry: MemoryRetryResult = MemoryRetryResult.none
): Future[ExecutionHandle] =
backendExecutionStatus flatMap {
case failedRetryableOrNonRetryable: FailedExecutionHandle =>
Expand All @@ -1069,35 +1080,48 @@ trait StandardAsyncExecutionActor
case None => Map.empty[String, KvPair]
}

val maxRetriesNotReachedYet = previousFailedRetries < maxRetries
failedRetryableOrNonRetryable match {
case failed: FailedNonRetryableExecutionHandle if maxRetriesNotReachedYet =>
(retryWithMoreMemory, memoryRetryFactor, previousMemoryMultiplier) match {
case (true, Some(retryFactor), Some(previousMultiplier)) =>
val nextMemoryMultiplier = previousMultiplier * retryFactor.value
saveAttrsAndRetry(failed,
kvsFromPreviousAttempt,
kvsForNextAttempt,
incFailedCount = true,
Option(nextMemoryMultiplier)
)
case (true, Some(retryFactor), None) =>
saveAttrsAndRetry(failed,
kvsFromPreviousAttempt,
kvsForNextAttempt,
incFailedCount = true,
Option(retryFactor.value)
)
case (_, _, _) =>
saveAttrsAndRetry(failed, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true)
}
case failedNonRetryable: FailedNonRetryableExecutionHandle => Future.successful(failedNonRetryable)
case failedNonRetryable: FailedNonRetryableExecutionHandle if previousFailedRetries < maxRetries =>
// The user asked us to retry finitely for them, possibly with a memory modification
evaluateFailureRetry(failedNonRetryable, kvsFromPreviousAttempt, kvsForNextAttempt, memoryRetry)
case failedNonRetryable: FailedNonRetryableExecutionHandle =>
// No reason to retry
Future.successful(failedNonRetryable)
case failedRetryable: FailedRetryableExecutionHandle =>
// Retry infinitely and unconditionally (!)
saveAttrsAndRetry(failedRetryable, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = false)
}
case _ => backendExecutionStatus
}

private def evaluateFailureRetry(handle: FailedNonRetryableExecutionHandle,
kvsFromPreviousAttempt: Map[String, KvPair],
kvsForNextAttempt: Map[String, KvPair],
memoryRetry: MemoryRetryResult
): Future[FailedRetryableExecutionHandle] =
(memoryRetry.oomDetected, memoryRetry.factor, memoryRetry.previousMultiplier) match {
case (true, Some(retryFactor), Some(previousMultiplier)) =>
// Subsequent memory retry attempt
val nextMemoryMultiplier = previousMultiplier * retryFactor.value
saveAttrsAndRetry(handle,
kvsFromPreviousAttempt,
kvsForNextAttempt,
incFailedCount = true,
Option(nextMemoryMultiplier)
)
case (true, Some(retryFactor), None) =>
// First memory retry attempt
saveAttrsAndRetry(handle,
kvsFromPreviousAttempt,
kvsForNextAttempt,
incFailedCount = true,
Option(retryFactor.value)
)
case (_, _, _) =>
// Not an OOM
saveAttrsAndRetry(handle, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true)
}

private def saveAttrsAndRetry(failedExecHandle: FailedExecutionHandle,
kvPrev: Map[String, KvPair],
kvNext: Map[String, KvPair],
Expand Down Expand Up @@ -1400,7 +1424,9 @@ trait StandardAsyncExecutionActor
None
)
)
retryElseFail(executionHandle, outOfMemoryDetected)
retryElseFail(executionHandle,
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
)
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
case Success(returnCodeAsInt) =>
Expand Down Expand Up @@ -1430,7 +1456,9 @@ trait StandardAsyncExecutionActor
None
)
)
retryElseFail(executionHandle, outOfMemoryDetected)
retryElseFail(executionHandle,
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
)
case _ =>
val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
retryElseFail(failureStatus)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cromwell.backend.standard.retry.memory

import common.validation.Validation.MemoryRetryMultiplierRefined

/**
* Result of evaluating an attempt as a memory-retry candidate, encapsulating instructions
* for configuring the next attempt if applicable.
*
* @param oomDetected Did the previous attempt OOM?
* @param factor User-configured factor
* @param previousMultiplier Multiplier used for the previous attempt
*/
case class MemoryRetryResult(oomDetected: Boolean,
factor: Option[MemoryRetryMultiplierRefined],
previousMultiplier: Option[Double]
)

case object MemoryRetryResult {
def none: MemoryRetryResult = MemoryRetryResult(oomDetected = false, None, None)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cromwell.backend.google.pipelines.common

package object errors {

private def quotaMessages = List(
"A resource limit has delayed the operation",
"usage too high",
"no available zones",
"resource_exhausted",
"quota too low"
)

def isQuotaMessage(msg: String): Boolean =
quotaMessages.exists(msg.contains)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import cromwell.backend.google.pipelines.common.api.RunStatus.{
Success,
UnsuccessfulRunStatus
}
import cromwell.backend.google.pipelines.common.errors.isQuotaMessage
import cromwell.backend.google.pipelines.v2beta.PipelinesConversions._
import cromwell.backend.google.pipelines.v2beta.api.Deserialization._
import cromwell.backend.google.pipelines.v2beta.api.request.ErrorReporter._
Expand Down Expand Up @@ -197,16 +198,9 @@ trait GetRequestHandler { this: RequestHandler =>
private def isQuotaDelayed(events: List[Event]): Boolean =
events.sortBy(_.getTimestamp).reverse.headOption match {
case Some(event) =>
quotaMessages.exists(event.getDescription.contains)
isQuotaMessage(event.getDescription)
case None =>
// If the events list is empty, we're not waiting for quota yet
false
}

private val quotaMessages = List(
"A resource limit has delayed the operation",
"usage too high",
"no available zones",
"resource_exhausted"
)
}

0 comments on commit 0e21d72

Please sign in to comment.