Skip to content

Commit

Permalink
Merge pull request #3079 from broadinstitute/ks_wom_dir
Browse files Browse the repository at this point in the history
WOM (only!!) updates to add directories and additional files.
  • Loading branch information
kshakir committed Jan 9, 2018
2 parents 6ed1f11 + 1313334 commit 1450991
Show file tree
Hide file tree
Showing 25 changed files with 944 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ class TestReadLikeFunctions(sizeResult: Try[Double]) extends IoFunctionSet {

override def size(params: Seq[Try[WomValue]]): Future[WomFloat] = Future.fromTry(sizeResult.map(WomFloat.apply))

override def copyFile(pathFrom: String, targetName: String): Future[WomFile] = ???
override def copyFile(pathFrom: String, targetName: String): Future[WomSingleFile] = ???
}
18 changes: 13 additions & 5 deletions core/src/main/scala/cromwell/core/NoIoFunctionSet.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package cromwell.core

import wom.expression.IoFunctionSet
import wom.values.{WomFile, WomFloat, WomValue}
import wom.values.{WomFloat, WomSingleFile, WomValue}

import scala.concurrent.Future
import scala.util.{Failure, Try}

case object NoIoFunctionSet extends IoFunctionSet {
override def readFile(path: String, maxBytes: Option[Int] = None, failOnOverflow: Boolean = false): Future[String] = Future.failed(new NotImplementedError("readFile is not available here"))

override def writeFile(path: String, content: String): Future[WomFile] = Future.failed(new NotImplementedError("writeFile is not available here"))
override def writeFile(path: String, content: String): Future[WomSingleFile] = {
Future.failed(new NotImplementedError("writeFile is not available here"))
}

override def copyFile(pathFrom: String, targetName: String): Future[WomFile] = throw new Exception("copyFile is not available here")
override def copyFile(pathFrom: String, targetName: String): Future[WomSingleFile] = {
throw new Exception("copyFile is not available here")
}

override def stdout(params: Seq[Try[WomValue]]): Try[WomFile] = Failure(new NotImplementedError("stdout is not available here"))
override def stdout(params: Seq[Try[WomValue]]): Try[WomSingleFile] = {
Failure(new NotImplementedError("stdout is not available here"))
}

override def stderr(params: Seq[Try[WomValue]]): Try[WomFile] = Failure(new NotImplementedError("stderr is not available here"))
override def stderr(params: Seq[Try[WomValue]]): Try[WomSingleFile] = {
Failure(new NotImplementedError("stderr is not available here"))
}

override def glob(pattern: String): Future[Seq[String]] = throw new NotImplementedError("glob is not available here")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,20 @@ object WomValueSimpleton {
case WomArray(_, arrayValue) => arrayValue.zipWithIndex flatMap { case (arrayItem, index) => arrayItem.simplify(s"$name[$index]") }
case WomMap(_, mapValue) => mapValue flatMap { case (key, value) => value.simplify(s"$name:${key.valueString.escapeMeta}") }
case WomPair(left, right) => left.simplify(s"$name:left") ++ right.simplify(s"$name:right")
case wdlObject: WomObjectLike => wdlObject.value flatMap { case (key, value) => value.simplify(s"$name:${key.escapeMeta}") }
case womObjectLike: WomObjectLike => womObjectLike.value flatMap {
case (key, value) => value.simplify(s"$name:${key.escapeMeta}")
}
// TODO: WOM: WOMFILE: Better simplification of listed dirs / populated files
case womMaybeListedDirectory: WomMaybeListedDirectory =>
womMaybeListedDirectory
.valueOption
.map(value => WomUnlistedDirectory(value).simplify(name))
.getOrElse(Seq.empty)
case womMaybePopulatedFile: WomMaybePopulatedFile =>
womMaybePopulatedFile
.valueOption
.map(value => WomSingleFile(value).simplify(name))
.getOrElse(Seq.empty)
case other => throw new Exception(s"Cannot simplify wdl value $other of type ${other.womType}")
}
}
Expand Down
3 changes: 2 additions & 1 deletion cwl/src/test/scala/cwl/CommandOutputExpressionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class CommandOutputExpressionSpec extends FlatSpec with Matchers {
new IoFunctionSet {
override def readFile(path: String, maxBytes: Option[Int] = None, failOnOverflow: Boolean = false) = Future.successful(data)
override def writeFile(path: String, content: String) = throw new Exception("writeFile should not be used in this test")
override def copyFile(pathFrom: String, pathTo: String): Future[WomFile] = throw new Exception("copyFile should not be used in this test")
override def copyFile(pathFrom: String, pathTo: String): Future[WomSingleFile] =
throw new Exception("copyFile should not be used in this test")
override def stdout(params: Seq[Try[WomValue]]) = throw new Exception("stdout should not be used in this test")
override def stderr(params: Seq[Try[WomValue]]) = throw new Exception("stderr should not be used in this test")
override def glob(pattern: String): Future[Seq[String]] = throw new Exception("glob should not be used in this test")
Expand Down
11 changes: 6 additions & 5 deletions engine/src/main/scala/cromwell/engine/EngineIoFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@ package cromwell.engine
import cromwell.backend.wdl.ReadLikeFunctions
import cromwell.core.io.AsyncIo
import cromwell.core.path.PathBuilder
import wom.values.{WomFile, WomValue}
import wom.values.{WomSingleFile, WomValue}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Try}

class EngineIoFunctions(val pathBuilders: List[PathBuilder], override val asyncIo: AsyncIo, override val ec: ExecutionContext) extends ReadLikeFunctions {
private def fail(name: String) = Failure(new NotImplementedError(s"$name() not supported at the workflow level yet"))

override def stdout(params: Seq[Try[WomValue]]): Try[WomFile] = fail("stdout")
override def stderr(params: Seq[Try[WomValue]]): Try[WomFile] = fail("stderr")
override def stdout(params: Seq[Try[WomValue]]): Try[WomSingleFile] = fail("stdout")
override def stderr(params: Seq[Try[WomValue]]): Try[WomSingleFile] = fail("stderr")
override def glob(pattern: String): Future[Seq[String]] = throw new NotImplementedError(s"glob(path, pattern) not implemented yet")

override def writeFile(path: String, content: String): Future[WomFile] =
override def writeFile(path: String, content: String): Future[WomSingleFile] =
Future.failed(new Exception("Cromwell does not support writing files from a workflow context"))
override def copyFile(pathFrom: String, targetName: String): Future[WomFile] =

override def copyFile(pathFrom: String, targetName: String): Future[WomSingleFile] =
Future.failed(new Exception("Cromwell does not support copying files from a workflow context"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,9 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef,
def prefix(port: OutputPort) = s"Invalid value for File input '${port.fullyQualifiedName}':"

val failedFiles = workflowInputs collect {
case (port , WomSingleFile(value)) if value.startsWith("\"gs://") =>
s"""${prefix(port)} $value starts with a '"'"""
case (port , WomSingleFile(value)) if value.isEmpty =>
case (port, womSingleFile: WomSingleFile) if womSingleFile.value.startsWith("\"gs://") =>
s"""${prefix(port)} ${womSingleFile.value} starts with a '"'"""
case (port, womSingleFile: WomSingleFile) if womSingleFile.value.isEmpty =>
s"${prefix(port)} empty value"
}

Expand Down
4 changes: 2 additions & 2 deletions engine/src/test/scala/cromwell/ScatterWorkflowSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ScatterWorkflowSpec extends CromwellTestKitWordSpec {
"sc_test.do_gather.sum" -> WomInteger(11),
"sc_test.do_prepare.split_files" -> WomArray(
WomArrayType(WomSingleFileType),
Seq("temp_aa", "temp_ab", "temp_ac", "temp_ad").map(WomSingleFile)
Seq("temp_aa", "temp_ab", "temp_ac", "temp_ad").map(WomSingleFile(_))
),
"sc_test.do_scatter.count_file" -> WomArray(
WomArrayType(WomSingleFileType),
Expand All @@ -83,7 +83,7 @@ class ScatterWorkflowSpec extends CromwellTestKitWordSpec {
"sc_test.do_gather.sum" -> WomInteger(11),
"sc_test.do_prepare.split_files" -> WomArray(
WomArrayType(WomSingleFileType),
Seq("temp_aa", "temp_ab", "temp_ac", "temp_ad").map(WomSingleFile)
Seq("temp_aa", "temp_ab", "temp_ac", "temp_ad").map(WomSingleFile(_))
),
"sc_test.do_scatter.count_file" -> WomArray(
WomArrayType(WomSingleFileType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ final class BcsAsyncBackendJobExecutionActor(override val standardParams: Standa
womFile match {
case singleFile: WomSingleFile => List(generateBcsSingleFileOutput(singleFile))
case _: WomGlobFile => throw new RuntimeException(s"glob output not supported currently")
case unsupported: WomFile =>
// TODO: WOM: WOMFILE: Add support for directories.
throw new NotImplementedError(s"$unsupported is not supported yet.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import cats.data.Validated.{Invalid, Valid}
import com.google.api.client.googleapis.json.GoogleJsonResponseException
import com.google.api.services.genomics.model.RunPipelineRequest
import com.google.cloud.storage.contrib.nio.CloudStorageOptions
import common.validation.ErrorOr.ErrorOr
import cromwell.backend._
import cromwell.backend.async.{AbortedExecutionHandle, ExecutionHandle, FailedNonRetryableExecutionHandle, FailedRetryableExecutionHandle, PendingExecutionHandle}
import cromwell.backend.impl.jes.RunStatus.TerminalRunStatus
Expand All @@ -24,7 +25,6 @@ import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.gcs.batch.GcsBatchCommandBuilder
import cromwell.services.keyvalue.KeyValueServiceActor._
import cromwell.services.keyvalue.KvClient
import common.validation.ErrorOr.ErrorOr
import org.slf4j.LoggerFactory
import wom.CommandSetupSideEffectFile
import wom.callable.Callable.OutputDefinition
Expand Down Expand Up @@ -157,7 +157,7 @@ class JesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
* relativeLocalizationPath("gs://some/bucket/foo.txt") -> "some/bucket/foo.txt"
*/
private def relativeLocalizationPath(file: WomFile): WomFile = {
WomFile.mapFile(file, value =>
file.mapFile(value =>
getPath(value) match {
case Success(path) => path.pathWithoutScheme
case _ => value
Expand Down Expand Up @@ -232,6 +232,9 @@ class JesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
womFile match {
case singleFile: WomSingleFile => List(generateJesSingleFileOutputs(singleFile))
case globFile: WomGlobFile => generateJesGlobFileOutputs(globFile)
case unsupported: WomFile =>
// TODO: WOM: WOMFILE: Add support for directories.
throw new NotImplementedError(s"$unsupported is not supported yet.")
}
}

Expand Down Expand Up @@ -532,7 +535,7 @@ class JesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
}

override def mapCommandLineWomFile(womFile: WomFile): WomFile = {
WomFile.mapFile(womFile, value =>
womFile.mapFile(value =>
getPath(value) match {
case Success(gcsPath: GcsPath) => workingDisk.mountPoint.resolve(gcsPath.pathWithoutScheme).pathAsString
case _ => value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
`type` = Option("FILE")
)
)
case (unsupported: WomFile, _) =>
// TODO: WOM: WOMFILE: Add support for directories.
throw new NotImplementedError(s"$unsupported is not supported yet.")
}

val outputs: Seq[Output] = womOutputs ++ standardOutputs ++ Seq(commandScriptOut)
Expand Down
28 changes: 18 additions & 10 deletions wdl/src/main/scala/wdl/AstTools.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package wdl
import java.nio.file.Path

import better.files._
import wdl4s.parser.WdlParser
import wdl4s.parser.WdlParser._
import wdl.WdlExpression.{AstForExpressions, AstNodeForExpressions}
import wdl.expression.ValueEvaluator.InterpolationTagPattern
import wdl4s.parser.WdlParser
import wdl4s.parser.WdlParser._
import wom.core._
import wom.types._
import wom.values._
Expand Down Expand Up @@ -111,6 +111,7 @@ object AstTools {
astNode match {
case t: Terminal =>
t.getSourceString match {
case WomUnlistedDirectoryType.`toDisplayString` => WomUnlistedDirectoryType
case WomSingleFileType.`toDisplayString` => WomSingleFileType
case WomStringType.`toDisplayString` => WomStringType
case WomIntegerType.`toDisplayString` => WomIntegerType
Expand All @@ -126,16 +127,19 @@ object AstTools {
val typeTerminal = a.getAttribute("name").asInstanceOf[Terminal]
a.getAttribute("name").sourceString match {
case "Pair" =>
if (subtypes.size != 2) throw new SyntaxError(wdlSyntaxErrorFormatter.pairMustHaveExactlyTwoTypeParameters(typeTerminal))
if (subtypes.lengthCompare(2) != 0)
throw new SyntaxError(wdlSyntaxErrorFormatter.pairMustHaveExactlyTwoTypeParameters(typeTerminal))
val leftType = subtypes.head.womType(wdlSyntaxErrorFormatter)
val rightType = subtypes.tail.head.womType(wdlSyntaxErrorFormatter)
WomPairType(leftType, rightType)
case "Array" =>
if (subtypes.size != 1) throw new SyntaxError(wdlSyntaxErrorFormatter.arrayMustHaveOnlyOneTypeParameter(typeTerminal))
if (subtypes.lengthCompare(1) != 0)
throw new SyntaxError(wdlSyntaxErrorFormatter.arrayMustHaveOnlyOneTypeParameter(typeTerminal))
val member = subtypes.head.womType(wdlSyntaxErrorFormatter)
WomArrayType(member)
case "Map" =>
if (subtypes.size != 2) throw new SyntaxError(wdlSyntaxErrorFormatter.mapMustHaveExactlyTwoTypeParameters(typeTerminal))
if (subtypes.lengthCompare(2) != 0)
throw new SyntaxError(wdlSyntaxErrorFormatter.mapMustHaveExactlyTwoTypeParameters(typeTerminal))
val keyType = subtypes.head.womType(wdlSyntaxErrorFormatter)
val valueType = subtypes.tail.head.womType(wdlSyntaxErrorFormatter)
WomMapType(keyType, valueType)
Expand Down Expand Up @@ -181,10 +185,10 @@ object AstTools {

def astTupleToValue(a: Ast): WomValue = {
val subElements = a.getAttribute("values").astListAsVector
if (subElements.size == 1) {
if (subElements.lengthCompare(1) == 0) {
// Tuple 1 is equivalent to the value inside it. Enables nesting parens, e.g. (1 + 2) + 3
a.womValue(womType, wdlSyntaxErrorFormatter)
} else if (subElements.size == 2 && womType.isInstanceOf[WomPairType]) {
} else if (subElements.lengthCompare(2) == 0 && womType.isInstanceOf[WomPairType]) {
val pairType = womType.asInstanceOf[WomPairType]
WomPair(subElements.head.womValue(pairType.leftType, wdlSyntaxErrorFormatter), subElements(1).womValue(pairType.rightType, wdlSyntaxErrorFormatter))
} else {
Expand All @@ -194,8 +198,12 @@ object AstTools {

astNode match {
case t: Terminal if t.getTerminalStr == "string" && womType == WomStringType => WomString(t.getSourceString)
case t: Terminal if t.getTerminalStr == "string" && womType == WomUnlistedDirectoryType =>
WomUnlistedDirectory(t.getSourceString)
case t: Terminal if t.getTerminalStr == "string" && womType == WomSingleFileType =>
WomSingleFile(t.getSourceString)
case t: Terminal if t.getTerminalStr == "string" && womType == WomGlobFileType =>
WomGlobFile(t.getSourceString)
case t: Terminal if t.getTerminalStr == "integer" && womType == WomIntegerType => WomInteger(t.getSourceString.toInt)
case t: Terminal if t.getTerminalStr == "float" && womType == WomFloatType => WomFloat(t.getSourceString.toDouble)
case t: Terminal if t.getTerminalStr == "boolean" && womType == WomBooleanType => t.getSourceString.toLowerCase match {
Expand All @@ -219,7 +227,7 @@ object AstTools {

implicit class EnhancedAstSeq(val astSeq: Seq[Ast]) extends AnyVal {
def duplicatesByName: Seq[Ast] = {
astSeq.groupBy(_.getAttribute("name").sourceString).collect({case (_ ,v) if v.size > 1 => v.head}).toVector
astSeq.groupBy(_.getAttribute("name").sourceString).collect({case (_ ,v) if v.lengthCompare(1) > 0 => v.head}).toVector
}
}

Expand Down Expand Up @@ -406,7 +414,7 @@ object AstTools {

/* Then, make sure there is at most one 'input' section defined, then return the a Seq of IOMapping(key=<terminal>, value=<expression>) ASTs*/
callInputSections match {
case asts: Seq[Ast] if asts.size == 1 => asts.head.getAttribute("map").findAsts(AstNodeName.IOMapping)
case asts: Seq[Ast] if asts.lengthCompare(1) == 0 => asts.head.getAttribute("map").findAsts(AstNodeName.IOMapping)
case asts: Seq[Ast] if asts.isEmpty => Seq.empty[Ast]
case asts: Seq[Ast] =>
/* Uses of .head here are assumed by the above code that ensures that there are no empty maps */
Expand All @@ -421,7 +429,7 @@ object AstTools {
def wdlSectionToStringMap(ast: Ast, node: String, wdlSyntaxErrorFormatter: WdlSyntaxErrorFormatter): Map[String, String] = {
ast.findAsts(node) match {
case a if a.isEmpty => Map.empty[String, String]
case a if a.size == 1 =>
case a if a.lengthCompare(1) == 0 =>
// Yes, even 'meta {}' and 'parameter_meta {}' sections have RuntimeAttribute ASTs.
// In hindsight, this was a poor name for the AST.
a.head.findAsts(AstNodeName.RuntimeAttribute).map({ ast =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import cats.instances.try_._
import cats.syntax.apply._
import common.exception.AggregatedException
import common.util.TryUtil
import wom.WomExpressionException
import wdl.expression.WdlStandardLibraryFunctions.{crossProduct => stdLibCrossProduct, _}
import wom.TsvSerializable
import wom.{TsvSerializable, WomExpressionException}
import wom.expression.IoFunctionSet
import wom.types._
import wom.values.WomArray.WomArrayLike
Expand Down Expand Up @@ -98,15 +97,17 @@ trait WdlStandardLibraryFunctions extends WdlFunctions[WomValue] {
womString <- WomStringType.coerceRawValue(pattern)
patternString = womString.valueString
filePaths <- Try(globHelper(patternString))
} yield WomArray(WomArrayType(WomSingleFileType), filePaths.map(WomSingleFile))
} yield WomArray(WomArrayType(WomSingleFileType), filePaths.map(WomSingleFile(_)))
}

def basename(params: Seq[Try[WomValue]]): Try[WomString] = {

val arguments: Try[(WomValue, WomValue)] = params.toList match {
case Success(f) :: Nil => Success((f, WomString("")))
case Success(f) :: Success(s) :: Nil => Success((f, s))
case s if s.size > 2 || s.size < 1 => Failure(new IllegalArgumentException(s"Bad number of arguments to basename(filename, suffixToStrip = ''): ${params.size}"))
case s if s.lengthCompare(2) > 0 || s.lengthCompare(1) < 0 =>
val message = s"Bad number of arguments to basename(filename, suffixToStrip = ''): ${params.size}"
Failure(new IllegalArgumentException(message))
case _ =>
val failures = params collect {
case Failure(e) => e
Expand Down Expand Up @@ -338,15 +339,15 @@ object WdlStandardLibraryFunctions {
} yield (a, b)

def extractTwoParams[A](params: Seq[Try[A]], badArgsFailure: Failure[Nothing]): Try[(A, A)] = {
if (params.size != 2) { badArgsFailure }
if (params.lengthCompare(2) != 0) { badArgsFailure }
else for {
left <- params.head
right <- params(1)
} yield (left, right)
}

def assertEquallySizedArrays[A](values: (WomValue, WomValue), badArgsFailure: Failure[Nothing] ): Try[(WomArray, WomArray)] = values match {
case (leftArray: WomArray, rightArray: WomArray) if leftArray.value.size == rightArray.value.size => Success((leftArray, rightArray))
case (leftArray: WomArray, rightArray: WomArray) if leftArray.value.lengthCompare(rightArray.value.size) == 0 => Success((leftArray, rightArray))
case _ => badArgsFailure
}

Expand Down
10 changes: 10 additions & 0 deletions wom/src/main/scala/wom/WomFileMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ import wom.values._
import scala.util.{Success, Try}

object WomFileMapper {
/**
* Loops over a WomValue applying the supplied mapper function whenever a WomFile is encountered.
*
* Where WomValue.collectAsSeq applies a partial function that collects into a Seq, this method applies a function to
* all encountered WomFile within a WomMap, WomPair, WomArray, etc. Similarly WomFile.mapFile only traverses
* the passed in WomFile plus any additional files referenced within the WomFile.
*
* @see [[wom.values.WomValue.collectAsSeq]]
* @see [[wom.values.WomFile.mapFile]]
*/
def mapWomFiles(mapper: (WomFile => WomFile))(womValue: WomValue): Try[WomValue] = {
womValue match {
case file: WomFile => Try(mapper(file))
Expand Down

0 comments on commit 1450991

Please sign in to comment.