Scala 2.12 support for Cromwell. Closes #1635 #2412

Merged
merged 1 commit into from Jul 5, 2017
Jump to file or symbol
Failed to load files and symbols.
+128 −133
Split
View
@@ -2,7 +2,7 @@ sudo: required
dist: trusty
language: scala
scala:
- - 2.11.8
+ - 2.12.2
jdk:
- oraclejdk8
cache:
View
@@ -120,13 +120,13 @@ There is a [Cromwell gitter channel](https://gitter.im/broadinstitute/cromwell)
The following is the toolchain used for development of Cromwell. Other versions may work, but these are recommended.
-* [Scala 2.11.8](http://www.scala-lang.org/news/2.11.8/)
+* [Scala 2.12.2](http://www.scala-lang.org/news/2.12.1#scala-212-notes)
* [SBT 0.13.12](https://github.com/sbt/sbt/releases/tag/v0.13.12)
* [Java 8](http://www.oracle.com/technetwork/java/javase/overview/java8-2100321.html)
# Building
-`sbt assembly` will build a runnable JAR in `target/scala-2.11/`
+`sbt assembly` will build a runnable JAR in `target/scala-2.12/`
Tests are run via `sbt test`. Note that the tests do require Docker to be running. To test this out while downloading the Ubuntu image that is required for tests, run `docker pull ubuntu:latest` prior to running `sbt test`
@@ -3788,7 +3788,7 @@ Essentially run `sbt doc` then commit the generated code into the `gh-pages` bra
```
$ sbt doc
$ git co gh-pages
-$ mv target/scala-2.11/api scaladoc
+$ mv target/scala-2.12/api scaladoc
$ git add scaladoc
$ git commit -m "API Docs"
$ git push origin gh-pages
@@ -251,9 +251,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
*
* @return the execution handle for the job.
*/
- def executeAsync()(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
- Future.fromTry(Try(execute()))
- }
+ def executeAsync(): Future[ExecutionHandle] = Future.fromTry(Try(execute()))
/**
* Recovers the specified job id, or starts a new job. The default implementation simply calls execute().
@@ -269,9 +267,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
* @param jobId The previously recorded job id.
* @return the execution handle for the job.
*/
- def recoverAsync(jobId: StandardAsyncJob)(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
- Future.fromTry(Try(recover(jobId)))
- }
+ def recoverAsync(jobId: StandardAsyncJob): Future[ExecutionHandle] = Future.fromTry(Try(recover(jobId)))
/**
* Returns the run status for the job.
@@ -289,10 +285,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
* @param handle The handle of the running job.
* @return The status of the job.
*/
- def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle)
- (implicit ec: ExecutionContext): Future[StandardAsyncRunStatus] = {
- Future.fromTry(Try(pollStatus(handle)))
- }
+ def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[StandardAsyncRunStatus] = Future.fromTry(Try(pollStatus(handle)))
@mcovarr

mcovarr Jul 5, 2017

Contributor

the Try => Future thing happens three times just in this file, perhaps some sugar is in order

@geoffjentry

geoffjentry Jul 5, 2017

Member

I don't even understand why it's doing it this way in the first place instead of just a future.

that said, i'll add it to the list of preexisting conditions i claim i'm going to fix from my akka http list

@mcovarr

mcovarr Jul 5, 2017

Contributor

That is a solid point.

@geoffjentry

geoffjentry Jul 5, 2017

Member

Of course then I'd need to rollback my removal of the ECs, oh the tangled webs we weave

/**
* Adds custom behavior invoked when polling fails due to some exception. By default adds nothing.
@@ -10,7 +10,6 @@ import cromwell.core.Dispatcher
import cromwell.services.keyvalue.KeyValueServiceActor._
import scala.concurrent.{Future, Promise}
-import scala.language.existentials
trait StandardSyncExecutionActorParams extends StandardJobExecutionActorParams {
/** The class for creating an async backend. */
@@ -7,15 +7,15 @@ import cats.syntax.validated._
import com.typesafe.config.{Config, ConfigException, ConfigValue}
import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.reflect.{ClassTag, classTag}
object ConfigUtil {
val validationLogger = LoggerFactory.getLogger("ConfigurationValidation")
implicit class EnhancedConfig(val config: Config) extends AnyVal {
- def keys = config.entrySet().toSet map { v: java.util.Map.Entry[String, ConfigValue] => v.getKey }
+ def keys = config.entrySet().asScala.toSet map { v: java.util.Map.Entry[String, ConfigValue] => v.getKey }
@cjllanwarne

cjllanwarne Jul 5, 2017

Contributor

Is this right? Won't the asScala remove any Java-y types from the map?

@geoffjentry

geoffjentry Jul 5, 2017

Member

I believe it just converts to a scala collection.

The key here is that the JavaConversions are gone now, so JavaConverters is the only thing available w/o doing it by hand.

@geoffjentry

geoffjentry Jul 5, 2017

Member

w/o asScala: util.Set[util.Map.Entry[String, ConfigValue]]
w/ asScala: mutable.Set[util.Map.Entry[String, ConfigValue]]

Note that the trailing toSet sets it to be immutable

@cjllanwarne

cjllanwarne Jul 5, 2017

Contributor

Oh I think I see... the config.entrySet is a (pardon the syntax) JavaSet[JavaMap.Entry] and asScala makes it a ScalaSet[JavaMap.Entry]. LGTM

/**
* For keys that are in the configuration but not in the reference keySet, log a warning.
@@ -214,11 +214,11 @@ trait BetterFileMethods {
betterFile.bufferedReader(codec)
final def newBufferedWriter(implicit codec: Codec, openOptions: OpenOptions = OpenOptions.default): BufferedWriter =
- betterFile.newBufferedWriter(codec)
+ betterFile.newBufferedWriter(codec, openOptions)
final def bufferedWriter(implicit codec: Codec,
openOptions: OpenOptions = OpenOptions.default): ManagedResource[BufferedWriter] =
- betterFile.bufferedWriter(codec)
+ betterFile.bufferedWriter(codec, openOptions)
final def newFileReader: FileReader = betterFile.newFileReader
@@ -34,7 +34,7 @@ class RetrySpec extends TestKitSuite("retry-spec") with FlatSpecLike with Matche
isFatal: Throwable => Boolean = Retry.throwableToFalse): Future[Int] = {
withRetry(
- f = work.doIt,
+ f = () => work.doIt(),
maxRetries = Option(retries),
isTransient = isTransient,
isFatal = isFatal
@@ -128,7 +128,7 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto
private def decodeResponse(response: HttpResponse): Try[HttpResponse] = {
decoders.get(response.encoding) map { decoder =>
- Try(decoder.decode(response))
+ Try(decoder.decodeMessage(response))
} getOrElse Failure(UnsuccessfulRequestException(s"No decoder for ${response.encoding}", response))
}
}
@@ -1,6 +1,7 @@
package cromwell.api.model
import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFormat}
+import scala.language.postfixOps
object LabelsJsonFormatter extends DefaultJsonProtocol {
implicit object LabelJsonFormat extends RootJsonFormat[List[Label]] {
@@ -118,7 +118,8 @@ trait BatchedTaskChange extends MigrationTaskChange {
resultBatch.close()
val progress = Math.min((page + 1) * 100 / pageCount, 100)
- logger.info(s"[$migrationName] $progress%")
+ val progressMessage = s"[$migrationName] $progress%"
+ logger.info(progressMessage)
}
if (batchMigrationCounter != 0) {
@@ -9,8 +9,7 @@ trait SummaryStatusSlickDatabase {
import dataAccess.driver.api._
- private[slick] def getSummaryStatusEntryMaximumId(summaryTableName: String, summarizedTableName: String)
- (implicit ec: ExecutionContext): DBIO[Option[Long]] = {
+ private[slick] def getSummaryStatusEntryMaximumId(summaryTableName: String, summarizedTableName: String): DBIO[Option[Long]] = {
dataAccess.
maximumIdForSummaryTableNameSummarizedTableName((summaryTableName, summarizedTableName)).
result.headOption
@@ -4,7 +4,7 @@ import java.util.concurrent.TimeoutException
import akka.actor.Scheduler
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition}
-import akka.stream.{ActorMaterializer, FlowShape}
+import akka.stream.FlowShape
import cromwell.docker.DockerHashActor._
import cromwell.docker.{DockerFlow, DockerHashActor, DockerHashResult, DockerImageIdentifierWithoutHash}
@@ -15,8 +15,7 @@ import scala.util.{Failure, Success}
/**
* A docker flow using the CLI to return docker hashes.
*/
-class DockerCliFlow(implicit ec: ExecutionContext, materializer: ActorMaterializer, scheduler: Scheduler)
- extends DockerFlow {
+class DockerCliFlow(implicit ec: ExecutionContext, scheduler: Scheduler) extends DockerFlow {
// If the docker cli hangs it would be difficult to debug. So timeout the first request after a short duration.
// https://github.com/docker/docker/issues/18279
@@ -13,7 +13,6 @@ import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
-import scala.language.postfixOps
import scala.util.Try
object HttpFlowWithRetry {
@@ -37,8 +36,6 @@ object HttpFlowWithRetry {
}
}
- def defaultRequestBackoff(): Backoff = SimpleExponentialBackoff(1 second, 2 minutes, 3D)
-
/**
* In order to allow for retries, the http context object needs to encapsulate the original request,
* so that it can be re-submitted if necessary.
@@ -69,7 +66,7 @@ object HttpFlowWithRetry {
case class HttpFlowWithRetry[T](
httpClientFlow: RetryableHttpFlow[T],
retryBufferSize: Int = 100,
- requestBackoff: () => Backoff = defaultRequestBackoff,
+ requestBackoff: () => Backoff = () => SimpleExponentialBackoff(1 second, 2 minutes, 3D),
maxAttempts: Int = 3
)(implicit val scheduler: Scheduler, ec: ExecutionContext, mat: ActorMaterializer) {
@@ -2,8 +2,6 @@ package cromwell.engine.io.gcs
import cromwell.engine.io.IoActor._
-import scala.language.existentials
-
/**
* ADT used only inside the batch stream
* @tparam T final type of the result of the Command
@@ -570,7 +570,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
case AbortedResponse(_: BackendJobDescriptorKey) =>
log.debug("{}: Won't save aborted job response to JobStore", jobTag)
forwardAndStop(updatedData.response)
- case JobFailedNonRetryableResponse(jobKey: BackendJobDescriptorKey, throwable: Throwable, returnCode: Option[Int]) =>
+ case JobFailedNonRetryableResponse(jobKey, throwable: Throwable, returnCode: Option[Int]) =>
publishHashesToMetadata(updatedData.hashes)
writeToMetadata(Map(callCachingAllowReuseMetadataKey -> false))
saveUnsuccessfulJobResults(jobKey, returnCode, throwable, retryable = false)
@@ -41,8 +41,7 @@ class CallCache(database: CallCachingSqlDatabase) {
}
private def buildCallCachingJoin(callCachingEntry: CallCachingEntry, callCacheHashes: CallCacheHashes,
- result: Iterable[WdlValueSimpleton], jobDetritus: Map[String, Path])
- (implicit ec: ExecutionContext): CallCachingJoin = {
+ result: Iterable[WdlValueSimpleton], jobDetritus: Map[String, Path]): CallCachingJoin = {
val hashesToInsert: Iterable[CallCachingHashEntry] = {
callCacheHashes.hashes map { hash => CallCachingHashEntry(hash.hashKey.key, hash.hashValue.value) }
@@ -94,7 +94,7 @@ import scala.language.postfixOps
lazy val googleFlow = new GoogleFlow(dockerHttpPool, dockerConf.gcrApiQueriesPer100Seconds)(ioEc, materializer, system.scheduler)
lazy val dockerHubFlow = new DockerHubFlow(dockerHttpPool)(ioEc, materializer, system.scheduler)
lazy val quayFlow = new QuayFlow(dockerHttpPool)(ioEc, materializer, system.scheduler)
- lazy val dockerCliFlow = new DockerCliFlow()(ioEc, materializer, system.scheduler)
+ lazy val dockerCliFlow = new DockerCliFlow()(ioEc, system.scheduler)
lazy val dockerFlows = dockerConf.method match {
case DockerLocalLookup => Seq(dockerCliFlow)
case DockerRemoteLookup => Seq(dockerHubFlow, googleFlow, quayFlow)
@@ -89,7 +89,7 @@ trait CromwellApiService {
} ~
encodeResponseWith(Gzip, Deflate, NoCoding) {
path("workflows" / Segment / Segment / "metadata") { (version, possibleWorkflowId) =>
- parameters('includeKey.*, 'excludeKey.*, 'expandSubWorkflows.as[Boolean].?) { (includeKeys, excludeKeys, expandSubWorkflowsOption) =>
+ parameters(('includeKey.*, 'excludeKey.*, 'expandSubWorkflows.as[Boolean].?)) { (includeKeys, excludeKeys, expandSubWorkflowsOption) =>
val includeKeysOption = NonEmptyList.fromList(includeKeys.toList)
val excludeKeysOption = NonEmptyList.fromList(excludeKeys.toList)
val expandSubWorkflows = expandSubWorkflowsOption.getOrElse(false)
@@ -34,7 +34,7 @@ object LabelsManagerActor {
}
def metadataEventsToLabels(events: Iterable[MetadataEvent]): Map[String, String] = {
- events map { case MetadataEvent(MetadataKey(_, _, key), Some(MetadataValue(value, _)), _) => key.split("\\:").last -> value } toMap
+ events collect { case MetadataEvent(MetadataKey(_, _, key), Some(MetadataValue(value, _)), _) => key.split("\\:").last -> value } toMap
}
def labelsToMetadataEvents(labels: Labels, workflowId: WorkflowId): Iterable[MetadataEvent] = {
@@ -29,7 +29,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
val akkaHttpService = new MockApiService()
val version = "v1"
- implicit def default(implicit system: ActorSystem) = RouteTestTimeout(5.seconds)
+ implicit def default = RouteTestTimeout(5.seconds)
behavior of "REST API /status endpoint"
@@ -322,7 +322,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
check {
status should be(StatusCodes.OK)
val decoder: Decoder = Gzip
- val result = Await.result(Unmarshal(decoder.decode(response)).to[JsObject], 1.second)
+ val result = Await.result(Unmarshal(decoder.decodeMessage(response)).to[JsObject], 1.second)
result.fields.keys should contain allOf("testKey1", "testKey2")
result.fields.keys shouldNot contain("testKey3")
result.fields("testKey1") should be(JsString("myValue1"))
@@ -336,7 +336,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
check {
status should be(StatusCodes.OK)
val decoder: Decoder = Gzip
- val result = Await.result(Unmarshal(decoder.decode(response)).to[JsObject], 1.second)
+ val result = Await.result(Unmarshal(decoder.decodeMessage(response)).to[JsObject], 1.second)
@cjllanwarne

cjllanwarne Jul 5, 2017

Contributor

I think this fits the pattern that lets you just say val result = responseAs[JsObject], avoiding the whole convoluted Await.result and decoding boilerplate 🤞

(and the examples below too, if it works here)

@cjllanwarne

cjllanwarne Jul 5, 2017

Contributor

cf the comment near the end of my lightbend support ticket:

AKKA HTTP: responding using a Stream in high-level DSL

@geoffjentry

geoffjentry Jul 5, 2017

Member

IIRC there was a reason why I couldn't do that in the first place (I think I used responseAs elsewhere in the file, I definitely played around with it). And by "couldn't", i probably mean "that was going to be a giant pain in the ass for some reason I didn't want to deal with it at the time".

I'll take another poke.

@geoffjentry

geoffjentry Jul 5, 2017

Member

oh duh. it's because i couldn't see a way to use responseAs with gzip

result.fields.keys should contain allOf("testKey1a", "testKey1b", "testKey2a")
result.fields.keys should contain noneOf("testKey2b", "testKey3")
result.fields("testKey1a") should be(JsString("myValue1a"))
@@ -351,7 +351,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
check {
status should be(StatusCodes.OK)
val decoder: Decoder = Gzip
- val result = Await.result(Unmarshal(decoder.decode(response)).to[JsObject], 1.second)
+ val result = Await.result(Unmarshal(decoder.decodeMessage(response)).to[JsObject], 1.second)
result.fields.keys should contain allOf("testKey1a", "testKey1b", "testKey2a")
result.fields.keys should contain noneOf("testKey2b", "testKey3")
result.fields("testKey1a") should be(JsString("myValue1a"))
@@ -369,7 +369,7 @@ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with
}
val decoder: Decoder = Gzip
- Unmarshal(decoder.decode(response)).to[String] map { r =>
+ Unmarshal(decoder.decodeMessage(response)).to[String] map { r =>
assertResult(
s"""{
| "status": "fail",
@@ -512,6 +512,7 @@ object CromwellApiServiceSpec {
events.head.key.workflowId match {
case CromwellApiServiceSpec.ExistingWorkflowId => sender ! MetadataWriteSuccess(events)
case CromwellApiServiceSpec.AbortedWorkflowId => sender ! MetadataWriteFailure(new Exception("mock exception of db failure"), events)
+ case WorkflowId(_) => throw new Exception("Something untoward happened, this situation is not believed to be possible at this time")
}
}
}
@@ -524,10 +525,10 @@ object CromwellApiServiceSpec {
sender ! response
case AbortWorkflow(id, manager) =>
val message = id match {
- case ExistingWorkflowId =>
- WorkflowStoreEngineActor.WorkflowAborted(id)
+ case ExistingWorkflowId => WorkflowStoreEngineActor.WorkflowAborted(id)
case AbortedWorkflowId =>
WorkflowAbortFailed(id, new IllegalStateException(s"Workflow ID '$id' is in terminal state 'Aborted' and cannot be aborted."))
+ case WorkflowId(_) => throw new Exception("Something untoward happened")
@cjllanwarne

cjllanwarne Jul 5, 2017

Contributor

ToL: I like test exceptions to be obviously distinguishable between "don't worry, I wanted this to happen for the test" vs "your test has broken". It's not obvious to me which this falls into.

@geoffjentry

geoffjentry Jul 5, 2017

Member

This is "clearly the test author didn't think this could possibly happen, so this should probably blow up: :)

}
sender ! message
}
@@ -57,7 +57,7 @@ object GoogleConfiguration {
private val log = LoggerFactory.getLogger("GoogleConfiguration")
- case class GoogleConfigurationException(errorMessages: List[String]) extends MessageAggregation {
+ final case class GoogleConfigurationException(errorMessages: List[String]) extends MessageAggregation {
@cjllanwarne

cjllanwarne Jul 5, 2017

Contributor

🎉

override val exceptionContext = "Google configuration"
}
Oops, something went wrong.