Skip to content

Commit

Permalink
Merge pull request #3153 from broadinstitute/ks_backend_dir_first_draft
Browse files Browse the repository at this point in the history
Backend secondary file and directory updates.
  • Loading branch information
kshakir committed Jan 19, 2018
2 parents 2347c14 + 8884a9f commit 8380b54
Show file tree
Hide file tree
Showing 19 changed files with 313 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,26 @@ import cats.syntax.traverse._
import cats.syntax.validated._
import common.validation.ErrorOr._
import common.validation.Validation._
import cromwell.backend.BackendJobDescriptor
import cromwell.backend.io.DirectoryFunctions._
import cromwell.core.path.{Path, PathFactory}
import wom.expression.IoFunctionSet
import wom.graph.TaskCallNode
import wom.values.{WomFile, WomGlobFile, WomMaybeListedDirectory, WomMaybePopulatedFile, WomSingleFile, WomUnlistedDirectory}

import scala.concurrent.Future

trait DirectoryFunctions extends IoFunctionSet with PathFactory {

def findDirectoryOutputs(call: TaskCallNode,
jobDescriptor: BackendJobDescriptor): ErrorOr[List[WomUnlistedDirectory]] = {
call.callable.outputs.flatTraverse[ErrorOr, WomUnlistedDirectory] { outputDefinition =>
outputDefinition.expression.evaluateFiles(jobDescriptor.localInputs, this, outputDefinition.womType) map {
_.toList.flatMap(_.flattenFiles) collect { case unlistedDirectory: WomUnlistedDirectory => unlistedDirectory }
}
}
}

override def listAllFilesUnderDirectory(dirPath: String): Future[Seq[String]] = {
temporaryImplListPaths(dirPath)
}
Expand All @@ -30,15 +42,26 @@ trait DirectoryFunctions extends IoFunctionSet with PathFactory {
object DirectoryFunctions {
def ensureSlashed(dir: String): String = if (dir.endsWith("/")) dir else s"$dir/"

def ensureUnslashed(dir: String): String = dir.stripSuffix("/")

def listFiles(path: Path): ErrorOr[List[Path]] = {
if (path.isDirectory) {
for {
pathListing <- validate(path.list.toList)
pathsPerListing <- pathListing.traverse(listFiles)
} yield pathsPerListing.flatten
} else {
List(path).valid
def listPaths(path: Path, checkedPaths: Set[Path]): ErrorOr[Set[Path]] = {
val newCheckedPaths = checkedPaths ++ Set(path)
if (path.isDirectory) {
for {
pathListing <- validate(path.list.toSet)
uncheckedPaths = pathListing -- newCheckedPaths
pathsPerListing <-
uncheckedPaths.toList.traverse[ErrorOr, List[Path]](listPaths(_, newCheckedPaths).map(_.toList))
} yield checkedPaths ++ pathsPerListing.flatten.toSet
} else {
newCheckedPaths.valid
}
}

val allPaths = listPaths(path, Set.empty).map(_.toList)
val allFiles = allPaths.map(_.filterNot(_.isDirectory))
allFiles
}

def listWomSingleFiles(womFile: WomFile, pathFactory: PathFactory, pathPatcher: String => String): ErrorOr[List[WomSingleFile]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ trait GlobFunctions extends IoFunctionSet with AsyncIoFunctions {
def findGlobOutputs(call: TaskCallNode, jobDescriptor: BackendJobDescriptor): ErrorOr[List[WomGlobFile]] = {
def fromOutputs = call.callable.outputs.flatTraverse[ErrorOr, WomGlobFile] { outputDefinition =>
outputDefinition.expression.evaluateFiles(jobDescriptor.localInputs, this, outputDefinition.womType) map {
_.toList collect { case glob: WomGlobFile => glob }
_.toList.flatMap(_.flattenFiles) collect { case glob: WomGlobFile => glob }
}
}
fromOutputs.map(_ ++ call.callable.additionalGlob)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import cromwell.backend.BackendLifecycleActor.AbortJobCommand
import cromwell.backend._
import cromwell.backend.async.AsyncBackendJobExecutionActor._
import cromwell.backend.async.{AbortedExecutionHandle, AsyncBackendJobExecutionActor, ExecutionHandle, FailedNonRetryableExecutionHandle, FailedRetryableExecutionHandle, PendingExecutionHandle, ReturnCodeIsNotAnInt, StderrNonEmpty, SuccessfulExecutionHandle, WrongReturnCode}
import cromwell.backend.io.DirectoryFunctions
import cromwell.backend.validation._
import cromwell.backend.wdl.OutputEvaluator._
import cromwell.backend.wdl.{Command, OutputEvaluator}
Expand Down Expand Up @@ -136,7 +137,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
*
*/
def mapCommandLineWomFile(womFile: WomFile): WomFile =
WomSingleFile(workflowPaths.buildPath(womFile.value).pathAsString)
womFile.mapFile(workflowPaths.buildPath(_).pathAsString)

/** @see [[Command.instantiate]] */
final lazy val commandLineValueMapper: WomValue => WomValue = {
Expand All @@ -148,6 +149,30 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
*/
lazy val commandDirectory: Path = jobPaths.callExecutionRoot

/**
* Returns the shell scripting for finding all files listed within a directory.
*
* @param directoryFiles The directories.
* @return The shell scripting.
*/
def directoryScripts(directoryFiles: Traversable[WomUnlistedDirectory]): String =
directoryFiles map directoryScript mkString "\n"

/**
* Returns the shell scripting for finding all files listed within a directory.
*
* @param unlistedDirectory The directory.
* @return The shell scripting.
*/
def directoryScript(unlistedDirectory: WomUnlistedDirectory): String = {
val directoryPath = DirectoryFunctions.ensureUnslashed(unlistedDirectory.value)
val directoryList = directoryPath + ".list"

s"""|# list all the files that match the directory into a file called directory.list
|find $directoryPath -type f > $directoryList
|""".stripMargin
}

/**
* The local parent directory of the glob file. By default this is the same as the commandDirectory.
*
Expand Down Expand Up @@ -212,15 +237,18 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
val stderrPath = cwd./(jobPaths.stderrFilename)
val rcTmpPath = rcPath.plusExt("tmp")

val globFiles: ErrorOr[List[WomGlobFile]] =
val errorOrDirectoryOutputs: ErrorOr[List[WomUnlistedDirectory]] =
backendEngineFunctions.findDirectoryOutputs(call, jobDescriptor)

val errorOrGlobFiles: ErrorOr[List[WomGlobFile]] =
backendEngineFunctions.findGlobOutputs(call, jobDescriptor)

lazy val environmentVariables = instantiatedCommand.environmentVariables map { case (k, v) => s"""$k="$v"""" } mkString("", "\n", "\n")

// The `tee` trickery below is to be able to redirect to known filenames for CWL while also streaming
// stdout and stderr for PAPI to periodically upload to cloud storage.
// https://stackoverflow.com/questions/692000/how-do-i-write-stderr-to-a-file-while-using-tee-with-a-pipe
globFiles.map(globFiles =>
(errorOrDirectoryOutputs, errorOrGlobFiles).mapN((directoryOutputs, globFiles) =>
s"""|#!/bin/bash
|tmpDir=$$(
| set -e
Expand All @@ -245,6 +273,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
|cd $cwd
|SCRIPT_EPILOGUE
|${globScripts(globFiles)}
|${directoryScripts(directoryOutputs)}
|)
|mv $rcTmpPath $rcPath
|""".stripMargin
Expand All @@ -268,9 +297,9 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
case other => s"Ad-hoc file creation expression invalidly created a ${other.womType.toDisplayString} result.".invalidNel
}

val adHocFileCreations: ErrorOr[List[WomFile]] = jobDescriptor.taskCall.callable.adHocFileCreation.toList.traverse {
val adHocFileCreations: ErrorOr[List[WomFile]] = jobDescriptor.taskCall.callable.adHocFileCreation.toList.traverse(
_.evaluateValue(adHocFileCreationInputs, backendEngineFunctions).flatMap(validateAdHocFile)
}.map(_.flatten)
).map(_.flatten)

val adHocFileCreationSideEffectFiles: ErrorOr[List[CommandSetupSideEffectFile]] = adHocFileCreations map { _ map {
f => CommandSetupSideEffectFile(f, Option(adHocFileLocalization(f)))
Expand Down Expand Up @@ -651,7 +680,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
private def executeOrRecoverSuccess(executionHandle: ExecutionHandle): Future[ExecutionHandle] = {
executionHandle match {
case handle: PendingExecutionHandle[StandardAsyncJob@unchecked, StandardAsyncRunInfo@unchecked, StandardAsyncRunStatus@unchecked] =>
tellKvJobId(handle.pendingJob).map { case _ =>
tellKvJobId(handle.pendingJob) map { _ =>
jobLogger.info(s"job id: ${handle.pendingJob.jobId}")
tellMetadata(Map(CallMetadataKeys.JobId -> handle.pendingJob.jobId))
/*
Expand Down
34 changes: 18 additions & 16 deletions cwl/src/main/scala/cwl/CommandLineTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import wom.callable.{Callable, CallableTaskDefinition}
import wom.executable.Executable
import wom.expression.{IoFunctionSet, ValueAsAnExpression, WomExpression}
import wom.graph.GraphNodePort.OutputPort
import wom.types.{WomOptionalType, WomStringType, WomType}
import wom.types.{WomFileType, WomOptionalType, WomStringType, WomType}
import wom.values.{WomArray, WomEvaluatedCallInputs, WomFile, WomGlobFile, WomString, WomValue}
import wom.{CommandPart, RuntimeAttributes}

Expand Down Expand Up @@ -209,7 +209,7 @@ case class CommandLineTool private(
s"Could not evaluate environment variable expressions defined in the call hierarchy of tool $id: ${xs.mkString(", ")}.".invalidNel
}
}

/*
* Custom evaluation of the outputs of the command line tool (custom as in bypasses the engine provided default implementation).
* This is needed because the output of a CWL tool might be defined by the presence of a cwl.output.json file in the output directory.
Expand All @@ -218,7 +218,7 @@ case class CommandLineTool private(
* It ensures all the output ports of the corresponding WomCallNode get a WomValue which is needed for the engine to keep running the workflow properly.
* If the json file happens to be missing values for one or more output ports, it's a failure.
* If the file is not present, an empty value is returned and the engine will execute its own output evaluation logic.
*
*
* TODO: use IO instead of Future ?
*/
final private def outputEvaluationJsonFunction(outputPorts: Set[OutputPort],
Expand All @@ -239,15 +239,15 @@ case class CommandLineTool private(
.getOrElse(s"Cannot find a value for output ${outputPort.name} in output json $json".invalidNel)
}).toEither
}

// Parse content as json and return output values for each output port
def parseContent(content: String): EvaluatedOutputs = {
for {
parsed <- yaml.parser.parse(content).flatMap(_.as[Map[String, Json]]).leftMap(error => NonEmptyList.one(error.getMessage))
jobOutputsMap <- jsonToOutputs(parsed)
} yield jobOutputsMap.toMap
}

for {
// Glob for "cwl.output.json"
outputJsonGlobs <- OptionT.liftF { ioFunctionSet.glob(CwlOutputJson) }
Expand All @@ -259,7 +259,7 @@ case class CommandLineTool private(
outputs = parseContent(content)
} yield outputs
}


def buildTaskDefinition(validator: RequirementsValidator, parentExpressionLib: ExpressionLib): ErrorOr[CallableTaskDefinition] = {
for {
Expand Down Expand Up @@ -365,7 +365,7 @@ case class CommandLineTool private(

object CommandLineTool {
val CwlOutputJson = "cwl.output.json"

private val DefaultPosition = Coproduct[StringOrInt](0)
// Elements of the sorting key can be either Strings or Ints
type StringOrInt = String :+: Int :+: CNil
Expand Down Expand Up @@ -612,27 +612,29 @@ object CommandLineTool {
* Returns the list of secondary files for the primary file.
*/
def secondaryFiles(primaryWomFile: WomFile,
stringWomFileType: WomFileType,
secondaryFilesOption: Option[SecondaryFiles],
parameterContext: ParameterContext,
expressionLib: ExpressionLib): ErrorOr[List[WomFile]] = {
secondaryFilesOption
.map(secondaryFiles(primaryWomFile, _, parameterContext, expressionLib))
.map(secondaryFiles(primaryWomFile, stringWomFileType, _, parameterContext, expressionLib))
.getOrElse(Nil.valid)
}

/**
* Returns the list of secondary files for the primary file.
*/
def secondaryFiles(primaryWomFile: WomFile,
stringWomFileType: WomFileType,
secondaryFiles: SecondaryFiles,
parameterContext: ParameterContext,
expressionLib: ExpressionLib): ErrorOr[List[WomFile]] = {
secondaryFiles
.fold(CommandLineTool.CommandOutputParameter.SecondaryFilesPoly)
.apply(primaryWomFile, parameterContext, expressionLib)
.apply(primaryWomFile, stringWomFileType, parameterContext, expressionLib)
}

type SecondaryFilesFunction = (WomFile, ParameterContext, ExpressionLib) => ErrorOr[List[WomFile]]
type SecondaryFilesFunction = (WomFile, WomFileType, ParameterContext, ExpressionLib) => ErrorOr[List[WomFile]]

object SecondaryFilesPoly extends Poly1 {
implicit def caseStringOrExpression: Case.Aux[StringOrExpression, SecondaryFilesFunction] = {
Expand All @@ -644,25 +646,25 @@ object CommandLineTool {
implicit def caseExpression: Case.Aux[Expression, SecondaryFilesFunction] = {
at {
expression =>
(primaryWomFile, parameterContext, expressionLib) =>
File.secondaryExpressionFiles(primaryWomFile, expression, parameterContext, expressionLib)
(primaryWomFile, stringWomFileType, parameterContext, expressionLib) =>
File.secondaryExpressionFiles(primaryWomFile, stringWomFileType, expression, parameterContext, expressionLib)
}
}

implicit def caseString: Case.Aux[String, SecondaryFilesFunction] = {
at {
string =>
(primaryWomFile, _, _) =>
File.secondaryStringFile(primaryWomFile, string).map(List(_))
(primaryWomFile, stringWomFileType, _, _) =>
File.secondaryStringFile(primaryWomFile, stringWomFileType, string).map(List(_))
}
}

implicit def caseArray: Case.Aux[Array[StringOrExpression], SecondaryFilesFunction] = {
at {
array =>
(primaryWomFile, parameterContext, expressionLib) =>
(primaryWomFile, stringWomFileType, parameterContext, expressionLib) =>
val functions: List[SecondaryFilesFunction] = array.toList.map(_.fold(this))
functions.flatTraverse(_ (primaryWomFile, parameterContext, expressionLib))
functions.flatTraverse(_ (primaryWomFile, stringWomFileType, parameterContext, expressionLib))
}
}
}
Expand Down

0 comments on commit 8380b54

Please sign in to comment.