Skip to content

Commit

Permalink
Add a byte limit to the read API (#3113)
Browse files Browse the repository at this point in the history
* read limit
  • Loading branch information
Horneth committed Jan 9, 2018
1 parent be2ea6a commit d31000c
Show file tree
Hide file tree
Showing 19 changed files with 89 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ trait StandardAsyncExecutionActor extends AsyncBackendJobExecutionActor with Sta
lazy val stderrAsOption: Option[Path] = Option(jobPaths.stderr)

val stderrSizeAndReturnCode = for {
returnCodeAsString <- asyncIo.contentAsStringAsync(jobPaths.returnCode)
returnCodeAsString <- asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
// may fail due to race conditions on quickly-executing jobs.
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(jobPaths.stderr) else Future.successful(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import scala.util.{Success, Try}
trait ReadLikeFunctions extends PathFactory with IoFunctionSet with AsyncIoFunctions {

// TODO WOM: https://github.com/broadinstitute/cromwell/issues/2611
val fileSizeLimitationConfig = FileSizeLimitationConfig.fileSizeLimitationConfig
val fileSizeLimitationConfig = FileSizeLimitationConfig.fileSizeLimitationConfig

override def readFile(path: String): Future[String] = asyncIo.contentAsStringAsync(buildPath(path))
override def readFile(path: String, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String] = asyncIo.contentAsStringAsync(buildPath(path), maxBytes, failOnOverflow)

protected def size(file: WomValue): Future[Double] = asyncIo.sizeAsync(buildPath(file.valueString)).map(_.toDouble)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ReadLikeFunctionsSpec extends AsyncFlatSpec with Matchers {

// TODO WOM: Fix
class TestReadLikeFunctions(sizeResult: Try[Double]) extends IoFunctionSet {
override def readFile(path: String): Future[String] = ???
override def readFile(path: String, maxBytes: Option[Int] = None, failOnOverflow: Boolean = false): Future[String] = ???

override def writeFile(path: String, content: String): Future[WomSingleFile] = ???

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/cromwell/core/NoIoFunctionSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.concurrent.Future
import scala.util.{Failure, Try}

case object NoIoFunctionSet extends IoFunctionSet {
override def readFile(path: String): Future[String] = Future.failed(new NotImplementedError("readFile is not available here"))
override def readFile(path: String, maxBytes: Option[Int] = None, failOnOverflow: Boolean = false): Future[String] = Future.failed(new NotImplementedError("readFile is not available here"))

override def writeFile(path: String, content: String): Future[WomFile] = Future.failed(new NotImplementedError("writeFile is not available here"))

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/cromwell/core/io/AsyncIo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class AsyncIo(ioEndpoint: ActorRef, ioCommandBuilder: IoCommandBuilder) {
* IMPORTANT: This loads the entire content of the file into memory !
* Only use for small files !
*/
def contentAsStringAsync(path: Path): Future[String] = {
asyncCommand(ioCommandBuilder.contentAsStringCommand(path))
def contentAsStringAsync(path: Path, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String] = {
asyncCommand(ioCommandBuilder.contentAsStringCommand(path, maxBytes, failOnOverflow))
}

def writeAsync(path: Path, content: String, options: OpenOptions): Future[Unit] = {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cromwell.core.io

import better.files.File.OpenOptions
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.Path

object DefaultIoCommand {
Expand All @@ -9,7 +10,7 @@ object DefaultIoCommand {
override val overwrite: Boolean) extends IoCopyCommand(
source, destination, overwrite
)
case class DefaultIoContentAsStringCommand(override val file: Path) extends IoContentAsStringCommand(file)
case class DefaultIoContentAsStringCommand(override val file: Path, override val options: IoReadOptions) extends IoContentAsStringCommand(file, options)
case class DefaultIoSizeCommand(override val file: Path) extends IoSizeCommand(file)
case class DefaultIoWriteCommand(override val file: Path,
override val content: String,
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/cromwell/core/io/IoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cromwell.core.io

import better.files.File.OpenOptions
import com.google.api.client.util.ExponentialBackOff
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.Path
import cromwell.core.retry.SimpleExponentialBackoff

Expand Down Expand Up @@ -52,11 +53,21 @@ abstract class IoCopyCommand(val source: Path, val destination: Path, val overwr
override def toString = s"copy ${source.pathAsString} to ${destination.pathAsString} with overwrite = $overwrite"
override lazy val name = "copy"
}

object IoContentAsStringCommand {

/**
* Options to customize reading of a file.
* @param maxBytes If specified, only reads up to maxBytes Bytes from the file
* @param failOnOverflow If this is true, maxBytes is specified, and the file is larger than maxBytes, fail the command.
*/
case class IoReadOptions(maxBytes: Option[Int], failOnOverflow: Boolean)
}

/**
* Read file as a string (load the entire content in memory)
*/
abstract class IoContentAsStringCommand(val file: Path) extends SingleFileIoCommand[String] {
abstract class IoContentAsStringCommand(val file: Path, val options: IoReadOptions = IoReadOptions(None, failOnOverflow = false)) extends SingleFileIoCommand[String] {
override def toString = s"read content of ${file.pathAsString}"
override lazy val name = "read"
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package cromwell.core.io

import cromwell.core.io.DefaultIoCommand._
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.BetterFileMethods.OpenOptions
import cromwell.core.path.Path

/**
* Can be used to customize IoCommands for the desired I/O operations
*/
abstract class PartialIoCommandBuilder {
def contentAsStringCommand: PartialFunction[Path, IoContentAsStringCommand] = PartialFunction.empty
def contentAsStringCommand: PartialFunction[(Path, Option[Int], Boolean), IoContentAsStringCommand] = PartialFunction.empty
def writeCommand: PartialFunction[(Path, String, OpenOptions), IoWriteCommand] = PartialFunction.empty
def sizeCommand: PartialFunction[Path, IoSizeCommand] = PartialFunction.empty
def deleteCommand: PartialFunction[(Path, Boolean), IoDeleteCommand] = PartialFunction.empty
Expand Down Expand Up @@ -49,8 +50,8 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp
}).getOrElse(default)
}

def contentAsStringCommand(path: Path): IoContentAsStringCommand = {
buildOrDefault(_.contentAsStringCommand, path, DefaultIoContentAsStringCommand(path))
def contentAsStringCommand(path: Path, maxBytes: Option[Int], failOnOverflow: Boolean): IoContentAsStringCommand = {
buildOrDefault(_.contentAsStringCommand, (path, maxBytes, failOnOverflow), DefaultIoContentAsStringCommand(path, IoReadOptions(maxBytes, failOnOverflow)))
}

def writeCommand(path: Path, content: String, options: OpenOptions): IoWriteCommand = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/cromwell/core/io/AsyncIoSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class AsyncIoSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
val testPath = DefaultPathBuilder.createTempFile()
testPath.write("hello")

testActor.underlyingActor.asyncIo.contentAsStringAsync(testPath) map { result =>
testActor.underlyingActor.asyncIo.contentAsStringAsync(testPath, None, failOnOverflow = false) map { result =>
assert(result == "hello")
}
}
Expand Down
2 changes: 1 addition & 1 deletion cwl/src/main/scala/cwl/CommandOutputBinding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ case class CommandOutputBinding(

object CommandOutputBinding {
type Glob = Expression :+: String :+: Array[String] :+: CNil

val ReadLimit = Option(64 * 1024)
}
8 changes: 2 additions & 6 deletions cwl/src/main/scala/cwl/CommandOutputExpression.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,9 @@ case class CommandOutputExpression(outputBinding: CommandOutputBinding,
}

private def load64KiB(path: String, ioFunctionSet: IoFunctionSet): String = {
// This suggests the IoFunctionSet should have a length-limited read API as both CWL and WDL support this concept.
// ChrisL: But remember that they are different (WDL => failure, CWL => truncate)
val content = ioFunctionSet.readFile(path)
val content = ioFunctionSet.readFile(path, CommandOutputBinding.ReadLimit, failOnOverflow = false)

// TODO: propagate IO, Try, or Future or something all the way out via "commandOutputBindingtoWomValue" signature
// TODO: Stream only the first 64 KiB, this "read everything then ignore most of it" method will be very inefficient
val initialResult = Await.result(content, 60 seconds)
initialResult.substring(0, Math.min(initialResult.length, 64 * 1024))
Await.result(content, 60 seconds)
}
}
2 changes: 1 addition & 1 deletion cwl/src/test/scala/cwl/CommandOutputExpressionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class CommandOutputExpressionSpec extends FlatSpec with Matchers {

def ioFunctionSet(data: String) =
new IoFunctionSet {
override def readFile(path: String) = Future.successful(data)
override def readFile(path: String, maxBytes: Option[Int] = None, failOnOverflow: Boolean = false) = Future.successful(data)
override def writeFile(path: String, content: String) = throw new Exception("writeFile should not be used in this test")
override def stdout(params: Seq[Try[WomValue]]) = throw new Exception("stdout should not be used in this test")
override def stderr(params: Seq[Try[WomValue]]) = throw new Exception("stderr should not be used in this test")
Expand Down
16 changes: 14 additions & 2 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package cromwell.engine.io.nio

import java.io.IOException
import java.nio.charset.StandardCharsets

import akka.actor.{ActorSystem, Scheduler}
import akka.stream.scaladsl.Flow
import cromwell.core.io._
Expand Down Expand Up @@ -73,8 +76,17 @@ class NioFlow(parallelism: Int,
()
}

private def readAsString(read: IoContentAsStringCommand) = Future {
read.file.contentAsString
private def readAsString(read: IoContentAsStringCommand) = {
read.options.maxBytes match {
case Some(limit) =>
Future(read.file.bytes.take(limit)) map { bytesIterator =>
if (read.options.failOnOverflow && bytesIterator.hasNext)
throw new IOException(s"File ${read.file.pathAsString} is larger than $limit Bytes")
else
new String(bytesIterator.toArray, StandardCharsets.UTF_8)
}
case _ => Future(read.file.contentAsString)
}
}

private def size(size: IoSizeCommand) = Future {
Expand Down
39 changes: 38 additions & 1 deletion engine/src/test/scala/cromwell/engine/io/IoActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import better.files.File.OpenOptions
import com.google.cloud.storage.StorageException
import cromwell.core.TestKitSuite
import cromwell.core.io.DefaultIoCommand._
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.io._
import cromwell.core.path.{DefaultPathBuilder, Path}
import cromwell.engine.io.gcs.GcsBatchFlow.BatchFailedException
Expand Down Expand Up @@ -87,7 +88,7 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
val src = DefaultPathBuilder.createTempFile()
src.write("hello")

val readCommand = DefaultIoContentAsStringCommand(src)
val readCommand = DefaultIoContentAsStringCommand(src, IoReadOptions(None, failOnOverflow = false))

testActor ! readCommand
expectMsgPF(5 seconds) {
Expand All @@ -100,6 +101,42 @@ class IoActorSpec extends TestKitSuite with FlatSpecLike with Matchers with Impl
src.delete()
}

it should "read only the first bytes of file" in {
val testActor = TestActorRef(new IoActor(1, None, TestProbe().ref))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")

val readCommand = DefaultIoContentAsStringCommand(src, IoReadOptions(Option(2), failOnOverflow = false))

testActor ! readCommand
expectMsgPF(5 seconds) {
case response: IoSuccess[_] =>
response.command.isInstanceOf[IoContentAsStringCommand] shouldBe true
response.result.asInstanceOf[String] shouldBe "he"
case response: IoFailure[_] => fail("Expected an IoSuccess", response.failure)
}

src.delete()
}

it should "fail if the file is larger than the read limit" in {
val testActor = TestActorRef(new IoActor(1, None, TestProbe().ref))

val src = DefaultPathBuilder.createTempFile()
src.write("hello")

val readCommand = DefaultIoContentAsStringCommand(src, IoReadOptions(Option(2), failOnOverflow = true))

testActor ! readCommand
expectMsgPF(5 seconds) {
case _: IoSuccess[_] => fail("Command should have failed because the read limit was < file size and failOnOverflow was true")
case response: IoFailure[_] => response.failure.getMessage shouldBe s"java.io.IOException: File ${src.pathAsString} is larger than 2 Bytes"
}

src.delete()
}

it should "return a file size" in {
val testActor = TestActorRef(new IoActor(1, None, TestProbe().ref))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
val testPath = DefaultPathBuilder.createTempFile()
testPath.write("hello")

val context = DefaultCommandContext(contentAsStringCommand(testPath), replyTo)
val context = DefaultCommandContext(contentAsStringCommand(testPath, None, failOnOverflow = false), replyTo)
val testSource = Source.single(context)

val stream = testSource.via(flow).toMat(readSink)(Keep.right)
Expand Down Expand Up @@ -193,7 +193,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
it should "retry on retryable exceptions" in {
val testPath = DefaultPathBuilder.build("does/not/matter").get

val context = DefaultCommandContext(contentAsStringCommand(testPath), replyTo)
val context = DefaultCommandContext(contentAsStringCommand(testPath, None, failOnOverflow = false), replyTo)

val testSource = Source.single(context)

Expand Down
4 changes: 2 additions & 2 deletions wdl/src/main/scala/wdl/WdlExpression.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ final case class WdlWomExpression(wdlExpression: WdlExpression, from: Scope) ext
// case in the brave new WOM-world.
wdlExpression.evaluateType(inputTypes.apply, new WdlStandardLibraryFunctionsType, Option(from)).toErrorOr

override def evaluateFiles(inputTypes: Map[String, WomValue], ioFunctionSet: IoFunctionSet, coerceTo: WomType): ErrorOr[Set[WomFile]] ={
override def evaluateFiles(inputTypes: Map[String, WomValue], ioFunctionSet: IoFunctionSet, coerceTo: WomType): ErrorOr[Set[WomFile]] = {
lazy val wdlFunctions = new WdlStandardLibraryFunctions {
override def readFile(path: String): String = Await.result(ioFunctionSet.readFile(path), Duration.Inf)
override def readFile(path: String): String = Await.result(ioFunctionSet.readFile(path, None, failOnOverflow = false), Duration.Inf)

override def writeFile(path: String, content: String): Try[WomFile] = Try(Await.result(ioFunctionSet.writeFile(path, content), Duration.Inf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ trait WdlStandardLibraryFunctions extends WdlFunctions[WomValue] {

object WdlStandardLibraryFunctions {
def fromIoFunctionSet(ioFunctionSet: IoFunctionSet) = new WdlStandardLibraryFunctions {
override def readFile(path: String): String = Await.result(ioFunctionSet.readFile(path), Duration.Inf)
override def readFile(path: String): String = Await.result(ioFunctionSet.readFile(path, None, failOnOverflow = false), Duration.Inf)

override def writeFile(path: String, content: String): Try[WomFile] = Try(Await.result(ioFunctionSet.writeFile(path, content), Duration.Inf))

Expand Down
2 changes: 1 addition & 1 deletion wom/src/main/scala/wom/expression/WomExpression.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ final case class ValueAsAnExpression(value: WomValue) extends WomExpression {

// TODO: Flesh this out (https://github.com/broadinstitute/cromwell/issues/2521)
trait IoFunctionSet {
def readFile(path: String): Future[String]
def readFile(path: String, maxBytes: Option[Int], failOnOverflow: Boolean): Future[String]
def writeFile(path: String, content: String): Future[WomFile]
def stdout(params: Seq[Try[WomValue]]): Try[WomFile]
def stderr(params: Seq[Try[WomValue]]): Try[WomFile]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final case class PlaceholderWomExpression(inputs: Set[String], fixedWomType: Wom
}

case object PlaceholderIoFunctionSet extends IoFunctionSet {
override def readFile(path: String): Future[String] = ???
override def readFile(path: String, maxBytes: Option[Int] = None, failOnOverflow: Boolean = false): Future[String] = ???
override def writeFile(path: String, content: String): Future[WomFile] = ???
override def stdout(params: Seq[Try[WomValue]]) = ???
override def stderr(params: Seq[Try[WomValue]]) = ???
Expand Down

0 comments on commit d31000c

Please sign in to comment.