From c2f3bdd0d6c3b946a566c138e0a84fe00a3bbb00 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 10 Jul 2017 21:01:15 +0000 Subject: [PATCH 1/8] upgrade plugin --- build.sbt | 8 ++++---- project/plugins.sbt | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/build.sbt b/build.sbt index 95da9be..d0e7ea6 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,9 @@ name := "spark-commands" -version := "1.0.4" +version := "1.0.5-SNAPSHOT" -providedDeps += spark.value +providedDeps += spark deps ++= Seq( - libs.value('bdg_utils_cli), - libs.value('slf4j) + bdg_utils_cli % "0.2.15", + slf4j ) diff --git a/project/plugins.sbt b/project/plugins.sbt index 1dd7897..ba64db2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1 @@ -addSbtPlugin("org.hammerlab" % "sbt-parent" % "2.0.1") +addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.0.0-SNAPSHOT") From cd79d03101a605f52fd945f14cb04e1cc02ead89 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 17 Jul 2017 15:39:00 +0000 Subject: [PATCH 2/8] use SparkConf helper --- build.sbt | 3 ++- src/main/scala/org/hammerlab/commands/SparkCommand.scala | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index d0e7ea6..5f55826 100644 --- a/build.sbt +++ b/build.sbt @@ -5,5 +5,6 @@ providedDeps += spark deps ++= Seq( bdg_utils_cli % "0.2.15", - slf4j + slf4j, + spark_util % "1.2.0-SNAPSHOT" ) diff --git a/src/main/scala/org/hammerlab/commands/SparkCommand.scala b/src/main/scala/org/hammerlab/commands/SparkCommand.scala index 78e184f..118820f 100644 --- a/src/main/scala/org/hammerlab/commands/SparkCommand.scala +++ b/src/main/scala/org/hammerlab/commands/SparkCommand.scala @@ -1,6 +1,7 @@ package org.hammerlab.commands -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{ SparkConf, SparkContext } +import org.hammerlab.spark.Conf import scala.collection.mutable @@ -33,7 +34,7 @@ abstract class SparkCommand[T <: Args: Manifest] * @return */ private def createSparkContext(): SparkContext = { - val config: SparkConf = new SparkConf() + val config: SparkConf = Conf() config.getOption("spark.app.name") match { case Some(cmdLineName) => config.setAppName(s"$cmdLineName: $name") From d378bc849050f07ac82525ae4912eddd466886f2 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 8 Aug 2017 15:07:28 +0000 Subject: [PATCH 3/8] upgrade plugin, deps --- build.sbt | 4 ++-- project/plugins.sbt | 2 +- src/main/scala/org/hammerlab/commands/Args.scala | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 5f55826..725d91e 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ version := "1.0.5-SNAPSHOT" providedDeps += spark deps ++= Seq( - bdg_utils_cli % "0.2.15", + bdg_utils_cli % "0.2.16", slf4j, - spark_util % "1.2.0-SNAPSHOT" + spark_util % "1.2.1" ) diff --git a/project/plugins.sbt b/project/plugins.sbt index ba64db2..c022d7b 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1 @@ -addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.0.0-SNAPSHOT") +addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.1.0") diff --git a/src/main/scala/org/hammerlab/commands/Args.scala b/src/main/scala/org/hammerlab/commands/Args.scala index 3ed8ec8..73acc1c 100644 --- a/src/main/scala/org/hammerlab/commands/Args.scala +++ b/src/main/scala/org/hammerlab/commands/Args.scala @@ -3,6 +3,7 @@ package org.hammerlab.commands import org.apache.spark.SparkContext import org.bdgenomics.utils.cli.Args4jBase -trait Args extends Args4jBase { +trait Args + extends Args4jBase { def validate(sc: SparkContext): Unit = {} } From 735290d40e75dbc5b0634bf8c9390383a234c994 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 8 Aug 2017 19:06:47 +0000 Subject: [PATCH 4/8] use spark-util add tests --- README.md | 2 + build.sbt | 4 +- .../org/hammerlab/commands/Command.scala | 7 +- .../org/hammerlab/commands/SparkCommand.scala | 65 +++++++------------ .../hammerlab/commands/SparkCommandTest.scala | 56 ++++++++++++++++ 5 files changed, 91 insertions(+), 43 deletions(-) create mode 100644 src/test/scala/org/hammerlab/commands/SparkCommandTest.scala diff --git a/README.md b/README.md index 25997f4..72bd0cc 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # spark-commands +[![Build Status](https://travis-ci.org/hammerlab/spark-commands.svg?branch=master)](https://travis-ci.org/hammerlab/spark-commands) +[![Coverage Status](https://coveralls.io/repos/github/hammerlab/spark-commands/badge.svg)](https://coveralls.io/github/hammerlab/spark-commands) [![Maven Central](https://img.shields.io/maven-central/v/org.hammerlab/spark-commands_2.11.svg?maxAge=600)](http://search.maven.org/#search%7Cga%7C1%7Cspark-commands) Interfaces for creating CLI-runnable and testable Spark commands/apps. diff --git a/build.sbt b/build.sbt index 725d91e..633f30a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,10 @@ name := "spark-commands" -version := "1.0.5-SNAPSHOT" +version := "1.1.0-SNAPSHOT" providedDeps += spark deps ++= Seq( - bdg_utils_cli % "0.2.16", + bdg_utils_cli % "0.3.0", slf4j, spark_util % "1.2.1" ) diff --git a/src/main/scala/org/hammerlab/commands/Command.scala b/src/main/scala/org/hammerlab/commands/Command.scala index c588b41..ec92358 100644 --- a/src/main/scala/org/hammerlab/commands/Command.scala +++ b/src/main/scala/org/hammerlab/commands/Command.scala @@ -21,7 +21,12 @@ abstract class Command[T <: Args: Manifest] * * @param args the command line arguments. */ - def run(args: Array[String]): Unit = run(Args4j[T](args)) + def run(args: Array[String]): Unit = + Args4j[T](args) match { + case Left(args) ⇒ run(args) + case _ ⇒ + } + def run(args: String*): Unit = run(args.toArray) def run(args: T): Unit diff --git a/src/main/scala/org/hammerlab/commands/SparkCommand.scala b/src/main/scala/org/hammerlab/commands/SparkCommand.scala index 118820f..6de3cee 100644 --- a/src/main/scala/org/hammerlab/commands/SparkCommand.scala +++ b/src/main/scala/org/hammerlab/commands/SparkCommand.scala @@ -1,12 +1,14 @@ package org.hammerlab.commands +import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.{ SparkConf, SparkContext } -import org.hammerlab.spark.Conf +import org.hammerlab.spark.{ Conf, SparkConfBase } import scala.collection.mutable abstract class SparkCommand[T <: Args: Manifest] - extends Command[T] { + extends Command[T] + with SparkConfBase { override def run(args: T): Unit = { val sc = createSparkContext() @@ -25,7 +27,22 @@ abstract class SparkCommand[T <: Args: Manifest] defaultConfs.update(key, value) } - def defaultRegistrar: String = "" + def registrar: Class[_ <: KryoRegistrator] = null + + sparkConf( + "spark.master" → "local[*]", + "spark.serializer" → "org.apache.spark.serializer.KryoSerializer", + "spark.kryoserializer.buffer" → "4mb", + "spark.kryo.referenceTracking" → "true", + "spark.kryo.registrationRequired" → "true" + ) + + Option(registrar).foreach( + clz ⇒ + sparkConf( + "spark.kryo.registrator" → clz.getCanonicalName + ) + ) /** * Return a spark context. @@ -34,45 +51,13 @@ abstract class SparkCommand[T <: Args: Manifest] * @return */ private def createSparkContext(): SparkContext = { - val config: SparkConf = Conf() - - config.getOption("spark.app.name") match { - case Some(cmdLineName) => config.setAppName(s"$cmdLineName: $name") - case _ => config.setAppName(name) - } - - if (config.getOption("spark.master").isEmpty) { - val numProcessors = Runtime.getRuntime.availableProcessors() - config.setMaster(s"local[$numProcessors]") - info(s"Running in local mode with $numProcessors 'executors'") - } - - if (config.getOption("spark.serializer").isEmpty) { - config.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - } - - if (config.getOption("spark.kryo.registrator").isEmpty && defaultRegistrar.nonEmpty) { - config.set("spark.kryo.registrator", defaultRegistrar) - } - - if (config.getOption("spark.kryoserializer.buffer").isEmpty) { - config.set("spark.kryoserializer.buffer", "4mb") - } - - if (config.getOption("spark.kryo.referenceTracking").isEmpty) { - config.set("spark.kryo.referenceTracking", "true") - } - - if (config.getOption("spark.kryo.registrationRequired").isEmpty) { - config.set("spark.kryo.registrationRequired", "true") - } + val conf = makeSparkConf - for { - (k, v) <- defaultConfs - } { - config.set(k, v) + conf.getOption("spark.app.name") match { + case Some(cmdLineName) => conf.setAppName(s"$cmdLineName: $name") + case _ => conf.setAppName(name) } - new SparkContext(config) + new SparkContext(conf) } } diff --git a/src/test/scala/org/hammerlab/commands/SparkCommandTest.scala b/src/test/scala/org/hammerlab/commands/SparkCommandTest.scala new file mode 100644 index 0000000..ecf9f18 --- /dev/null +++ b/src/test/scala/org/hammerlab/commands/SparkCommandTest.scala @@ -0,0 +1,56 @@ +package org.hammerlab.commands + +import com.esotericsoftware.kryo.Kryo +import org.apache.spark.SparkContext +import org.apache.spark.serializer.KryoRegistrator +import org.hammerlab.paths.Path +import org.hammerlab.test.Suite +import org.kohsuke.args4j.Argument + +class SparkCommandTest + extends Suite { + test("command") { + val outFile = tmpPath() + TestCommand.main(Array(outFile.toString())) + outFile.lines.toList should be( + List( + "8mb", + "org.hammerlab.commands.TestRegistrar" + ) + ) + } +} + +class TestArgs + extends Args { + @Argument(required = true) + var outFile: String = _ +} + +class TestRegistrar + extends KryoRegistrator { + override def registerClasses(kryo: Kryo): Unit = {} +} + +object TestCommand + extends SparkCommand[TestArgs] { + override def name: String = "test" + override def description: String = "test command" + + sparkConf( + "spark.kryoserializer.buffer" → "8mb" + ) + + + override def registrar = classOf[TestRegistrar] + + override def run(args: TestArgs, sc: SparkContext): Unit = { + val conf = sc.getConf + Path(args.outFile).writeLines( + List( + conf.get("spark.kryoserializer.buffer"), + conf.get("spark.kryo.registrator") + ) + ) + } +} From 1a0c7b77fd1589cd832310736b8d19a129d0bb1c Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 14 Sep 2017 18:13:28 +0000 Subject: [PATCH 5/8] re-pkg as o.h.cli, add case_app module --- .../org/hammerlab/cli/args4j}/Args.scala | 2 +- .../org/hammerlab/cli/args4j}/Command.scala | 2 +- .../hammerlab/cli/args4j/SparkCommand.scala | 39 ++++++++++ .../cli/args4j}/SparkCommandTest.scala | 23 ++++-- build.sbt | 36 ++++++++-- .../scala/org/hammerlab/cli/app/App.scala | 34 +++++++++ .../org/hammerlab/cli/app/IndexingApp.scala | 36 ++++++++++ .../scala/org/hammerlab/cli/app/PathApp.scala | 33 +++++++++ .../org/hammerlab/cli/app/SparkPathApp.scala | 72 +++++++++++++++++++ .../org/hammerlab/cli/args/OutputArgs.scala | 21 ++++++ .../hammerlab/cli/app/IndexingAppTest.scala | 41 +++++++++++ project/plugins.sbt | 2 +- .../org/hammerlab/commands/SparkCommand.scala | 63 ---------------- 13 files changed, 325 insertions(+), 79 deletions(-) rename {src/main/scala/org/hammerlab/commands => args4j/src/main/scala/org/hammerlab/cli/args4j}/Args.scala (83%) rename {src/main/scala/org/hammerlab/commands => args4j/src/main/scala/org/hammerlab/cli/args4j}/Command.scala (96%) create mode 100644 args4j/src/main/scala/org/hammerlab/cli/args4j/SparkCommand.scala rename {src/test/scala/org/hammerlab/commands => args4j/src/test/scala/org/hammerlab/cli/args4j}/SparkCommandTest.scala (76%) create mode 100644 case_app/src/main/scala/org/hammerlab/cli/app/App.scala create mode 100644 case_app/src/main/scala/org/hammerlab/cli/app/IndexingApp.scala create mode 100644 case_app/src/main/scala/org/hammerlab/cli/app/PathApp.scala create mode 100644 case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala create mode 100644 case_app/src/main/scala/org/hammerlab/cli/args/OutputArgs.scala create mode 100644 case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala delete mode 100644 src/main/scala/org/hammerlab/commands/SparkCommand.scala diff --git a/src/main/scala/org/hammerlab/commands/Args.scala b/args4j/src/main/scala/org/hammerlab/cli/args4j/Args.scala similarity index 83% rename from src/main/scala/org/hammerlab/commands/Args.scala rename to args4j/src/main/scala/org/hammerlab/cli/args4j/Args.scala index 73acc1c..722e5ea 100644 --- a/src/main/scala/org/hammerlab/commands/Args.scala +++ b/args4j/src/main/scala/org/hammerlab/cli/args4j/Args.scala @@ -1,4 +1,4 @@ -package org.hammerlab.commands +package org.hammerlab.cli.args4j import org.apache.spark.SparkContext import org.bdgenomics.utils.cli.Args4jBase diff --git a/src/main/scala/org/hammerlab/commands/Command.scala b/args4j/src/main/scala/org/hammerlab/cli/args4j/Command.scala similarity index 96% rename from src/main/scala/org/hammerlab/commands/Command.scala rename to args4j/src/main/scala/org/hammerlab/cli/args4j/Command.scala index ec92358..0e8fddf 100644 --- a/src/main/scala/org/hammerlab/commands/Command.scala +++ b/args4j/src/main/scala/org/hammerlab/cli/args4j/Command.scala @@ -1,4 +1,4 @@ -package org.hammerlab.commands +package org.hammerlab.cli.args4j import grizzled.slf4j.Logging import org.bdgenomics.utils.cli.Args4j diff --git a/args4j/src/main/scala/org/hammerlab/cli/args4j/SparkCommand.scala b/args4j/src/main/scala/org/hammerlab/cli/args4j/SparkCommand.scala new file mode 100644 index 0000000..e2684ca --- /dev/null +++ b/args4j/src/main/scala/org/hammerlab/cli/args4j/SparkCommand.scala @@ -0,0 +1,39 @@ +package org.hammerlab.cli.args4j + +import org.apache.spark.SparkContext +import org.hammerlab.spark.{ SparkConfBase, confs } + +abstract class SparkCommand[T <: Args: Manifest] + extends Command[T] + with SparkConfBase + with confs.Kryo { + + override def run(args: T): Unit = { + val sc = createSparkContext() + try { + args.validate(sc) + run(args, sc) + } finally { + sc.stop() + } + } + + def run(args: T, sc: SparkContext): Unit + + /** + * Return a spark context. + * + * Typically, most properties are set through config file / cmd-line. + * @return + */ + private def createSparkContext(): SparkContext = { + val conf = makeSparkConf + + conf.getOption("spark.app.name") match { + case Some(cmdLineName) => conf.setAppName(s"$cmdLineName: $name") + case _ => conf.setAppName(name) + } + + new SparkContext(conf) + } +} diff --git a/src/test/scala/org/hammerlab/commands/SparkCommandTest.scala b/args4j/src/test/scala/org/hammerlab/cli/args4j/SparkCommandTest.scala similarity index 76% rename from src/test/scala/org/hammerlab/commands/SparkCommandTest.scala rename to args4j/src/test/scala/org/hammerlab/cli/args4j/SparkCommandTest.scala index ecf9f18..943e40e 100644 --- a/src/test/scala/org/hammerlab/commands/SparkCommandTest.scala +++ b/args4j/src/test/scala/org/hammerlab/cli/args4j/SparkCommandTest.scala @@ -1,4 +1,4 @@ -package org.hammerlab.commands +package org.hammerlab.cli.args4j import com.esotericsoftware.kryo.Kryo import org.apache.spark.SparkContext @@ -11,11 +11,19 @@ class SparkCommandTest extends Suite { test("command") { val outFile = tmpPath() - TestCommand.main(Array(outFile.toString())) - outFile.lines.toList should be( + + TestCommand.main( + Array( + outFile.toString + ) + ) + + outFile + .lines + .toList should be( List( "8mb", - "org.hammerlab.commands.TestRegistrar" + "org.hammerlab.cli.args4j.TestRegistrar" ) ) } @@ -32,8 +40,12 @@ class TestRegistrar override def registerClasses(kryo: Kryo): Unit = {} } +import org.hammerlab.spark.test.suite.TestConfs + object TestCommand - extends SparkCommand[TestArgs] { + extends SparkCommand[TestArgs] + with TestConfs { + override def name: String = "test" override def description: String = "test command" @@ -41,7 +53,6 @@ object TestCommand "spark.kryoserializer.buffer" → "8mb" ) - override def registrar = classOf[TestRegistrar] override def run(args: TestArgs, sc: SparkContext): Unit = { diff --git a/build.sbt b/build.sbt index 633f30a..03a4989 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,32 @@ -name := "spark-commands" -version := "1.1.0-SNAPSHOT" -providedDeps += spark +lazy val cli = + rootProject( + args4j, + case_app + ) -deps ++= Seq( - bdg_utils_cli % "0.3.0", - slf4j, - spark_util % "1.2.1" +val defaults = Seq( + organization := "org.hammerlab.cli", + deps ++= Seq( + slf4j, + spark_util % "1.3.0" + ) +) ++ addSparkDeps + +lazy val args4j = project.settings( + defaults, + version := "1.1.0-SNAPSHOT", + deps += bdg_utils_cli % "0.3.0", + testDeps += Parent.autoImport.args4j +) + +lazy val case_app = project.settings( + defaults, + name := "case-app", + version := "1.0.0-SNAPSHOT", + deps ++= Seq( + Parent.autoImport.case_app, + io % "1.2.0", + paths % "1.2.0" + ) ) diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/App.scala b/case_app/src/main/scala/org/hammerlab/cli/app/App.scala new file mode 100644 index 0000000..d723355 --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/app/App.scala @@ -0,0 +1,34 @@ +package org.hammerlab.cli.app + +import caseapp.core.Messages +import caseapp.{ CaseApp, Parser, RemainingArgs } +import grizzled.slf4j.Logging + +abstract class App[Args : Parser : Messages] + extends CaseApp[Args] + with Logging { + + final override def run(options: Args, remainingArgs: RemainingArgs): Unit = + remainingArgs match { + case RemainingArgs(args, Nil) ⇒ + run( + options, + args + ) + case RemainingArgs(args, unparsed) ⇒ + throw new IllegalArgumentException( + s"Unparsed arguments: ${unparsed.mkString(" ")}" + ) + } + + def done(): Unit = {} + + final def run(options: Args, remainingArgs: Seq[String]): Unit = + try { + _run(options, remainingArgs) + } finally { + done() + } + + protected def _run(options: Args, remainingArgs: Seq[String]): Unit +} diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/IndexingApp.scala b/case_app/src/main/scala/org/hammerlab/cli/app/IndexingApp.scala new file mode 100644 index 0000000..e6256cf --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/app/IndexingApp.scala @@ -0,0 +1,36 @@ +package org.hammerlab.cli.app + +import caseapp.Parser +import caseapp.core.Messages +import org.hammerlab.io.Printer +import org.hammerlab.paths.Path + +trait OutPathArgs { + def out: Option[Path] +} + +/** + * Interface for apps that take a [[Path]] and "index" it in some way, generating an output file that is by default + * named by appending an extension to the input path. + * + * @param defaultSuffix if [[OutPathArgs.out]] is empty, construct an output path by appending this string to the argument + * value [[PathApp.path]]. + */ +abstract class IndexingApp[Args <: OutPathArgs : Parser : Messages](defaultSuffix: String) + extends PathApp[Args] { + implicit var printer: Printer = _ + + override def init(options: Args): Unit = { + printer = + Printer( + options + .out + .getOrElse( + path + defaultSuffix + ) + ) + } + + override def close(): Unit = + printer.close() +} diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/PathApp.scala b/case_app/src/main/scala/org/hammerlab/cli/app/PathApp.scala new file mode 100644 index 0000000..ac5816d --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/app/PathApp.scala @@ -0,0 +1,33 @@ +package org.hammerlab.cli.app + +import java.io.Closeable + +import caseapp.Parser +import caseapp.core.Messages +import org.hammerlab.paths.Path + +abstract class PathApp[Args : Parser : Messages] + extends App[Args] + with Closeable { + + @transient implicit var path: Path = _ + + def init(options: Args): Unit = {} + def close(): Unit = {} + + final protected override def _run(options: Args, args: Seq[String]): Unit = { + if (args.size != 1) { + throw new IllegalArgumentException( + s"Exactly one argument (a BAM file path) is required" + ) + } + + path = Path(args.head) + + init(options) + run(options) + close() + } + + protected def run(options: Args): Unit +} diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala b/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala new file mode 100644 index 0000000..f8a4d3d --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala @@ -0,0 +1,72 @@ +package org.hammerlab.cli.app + +import caseapp.Parser +import caseapp.core.Messages +import grizzled.slf4j.Logging +import org.apache.spark.SparkContext +import org.apache.spark.serializer.KryoRegistrator +import org.hammerlab.cli.args.OutputArgs +import org.hammerlab.hadoop.Configuration +import org.hammerlab.io.{ Printer, SampleSize } +import org.hammerlab.spark.{ SparkConfBase, confs } + +trait SparkPathAppArgs { + def output: OutputArgs +} + +trait HasSparkConf + extends SparkConfBase + with confs.Kryo + with confs.DynamicAllocation + with confs.EventLog + with confs.Speculation + +trait SparkApp[Args] + extends HasSparkConf { + + self: App[Args] with Logging ⇒ + + @transient private var _sc: SparkContext = _ + + implicit def sc: SparkContext = { + if (_sc == null) { + info("Creating SparkContext") + _sc = new SparkContext(makeSparkConf) + } + _sc + } + + implicit def conf: Configuration = sc.hadoopConfiguration + + override def done(): Unit = { + if (_sc != null && !_sc.isStopped) { + info("Stopping SparkContext") + _sc.stop() + } + _sc = null + } +} + +/** + * [[SparkApp]] that takes an input path and prints some information to stdout or a path, with optional truncation of + * such output. + */ +abstract class SparkPathApp[Args <: SparkPathAppArgs : Parser : Messages](override val registrar: Class[_ <: KryoRegistrator]) + extends PathApp[Args] + with SparkApp[Args] { + + @transient implicit var printer: Printer = _ + @transient implicit var printLimit: SampleSize = _ + + override def init(options: Args): Unit = { + val OutputArgs(printLim, path, overwrite) = options.output + + if (path.exists(_.exists) && !overwrite) + throw new IllegalArgumentException( + s"Output path $path exists and overwrite (-f) not set" + ) + + printer = Printer(path) + printLimit = printLim + } +} diff --git a/case_app/src/main/scala/org/hammerlab/cli/args/OutputArgs.scala b/case_app/src/main/scala/org/hammerlab/cli/args/OutputArgs.scala new file mode 100644 index 0000000..0034d0d --- /dev/null +++ b/case_app/src/main/scala/org/hammerlab/cli/args/OutputArgs.scala @@ -0,0 +1,21 @@ +package org.hammerlab.cli.args + +import caseapp.{ ValueDescription, ExtraName ⇒ O, HelpMessage ⇒ M } +import org.hammerlab.io.SampleSize +import org.hammerlab.paths.Path + +case class OutputArgs( + @O("l") + @ValueDescription("num=1000000") + @M("When collecting samples of records/results for displaying to the user, limit to this many to avoid overloading the driver") + printLimit: SampleSize = SampleSize(1000000), + + @O("o") + @ValueDescription("path") + @M("Print output to this file, otherwise to stdout") + outputPath: Option[Path] = None, + + @O("f") + @M("Whether to overwrite the output file, if it already exists") + overwrite: Boolean = false +) diff --git a/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala b/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala new file mode 100644 index 0000000..e4db763 --- /dev/null +++ b/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala @@ -0,0 +1,41 @@ +package org.hammerlab.cli.app + +import caseapp.{ AppName, ExtraName ⇒ O } +import org.hammerlab.paths.Path +import org.hammerlab.test.Suite + +class IndexingAppTest + extends Suite { + + test("sample app") { + val in = tmpPath() + in.writeLines(1 to 10 map(_.toString)) + SumNumbers.main( + Array( + in.toString() + ) + ) + (in + ".sum").read should be("55\n") + } + +} + +@AppName("Add up numbers from an input file, write result to a sibling file with extension '.sum'") +case class Args(@O("o") out: Option[Path] = None) + extends OutPathArgs + +object SumNumbers + extends IndexingApp[Args](".sum") { + override protected def run(options: Args): Unit = { + + import org.hammerlab.io.Printer._ + import cats.implicits.catsStdShowForInt + + echo( + path + .lines + .map(_.toInt) + .sum + ) + } +} diff --git a/project/plugins.sbt b/project/plugins.sbt index c022d7b..33635f8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1 @@ -addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.1.0") +addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.1.3-SNAPSHOT") diff --git a/src/main/scala/org/hammerlab/commands/SparkCommand.scala b/src/main/scala/org/hammerlab/commands/SparkCommand.scala deleted file mode 100644 index 6de3cee..0000000 --- a/src/main/scala/org/hammerlab/commands/SparkCommand.scala +++ /dev/null @@ -1,63 +0,0 @@ -package org.hammerlab.commands - -import org.apache.spark.serializer.KryoRegistrator -import org.apache.spark.{ SparkConf, SparkContext } -import org.hammerlab.spark.{ Conf, SparkConfBase } - -import scala.collection.mutable - -abstract class SparkCommand[T <: Args: Manifest] - extends Command[T] - with SparkConfBase { - - override def run(args: T): Unit = { - val sc = createSparkContext() - try { - args.validate(sc) - run(args, sc) - } finally { - sc.stop() - } - } - - def run(args: T, sc: SparkContext): Unit - - private val defaultConfs = mutable.HashMap[String, String]() - def setDefaultConf(key: String, value: String): Unit = { - defaultConfs.update(key, value) - } - - def registrar: Class[_ <: KryoRegistrator] = null - - sparkConf( - "spark.master" → "local[*]", - "spark.serializer" → "org.apache.spark.serializer.KryoSerializer", - "spark.kryoserializer.buffer" → "4mb", - "spark.kryo.referenceTracking" → "true", - "spark.kryo.registrationRequired" → "true" - ) - - Option(registrar).foreach( - clz ⇒ - sparkConf( - "spark.kryo.registrator" → clz.getCanonicalName - ) - ) - - /** - * Return a spark context. - * - * Typically, most properties are set through config file / cmd-line. - * @return - */ - private def createSparkContext(): SparkContext = { - val conf = makeSparkConf - - conf.getOption("spark.app.name") match { - case Some(cmdLineName) => conf.setAppName(s"$cmdLineName: $name") - case _ => conf.setAppName(name) - } - - new SparkContext(conf) - } -} From f84e7301498bc57b4e89e8623330664897c1bfd1 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 19 Sep 2017 18:40:24 +0000 Subject: [PATCH 6/8] add spark default eventLog guard --- build.sbt | 13 +++++++------ .../scala/org/hammerlab/cli/app/SparkPathApp.scala | 11 ++++++++++- project/plugins.sbt | 2 +- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/build.sbt b/build.sbt index 03a4989..aad3982 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,4 @@ -lazy val cli = - rootProject( - args4j, - case_app - ) - val defaults = Seq( organization := "org.hammerlab.cli", deps ++= Seq( @@ -30,3 +24,10 @@ lazy val case_app = project.settings( paths % "1.2.0" ) ) + +lazy val cli_root = + rootProject( + "cli-root", + args4j, + case_app + ) diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala b/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala index f8a4d3d..68a873e 100644 --- a/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala +++ b/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala @@ -8,6 +8,7 @@ import org.apache.spark.serializer.KryoRegistrator import org.hammerlab.cli.args.OutputArgs import org.hammerlab.hadoop.Configuration import org.hammerlab.io.{ Printer, SampleSize } +import org.hammerlab.paths.Path import org.hammerlab.spark.{ SparkConfBase, confs } trait SparkPathAppArgs { @@ -31,7 +32,15 @@ trait SparkApp[Args] implicit def sc: SparkContext = { if (_sc == null) { info("Creating SparkContext") - _sc = new SparkContext(makeSparkConf) + val conf = makeSparkConf + if ( + conf.get("spark.eventLog.enabled", "true") == "true" && + conf.get("spark.eventLog.dir", "") == "" && + !Path("/tmp/spark-events").exists) { + conf.set("spark.eventLog.enabled", "false") + warn("Disabling event-logging because default destination /tmp/spark-events doesn't exist") + } + _sc = new SparkContext(conf) } _sc } diff --git a/project/plugins.sbt b/project/plugins.sbt index 33635f8..c67154a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1 @@ -addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.1.3-SNAPSHOT") +addSbtPlugin("org.hammerlab" % "sbt-parent" % "3.2.0") From 901927a84490900d44e1e6704c37d26d1200971d Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 19 Sep 2017 18:41:17 +0000 Subject: [PATCH 7/8] travis config --- .travis.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index f330307..28ad4ae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ jdk: scala: - 2.11.8 -script: sbt ++$TRAVIS_SCALA_VERSION clean compile +script: sbt ++$TRAVIS_SCALA_VERSION clean coverageTest cache: directories: @@ -14,6 +14,8 @@ cache: - $HOME/.sbt/boot/ - $HOME/.zinc +after_success: sbt ++$TRAVIS_SCALA_VERSION travis-report + before_cache: # Tricks to avoid unnecessary cache updates - find $HOME/.ivy2 -name "ivydata-*.properties" -delete From 050835e0dbd9eb21f248202f0889896932cad130 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 19 Sep 2017 18:58:15 +0000 Subject: [PATCH 8/8] add SparkPathApp test --- README.md | 14 ++++++- .../org/hammerlab/cli/app/SparkPathApp.scala | 2 +- case_app/src/test/resources/numbers | 10 +++++ .../hammerlab/cli/app/IndexingAppTest.scala | 9 +---- .../hammerlab/cli/app/SparkPathAppTest.scala | 37 +++++++++++++++++++ 5 files changed, 62 insertions(+), 10 deletions(-) create mode 100644 case_app/src/test/resources/numbers create mode 100644 case_app/src/test/scala/org/hammerlab/cli/app/SparkPathAppTest.scala diff --git a/README.md b/README.md index 72bd0cc..6cf5789 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,16 @@ [![Build Status](https://travis-ci.org/hammerlab/spark-commands.svg?branch=master)](https://travis-ci.org/hammerlab/spark-commands) [![Coverage Status](https://coveralls.io/repos/github/hammerlab/spark-commands/badge.svg)](https://coveralls.io/github/hammerlab/spark-commands) -[![Maven Central](https://img.shields.io/maven-central/v/org.hammerlab/spark-commands_2.11.svg?maxAge=600)](http://search.maven.org/#search%7Cga%7C1%7Cspark-commands) -Interfaces for creating CLI-runnable and testable Spark commands/apps. +Interfaces for creating CLI-runnable and testable commands/apps, with [Spark](http://spark.apache.org/)-based and non-Spark flavors. + +## args4j +[![Maven Central](https://img.shields.io/maven-central/v/org.hammerlab.cli/args4j_2.11.svg?maxAge=600)](http://search.maven.org/#search%7Cga%7C1%7Chammerlab%20args4j) + + +## case-app +[![Maven Central](https://img.shields.io/maven-central/v/org.hammerlab.cli/case-app_2.11.svg?maxAge=600)](http://search.maven.org/#search%7Cga%7C1%7Chammerlab%20case-app) + + + + diff --git a/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala b/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala index 68a873e..e20cbe0 100644 --- a/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala +++ b/case_app/src/main/scala/org/hammerlab/cli/app/SparkPathApp.scala @@ -60,7 +60,7 @@ trait SparkApp[Args] * [[SparkApp]] that takes an input path and prints some information to stdout or a path, with optional truncation of * such output. */ -abstract class SparkPathApp[Args <: SparkPathAppArgs : Parser : Messages](override val registrar: Class[_ <: KryoRegistrator]) +abstract class SparkPathApp[Args <: SparkPathAppArgs : Parser : Messages](override val registrar: Class[_ <: KryoRegistrator] = null) extends PathApp[Args] with SparkApp[Args] { diff --git a/case_app/src/test/resources/numbers b/case_app/src/test/resources/numbers new file mode 100644 index 0000000..f00c965 --- /dev/null +++ b/case_app/src/test/resources/numbers @@ -0,0 +1,10 @@ +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 diff --git a/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala b/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala index e4db763..a4e9a61 100644 --- a/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala +++ b/case_app/src/test/scala/org/hammerlab/cli/app/IndexingAppTest.scala @@ -6,10 +6,8 @@ import org.hammerlab.test.Suite class IndexingAppTest extends Suite { - - test("sample app") { - val in = tmpPath() - in.writeLines(1 to 10 map(_.toString)) + test("SumNumbers") { + val in = fileCopy(path("numbers"), tmpPath()) SumNumbers.main( Array( in.toString() @@ -17,7 +15,6 @@ class IndexingAppTest ) (in + ".sum").read should be("55\n") } - } @AppName("Add up numbers from an input file, write result to a sibling file with extension '.sum'") @@ -27,10 +24,8 @@ case class Args(@O("o") out: Option[Path] = None) object SumNumbers extends IndexingApp[Args](".sum") { override protected def run(options: Args): Unit = { - import org.hammerlab.io.Printer._ import cats.implicits.catsStdShowForInt - echo( path .lines diff --git a/case_app/src/test/scala/org/hammerlab/cli/app/SparkPathAppTest.scala b/case_app/src/test/scala/org/hammerlab/cli/app/SparkPathAppTest.scala new file mode 100644 index 0000000..d7f76c1 --- /dev/null +++ b/case_app/src/test/scala/org/hammerlab/cli/app/SparkPathAppTest.scala @@ -0,0 +1,37 @@ +package org.hammerlab.cli.app + +import caseapp.Recurse +import org.hammerlab.cli.args.OutputArgs +import org.hammerlab.spark.test.suite.MainSuite +import org.hammerlab.test.Suite + +class SparkPathAppTest + extends MainSuite { + test("SumNumbersSpark") { + val out = tmpPath() + SumNumbersSpark.main( + Array( + path("numbers").toString, + "-o", out.toString + ) + ) + out.read should be("55\n") + } +} + +case class SparkArgs(@Recurse output: OutputArgs) + extends SparkPathAppArgs + +object SumNumbersSpark + extends SparkPathApp[SparkArgs] { + override protected def run(options: SparkArgs): Unit = { + import org.hammerlab.io.Printer._ + import cats.implicits.catsStdShowForInt + echo( + sc + .textFile(path.toString) + .map(_.toInt) + .reduce(_ + _) + ) + } +}