Skip to content

Commit

Permalink
Merge d0b4a53 into 06a80d0
Browse files Browse the repository at this point in the history
  • Loading branch information
Horneth committed May 23, 2017
2 parents 06a80d0 + d0b4a53 commit bc4472d
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 17 deletions.
7 changes: 5 additions & 2 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,14 @@ engine {
# }
# You will need to provide the engine with a gcs filesystem
# Note that the default filesystem (local) is always available.
#filesystems {
filesystems {
# gcs {
# auth = "application-default"
# }
#}
local {
enabled: true
}
}
}

backend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ import cromwell.core.callcaching.docker.registryv2.flows.{FlowUtils, HttpFlowWit
import cromwell.core.callcaching.docker.{DockerFlow, DockerHashResult, DockerImageIdentifierWithoutHash}
import spray.json._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

object DockerRegistryV2AbstractFlow {
type HttpDockerFlow = Flow[(HttpRequest, ContextWithRequest[DockerHashContext]), (Try[HttpResponse], ContextWithRequest[DockerHashContext]), NotUsed]
val StrictTimeout = 30 seconds

val DigestHeaderName = "Docker-Content-Digest".toLowerCase
val AcceptHeader = HttpHeader.parse("Accept", "application/vnd.docker.distribution.manifest.v2+json") match {
Expand All @@ -41,7 +44,36 @@ object DockerRegistryV2AbstractFlow {
*/
abstract class DockerRegistryV2AbstractFlow(httpClientFlow: HttpDockerFlow)(implicit ec: ExecutionContext, materializer: ActorMaterializer, scheduler: Scheduler) extends DockerFlow {
// Wraps the Http flow in a retryable flow to enable auto retries
final private val httpFlowWithRetry = new HttpFlowWithRetry[DockerHashContext](httpClientFlow).flow
final private val httpFlowWithRetry = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val retryHttpFlow = builder.add(new HttpFlowWithRetry[DockerHashContext](httpClientFlow).flow)

// Force the response entity to be strict. This makes sure that whatever happens to the response later we
// won't leave it hanging and potentially lock the pool.
// See http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/request-level.html#using-the-future-based-api-in-actors
// Note that in this particular case it's ok to force the loading of the entity in memory
// because we use HEAD Http method when we only care about the headers. Therefore there's no unnecessary memory usage.
/* Returns a (Try[HttpResponse], DockerHashContext) */
val strictHttpResponse = retryHttpFlow.out0.mapAsync(1){
case (response, context) => response.toStrict(StrictTimeout) map { Success(_) -> context } recoverWith {
case failure => Future.successful(Failure(failure) -> context)
}
}

// Splits successfully `toStrict` responses from failures
val partitionStrictResponse = builder.add(FlowUtils.fanOutTry[HttpResponse, DockerHashContext])

// Merge failures from retryHttpFlow.out1 (failed http responses)
// and partitionStrictResponse.out1 (failed to `toStrict` the response
val mergeFailures = builder.add(Merge[(Throwable, DockerHashContext)](2))

strictHttpResponse.outlet ~> partitionStrictResponse.in

retryHttpFlow.out1 ~> mergeFailures
partitionStrictResponse.out1 ~> mergeFailures

new FanOutShape2(retryHttpFlow.in, partitionStrictResponse.out0, mergeFailures.out)
}

final private val tokenFlow = {
val responseHandlerFlow = Flow[(HttpResponse, DockerHashContext)].mapAsync(1)(Function.tupled(tokenResponseHandler))
Expand All @@ -62,8 +94,8 @@ abstract class DockerRegistryV2AbstractFlow(httpClientFlow: HttpDockerFlow)(impl
// Decouple the token flow from the manifest flow with ".async"
// this way they can run in parallel from each other
// while still maintaining the final ordering
val token = builder.add(tokenFlow.async)
val manifest = builder.add(manifestFlow.async)
val token = builder.add(tokenFlow)
val manifest = builder.add(manifestFlow)
val mergeResponses = builder.add(Merge[(DockerHashResponse, DockerHashContext)](3))

token.out0 ~> manifest.in
Expand Down Expand Up @@ -116,7 +148,7 @@ abstract class DockerRegistryV2AbstractFlow(httpClientFlow: HttpDockerFlow)(impl
/**
* Http method used for the manifest request
*/
protected def manifestRequestHttpMethod: HttpMethod = HttpMethods.GET
protected def manifestRequestHttpMethod: HttpMethod = HttpMethods.HEAD

/**
* Generic method to build a flow that creates a request, sends it,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ package cromwell.core.callcaching.docker.registryv2.flows
import akka.NotUsed
import akka.actor.Scheduler
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.stream.FanOutShape2
import akka.stream.{ActorMaterializer, FanOutShape2}
import akka.stream.javadsl.MergePreferred
import akka.stream.scaladsl.{Flow, GraphDSL, Partition}
import cromwell.core.callcaching.docker.registryv2.flows.FlowUtils._
import cromwell.core.callcaching.docker.registryv2.flows.HttpFlowWithRetry._
import cromwell.core.retry.{Backoff, SimpleExponentialBackoff}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.Try

object HttpFlowWithRetry {
val Logger = LoggerFactory.getLogger("HttpLogger")

def isRetryable(response: HttpResponse) = {
response.status match {
case StatusCodes.InternalServerError => true
Expand Down Expand Up @@ -68,7 +71,7 @@ case class HttpFlowWithRetry[T](
retryBufferSize: Int = 100,
requestBackoff: () => Backoff = defaultRequestBackoff,
maxAttempts: Int = 3
)(implicit val scheduler: Scheduler, ec: ExecutionContext) {
)(implicit val scheduler: Scheduler, ec: ExecutionContext, mat: ActorMaterializer) {

lazy val flow = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
Expand Down Expand Up @@ -143,12 +146,24 @@ case class HttpFlowWithRetry[T](
* Create a re-submittable request from a failed retryable
* @return a future that will complete after the appropriate backoff time
*/
private def toRetryableRequest(value: (Any, ContextWithRequest[T])) = value match {
case (_, contextWithRequest) =>
private def toRetryableRequest(value: (HttpResponse, ContextWithRequest[T])) = value match {
case (response, contextWithRequest) =>
val nextRetryIn = contextWithRequest.retryIn
val nextRequest = (contextWithRequest.request, contextWithRequest.withNextAttempt)
akka.pattern.after(nextRetryIn, scheduler) {
Future.successful(nextRequest)
// This response will never be consumed by anyone, so discard its content here to avoid pool freeze
// http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/request-level.html#using-the-future-based-api-in-actors
// https://github.com/akka/akka/issues/19538
// https://github.com/akka/akka-http/issues/183
// https://github.com/akka/akka-http/issues/117
response.discardEntityBytes().future() flatMap { _ =>
akka.pattern.after(nextRetryIn, scheduler) {
Future.successful(nextRequest)
}
} recoverWith {
case failure =>
// Can't do much here except log the error and keep going with the next request
Logger.error(s"Failed to discard entity bytes for response $response", failure)
Future.successful(nextRequest)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package cromwell.core.callcaching.docker.registryv2.flows.dockerhub

import akka.actor.Scheduler
import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials}
import akka.http.scaladsl.model.{HttpMethod, HttpMethods}
import akka.stream.ActorMaterializer
import cromwell.core.DockerCredentials
import cromwell.core.callcaching.docker.DockerHashActor.DockerHashContext
import cromwell.core.callcaching.docker.DockerImageIdentifierWithoutHash
import cromwell.core.callcaching.docker.registryv2.flows.dockerhub.DockerHubFlow._
import cromwell.core.callcaching.docker.registryv2.DockerRegistryV2AbstractFlow
import cromwell.core.callcaching.docker.registryv2.DockerRegistryV2AbstractFlow.HttpDockerFlow
import cromwell.core.callcaching.docker.registryv2.flows.dockerhub.DockerHubFlow._

import scala.concurrent.ExecutionContext

Expand Down Expand Up @@ -45,6 +44,4 @@ class DockerHubFlow(httpClientFlow: HttpDockerFlow)(implicit ec: ExecutionContex
case _ => false
}
}

override def manifestRequestHttpMethod: HttpMethod = HttpMethods.HEAD
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,16 @@ class DockerHashActorSpec extends TestKitSuite with FlatSpecLike with Matchers w
expectMsg(responseFailure)
mockHttpFlow.count() shouldBe 2
}


it should "not deadlock" taggedAs IntegrationTest in {
lazy val dockerActorScale = system.actorOf(DockerHashActor.props(registryFlows, 1000, 20.minutes, 0)(materializer))
0 until 400 foreach { _ =>
dockerActorScale ! makeRequest("gcr.io/google-containers/alpine-with-bash:1.0")
}

val received = receiveN(400, 1 minute)
received foreach { _ shouldBe a[DockerHashResponseSuccess] }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ case class EngineFilesystems(actorSystem: ActorSystem) {
private val gcsPathBuilderFactory = googleAuthMode map { mode =>
GcsPathBuilderFactory(mode, googleConf.applicationName)
}

private val defaultFileSystem = if (config.as[Boolean]("engine.filesystems.local.enabled")) {
Option(DefaultPathBuilder)
} else None

def pathBuildersForWorkflow(workflowOptions: WorkflowOptions): List[PathBuilder] = {
List(gcsPathBuilderFactory map { _.withOptions(workflowOptions)(actorSystem) }, Option(DefaultPathBuilder)).flatten
List(gcsPathBuilderFactory map { _.withOptions(workflowOptions)(actorSystem) }, defaultFileSystem).flatten
}
}

0 comments on commit bc4472d

Please sign in to comment.