Permalink
Browse files

Scala 2.12 support for Cromwell (#2412)

  • Loading branch information...
1 parent 176de92 commit 3e9808241969ae1d13d09827ec09430648d00e23 @geoffjentry geoffjentry committed on GitHub Jul 5, 2017
Showing with 128 additions and 133 deletions.
  1. +1 −1 .travis.yml
  2. +3 −3 README.md
  3. +3 −10 backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala
  4. +0 −1 backend/src/main/scala/cromwell/backend/standard/StandardSyncExecutionActor.scala
  5. +2 −2 core/src/main/scala/cromwell/core/ConfigUtil.scala
  6. +2 −2 core/src/main/scala/cromwell/core/path/BetterFileMethods.scala
  7. +1 −1 core/src/test/scala/cromwell/core/retry/RetrySpec.scala
  8. +1 −1 cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala
  9. +1 −0 cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala
  10. +2 −1 database/migration/src/main/scala/cromwell/database/migration/custom/BatchedTaskChange.scala
  11. +1 −2 database/sql/src/main/scala/cromwell/database/slick/SummaryStatusSlickDatabase.scala
  12. +2 −3 dockerHashing/src/main/scala/cromwell/docker/local/DockerCliFlow.scala
  13. +1 −4 dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/HttpFlowWithRetry.scala
  14. +0 −2 engine/src/main/scala/cromwell/engine/io/gcs/GcsResponse.scala
  15. +1 −1 engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/EngineJobExecutionActor.scala
  16. +1 −2 engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCache.scala
  17. +1 −1 engine/src/main/scala/cromwell/server/CromwellRootActor.scala
  18. +1 −1 engine/src/main/scala/cromwell/webservice/CromwellApiService.scala
  19. +1 −1 engine/src/main/scala/cromwell/webservice/LabelsManagerActor.scala
  20. +8 −7 engine/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala
  21. +1 −1 filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GoogleConfiguration.scala
  22. +37 −36 project/Dependencies.scala
  23. +32 −15 project/Settings.scala
  24. +1 −1 project/build.properties
  25. +3 −3 release/release_workflow.wdl
  26. +1 −0 services/src/main/scala/cromwell/services/metadata/impl/MetadataDatabaseAccess.scala
  27. +1 −3 services/src/test/scala/cromwell/services/ServicesStoreSpec.scala
  28. +2 −2 src/bin/travis/testCentaurJes.sh
  29. +1 −1 src/bin/travis/testCentaurLocal.sh
  30. +1 −1 src/bin/travis/testCentaurTes.sh
  31. +4 −13 ...ortedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesAsyncBackendJobExecutionActor.scala
  32. +2 −2 supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesAttributes.scala
  33. +1 −2 supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesJobPaths.scala
  34. +2 −1 ...ortedBackends/jes/src/main/scala/cromwell/backend/impl/jes/statuspolling/JesApiQueryManager.scala
  35. +2 −2 supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/statuspolling/JesPollingActor.scala
  36. +4 −4 ...ortedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala
View
@@ -2,7 +2,7 @@ sudo: required
dist: trusty
language: scala
scala:
- - 2.11.8
+ - 2.12.2
jdk:
- oraclejdk8
cache:
View
@@ -120,13 +120,13 @@ There is a [Cromwell gitter channel](https://gitter.im/broadinstitute/cromwell)
The following is the toolchain used for development of Cromwell. Other versions may work, but these are recommended.
-* [Scala 2.11.8](http://www.scala-lang.org/news/2.11.8/)
+* [Scala 2.12.2](http://www.scala-lang.org/news/2.12.1#scala-212-notes)
* [SBT 0.13.12](https://github.com/sbt/sbt/releases/tag/v0.13.12)
* [Java 8](http://www.oracle.com/technetwork/java/javase/overview/java8-2100321.html)
# Building
-`sbt assembly` will build a runnable JAR in `target/scala-2.11/`
+`sbt assembly` will build a runnable JAR in `target/scala-2.12/`
Tests are run via `sbt test`. Note that the tests do require Docker to be running. To test this out while downloading the Ubuntu image that is required for tests, run `docker pull ubuntu:latest` prior to running `sbt test`
@@ -3788,7 +3788,7 @@ Essentially run `sbt doc` then commit the generated code into the `gh-pages` bra
```
$ sbt doc
$ git co gh-pages
-$ mv target/scala-2.11/api scaladoc
+$ mv target/scala-2.12/api scaladoc
$ git add scaladoc
$ git commit -m "API Docs"
$ git push origin gh-pages
@@ -251,9 +251,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
*
* @return the execution handle for the job.
*/
- def executeAsync()(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
- Future.fromTry(Try(execute()))
- }
+ def executeAsync(): Future[ExecutionHandle] = Future.fromTry(Try(execute()))
/**
* Recovers the specified job id, or starts a new job. The default implementation simply calls execute().
@@ -269,9 +267,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
* @param jobId The previously recorded job id.
* @return the execution handle for the job.
*/
- def recoverAsync(jobId: StandardAsyncJob)(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
- Future.fromTry(Try(recover(jobId)))
- }
+ def recoverAsync(jobId: StandardAsyncJob): Future[ExecutionHandle] = Future.fromTry(Try(recover(jobId)))
/**
* Returns the run status for the job.
@@ -289,10 +285,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
* @param handle The handle of the running job.
* @return The status of the job.
*/
- def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle)
- (implicit ec: ExecutionContext): Future[StandardAsyncRunStatus] = {
- Future.fromTry(Try(pollStatus(handle)))
- }
+ def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[StandardAsyncRunStatus] = Future.fromTry(Try(pollStatus(handle)))
/**
* Adds custom behavior invoked when polling fails due to some exception. By default adds nothing.
@@ -10,7 +10,6 @@ import cromwell.core.Dispatcher
import cromwell.services.keyvalue.KeyValueServiceActor._
import scala.concurrent.{Future, Promise}
-import scala.language.existentials
trait StandardSyncExecutionActorParams extends StandardJobExecutionActorParams {
/** The class for creating an async backend. */
@@ -7,15 +7,15 @@ import cats.syntax.validated._
import com.typesafe.config.{Config, ConfigException, ConfigValue}
import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.{ClassTag, classTag}
object ConfigUtil {
val validationLogger = LoggerFactory.getLogger("ConfigurationValidation")
implicit class EnhancedConfig(val config: Config) extends AnyVal {
- def keys = config.entrySet().toSet map { v: java.util.Map.Entry[String, ConfigValue] => v.getKey }
+ def keys = config.entrySet().asScala.toSet map { v: java.util.Map.Entry[String, ConfigValue] => v.getKey }
/**
* For keys that are in the configuration but not in the reference keySet, log a warning.
@@ -214,11 +214,11 @@ trait BetterFileMethods {
betterFile.bufferedReader(codec)
final def newBufferedWriter(implicit codec: Codec, openOptions: OpenOptions = OpenOptions.default): BufferedWriter =
- betterFile.newBufferedWriter(codec)
+ betterFile.newBufferedWriter(codec, openOptions)
final def bufferedWriter(implicit codec: Codec,
openOptions: OpenOptions = OpenOptions.default): ManagedResource[BufferedWriter] =
- betterFile.bufferedWriter(codec)
+ betterFile.bufferedWriter(codec, openOptions)
final def newFileReader: FileReader = betterFile.newFileReader
@@ -34,7 +34,7 @@ class RetrySpec extends TestKitSuite("retry-spec") with FlatSpecLike with Matche
isFatal: Throwable => Boolean = Retry.throwableToFalse): Future[Int] = {
withRetry(
- f = work.doIt,
+ f = () => work.doIt(),
maxRetries = Option(retries),
isTransient = isTransient,
isFatal = isFatal
@@ -128,7 +128,7 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto
private def decodeResponse(response: HttpResponse): Try[HttpResponse] = {
decoders.get(response.encoding) map { decoder =>
- Try(decoder.decode(response))
+ Try(decoder.decodeMessage(response))
} getOrElse Failure(UnsuccessfulRequestException(s"No decoder for ${response.encoding}", response))
}
}
@@ -1,6 +1,7 @@
package cromwell.api.model
import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFormat}
+import scala.language.postfixOps
object LabelsJsonFormatter extends DefaultJsonProtocol {
implicit object LabelJsonFormat extends RootJsonFormat[List[Label]] {
@@ -118,7 +118,8 @@ trait BatchedTaskChange extends MigrationTaskChange {
resultBatch.close()
val progress = Math.min((page + 1) * 100 / pageCount, 100)
- logger.info(s"[$migrationName] $progress%")
+ val progressMessage = s"[$migrationName] $progress%"
+ logger.info(progressMessage)
}
if (batchMigrationCounter != 0) {
@@ -9,8 +9,7 @@ trait SummaryStatusSlickDatabase {
import dataAccess.driver.api._
- private[slick] def getSummaryStatusEntryMaximumId(summaryTableName: String, summarizedTableName: String)
- (implicit ec: ExecutionContext): DBIO[Option[Long]] = {
+ private[slick] def getSummaryStatusEntryMaximumId(summaryTableName: String, summarizedTableName: String): DBIO[Option[Long]] = {
dataAccess.
maximumIdForSummaryTableNameSummarizedTableName((summaryTableName, summarizedTableName)).
result.headOption
@@ -4,7 +4,7 @@ import java.util.concurrent.TimeoutException
import akka.actor.Scheduler
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition}
-import akka.stream.{ActorMaterializer, FlowShape}
+import akka.stream.FlowShape
import cromwell.docker.DockerHashActor._
import cromwell.docker.{DockerFlow, DockerHashActor, DockerHashResult, DockerImageIdentifierWithoutHash}
@@ -15,8 +15,7 @@ import scala.util.{Failure, Success}
/**
* A docker flow using the CLI to return docker hashes.
*/
-class DockerCliFlow(implicit ec: ExecutionContext, materializer: ActorMaterializer, scheduler: Scheduler)
- extends DockerFlow {
+class DockerCliFlow(implicit ec: ExecutionContext, scheduler: Scheduler) extends DockerFlow {
// If the docker cli hangs it would be difficult to debug. So timeout the first request after a short duration.
// https://github.com/docker/docker/issues/18279
@@ -13,7 +13,6 @@ import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
-import scala.language.postfixOps
import scala.util.Try
object HttpFlowWithRetry {
@@ -37,8 +36,6 @@ object HttpFlowWithRetry {
}
}
- def defaultRequestBackoff(): Backoff = SimpleExponentialBackoff(1 second, 2 minutes, 3D)
-
/**
* In order to allow for retries, the http context object needs to encapsulate the original request,
* so that it can be re-submitted if necessary.
@@ -69,7 +66,7 @@ object HttpFlowWithRetry {
case class HttpFlowWithRetry[T](
httpClientFlow: RetryableHttpFlow[T],
retryBufferSize: Int = 100,
- requestBackoff: () => Backoff = defaultRequestBackoff,
+ requestBackoff: () => Backoff = () => SimpleExponentialBackoff(1 second, 2 minutes, 3D),
maxAttempts: Int = 3
)(implicit val scheduler: Scheduler, ec: ExecutionContext, mat: ActorMaterializer) {
@@ -2,8 +2,6 @@ package cromwell.engine.io.gcs
import cromwell.engine.io.IoActor._
-import scala.language.existentials
-
/**
* ADT used only inside the batch stream
* @tparam T final type of the result of the Command
@@ -570,7 +570,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
case AbortedResponse(_: BackendJobDescriptorKey) =>
log.debug("{}: Won't save aborted job response to JobStore", jobTag)
forwardAndStop(updatedData.response)
- case JobFailedNonRetryableResponse(jobKey: BackendJobDescriptorKey, throwable: Throwable, returnCode: Option[Int]) =>
+ case JobFailedNonRetryableResponse(jobKey, throwable: Throwable, returnCode: Option[Int]) =>
publishHashesToMetadata(updatedData.hashes)
writeToMetadata(Map(callCachingAllowReuseMetadataKey -> false))
saveUnsuccessfulJobResults(jobKey, returnCode, throwable, retryable = false)
@@ -41,8 +41,7 @@ class CallCache(database: CallCachingSqlDatabase) {
}
private def buildCallCachingJoin(callCachingEntry: CallCachingEntry, callCacheHashes: CallCacheHashes,
- result: Iterable[WdlValueSimpleton], jobDetritus: Map[String, Path])
- (implicit ec: ExecutionContext): CallCachingJoin = {
+ result: Iterable[WdlValueSimpleton], jobDetritus: Map[String, Path]): CallCachingJoin = {
val hashesToInsert: Iterable[CallCachingHashEntry] = {
callCacheHashes.hashes map { hash => CallCachingHashEntry(hash.hashKey.key, hash.hashValue.value) }
@@ -94,7 +94,7 @@ import scala.language.postfixOps
lazy val googleFlow = new GoogleFlow(dockerHttpPool, dockerConf.gcrApiQueriesPer100Seconds)(ioEc, materializer, system.scheduler)
lazy val dockerHubFlow = new DockerHubFlow(dockerHttpPool)(ioEc, materializer, system.scheduler)
lazy val quayFlow = new QuayFlow(dockerHttpPool)(ioEc, materializer, system.scheduler)
- lazy val dockerCliFlow = new DockerCliFlow()(ioEc, materializer, system.scheduler)
+ lazy val dockerCliFlow = new DockerCliFlow()(ioEc, system.scheduler)
lazy val dockerFlows = dockerConf.method match {
case DockerLocalLookup => Seq(dockerCliFlow)
case DockerRemoteLookup => Seq(dockerHubFlow, googleFlow, quayFlow)
@@ -89,7 +89,7 @@ trait CromwellApiService {
} ~
encodeResponseWith(Gzip, Deflate, NoCoding) {
path("workflows" / Segment / Segment / "metadata") { (version, possibleWorkflowId) =>
- parameters('includeKey.*, 'excludeKey.*, 'expandSubWorkflows.as[Boolean].?) { (includeKeys, excludeKeys, expandSubWorkflowsOption) =>
+ parameters(('includeKey.*, 'excludeKey.*, 'expandSubWorkflows.as[Boolean].?)) { (includeKeys, excludeKeys, expandSubWorkflowsOption) =>
val includeKeysOption = NonEmptyList.fromList(includeKeys.toList)
val excludeKeysOption = NonEmptyList.fromList(excludeKeys.toList)
val expandSubWorkflows = expandSubWorkflowsOption.getOrElse(false)
@@ -34,7 +34,7 @@ object LabelsManagerActor {
}
def metadataEventsToLabels(events: Iterable[MetadataEvent]): Map[String, String] = {
- events map { case MetadataEvent(MetadataKey(_, _, key), Some(MetadataValue(value, _)), _) => key.split("\\:").last -> value } toMap
+ events collect { case MetadataEvent(MetadataKey(_, _, key), Some(MetadataValue(value, _)), _) => key.split("\\:").last -> value } toMap
}
def labelsToMetadataEvents(labels: Labels, workflowId: WorkflowId): Iterable[MetadataEvent] = {
@@ -29,7 +29,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
val akkaHttpService = new MockApiService()
val version = "v1"
- implicit def default(implicit system: ActorSystem) = RouteTestTimeout(5.seconds)
+ implicit def default = RouteTestTimeout(5.seconds)
behavior of "REST API /status endpoint"
@@ -322,7 +322,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
check {
status should be(StatusCodes.OK)
val decoder: Decoder = Gzip
- val result = Await.result(Unmarshal(decoder.decode(response)).to[JsObject], 1.second)
+ val result = Await.result(Unmarshal(decoder.decodeMessage(response)).to[JsObject], 1.second)
result.fields.keys should contain allOf("testKey1", "testKey2")
result.fields.keys shouldNot contain("testKey3")
result.fields("testKey1") should be(JsString("myValue1"))
@@ -336,7 +336,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
check {
status should be(StatusCodes.OK)
val decoder: Decoder = Gzip
- val result = Await.result(Unmarshal(decoder.decode(response)).to[JsObject], 1.second)
+ val result = Await.result(Unmarshal(decoder.decodeMessage(response)).to[JsObject], 1.second)
result.fields.keys should contain allOf("testKey1a", "testKey1b", "testKey2a")
result.fields.keys should contain noneOf("testKey2b", "testKey3")
result.fields("testKey1a") should be(JsString("myValue1a"))
@@ -351,7 +351,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
check {
status should be(StatusCodes.OK)
val decoder: Decoder = Gzip
- val result = Await.result(Unmarshal(decoder.decode(response)).to[JsObject], 1.second)
+ val result = Await.result(Unmarshal(decoder.decodeMessage(response)).to[JsObject], 1.second)
result.fields.keys should contain allOf("testKey1a", "testKey1b", "testKey2a")
result.fields.keys should contain noneOf("testKey2b", "testKey3")
result.fields("testKey1a") should be(JsString("myValue1a"))
@@ -369,7 +369,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
}
val decoder: Decoder = Gzip
- Unmarshal(decoder.decode(response)).to[String] map { r =>
+ Unmarshal(decoder.decodeMessage(response)).to[String] map { r =>
assertResult(
s"""{
| "status": "fail",
@@ -512,6 +512,7 @@ object CromwellApiServiceSpec {
events.head.key.workflowId match {
case CromwellApiServiceSpec.ExistingWorkflowId => sender ! MetadataWriteSuccess(events)
case CromwellApiServiceSpec.AbortedWorkflowId => sender ! MetadataWriteFailure(new Exception("mock exception of db failure"), events)
+ case WorkflowId(_) => throw new Exception("Something untoward happened, this situation is not believed to be possible at this time")
}
}
}
@@ -524,10 +525,10 @@ object CromwellApiServiceSpec {
sender ! response
case AbortWorkflow(id, manager) =>
val message = id match {
- case ExistingWorkflowId =>
- WorkflowStoreEngineActor.WorkflowAborted(id)
+ case ExistingWorkflowId => WorkflowStoreEngineActor.WorkflowAborted(id)
case AbortedWorkflowId =>
WorkflowAbortFailed(id, new IllegalStateException(s"Workflow ID '$id' is in terminal state 'Aborted' and cannot be aborted."))
+ case WorkflowId(_) => throw new Exception("Something untoward happened")
}
sender ! message
}
@@ -57,7 +57,7 @@ object GoogleConfiguration {
private val log = LoggerFactory.getLogger("GoogleConfiguration")
- case class GoogleConfigurationException(errorMessages: List[String]) extends MessageAggregation {
+ final case class GoogleConfigurationException(errorMessages: List[String]) extends MessageAggregation {
override val exceptionContext = "Google configuration"
}
Oops, something went wrong.

0 comments on commit 3e98082

Please sign in to comment.