From d0f64205a116853aa471dd1361a635167da15fcc Mon Sep 17 00:00:00 2001 From: pferrel Date: Sat, 27 Dec 2014 15:43:41 -0800 Subject: [PATCH 1/3] simplified driver and made required changes to all, note: left job assembly untouched --- .../apache/mahout/drivers/MahoutDriver.scala | 2 +- spark/pom.xml | 13 ++-- spark/src/main/assembly/job.xml | 17 ++++- .../mahout/drivers/ItemSimilarityDriver.scala | 12 +--- .../mahout/drivers/MahoutSparkDriver.scala | 20 +++--- .../mahout/drivers/RowSimilarityDriver.scala | 8 +-- .../apache/mahout/drivers/TestNBDriver.scala | 64 +++++++------------ .../apache/mahout/drivers/TrainNBDriver.scala | 18 ------ 8 files changed, 61 insertions(+), 93 deletions(-) diff --git a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala index 8c1f8cf92b..3d9d4e19cb 100644 --- a/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala +++ b/math-scala/src/main/scala/org/apache/mahout/drivers/MahoutDriver.scala @@ -25,7 +25,7 @@ abstract class MahoutDriver { implicit protected var mc: DistributedContext = _ - protected var parser: MahoutOptionParser = _ + implicit protected var parser: MahoutOptionParser = _ var _useExistingContext: Boolean = false // used in the test suite to reuse one context per suite diff --git a/spark/pom.xml b/spark/pom.xml index f61f988dc5..bcf9e30ced 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -157,8 +157,8 @@ - - + + org.apache.maven.plugins maven-assembly-plugin @@ -171,13 +171,14 @@ - src/main/assembly/job.xml + ../spark/src/main/assembly/job.xml + - - com.github.scopt - scopt_2.10 - 3.2.0 - - diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/job.xml index 0c41f3d3ae..2bdb3ce3d1 100644 --- a/spark/src/main/assembly/job.xml +++ b/spark/src/main/assembly/job.xml @@ -42,5 +42,20 @@ + + + ${basedir}/target/classes + / + + *.jar + + + + ${basedir}/target/classes + / + + driver.classes.default.props + + + - \ No newline at end of file diff --git a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala index 01a18c916c..36ba6efa33 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/ItemSimilarityDriver.scala @@ -117,15 +117,9 @@ object ItemSimilarityDriver extends MahoutSparkDriver { } } - override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], - appName: String = parser.opts("appName").asInstanceOf[String]): - Unit = { + override protected def start() : Unit = { - if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") - sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) - //else leave as set in Spark config - - super.start(masterUrl, appName) + super.start readSchema1 = new Schema("delim" -> parser.opts("inDelim").asInstanceOf[String], "filter" -> parser.opts("filter1").asInstanceOf[String], @@ -208,7 +202,7 @@ object ItemSimilarityDriver extends MahoutSparkDriver { } override def process: Unit = { - start() + start val indexedDatasets = readIndexedDatasets val idss = SimilarityAnalysis.cooccurrencesIDSs(indexedDatasets, parser.opts("randomSeed").asInstanceOf[Int], diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala index e6299fd3ab..ab40c3a919 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala @@ -34,7 +34,6 @@ import org.apache.mahout.sparkbindings._ * * override def main(args: Array[String]): Unit = { * - * * val parser = new MahoutOptionParser(programName = "shortname") { * head("somedriver", "Mahout 1.0-SNAPSHOT") * @@ -55,7 +54,7 @@ import org.apache.mahout.sparkbindings._ * } * * override def process: Unit = { - * start() + * start // override to change the default Kryo or SparkConf before the distributed context is created * // do the work here * stop * } @@ -70,15 +69,18 @@ abstract class MahoutSparkDriver extends MahoutDriver { /** Creates a Spark context to run the job inside. * Override to set the SparkConf values specific to the job, * these must be set before the context is created. - * @param masterUrl Spark master URL - * @param appName Name to display in Spark UI * */ - protected def start(masterUrl: String, appName: String) : Unit = { - sparkConf.set("spark.kryo.referenceTracking", "false") - .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option - + protected def start() : Unit = { if (!_useExistingContext) { - mc = mahoutSparkContext(masterUrl, appName, sparkConf = sparkConf) + sparkConf.set("spark.kryo.referenceTracking", "false") + .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option + + if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") + sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) + //else leave as set in Spark config + mc = mahoutSparkContext(masterUrl = parser.opts("master").asInstanceOf[String], + appName = parser.opts("appName").asInstanceOf[String], + sparkConf = sparkConf) } } diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala index 9b44b95039..8c1bce4985 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala @@ -106,11 +106,9 @@ object RowSimilarityDriver extends MahoutSparkDriver { } } - override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], - appName: String = parser.opts("appName").asInstanceOf[String]): - Unit = { + override protected def start() : Unit = { - super.start(masterUrl, appName) + super.start readWriteSchema = new Schema( "rowKeyDelim" -> parser.opts("rowKeyDelim").asInstanceOf[String], @@ -135,7 +133,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { } override def process: Unit = { - start() + start val indexedDataset = readIndexedDataset diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala index 7d0738c232..368ee892c7 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TestNBDriver.scala @@ -78,54 +78,36 @@ object TestNBDriver extends MahoutSparkDriver { } } - override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], - appName: String = parser.opts("appName").asInstanceOf[String]): - Unit = { - - // will be only specific to this job. - // Note: set a large spark.kryoserializer.buffer.mb if using DSL MapBlock else leave as default - - if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") - sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) - - // Note: set a large akka frame size for DSL NB (20) - //sparkConf.set("spark.akka.frameSize","20") // don't need this for Spark optimized NaiveBayes.. - //else leave as set in Spark config - - super.start(masterUrl, appName) - - } - - /** Read the test set from inputPath/part-x-00000 sequence file of form */ - private def readTestSet: DrmLike[_] = { - val inputPath = parser.opts("input").asInstanceOf[String] - val trainingSet= drm.drmDfsRead(inputPath) - trainingSet - } +/** Read the test set from inputPath/part-x-00000 sequence file of form */ +private def readTestSet: DrmLike[_] = { + val inputPath = parser.opts("input").asInstanceOf[String] + val trainingSet= drm.drmDfsRead(inputPath) + trainingSet +} - /** read the model from pathToModel using NBModel.DfsRead(...) */ - private def readModel: NBModel = { - val inputPath = parser.opts("pathToModel").asInstanceOf[String] - val model= NBModel.dfsRead(inputPath) - model - } +/** read the model from pathToModel using NBModel.DfsRead(...) */ +private def readModel: NBModel = { + val inputPath = parser.opts("pathToModel").asInstanceOf[String] + val model= NBModel.dfsRead(inputPath) + model +} - override def process: Unit = { - start() +override def process: Unit = { + start() - val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean] - val outputPath = parser.opts("output").asInstanceOf[String] + val testComplementary = parser.opts("testComplementary").asInstanceOf[Boolean] + val outputPath = parser.opts("output").asInstanceOf[String] - // todo: get the -ow option in to check for a model in the path and overwrite if flagged. + // todo: get the -ow option in to check for a model in the path and overwrite if flagged. - val testSet = readTestSet - val model = readModel - val analyzer= NaiveBayes.test(model, testSet, testComplementary) + val testSet = readTestSet + val model = readModel + val analyzer= NaiveBayes.test(model, testSet, testComplementary) - println(analyzer) + println(analyzer) - stop - } + stop +} } diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala index 35ff90befe..3d03c1d013 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala @@ -72,24 +72,6 @@ object TrainNBDriver extends MahoutSparkDriver { } } - override def start(masterUrl: String = parser.opts("master").asInstanceOf[String], - appName: String = parser.opts("appName").asInstanceOf[String]): - Unit = { - - // will be only specific to this job. - // Note: set a large spark.kryoserializer.buffer.mb if using DSL MapBlock else leave as default - - if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") - sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) - - // Note: set a large akka frame size for DSL NB (20) - // sparkConf.set("spark.akka.frameSize","20") // don't need this for Spark optimized NaiveBayes.. - // else leave as set in Spark config - - super.start(masterUrl, appName) - - } - /** Read the training set from inputPath/part-x-00000 sequence file of form */ private def readTrainingSet: DrmLike[_]= { val inputPath = parser.opts("input").asInstanceOf[String] From 5e32612b4ca9400287c43e4130478f04207fbcfa Mon Sep 17 00:00:00 2001 From: pferrel Date: Mon, 29 Dec 2014 08:49:10 -0800 Subject: [PATCH 2/3] creating a trimmed down all-deps dependencies.jar for spark drivers --- spark/pom.xml | 2 +- spark/src/main/assembly/{job.xml => dependencies.xml} | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) rename spark/src/main/assembly/{job.xml => dependencies.xml} (92%) diff --git a/spark/pom.xml b/spark/pom.xml index bcf9e30ced..8cc67c59d3 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -171,7 +171,7 @@ - ../spark/src/main/assembly/job.xml + src/main/assembly/dependencies.xml diff --git a/spark/src/main/assembly/job.xml b/spark/src/main/assembly/dependencies.xml similarity index 92% rename from spark/src/main/assembly/job.xml rename to spark/src/main/assembly/dependencies.xml index 2bdb3ce3d1..b9605c27bb 100644 --- a/spark/src/main/assembly/job.xml +++ b/spark/src/main/assembly/dependencies.xml @@ -20,7 +20,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - job + dependencies jar @@ -39,6 +39,9 @@ true org.apache.hadoop:hadoop-core + + org.apache.spark:spark-core_${scala.major} + org.scala-lang:scala-library From 4be69312af16d2016c9754443943c8b486ab59d9 Mon Sep 17 00:00:00 2001 From: pferrel Date: Mon, 29 Dec 2014 11:55:47 -0800 Subject: [PATCH 3/3] added all projects that cam be safely excluded from the dependencies.jar for spark drivers --- spark/src/main/assembly/dependencies.xml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/spark/src/main/assembly/dependencies.xml b/spark/src/main/assembly/dependencies.xml index b9605c27bb..0b03a8e1ee 100644 --- a/spark/src/main/assembly/dependencies.xml +++ b/spark/src/main/assembly/dependencies.xml @@ -38,10 +38,17 @@ / true - org.apache.hadoop:hadoop-core + + org.apache.hadoop:hadoop-core org.apache.spark:spark-core_${scala.major} org.scala-lang:scala-library + jackson-core-asl + jackson-mapper-asl + xstream + lucene-core + lucene-analyzers-common