diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala index bbb796220c4..35649cf1634 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala @@ -24,6 +24,7 @@ import cromwell.backend._ import cromwell.backend.async.AsyncBackendJobExecutionActor._ import cromwell.backend.async._ import cromwell.backend.standard.StandardAdHocValue._ +import cromwell.backend.standard.retry.memory.MemoryRetryResult import cromwell.backend.validation._ import cromwell.core.io.{AsyncIoActorClient, DefaultIoCommandBuilder, IoCommandBuilder} import cromwell.core.path.Path @@ -896,6 +897,16 @@ trait StandardAsyncExecutionActor /** * Returns true if the status represents a completion. * + * Select meanings by backend: + * - TES: + * `cromwell.backend.impl.tes.Complete` derived from "state": "COMPLETE" + * - Life Sciences: + * `com.google.api.services.genomics.v2alpha1.model.Operation.getDone` is true + * -- AND -- + * `com.google.api.services.genomics.v2alpha1.model.Operation#getError` is empty + * - GCP Batch: + * `com.google.cloud.batch.v1.JobStatus.State` is `SUCCEEDED` + * * @param runStatus The run status. * @return True if the job is done. */ @@ -1054,7 +1065,7 @@ trait StandardAsyncExecutionActor * @return The execution handle. */ def retryElseFail(backendExecutionStatus: Future[ExecutionHandle], - retryWithMoreMemory: Boolean = false + memoryRetry: MemoryRetryResult = MemoryRetryResult.none ): Future[ExecutionHandle] = backendExecutionStatus flatMap { case failedRetryableOrNonRetryable: FailedExecutionHandle => @@ -1069,35 +1080,48 @@ trait StandardAsyncExecutionActor case None => Map.empty[String, KvPair] } - val maxRetriesNotReachedYet = previousFailedRetries < maxRetries failedRetryableOrNonRetryable match { - case failed: FailedNonRetryableExecutionHandle if maxRetriesNotReachedYet => - (retryWithMoreMemory, memoryRetryFactor, previousMemoryMultiplier) match { - case (true, Some(retryFactor), Some(previousMultiplier)) => - val nextMemoryMultiplier = previousMultiplier * retryFactor.value - saveAttrsAndRetry(failed, - kvsFromPreviousAttempt, - kvsForNextAttempt, - incFailedCount = true, - Option(nextMemoryMultiplier) - ) - case (true, Some(retryFactor), None) => - saveAttrsAndRetry(failed, - kvsFromPreviousAttempt, - kvsForNextAttempt, - incFailedCount = true, - Option(retryFactor.value) - ) - case (_, _, _) => - saveAttrsAndRetry(failed, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true) - } - case failedNonRetryable: FailedNonRetryableExecutionHandle => Future.successful(failedNonRetryable) + case failedNonRetryable: FailedNonRetryableExecutionHandle if previousFailedRetries < maxRetries => + // The user asked us to retry finitely for them, possibly with a memory modification + evaluateFailureRetry(failedNonRetryable, kvsFromPreviousAttempt, kvsForNextAttempt, memoryRetry) + case failedNonRetryable: FailedNonRetryableExecutionHandle => + // No reason to retry + Future.successful(failedNonRetryable) case failedRetryable: FailedRetryableExecutionHandle => + // Retry infinitely and unconditionally (!) saveAttrsAndRetry(failedRetryable, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = false) } case _ => backendExecutionStatus } + private def evaluateFailureRetry(handle: FailedNonRetryableExecutionHandle, + kvsFromPreviousAttempt: Map[String, KvPair], + kvsForNextAttempt: Map[String, KvPair], + memoryRetry: MemoryRetryResult + ): Future[FailedRetryableExecutionHandle] = + (memoryRetry.oomDetected, memoryRetry.factor, memoryRetry.previousMultiplier) match { + case (true, Some(retryFactor), Some(previousMultiplier)) => + // Subsequent memory retry attempt + val nextMemoryMultiplier = previousMultiplier * retryFactor.value + saveAttrsAndRetry(handle, + kvsFromPreviousAttempt, + kvsForNextAttempt, + incFailedCount = true, + Option(nextMemoryMultiplier) + ) + case (true, Some(retryFactor), None) => + // First memory retry attempt + saveAttrsAndRetry(handle, + kvsFromPreviousAttempt, + kvsForNextAttempt, + incFailedCount = true, + Option(retryFactor.value) + ) + case (_, _, _) => + // Not an OOM + saveAttrsAndRetry(handle, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true) + } + private def saveAttrsAndRetry(failedExecHandle: FailedExecutionHandle, kvPrev: Map[String, KvPair], kvNext: Map[String, KvPair], @@ -1400,7 +1424,9 @@ trait StandardAsyncExecutionActor None ) ) - retryElseFail(executionHandle, outOfMemoryDetected) + retryElseFail(executionHandle, + MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier) + ) case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) => Future.successful(AbortedExecutionHandle) case Success(returnCodeAsInt) => @@ -1430,7 +1456,9 @@ trait StandardAsyncExecutionActor None ) ) - retryElseFail(executionHandle, outOfMemoryDetected) + retryElseFail(executionHandle, + MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier) + ) case _ => val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption) retryElseFail(failureStatus) diff --git a/backend/src/main/scala/cromwell/backend/standard/retry/memory/MemoryRetryResult.scala b/backend/src/main/scala/cromwell/backend/standard/retry/memory/MemoryRetryResult.scala new file mode 100644 index 00000000000..51b0f9d096e --- /dev/null +++ b/backend/src/main/scala/cromwell/backend/standard/retry/memory/MemoryRetryResult.scala @@ -0,0 +1,20 @@ +package cromwell.backend.standard.retry.memory + +import common.validation.Validation.MemoryRetryMultiplierRefined + +/** + * Result of evaluating an attempt as a memory-retry candidate, encapsulating instructions + * for configuring the next attempt if applicable. + * + * @param oomDetected Did the previous attempt OOM? + * @param factor User-configured factor + * @param previousMultiplier Multiplier used for the previous attempt + */ +case class MemoryRetryResult(oomDetected: Boolean, + factor: Option[MemoryRetryMultiplierRefined], + previousMultiplier: Option[Double] +) + +case object MemoryRetryResult { + def none: MemoryRetryResult = MemoryRetryResult(oomDetected = false, None, None) +} diff --git a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala new file mode 100644 index 00000000000..ade5f84e00f --- /dev/null +++ b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/errors/package.scala @@ -0,0 +1,16 @@ +package cromwell.backend.google.pipelines.common + +package object errors { + + private def quotaMessages = List( + "A resource limit has delayed the operation", + "usage too high", + "no available zones", + "resource_exhausted", + "quota too low" + ) + + def isQuotaMessage(msg: String): Boolean = + quotaMessages.exists(msg.contains) + +} diff --git a/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/api/request/GetRequestHandler.scala b/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/api/request/GetRequestHandler.scala index 0473aecaa36..e87f3c580c4 100644 --- a/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/api/request/GetRequestHandler.scala +++ b/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/api/request/GetRequestHandler.scala @@ -17,6 +17,7 @@ import cromwell.backend.google.pipelines.common.api.RunStatus.{ Success, UnsuccessfulRunStatus } +import cromwell.backend.google.pipelines.common.errors.isQuotaMessage import cromwell.backend.google.pipelines.v2beta.PipelinesConversions._ import cromwell.backend.google.pipelines.v2beta.api.Deserialization._ import cromwell.backend.google.pipelines.v2beta.api.request.ErrorReporter._ @@ -197,16 +198,9 @@ trait GetRequestHandler { this: RequestHandler => private def isQuotaDelayed(events: List[Event]): Boolean = events.sortBy(_.getTimestamp).reverse.headOption match { case Some(event) => - quotaMessages.exists(event.getDescription.contains) + isQuotaMessage(event.getDescription) case None => // If the events list is empty, we're not waiting for quota yet false } - - private val quotaMessages = List( - "A resource limit has delayed the operation", - "usage too high", - "no available zones", - "resource_exhausted" - ) }