Skip to content

Commit

Permalink
Added a SharedFileSystem.cacheCopy method.
Browse files Browse the repository at this point in the history
  • Loading branch information
kshakir committed Sep 13, 2016
1 parent 03478cf commit a0c8e93
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/cromwell/core/PathCopier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import java.nio.file.Path
import better.files._

object PathCopier {
def getDestinationFilePath(sourceContextPath: Path, sourceFilePath: Path, destinationDirPath: Path): Path = {
val relativeFileString = sourceContextPath.toAbsolutePath.relativize(sourceFilePath.toAbsolutePath).toString
destinationDirPath.resolve(relativeFileString)
}

def copy(sourceContextPath: Path, sourceFilePath: Path, destinationDirPath: Path): Unit = {
val relativeFileString: String = sourceContextPath.toAbsolutePath.relativize(sourceFilePath.toAbsolutePath).toString
val destinationFilePath: Path = destinationDirPath.resolve(relativeFileString)
val destinationFilePath = getDestinationFilePath(sourceContextPath, sourceFilePath, destinationDirPath)
copy(sourceFilePath, destinationFilePath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object SharedFileSystem {
}

type PathsPair = (Path, Path)
type LocalizationStrategy = (Path, Path) => Try[Unit]
type DuplicationStrategy = (Path, Path) => Try[Unit]

/**
* Return a `Success` result if the file has already been localized, otherwise `Failure`.
Expand Down Expand Up @@ -65,6 +65,16 @@ object SharedFileSystem {
Try(Files.createSymbolicLink(executionPath, originalPath.toAbsolutePath))
}
}

private def duplicate(description: String, source: Path, dest: Path, strategies: Stream[DuplicationStrategy]) = {
strategies map {
_ (source, dest)
} find {
_.isSuccess
} getOrElse {
Failure(new UnsupportedOperationException(s"Could not $description $source -> $dest"))
}
}
}

trait SharedFileSystem extends PathFactory {
Expand All @@ -73,19 +83,37 @@ trait SharedFileSystem extends PathFactory {

def sharedFileSystemConfig: Config

lazy val LocalizationStrategies = sharedFileSystemConfig.getStringList("localization").asScala
lazy val Localizers = localizePathAlreadyLocalized _ +: (LocalizationStrategies map {
case "hard-link" => localizePathViaHardLink _
case "soft-link" => localizePathViaSymbolicLink _
case "copy" => localizePathViaCopy _
case unsupported => throw new UnsupportedOperationException(s"Localization strategy $unsupported is not recognized")
})
lazy val DefaultStrategies = Seq("hard-link", "soft-link", "copy")

lazy val LocalizationStrategies = getConfigStrategies("localization")
lazy val Localizers = createStrategies(LocalizationStrategies, docker = false)
lazy val DockerLocalizers = createStrategies(LocalizationStrategies, docker = true)

// Note that any unrecognized configuration will be raised when Localizers (just above) gets resolved.
lazy val DockerLocalizers = localizePathAlreadyLocalized _ +: (LocalizationStrategies collect {
case "hard-link" => localizePathViaHardLink _
case "copy" => localizePathViaCopy _
})
lazy val CachingStrategies = getConfigStrategies("caching")
lazy val Cachers = createStrategies(CachingStrategies, docker = false)
lazy val DockerCachers = createStrategies(CachingStrategies, docker = true)

private def getConfigStrategies(configPath: String): Seq[String] = {
if (sharedFileSystemConfig.hasPath(configPath)) {
sharedFileSystemConfig.getStringList(configPath).asScala
} else {
DefaultStrategies
}
}

private def createStrategies(configStrategies: Seq[String], docker: Boolean): Seq[DuplicationStrategy] = {
localizePathAlreadyLocalized _ +: {
LocalizationStrategies filter {
case "soft-link" if docker => false
case _ => true
} map {
case "hard-link" => localizePathViaHardLink _
case "soft-link" => localizePathViaSymbolicLink _
case "copy" => localizePathViaCopy _
case unsupported => throw new UnsupportedOperationException(s"Strategy $unsupported is not recognized")
}
}
}

private def hostAbsoluteFilePath(callRoot: Path, pathString: String): File = {
val wdlPath = Paths.get(pathString)
Expand All @@ -105,6 +133,12 @@ trait SharedFileSystem extends PathFactory {
}
}

def cacheCopy(sourceContextPath: Path, sourceFilePath: Path, destinationDirPath: Path, docker: Boolean): Try[Unit] = {
val strategies = if (docker) DockerCachers else Cachers
val destinationFilePath = PathCopier.getDestinationFilePath(sourceContextPath, sourceFilePath, destinationDirPath)
duplicate("cache", sourceFilePath, destinationFilePath, strategies.toStream)
}

/**
* Return a possibly altered copy of inputs reflecting any localization of input file paths that might have
* been performed for this `Backend` implementation.
Expand Down Expand Up @@ -161,11 +195,7 @@ trait SharedFileSystem extends PathFactory {
* @param wdlValue WdlValue to localize
* @return localized wdlValue
*/
private def localizeWdlValue(toDestPath: (String => PathsPair), strategies: Stream[LocalizationStrategy])(wdlValue: WdlValue): Try[WdlValue] = {

def localize(source: Path, dest: Path) = strategies map { _(source, dest) } find { _.isSuccess } getOrElse {
Failure(new UnsupportedOperationException(s"Could not localize $source -> $dest"))
}
private def localizeWdlValue(toDestPath: (String => PathsPair), strategies: Stream[DuplicationStrategy])(wdlValue: WdlValue): Try[WdlValue] = {

def adjustArray(t: WdlArrayType, inputArray: Seq[WdlValue]): Try[WdlArray] = {
val tryAdjust = inputArray map localizeWdlValue(toDestPath, strategies)
Expand All @@ -185,7 +215,7 @@ trait SharedFileSystem extends PathFactory {

def adjustFile(path: String) = {
val (src, dst) = toDestPath(path)
localize(src, dst) map { Unit => WdlFile(dst.toString) }
duplicate("localize", src, dst, strategies) map { Unit => WdlFile(dst.toString) }
}

wdlValue match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,7 @@ trait SharedFileSystemAsyncJobExecutionActor
private val sharedFileSystem = new SharedFileSystem {
override lazy val sharedFileSystemConfig = {
import lenthall.config.ScalaConfig._
val config = configurationDescriptor.backendConfig.getConfigOption("filesystems.local")
config.getOrElse(ConfigFactory.parseString("""localization: [ "hard-link", "soft-link", "copy" ]"""))
configurationDescriptor.backendConfig.getConfigOr("filesystems.local")
}
}
}

0 comments on commit a0c8e93

Please sign in to comment.