diff --git a/.travis.yml b/.travis.yml index f330307..28ad4ae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ jdk: scala: - 2.11.8 -script: sbt ++$TRAVIS_SCALA_VERSION clean compile +script: sbt ++$TRAVIS_SCALA_VERSION clean coverageTest cache: directories: @@ -14,6 +14,8 @@ cache: - $HOME/.sbt/boot/ - $HOME/.zinc +after_success: sbt ++$TRAVIS_SCALA_VERSION travis-report + before_cache: # Tricks to avoid unnecessary cache updates - find $HOME/.ivy2 -name "ivydata-*.properties" -delete diff --git a/README.md b/README.md index 25997f4..6cf5789 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,17 @@ # spark-commands -[![Maven Central](https://img.shields.io/maven-central/v/org.hammerlab/spark-commands_2.11.svg?maxAge=600)](http://search.maven.org/#search%7Cga%7C1%7Cspark-commands) +[![Build Status](https://travis-ci.org/hammerlab/spark-commands.svg?branch=master)](https://travis-ci.org/hammerlab/spark-commands) +[![Coverage Status](https://coveralls.io/repos/github/hammerlab/spark-commands/badge.svg)](https://coveralls.io/github/hammerlab/spark-commands) + +Interfaces for creating CLI-runnable and testable commands/apps, with [Spark](http://spark.apache.org/)-based and non-Spark flavors. + +## args4j +[![Maven Central](https://img.shields.io/maven-central/v/org.hammerlab.cli/args4j_2.11.svg?maxAge=600)](http://search.maven.org/#search%7Cga%7C1%7Chammerlab%20args4j) + + +## case-app +[![Maven Central](https://img.shields.io/maven-central/v/org.hammerlab.cli/case-app_2.11.svg?maxAge=600)](http://search.maven.org/#search%7Cga%7C1%7Chammerlab%20case-app) + + + -Interfaces for creating CLI-runnable and testable Spark commands/apps. diff --git a/src/main/scala/org/hammerlab/commands/Args.scala b/args4j/src/main/scala/org/hammerlab/cli/args4j/Args.scala similarity index 65% rename from src/main/scala/org/hammerlab/commands/Args.scala rename to args4j/src/main/scala/org/hammerlab/cli/args4j/Args.scala index 3ed8ec8..722e5ea 100644 --- a/src/main/scala/org/hammerlab/commands/Args.scala +++ b/args4j/src/main/scala/org/hammerlab/cli/args4j/Args.scala @@ -1,8 +1,9 @@ -package org.hammerlab.commands +package org.hammerlab.cli.args4j import org.apache.spark.SparkContext import org.bdgenomics.utils.cli.Args4jBase -trait Args extends Args4jBase { +trait Args + extends Args4jBase { def validate(sc: SparkContext): Unit = {} } diff --git a/src/main/scala/org/hammerlab/commands/Command.scala b/args4j/src/main/scala/org/hammerlab/cli/args4j/Command.scala similarity index 80% rename from src/main/scala/org/hammerlab/commands/Command.scala rename to args4j/src/main/scala/org/hammerlab/cli/args4j/Command.scala index c588b41..0e8fddf 100644 --- a/src/main/scala/org/hammerlab/commands/Command.scala +++ b/args4j/src/main/scala/org/hammerlab/cli/args4j/Command.scala @@ -1,4 +1,4 @@ -package org.hammerlab.commands +package org.hammerlab.cli.args4j import grizzled.slf4j.Logging import org.bdgenomics.utils.cli.Args4j @@ -21,7 +21,12 @@ abstract class Command[T <: Args: Manifest] * * @param args the command line arguments. */ - def run(args: Array[String]): Unit = run(Args4j[T](args)) + def run(args: Array[String]): Unit = + Args4j[T](args) match { + case Left(args) ⇒ run(args) + case _ ⇒ + } + def run(args: String*): Unit = run(args.toArray) def run(args: T): Unit diff --git a/args4j/src/main/scala/org/hammerlab/cli/args4j/SparkCommand.scala b/args4j/src/main/scala/org/hammerlab/cli/args4j/SparkCommand.scala new file mode 100644 index 0000000..e2684ca --- /dev/null +++ b/args4j/src/main/scala/org/hammerlab/cli/args4j/SparkCommand.scala @@ -0,0 +1,39 @@ +package org.hammerlab.cli.args4j + +import org.apache.spark.SparkContext +import org.hammerlab.spark.{ SparkConfBase, confs } + +abstract class SparkCommand[T <: Args: Manifest] + extends Command[T] + with SparkConfBase + with confs.Kryo { + + override def run(args: T): Unit = { + val sc = createSparkContext() + try { + args.validate(sc) + run(args, sc) + } finally { + sc.stop() + } + } + + def run(args: T, sc: SparkContext): Unit + + /** + * Return a spark context. + * + * Typically, most properties are set through config file / cmd-line. + * @return + */ + private def createSparkContext(): SparkContext = { + val conf = makeSparkConf + + conf.getOption("spark.app.name") match { + case Some(cmdLineName) => conf.setAppName(s"$cmdLineName: $name") + case _ => conf.setAppName(name) + } + + new SparkContext(conf) + } +} diff --git a/args4j/src/test/scala/org/hammerlab/cli/args4j/SparkCommandTest.scala b/args4j/src/test/scala/org/hammerlab/cli/args4j/SparkCommandTest.scala new file mode 100644 index 0000000..943e40e --- /dev/null +++ b/args4j/src/test/scala/org/hammerlab/cli/args4j/SparkCommandTest.scala @@ -0,0 +1,67 @@ +package org.hammerlab.cli.args4j + +import com.esotericsoftware.kryo.Kryo +import org.apache.spark.SparkContext +import org.apache.spark.serializer.KryoRegistrator +import org.hammerlab.paths.Path +import org.hammerlab.test.Suite +import org.kohsuke.args4j.Argument + +class SparkCommandTest + extends Suite { + test("command") { + val outFile = tmpPath() + + TestCommand.main( + Array( + outFile.toString + ) + ) + + outFile + .lines + .toList should be( + List( + "8mb", + "org.hammerlab.cli.args4j.TestRegistrar" + ) + ) + } +} + +class TestArgs + extends Args { + @Argument(required = true) + var outFile: String = _ +} + +class TestRegistrar + extends KryoRegistrator { + override def registerClasses(kryo: Kryo): Unit = {} +} + +import org.hammerlab.spark.test.suite.TestConfs + +object TestCommand + extends SparkCommand[TestArgs] + with TestConfs { + + override def name: String = "test" + override def description: String = "test command" + + sparkConf( + "spark.kryoserializer.buffer" → "8mb" + ) + + override def registrar = classOf[TestRegistrar] + + override def run(args: TestArgs, sc: SparkContext): Unit = { + val conf = sc.getConf + Path(args.outFile).writeLines( + List( + conf.get("spark.kryoserializer.buffer"), + conf.get("spark.kryo.registrator") + ) + ) + } +} diff --git a/build.sbt b/build.sbt index 95da9be..aad3982 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,33 @@ -name := "spark-commands" -version := "1.0.4" -providedDeps += spark.value +val defaults = Seq( + organization := "org.hammerlab.cli", + deps ++= Seq( + slf4j, + spark_util % "1.3.0" + ) +) ++ addSparkDeps -deps ++= Seq( - libs.value('bdg_utils_cli), - libs.value('slf4j) +lazy val args4j = project.settings( + defaults, + version := "1.1.0-SNAPSHOT", + deps += bdg_utils_cli % "0.3.0", + testDeps += Parent.autoImport.args4j ) + +lazy val case_app = project.settings( + defaults, + name := "case-app", + version := "1.0.0-SNAPSHOT", + deps ++= Seq( + Parent.autoImport.case_app, + io % "1.2.0", + paths % "1.2.0" + ) +) + +lazy val cli_root = + rootProject( + "cli-root", + args4j, + case_app + ) diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/App.scala b/case_app/src/main/scala/org/hammerlab/cli/app/App.scala new file mode 100644 index 0000000..d723355 --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/app/App.scala @@ -0,0 +1,34 @@ +package org.hammerlab.cli.app + +import caseapp.core.Messages +import caseapp.{ CaseApp, Parser, RemainingArgs } +import grizzled.slf4j.Logging + +abstract class App[Args : Parser : Messages] + extends CaseApp[Args] + with Logging { + + final override def run(options: Args, remainingArgs: RemainingArgs): Unit = + remainingArgs match { + case RemainingArgs(args, Nil) ⇒ + run( + options, + args + ) + case RemainingArgs(args, unparsed) ⇒ + throw new IllegalArgumentException( + s"Unparsed arguments: ${unparsed.mkString(" ")}" + ) + } + + def done(): Unit = {} + + final def run(options: Args, remainingArgs: Seq[String]): Unit = + try { + _run(options, remainingArgs) + } finally { + done() + } + + protected def _run(options: Args, remainingArgs: Seq[String]): Unit +} diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/IndexingApp.scala b/case_app/src/main/scala/org/hammerlab/cli/app/IndexingApp.scala new file mode 100644 index 0000000..e6256cf --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/app/IndexingApp.scala @@ -0,0 +1,36 @@ +package org.hammerlab.cli.app + +import caseapp.Parser +import caseapp.core.Messages +import org.hammerlab.io.Printer +import org.hammerlab.paths.Path + +trait OutPathArgs { + def out: Option[Path] +} + +/** + * Interface for apps that take a [[Path]] and "index" it in some way, generating an output file that is by default + * named by appending an extension to the input path. + * + * @param defaultSuffix if [[OutPathArgs.out]] is empty, construct an output path by appending this string to the argument + * value [[PathApp.path]]. + */ +abstract class IndexingApp[Args <: OutPathArgs : Parser : Messages](defaultSuffix: String) + extends PathApp[Args] { + implicit var printer: Printer = _ + + override def init(options: Args): Unit = { + printer = + Printer( + options + .out + .getOrElse( + path + defaultSuffix + ) + ) + } + + override def close(): Unit = + printer.close() +} diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/PathApp.scala b/case_app/src/main/scala/org/hammerlab/cli/app/PathApp.scala new file mode 100644 index 0000000..ac5816d --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/app/PathApp.scala @@ -0,0 +1,33 @@ +package org.hammerlab.cli.app + +import java.io.Closeable + +import caseapp.Parser +import caseapp.core.Messages +import org.hammerlab.paths.Path + +abstract class PathApp[Args : Parser : Messages] + extends App[Args] + with Closeable { + + @transient implicit var path: Path = _ + + def init(options: Args): Unit = {} + def close(): Unit = {} + + final protected override def _run(options: Args, args: Seq[String]): Unit = { + if (args.size != 1) { + throw new IllegalArgumentException( + s"Exactly one argument (a BAM file path) is required" + ) + } + + path = Path(args.head) + + init(options) + run(options) + close() + } + + protected def run(options: Args): Unit +} diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala b/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala new file mode 100644 index 0000000..e20cbe0 --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala @@ -0,0 +1,81 @@ +package org.hammerlab.cli.app + +import caseapp.Parser +import caseapp.core.Messages +import grizzled.slf4j.Logging +import org.apache.spark.SparkContext +import org.apache.spark.serializer.KryoRegistrator +import org.hammerlab.cli.args.OutputArgs +import org.hammerlab.hadoop.Configuration +import org.hammerlab.io.{ Printer, SampleSize } +import org.hammerlab.paths.Path +import org.hammerlab.spark.{ SparkConfBase, confs } + +trait SparkPathAppArgs { + def output: OutputArgs +} + +trait HasSparkConf + extends SparkConfBase + with confs.Kryo + with confs.DynamicAllocation + with confs.EventLog + with confs.Speculation + +trait SparkApp[Args] + extends HasSparkConf { + + self: App[Args] with Logging ⇒ + + @transient private var _sc: SparkContext = _ + + implicit def sc: SparkContext = { + if (_sc == null) { + info("Creating SparkContext") + val conf = makeSparkConf + if ( + conf.get("spark.eventLog.enabled", "true") == "true" && + conf.get("spark.eventLog.dir", "") == "" && + !Path("/tmp/spark-events").exists) { + conf.set("spark.eventLog.enabled", "false") + warn("Disabling event-logging because default destination /tmp/spark-events doesn't exist") + } + _sc = new SparkContext(conf) + } + _sc + } + + implicit def conf: Configuration = sc.hadoopConfiguration + + override def done(): Unit = { + if (_sc != null && !_sc.isStopped) { + info("Stopping SparkContext") + _sc.stop() + } + _sc = null + } +} + +/** + * [[SparkApp]] that takes an input path and prints some information to stdout or a path, with optional truncation of + * such output. + */ +abstract class SparkPathApp[Args <: SparkPathAppArgs : Parser : Messages](override val registrar: Class[_ <: KryoRegistrator] = null) + extends PathApp[Args] + with SparkApp[Args] { + + @transient implicit var printer: Printer = _ + @transient implicit var printLimit: SampleSize = _ + + override def init(options: Args): Unit = { + val OutputArgs(printLim, path, overwrite) = options.output + + if (path.exists(_.exists) && !overwrite) + throw new IllegalArgumentException( + s"Output path $path exists and overwrite (-f) not set" + ) + + printer = Printer(path) + printLimit = printLim + } +} diff --git a/case_app/src/main/scala/org/hammerlab/cli/args/OutputArgs.scala b/case_app/src/main/scala/org/hammerlab/cli/args/OutputArgs.scala new file mode 100644 index 0000000..0034d0d --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/args/OutputArgs.scala @@ -0,0 +1,21 @@ +package org.hammerlab.cli.args + +import caseapp.{ ValueDescription, ExtraName ⇒ O, HelpMessage ⇒ M } +import org.hammerlab.io.SampleSize +import org.hammerlab.paths.Path + +case class OutputArgs( + @O("l") + @ValueDescription("num=1000000") + @M("When collecting samples of records/results for displaying to the user, limit to this many to avoid overloading the driver") + printLimit: SampleSize = SampleSize(1000000), + + @O("o") + @ValueDescription("path") + @M("Print output to this file, otherwise to stdout") + outputPath: Option[Path] = None, + + @O("f") + @M("Whether to overwrite the output file, if it already exists") + overwrite: Boolean = false +) diff --git a/case_app/src/test/resources/numbers b/case_app/src/test/resources/numbers new file mode 100644 index 0000000..f00c965 --- /dev/null +++ b/case_app/src/test/resources/numbers @@ -0,0 +1,10 @@ +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 diff --git a/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala b/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala new file mode 100644 index 0000000..a4e9a61 --- /dev/null +++ b/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala @@ -0,0 +1,36 @@ +package org.hammerlab.cli.app + +import caseapp.{ AppName, ExtraName ⇒ O } +import org.hammerlab.paths.Path +import org.hammerlab.test.Suite + +class IndexingAppTest + extends Suite { + test("SumNumbers") { + val in = fileCopy(path("numbers"), tmpPath()) + SumNumbers.main( + Array( + in.toString() + ) + ) + (in + ".sum").read should be("55\n") + } +} + +@AppName("Add up numbers from an input file, write result to a sibling file with extension '.sum'") +case class Args(@O("o") out: Option[Path] = None) + extends OutPathArgs + +object SumNumbers + extends IndexingApp[Args](".sum") { + override protected def run(options: Args): Unit = { + import org.hammerlab.io.Printer._ + import cats.implicits.catsStdShowForInt + echo( + path + .lines + .map(_.toInt) + .sum + ) + } +} diff --git a/case_app/src/test/scala/org/hammerlab/cli/app/SparkPathAppTest.scala b/case_app/src/test/scala/org/hammerlab/cli/app/SparkPathAppTest.scala new file mode 100644 index 0000000..d7f76c1 --- /dev/null +++ b/case_app/src/test/scala/org/hammerlab/cli/app/SparkPathAppTest.scala @@ -0,0 +1,37 @@ +package org.hammerlab.cli.app + +import caseapp.Recurse +import org.hammerlab.cli.args.OutputArgs +import org.hammerlab.spark.test.suite.MainSuite +import org.hammerlab.test.Suite + +class SparkPathAppTest + extends MainSuite { + test("SumNumbersSpark") { + val out = tmpPath() + SumNumbersSpark.main( + Array( + path("numbers").toString, + "-o", out.toString + ) + ) + out.read should be("55\n") + } +} + +case class SparkArgs(@Recurse output: OutputArgs) + extends SparkPathAppArgs + +object SumNumbersSpark + extends SparkPathApp[SparkArgs] { + override protected def run(options: SparkArgs): Unit = { + import org.hammerlab.io.Printer._ + import cats.implicits.catsStdShowForInt + echo( + sc + .textFile(path.toString) + .map(_.toInt) + .reduce(_ + _) + ) + } +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 1dd7897..c67154a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1 @@ -addSbtPlugin("org.hammerlab" % "sbt-parent" % "2.0.1") +addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.2.0") diff --git a/src/main/scala/org/hammerlab/commands/SparkCommand.scala b/src/main/scala/org/hammerlab/commands/SparkCommand.scala deleted file mode 100644 index 78e184f..0000000 --- a/src/main/scala/org/hammerlab/commands/SparkCommand.scala +++ /dev/null @@ -1,77 +0,0 @@ -package org.hammerlab.commands - -import org.apache.spark.{SparkConf, SparkContext} - -import scala.collection.mutable - -abstract class SparkCommand[T <: Args: Manifest] - extends Command[T] { - - override def run(args: T): Unit = { - val sc = createSparkContext() - try { - args.validate(sc) - run(args, sc) - } finally { - sc.stop() - } - } - - def run(args: T, sc: SparkContext): Unit - - private val defaultConfs = mutable.HashMap[String, String]() - def setDefaultConf(key: String, value: String): Unit = { - defaultConfs.update(key, value) - } - - def defaultRegistrar: String = "" - - /** - * Return a spark context. - * - * Typically, most properties are set through config file / cmd-line. - * @return - */ - private def createSparkContext(): SparkContext = { - val config: SparkConf = new SparkConf() - - config.getOption("spark.app.name") match { - case Some(cmdLineName) => config.setAppName(s"$cmdLineName: $name") - case _ => config.setAppName(name) - } - - if (config.getOption("spark.master").isEmpty) { - val numProcessors = Runtime.getRuntime.availableProcessors() - config.setMaster(s"local[$numProcessors]") - info(s"Running in local mode with $numProcessors 'executors'") - } - - if (config.getOption("spark.serializer").isEmpty) { - config.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - } - - if (config.getOption("spark.kryo.registrator").isEmpty && defaultRegistrar.nonEmpty) { - config.set("spark.kryo.registrator", defaultRegistrar) - } - - if (config.getOption("spark.kryoserializer.buffer").isEmpty) { - config.set("spark.kryoserializer.buffer", "4mb") - } - - if (config.getOption("spark.kryo.referenceTracking").isEmpty) { - config.set("spark.kryo.referenceTracking", "true") - } - - if (config.getOption("spark.kryo.registrationRequired").isEmpty) { - config.set("spark.kryo.registrationRequired", "true") - } - - for { - (k, v) <- defaultConfs - } { - config.set(k, v) - } - - new SparkContext(config) - } -}