From 35be8ffbeee916bdc893d6670a23d7d3349edf37 Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Mon, 29 Jun 2015 17:53:39 -0400 Subject: [PATCH 1/9] Map RDD[(K,Vector)] to RDD[(KWritable,VectorWritable)] explicitly --- pom.xml | 4 ++-- .../drm/CheckpointedDrmSpark.scala | 19 +++++++++++-------- .../mahout/sparkbindings/drm/package.scala | 2 +- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index 25cd27d533..2fa6548670 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,7 @@ 1.7.10 2.10 2.10.4 - 1.2.2 + 1.3.1 0.1.25 @@ -774,7 +774,7 @@ distribution math-scala spark - spark-shell + h2o diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 41efc2726c..033c49788d 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -29,7 +29,6 @@ import scala.util.Random import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} import org.apache.mahout.math.drm._ import org.apache.mahout.sparkbindings._ -import org.apache.spark.SparkContext._ /** ==Spark-specific optimizer-checkpointed DRM.== * @@ -158,14 +157,18 @@ class CheckpointedDrmSpark[K: ClassTag]( def dfsWrite(path: String) = { val ktag = implicitly[ClassTag[K]] - implicit val k2wFunc: (K) => Writable = - if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int]) - else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String]) - else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long]) - else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable] - else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag)) + // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save. + if (ktag.runtimeClass == classOf[Int]) { + rddInput.toDrmRdd() + .map( x =>(new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path) + } else if (ktag.runtimeClass == classOf[String]){ + rddInput.toDrmRdd() + .map( x =>(new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path) + } else if (ktag.runtimeClass == classOf[Long]) { + rddInput.toDrmRdd() + .map( x =>(new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path) + } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag)) - rddInput.toDrmRdd().saveAsSequenceFile(path) } protected def computeNRow = { diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala index 0de5ff8063..e18d6da766 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} import org.apache.log4j.Logger import java.lang.Math -import org.apache.spark.rdd.{FilteredRDD, RDD} +import org.apache.spark.rdd.RDD import scala.reflect.ClassTag import org.apache.mahout.math.scalabindings._ import RLikeOps._ From 15cb3618b0e1495ad1fd962dffca12d1a904fcc9 Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Wed, 1 Jul 2015 22:30:30 -0400 Subject: [PATCH 2/9] inital commit of shell fix ugly.. starts shell but cant reach implicit loopToMain() funtction need to get ClassServerUri --- pom.xml | 2 +- .../shell/MahoutSparkILoop.scala | 204 ++++++++--------- .../mahout/sparkbindings/shell/Main.scala | 210 +++++++++++++++++- 3 files changed, 308 insertions(+), 108 deletions(-) diff --git a/pom.xml b/pom.xml index 2fa6548670..68e528636d 100644 --- a/pom.xml +++ b/pom.xml @@ -774,7 +774,7 @@ distribution math-scala spark - + spark-shell h2o diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index 4d0615afec..22b8c47ff3 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -1,102 +1,102 @@ -package org.apache.mahout.sparkbindings.shell - -import org.apache.spark.repl.SparkILoop -import org.apache.spark.{SparkConf, SparkContext} -import scala.tools.nsc.Properties -import scala.Some -import org.apache.mahout.sparkbindings._ - -class MahoutSparkILoop extends SparkILoop { - - log.info("Mahout spark shell waking up.") - - private val postInitScript = - "import org.apache.mahout.math._" :: - "import scalabindings._" :: - "import RLikeOps._" :: - "import drm._" :: - "import RLikeDrmOps._" :: - "import decompositions._" :: - "import org.apache.mahout.sparkbindings._" :: - "import collection.JavaConversions._" :: - Nil - - override protected def postInitialization() { - super.postInitialization() - val intp: MahoutSparkILoop = this - intp.beQuietDuring { - postInitScript.foreach(command(_)) - } - } - - override def createSparkContext(): SparkContext = { - val execUri = System.getenv("SPARK_EXECUTOR_URI") - val master = this.master match { - case Some(m) => m - case None => { - val prop = System.getenv("MASTER") - if (prop != null) prop else "local" - } - } - val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) - val conf = new SparkConf() - .set("spark.repl.class.uri", intp.classServer.uri) - - if (execUri != null) { - conf.set("spark.executor.uri", execUri) - } - - conf.set("spark.executor.memory", "1g") - - sparkContext = mahoutSparkContext( - masterUrl = master, - appName = "Mahout Spark Shell", - customJars = jars, - sparkConf = conf - ) - - echo("Created spark context..") - sparkContext - } - - override def initializeSpark() { - intp.beQuietDuring { - command(""" - - @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext = - new org.apache.mahout.sparkbindings.SparkDistributedContext( - org.apache.spark.repl.Main.interp.createSparkContext()) - - """) - command("import org.apache.spark.SparkContext._") - echo("Mahout distributed context is available as \"implicit val sdc\".") - } - } - - override def sparkCleanUp() { - echo("Stopping Spark context.") - intp.beQuietDuring { - command("sdc.stop()") - } - } - - override def prompt: String = "mahout> " - - override def printWelcome(): Unit = { - echo( - """ - _ _ - _ __ ___ __ _| |__ ___ _ _| |_ - | '_ ` _ \ / _` | '_ \ / _ \| | | | __| - | | | | | | (_| | | | | (_) | |_| | |_ - |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.10.0 - - """) - import Properties._ - val welcomeMsg = "Using Scala %s (%s, Java %s)".format( - versionString, javaVmName, javaVersion) - echo(welcomeMsg) - echo("Type in expressions to have them evaluated.") - echo("Type :help for more information.") - } -} +//package org.apache.mahout.sparkbindings.shell +// +//import org.apache.spark.repl.SparkILoop +//import org.apache.spark.{SparkConf, SparkContext} +//import scala.tools.nsc.Properties +//import scala.Some +//import org.apache.mahout.sparkbindings._ +// +//class MahoutSparkILoop extends SparkILoop { +// +// log.info("Mahout spark shell waking up.") +// +// private val postInitScript = +// "import org.apache.mahout.math._" :: +// "import scalabindings._" :: +// "import RLikeOps._" :: +// "import drm._" :: +// "import RLikeDrmOps._" :: +// "import decompositions._" :: +// "import org.apache.mahout.sparkbindings._" :: +// "import collection.JavaConversions._" :: +// Nil +// +// override protected def postInitialization() { +// super.postInitialization() +// val intp: MahoutSparkILoop = this +// intp.beQuietDuring { +// postInitScript.foreach(command(_)) +// } +// } +// +// override def createSparkContext(): SparkContext = { +// val execUri = System.getenv("SPARK_EXECUTOR_URI") +// val master = this.master match { +// case Some(m) => m +// case None => { +// val prop = System.getenv("MASTER") +// if (prop != null) prop else "local" +// } +// } +// val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) +// val conf = new SparkConf() +// .set("spark.repl.class.uri", intp.classServer.uri) +// +// if (execUri != null) { +// conf.set("spark.executor.uri", execUri) +// } +// +// conf.set("spark.executor.memory", "1g") +// +// sparkContext = mahoutSparkContext( +// masterUrl = master, +// appName = "Mahout Spark Shell", +// customJars = jars, +// sparkConf = conf +// ) +// +// echo("Created spark context..") +// sparkContext +// } +// +// override def initializeSpark() { +// intp.beQuietDuring { +// command(""" +// +// @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext = +// new org.apache.mahout.sparkbindings.SparkDistributedContext( +// org.apache.spark.repl.Main.interp.createSparkContext()) +// +// """) +// command("import org.apache.spark.SparkContext._") +// echo("Mahout distributed context is available as \"implicit val sdc\".") +// } +// } +// +// override def sparkCleanUp() { +// echo("Stopping Spark context.") +// intp.beQuietDuring { +// command("sdc.stop()") +// } +// } +// +// override def prompt: String = "mahout> " +// +// override def printWelcome(): Unit = { +// echo( +// """ +// _ _ +// _ __ ___ __ _| |__ ___ _ _| |_ +// | '_ ` _ \ / _` | '_ \ / _ \| | | | __| +// | | | | | | (_| | | | | (_) | |_| | |_ +// |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.10.0 +// +// """) +// import Properties._ +// val welcomeMsg = "Using Scala %s (%s, Java %s)".format( +// versionString, javaVmName, javaVersion) +// echo(welcomeMsg) +// echo("Type in expressions to have them evaluated.") +// echo("Type :help for more information.") +// } +//} diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala index b59c9a77d4..da5d89df55 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala @@ -19,24 +19,224 @@ package org.apache.mahout.sparkbindings.shell import org.apache.mahout.sparkbindings._ import org.apache.log4j.PropertyConfigurator +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.repl.{SparkIMain, SparkILoop} +import scala.tools.nsc.Properties + +class MahoutLoop extends SparkILoop{ + private var _interp: SparkILoop = _ + + private val postInitImports = + "org.apache.mahout.math._" :: + "scalabindings._" :: + "RLikeOps._" :: + "drm._" :: + "RLikeDrmOps._" :: + "decompositions._" :: + "org.apache.mahout.sparkbindings._" :: + "collection.JavaConversions._" :: + Nil -object Main { - private var _interp: MahoutSparkILoop = _ - def main(args:Array[String]) { // Hack: for some very unclear reason, log4j is not picking up log4j.properties in Spark conf/ even // though the latter is added to the classpath. So we force it to pick it. PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") System.setProperty("scala.usejavacp", "true") - _interp = new MahoutSparkILoop() + _interp = this // It looks like we need to initialize this too, since some Spark shell initilaization code // expects it org.apache.spark.repl.Main.interp = _interp - _interp.process(args) + + // _interp.reset() + + _interp.setPrompt("mahout> ") + + + + // createSparkContext() + + // implicit + + // add imports + // postInitImports.foreach(_interp.interpret(_)) + // _interp.process(args) + +def getClassServerUrl(iMain: SparkIMain): String = { + iMain.classServerUri +} + + override def createSparkContext(): SparkContext = { + val execUri = System.getenv("SPARK_EXECUTOR_URI") + val master = _interp.master match { + case Some(m) => m + case None => { + val prop = System.getenv("MASTER") + if (prop != null) prop else "local" + } + } + + val iMain: SparkIMain = _interp.asInstanceOf[SparkIMain] + print("\n Imain: "+iMain.getClass.getName) + System.exit(0) + + val classServerUri_ : String = iMain.classServerUri.toString + System.out.println("!!!!!!!"+classServerUri_) + + val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) + val conf = new SparkConf().set("spark.repl.class.uri", iMain.classServerUri) + + if (execUri != null) { + conf.set("spark.executor.uri", execUri) + } + + conf.set("spark.executor.memory", "1g") + + _interp.sparkContext= mahoutSparkContext( + masterUrl = master, + appName = "Mahout Spark Shell", + customJars = jars, + sparkConf = conf + ) + + // _interp.echo("Created spark context..") + sparkContext } + override def initializeSpark() { + _interp.beQuietDuring { + _interp.interpret(""" + + @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext = + new org.apache.mahout.sparkbindings.SparkDistributedContext( + org.apache.spark.repl.Main.interp.createSparkContext()) + + """) + _interp.interpret("import org.apache.spark.SparkContext._") + //echo("Mahout distributed context is available as \"implicit val sdc\".") + } + } + + def sparkCleanUp() { + //echo("Stopping Spark context.") + _interp.beQuietDuring { + _interp.interpret("sdc.stop()") + } + } + +} + +object Main { + private var _interp: SparkILoop = _ + + def main(args: Array[String]) { + PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") + + System.setProperty("scala.usejavacp", "true") + _interp = new MahoutLoop() + // It looks like we need to initialize this too, since some Spark shell initilaization code + // expects it + org.apache.spark.repl.Main.interp = _interp + _interp.process(args) + + + } } + + + + +//log.info("Mahout spark shell waking up.") + +//private val postInitScript = +//"import org.apache.mahout.math._" :: +//"import scalabindings._" :: +//"import RLikeOps._" :: +//"import drm._" :: +//"import RLikeDrmOps._" :: +//"import decompositions._" :: +//"import org.apache.mahout.sparkbindings._" :: +//"import collection.JavaConversions._" :: +//Nil +// +//override protected def postInitialization() { +//super.postInitialization() +//val intp: MahoutSparkILoop = this +//intp.beQuietDuring { +//postInitScript.foreach(command(_)) +//} +//} +// +//override def createSparkContext(): SparkContext = { +//val execUri = System.getenv("SPARK_EXECUTOR_URI") +//val master = this.master match { +//case Some(m) => m +//case None => { +//val prop = System.getenv("MASTER") +//if (prop != null) prop else "local" +//} +//} +//val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) +//val conf = new SparkConf() +//.set("spark.repl.class.uri", intp.classServer.uri) +// +//if (execUri != null) { +//conf.set("spark.executor.uri", execUri) +//} +// +//conf.set("spark.executor.memory", "1g") +// +//sparkContext = mahoutSparkContext( +//masterUrl = master, +//appName = "Mahout Spark Shell", +//customJars = jars, +//sparkConf = conf +//) +// +//echo("Created spark context..") +//sparkContext +//} +// +//override def initializeSpark() { +//intp.beQuietDuring { +//command(""" +// +// @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext = +// new org.apache.mahout.sparkbindings.SparkDistributedContext( +// org.apache.spark.repl.Main.interp.createSparkContext()) +// +// """) +//command("import org.apache.spark.SparkContext._") +//echo("Mahout distributed context is available as \"implicit val sdc\".") +//} +//} +// +//override def sparkCleanUp() { +//echo("Stopping Spark context.") +//intp.beQuietDuring { +//command("sdc.stop()") +//} +//} +// +//override def prompt: String = "mahout> " +// +//override def printWelcome(): Unit = { +//echo( +//""" +// _ _ +// _ __ ___ __ _| |__ ___ _ _| |_ +// | '_ ` _ \ / _` | '_ \ / _ \| | | | __| +// | | | | | | (_| | | | | (_) | |_| | |_ +// |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.10.0 +// +//""") +//import Properties._ +//val welcomeMsg = "Using Scala %s (%s, Java %s)".format( +//versionString, javaVmName, javaVersion) +//echo(welcomeMsg) +//echo("Type in expressions to have them evaluated.") +//echo("Type :help for more information.") +//} From df36d249c08b0d8501d9f080d4e46b23e1bd2dc3 Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Thu, 2 Jul 2015 00:15:56 -0400 Subject: [PATCH 3/9] working now. still getting a sc.stop() error when closing.. added in a echoToShell method. compatible with 1.3+ only --- .../mahout/sparkbindings/shell/Main.scala | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala index da5d89df55..4bb70b3a2a 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala @@ -17,25 +17,25 @@ package org.apache.mahout.sparkbindings.shell -import org.apache.mahout.sparkbindings._ import org.apache.log4j.PropertyConfigurator import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.repl.{SparkIMain, SparkILoop} - +import org.apache.spark.repl.SparkILoop import scala.tools.nsc.Properties +import org.apache.mahout.sparkbindings._ + class MahoutLoop extends SparkILoop{ private var _interp: SparkILoop = _ private val postInitImports = - "org.apache.mahout.math._" :: - "scalabindings._" :: - "RLikeOps._" :: - "drm._" :: - "RLikeDrmOps._" :: - "decompositions._" :: - "org.apache.mahout.sparkbindings._" :: - "collection.JavaConversions._" :: + "import org.apache.mahout.math._" :: + "import scalabindings._" :: + "import RLikeOps._" :: + "import drm._" :: + "import RLikeDrmOps._" :: + "import decompositions._" :: + "import org.apache.mahout.sparkbindings._" :: + "import collection.JavaConversions._" :: Nil @@ -51,24 +51,10 @@ class MahoutLoop extends SparkILoop{ // expects it org.apache.spark.repl.Main.interp = _interp - // _interp.reset() _interp.setPrompt("mahout> ") - - // createSparkContext() - - // implicit - - // add imports - // postInitImports.foreach(_interp.interpret(_)) - // _interp.process(args) - -def getClassServerUrl(iMain: SparkIMain): String = { - iMain.classServerUri -} - override def createSparkContext(): SparkContext = { val execUri = System.getenv("SPARK_EXECUTOR_URI") val master = _interp.master match { @@ -79,15 +65,15 @@ def getClassServerUrl(iMain: SparkIMain): String = { } } - val iMain: SparkIMain = _interp.asInstanceOf[SparkIMain] - print("\n Imain: "+iMain.getClass.getName) - System.exit(0) - - val classServerUri_ : String = iMain.classServerUri.toString - System.out.println("!!!!!!!"+classServerUri_) +// val iMain: SparkIMain = SparkILoop.loopToInterpreter(_interp) +// print("\n Imain: "+iMain.getClass.getName) +// //System.exit(0) +// +// val classServerUri_ : String = iMain.classServerUri.toString +// System.out.println("!!!!!!!"+classServerUri_) val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) - val conf = new SparkConf().set("spark.repl.class.uri", iMain.classServerUri) + val conf = new SparkConf().set("spark.repl.class.uri", _interp.classServerUri) if (execUri != null) { conf.set("spark.executor.uri", execUri) @@ -102,7 +88,7 @@ def getClassServerUrl(iMain: SparkIMain): String = { sparkConf = conf ) - // _interp.echo("Created spark context..") + echoToShell("Created spark context..") sparkContext } @@ -116,17 +102,46 @@ def getClassServerUrl(iMain: SparkIMain): String = { """) _interp.interpret("import org.apache.spark.SparkContext._") - //echo("Mahout distributed context is available as \"implicit val sdc\".") + echoToShell("Mahout distributed context is available as \"implicit val sdc\".") } } def sparkCleanUp() { - //echo("Stopping Spark context.") + echoToShell("Stopping Spark context.") _interp.beQuietDuring { _interp.interpret("sdc.stop()") } } + override protected def postInitialization() { + super.postInitialization() + //val intp: MahoutSparkILoop = this + _interp.beQuietDuring { + postInitImports.foreach(_interp.interpret(_)) + } + } + + def echoToShell(str: String): Unit = { + _interp.out.println(str) + } + + override def printWelcome(): Unit = { + echoToShell( + """ + _ _ + _ __ ___ __ _| |__ ___ _ _| |_ + | '_ ` _ \ / _` | '_ \ / _ \| | | | __| + | | | | | | (_| | | | | (_) | |_| | |_ + |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.11.0 + """) + import Properties._ + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, javaVmName, javaVersion) + echoToShell(welcomeMsg) + echoToShell("Type in expressions to have them evaluated.") + echoToShell("Type :help for more information.") + } + } object Main { @@ -142,7 +157,6 @@ object Main { org.apache.spark.repl.Main.interp = _interp _interp.process(args) - } } From 94d766cdf60fe305fa435928b70c3cda9befd668 Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Thu, 2 Jul 2015 01:16:16 -0400 Subject: [PATCH 4/9] Cleanup Conflicts: spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala --- .../shell/MahoutSparkILoop.scala | 232 ++++++++++-------- .../mahout/sparkbindings/shell/Main.scala | 220 +---------------- 2 files changed, 132 insertions(+), 320 deletions(-) diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index 22b8c47ff3..189e800546 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -1,102 +1,130 @@ -//package org.apache.mahout.sparkbindings.shell -// -//import org.apache.spark.repl.SparkILoop -//import org.apache.spark.{SparkConf, SparkContext} -//import scala.tools.nsc.Properties -//import scala.Some -//import org.apache.mahout.sparkbindings._ -// -//class MahoutSparkILoop extends SparkILoop { -// -// log.info("Mahout spark shell waking up.") -// -// private val postInitScript = -// "import org.apache.mahout.math._" :: -// "import scalabindings._" :: -// "import RLikeOps._" :: -// "import drm._" :: -// "import RLikeDrmOps._" :: -// "import decompositions._" :: -// "import org.apache.mahout.sparkbindings._" :: -// "import collection.JavaConversions._" :: -// Nil -// -// override protected def postInitialization() { -// super.postInitialization() -// val intp: MahoutSparkILoop = this -// intp.beQuietDuring { -// postInitScript.foreach(command(_)) -// } -// } -// -// override def createSparkContext(): SparkContext = { -// val execUri = System.getenv("SPARK_EXECUTOR_URI") -// val master = this.master match { -// case Some(m) => m -// case None => { -// val prop = System.getenv("MASTER") -// if (prop != null) prop else "local" -// } -// } -// val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) -// val conf = new SparkConf() -// .set("spark.repl.class.uri", intp.classServer.uri) -// -// if (execUri != null) { -// conf.set("spark.executor.uri", execUri) -// } -// -// conf.set("spark.executor.memory", "1g") -// -// sparkContext = mahoutSparkContext( -// masterUrl = master, -// appName = "Mahout Spark Shell", -// customJars = jars, -// sparkConf = conf -// ) -// -// echo("Created spark context..") -// sparkContext -// } -// -// override def initializeSpark() { -// intp.beQuietDuring { -// command(""" -// -// @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext = -// new org.apache.mahout.sparkbindings.SparkDistributedContext( -// org.apache.spark.repl.Main.interp.createSparkContext()) -// -// """) -// command("import org.apache.spark.SparkContext._") -// echo("Mahout distributed context is available as \"implicit val sdc\".") -// } -// } -// -// override def sparkCleanUp() { -// echo("Stopping Spark context.") -// intp.beQuietDuring { -// command("sdc.stop()") -// } -// } -// -// override def prompt: String = "mahout> " -// -// override def printWelcome(): Unit = { -// echo( -// """ -// _ _ -// _ __ ___ __ _| |__ ___ _ _| |_ -// | '_ ` _ \ / _` | '_ \ / _ \| | | | __| -// | | | | | | (_| | | | | (_) | |_| | |_ -// |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.10.0 -// -// """) -// import Properties._ -// val welcomeMsg = "Using Scala %s (%s, Java %s)".format( -// versionString, javaVmName, javaVersion) -// echo(welcomeMsg) -// echo("Type in expressions to have them evaluated.") -// echo("Type :help for more information.") -// } -//} + +package org.apache.mahout.sparkbindings.shell + +import org.apache.log4j.PropertyConfigurator +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.repl.SparkILoop +import scala.tools.nsc.Properties +import org.apache.mahout.sparkbindings._ + + +class MahoutSparkILoop extends SparkILoop { + private var _interp: SparkILoop = _ + + private val postInitImports = + "import org.apache.mahout.math._" :: + "import scalabindings._" :: + "import RLikeOps._" :: + "import drm._" :: + "import RLikeDrmOps._" :: + "import decompositions._" :: + "import org.apache.mahout.sparkbindings._" :: + "import collection.JavaConversions._" :: + Nil + + // Hack: for some very unclear reason, log4j is not picking up log4j.properties in Spark conf/ even + // though the latter is added to the classpath. So we force it to pick it. + PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") + + System.setProperty("scala.usejavacp", "true") + _interp = this + // It looks like we need to initialize this too, since some Spark shell initilaization code + // expects it + org.apache.spark.repl.Main.interp = _interp + + + _interp.setPrompt("mahout> ") + + + override def createSparkContext(): SparkContext = { + val execUri = System.getenv("SPARK_EXECUTOR_URI") + val master = _interp.master match { + case Some(m) => m + case None => { + val prop = System.getenv("MASTER") + if (prop != null) prop else "local" + } + } + + // val classServverURI = if (sc.version().startsWith("1.1")) { + // _interp.class + // } else if (sc.version().startsWith("1.2")) { + // intp.interpret("import sqlContext._"); + // } else if (sc.version().startsWith("1.3")) { + + val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) + val conf = new SparkConf().set("spark.repl.class.uri", _interp.classServerUri) + + if (execUri != null) { + conf.set("spark.executor.uri", execUri) + } + + conf.set("spark.executor.memory", "1g") + + _interp.sparkContext= mahoutSparkContext( + masterUrl = master, + appName = "Mahout Spark Shell", + customJars = jars, + sparkConf = conf + ) + + echoToShell("Created spark context..") + sparkContext + } + + // need to change our SparkDistributedContext name to 'sc' since we cannot override the + // private sparkCleanUp() method. + override def initializeSpark() { + _interp.beQuietDuring { + _interp.interpret(""" + + @transient implicit val sc: org.apache.mahout.math.drm.DistributedContext = + new org.apache.mahout.sparkbindings.SparkDistributedContext( + org.apache.spark.repl.Main.interp.createSparkContext()) + + """) + _interp.interpret("import org.apache.spark.SparkContext._") + echoToShell("Mahout distributed context is available as \"implicit val sc\".") + } + } + + // not part of the Spark REPL Developer API + // def sparkCleanUp() { + // echoToShell("Stopping Spark context.") + // _interp.beQuietDuring { + // _interp.interpret("sdc.stop()") + // } + // } + + override protected def postInitialization() { + super.postInitialization() + //val intp: MahoutSparkILoop = this + _interp.beQuietDuring { + postInitImports.foreach(_interp.interpret(_)) + } + } + + // sparkILoop.echo(...) is private + def echoToShell(str: String): Unit = { + _interp.out.println(str) + } + + override def printWelcome(): Unit = { + echoToShell( + """ + _ _ + _ __ ___ __ _| |__ ___ _ _| |_ + | '_ ` _ \ / _` | '_ \ / _ \| | | | __| + | | | | | | (_| | | | | (_) | |_| | |_ + |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.11.0 + """) + import Properties._ + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, javaVmName, javaVersion) + echoToShell(welcomeMsg) + echoToShell("Type in expressions to have them evaluated.") + echoToShell("Type :help for more information.") + } + +} + diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala index 4bb70b3a2a..32bba32617 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala @@ -18,132 +18,10 @@ package org.apache.mahout.sparkbindings.shell import org.apache.log4j.PropertyConfigurator -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.repl.SparkILoop -import scala.tools.nsc.Properties import org.apache.mahout.sparkbindings._ -class MahoutLoop extends SparkILoop{ - private var _interp: SparkILoop = _ - - private val postInitImports = - "import org.apache.mahout.math._" :: - "import scalabindings._" :: - "import RLikeOps._" :: - "import drm._" :: - "import RLikeDrmOps._" :: - "import decompositions._" :: - "import org.apache.mahout.sparkbindings._" :: - "import collection.JavaConversions._" :: - Nil - - - - - // Hack: for some very unclear reason, log4j is not picking up log4j.properties in Spark conf/ even - // though the latter is added to the classpath. So we force it to pick it. - PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") - - System.setProperty("scala.usejavacp", "true") - _interp = this - // It looks like we need to initialize this too, since some Spark shell initilaization code - // expects it - org.apache.spark.repl.Main.interp = _interp - - - _interp.setPrompt("mahout> ") - - - override def createSparkContext(): SparkContext = { - val execUri = System.getenv("SPARK_EXECUTOR_URI") - val master = _interp.master match { - case Some(m) => m - case None => { - val prop = System.getenv("MASTER") - if (prop != null) prop else "local" - } - } - -// val iMain: SparkIMain = SparkILoop.loopToInterpreter(_interp) -// print("\n Imain: "+iMain.getClass.getName) -// //System.exit(0) -// -// val classServerUri_ : String = iMain.classServerUri.toString -// System.out.println("!!!!!!!"+classServerUri_) - - val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) - val conf = new SparkConf().set("spark.repl.class.uri", _interp.classServerUri) - - if (execUri != null) { - conf.set("spark.executor.uri", execUri) - } - - conf.set("spark.executor.memory", "1g") - - _interp.sparkContext= mahoutSparkContext( - masterUrl = master, - appName = "Mahout Spark Shell", - customJars = jars, - sparkConf = conf - ) - - echoToShell("Created spark context..") - sparkContext - } - - override def initializeSpark() { - _interp.beQuietDuring { - _interp.interpret(""" - - @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext = - new org.apache.mahout.sparkbindings.SparkDistributedContext( - org.apache.spark.repl.Main.interp.createSparkContext()) - - """) - _interp.interpret("import org.apache.spark.SparkContext._") - echoToShell("Mahout distributed context is available as \"implicit val sdc\".") - } - } - - def sparkCleanUp() { - echoToShell("Stopping Spark context.") - _interp.beQuietDuring { - _interp.interpret("sdc.stop()") - } - } - - override protected def postInitialization() { - super.postInitialization() - //val intp: MahoutSparkILoop = this - _interp.beQuietDuring { - postInitImports.foreach(_interp.interpret(_)) - } - } - - def echoToShell(str: String): Unit = { - _interp.out.println(str) - } - - override def printWelcome(): Unit = { - echoToShell( - """ - _ _ - _ __ ___ __ _| |__ ___ _ _| |_ - | '_ ` _ \ / _` | '_ \ / _ \| | | | __| - | | | | | | (_| | | | | (_) | |_| | |_ - |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.11.0 - """) - import Properties._ - val welcomeMsg = "Using Scala %s (%s, Java %s)".format( - versionString, javaVmName, javaVersion) - echoToShell(welcomeMsg) - echoToShell("Type in expressions to have them evaluated.") - echoToShell("Type :help for more information.") - } - -} - object Main { private var _interp: SparkILoop = _ @@ -151,7 +29,8 @@ object Main { PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") System.setProperty("scala.usejavacp", "true") - _interp = new MahoutLoop() + _interp = new MahoutSparkILoop() + // It looks like we need to initialize this too, since some Spark shell initilaization code // expects it org.apache.spark.repl.Main.interp = _interp @@ -159,98 +38,3 @@ object Main { } } - - - - -//log.info("Mahout spark shell waking up.") - -//private val postInitScript = -//"import org.apache.mahout.math._" :: -//"import scalabindings._" :: -//"import RLikeOps._" :: -//"import drm._" :: -//"import RLikeDrmOps._" :: -//"import decompositions._" :: -//"import org.apache.mahout.sparkbindings._" :: -//"import collection.JavaConversions._" :: -//Nil -// -//override protected def postInitialization() { -//super.postInitialization() -//val intp: MahoutSparkILoop = this -//intp.beQuietDuring { -//postInitScript.foreach(command(_)) -//} -//} -// -//override def createSparkContext(): SparkContext = { -//val execUri = System.getenv("SPARK_EXECUTOR_URI") -//val master = this.master match { -//case Some(m) => m -//case None => { -//val prop = System.getenv("MASTER") -//if (prop != null) prop else "local" -//} -//} -//val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) -//val conf = new SparkConf() -//.set("spark.repl.class.uri", intp.classServer.uri) -// -//if (execUri != null) { -//conf.set("spark.executor.uri", execUri) -//} -// -//conf.set("spark.executor.memory", "1g") -// -//sparkContext = mahoutSparkContext( -//masterUrl = master, -//appName = "Mahout Spark Shell", -//customJars = jars, -//sparkConf = conf -//) -// -//echo("Created spark context..") -//sparkContext -//} -// -//override def initializeSpark() { -//intp.beQuietDuring { -//command(""" -// -// @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext = -// new org.apache.mahout.sparkbindings.SparkDistributedContext( -// org.apache.spark.repl.Main.interp.createSparkContext()) -// -// """) -//command("import org.apache.spark.SparkContext._") -//echo("Mahout distributed context is available as \"implicit val sdc\".") -//} -//} -// -//override def sparkCleanUp() { -//echo("Stopping Spark context.") -//intp.beQuietDuring { -//command("sdc.stop()") -//} -//} -// -//override def prompt: String = "mahout> " -// -//override def printWelcome(): Unit = { -//echo( -//""" -// _ _ -// _ __ ___ __ _| |__ ___ _ _| |_ -// | '_ ` _ \ / _` | '_ \ / _ \| | | | __| -// | | | | | | (_| | | | | (_) | |_| | |_ -// |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.10.0 -// -//""") -//import Properties._ -//val welcomeMsg = "Using Scala %s (%s, Java %s)".format( -//versionString, javaVmName, javaVersion) -//echo(welcomeMsg) -//echo("Type in expressions to have them evaluated.") -//echo("Type :help for more information.") -//} From f80cf79e0798977138c6457cde2e420f8add9d3e Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Fri, 3 Jul 2015 13:56:18 -0400 Subject: [PATCH 5/9] More Cleanup, Comments, License --- .../shell/MahoutSparkILoop.scala | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index 189e800546..eee37780cc 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.mahout.sparkbindings.shell @@ -9,6 +25,7 @@ import org.apache.mahout.sparkbindings._ class MahoutSparkILoop extends SparkILoop { + private var _interp: SparkILoop = _ private val postInitImports = @@ -27,14 +44,19 @@ class MahoutSparkILoop extends SparkILoop { PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") System.setProperty("scala.usejavacp", "true") + _interp = this + // It looks like we need to initialize this too, since some Spark shell initilaization code // expects it org.apache.spark.repl.Main.interp = _interp - _interp.setPrompt("mahout> ") + // sparkILoop.echo(...) is private so we create our own here. + def echoToShell(str: String): Unit = { + _interp.out.println(str) + } override def createSparkContext(): SparkContext = { val execUri = System.getenv("SPARK_EXECUTOR_URI") @@ -46,12 +68,6 @@ class MahoutSparkILoop extends SparkILoop { } } - // val classServverURI = if (sc.version().startsWith("1.1")) { - // _interp.class - // } else if (sc.version().startsWith("1.2")) { - // intp.interpret("import sqlContext._"); - // } else if (sc.version().startsWith("1.3")) { - val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) val conf = new SparkConf().set("spark.repl.class.uri", _interp.classServerUri) @@ -61,7 +77,7 @@ class MahoutSparkILoop extends SparkILoop { conf.set("spark.executor.memory", "1g") - _interp.sparkContext= mahoutSparkContext( + _interp.sparkContext = mahoutSparkContext( masterUrl = master, appName = "Mahout Spark Shell", customJars = jars, @@ -74,6 +90,8 @@ class MahoutSparkILoop extends SparkILoop { // need to change our SparkDistributedContext name to 'sc' since we cannot override the // private sparkCleanUp() method. + // this is technically not part of Sparks explicitly defined Developer API though + // nothing in the SparkILoopInit.scala file is marked as such. override def initializeSpark() { _interp.beQuietDuring { _interp.interpret(""" @@ -88,27 +106,17 @@ class MahoutSparkILoop extends SparkILoop { } } - // not part of the Spark REPL Developer API - // def sparkCleanUp() { - // echoToShell("Stopping Spark context.") - // _interp.beQuietDuring { - // _interp.interpret("sdc.stop()") - // } - // } - + // this is technically not part of Sparks explicitly defined Developer API though + // nothing in the SparkILoopInit.scala file is marked as such. override protected def postInitialization() { super.postInitialization() - //val intp: MahoutSparkILoop = this _interp.beQuietDuring { postInitImports.foreach(_interp.interpret(_)) } } - // sparkILoop.echo(...) is private - def echoToShell(str: String): Unit = { - _interp.out.println(str) - } - + // this is technically not part of Sparks explicitly defined Developer API though + // nothing in the SparkILoopInit.scala file is marked as such. override def printWelcome(): Unit = { echoToShell( """ @@ -117,6 +125,7 @@ class MahoutSparkILoop extends SparkILoop { | '_ ` _ \ / _` | '_ \ / _ \| | | | __| | | | | | | (_| | | | | (_) | |_| | |_ |_| |_| |_|\__,_|_| |_|\___/ \__,_|\__| version 0.11.0 + """) import Properties._ val welcomeMsg = "Using Scala %s (%s, Java %s)".format( From 27cbb377d5c735a563b3f06f77f7d3c3b96eaf28 Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Mon, 6 Jul 2015 22:24:23 -0400 Subject: [PATCH 6/9] Changed Mahout Distributed Context back to sdc. Added a SparkContext val sc set to sdc's public spark context field --- .../shell/MahoutSparkILoop.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index eee37780cc..7d4ec01c85 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -88,25 +88,29 @@ class MahoutSparkILoop extends SparkILoop { sparkContext } - // need to change our SparkDistributedContext name to 'sc' since we cannot override the - // private sparkCleanUp() method. - // this is technically not part of Sparks explicitly defined Developer API though + // this is technically not part of Spark's explicitly defined Developer API though // nothing in the SparkILoopInit.scala file is marked as such. override def initializeSpark() { _interp.beQuietDuring { _interp.interpret(""" - @transient implicit val sc: org.apache.mahout.math.drm.DistributedContext = + @transient implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = new org.apache.mahout.sparkbindings.SparkDistributedContext( org.apache.spark.repl.Main.interp.createSparkContext()) """) + echoToShell("Mahout distributed context is available as \"implicit val sdc\".") + _interp.interpret("import org.apache.spark.SparkContext._") - echoToShell("Mahout distributed context is available as \"implicit val sc\".") + + // get the spark context from the mahout distributed context + _interp.interpret("@transient val sc: org.apache.spark.SparkContext = sdc.sc") + echoToShell("Spark context is available as \"val sc\".") + } } - // this is technically not part of Sparks explicitly defined Developer API though + // this is technically not part of Spark's explicitly defined Developer API though // nothing in the SparkILoopInit.scala file is marked as such. override protected def postInitialization() { super.postInitialization() @@ -115,8 +119,8 @@ class MahoutSparkILoop extends SparkILoop { } } - // this is technically not part of Sparks explicitly defined Developer API though - // nothing in the SparkILoopInit.scala file is marked as such. + // this is technically not part of Spark's explicitly defined Developer API though + // nothing in the SparkILoopInit.scala file is marked as such.. override def printWelcome(): Unit = { echoToShell( """ From 4c6199173c991fb36106fc85b658f87cdd2e0b11 Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Mon, 6 Jul 2015 22:49:07 -0400 Subject: [PATCH 7/9] add in a Spark sqlContext --- .../sparkbindings/shell/MahoutSparkILoop.scala | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index 7d4ec01c85..3f72c710eb 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -92,6 +92,9 @@ class MahoutSparkILoop extends SparkILoop { // nothing in the SparkILoopInit.scala file is marked as such. override def initializeSpark() { _interp.beQuietDuring { + + // initalize the Mahout Context as a SparkDistributedContext since so that we can access + // the SparkContext from it. _interp.interpret(""" @transient implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = @@ -101,12 +104,23 @@ class MahoutSparkILoop extends SparkILoop { """) echoToShell("Mahout distributed context is available as \"implicit val sdc\".") + // get the spark context from the mahout distributed context. _interp.interpret("import org.apache.spark.SparkContext._") - - // get the spark context from the mahout distributed context _interp.interpret("@transient val sc: org.apache.spark.SparkContext = sdc.sc") echoToShell("Spark context is available as \"val sc\".") + // create a SQL Context. + _interp.interpret(""" + @transient val sqlContext = { + val _sqlContext = org.apache.spark.repl.Main.interp.createSQLContext() + _sqlContext + } + """) + _interp.interpret("import sqlContext.implicits._") + _interp.interpret("import sqlContext.sql") + _interp.interpret("import org.apache.spark.sql.functions._") + echoToShell("SQL context available as \"val sqlContext\".") + } } From 24a44148344a1127318a9071fd0824d52c2bed32 Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Tue, 7 Jul 2015 21:15:25 -0400 Subject: [PATCH 8/9] Fix redundant instantiation of SparkDistributedContext. Some cleanup --- .../shell/MahoutSparkILoop.scala | 36 ++++++++++++------- .../drm/CheckpointedDrmSpark.scala | 6 ++-- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index 3f72c710eb..2a4f17d13c 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -28,6 +28,8 @@ class MahoutSparkILoop extends SparkILoop { private var _interp: SparkILoop = _ + private var sdc: SparkDistributedContext = _ + private val postInitImports = "import org.apache.mahout.math._" :: "import scalabindings._" :: @@ -39,6 +41,8 @@ class MahoutSparkILoop extends SparkILoop { "import collection.JavaConversions._" :: Nil + def getSparkDistributedContext: SparkDistributedContext = sdc + // Hack: for some very unclear reason, log4j is not picking up log4j.properties in Spark conf/ even // though the latter is added to the classpath. So we force it to pick it. PropertyConfigurator.configure(getMahoutHome() + "/conf/log4j.properties") @@ -58,6 +62,8 @@ class MahoutSparkILoop extends SparkILoop { _interp.out.println(str) } + // create a spark context as a mahout SparkDistributedContext. + // store the SparkDistributedContext for decleration in the intreperer session. override def createSparkContext(): SparkContext = { val execUri = System.getenv("SPARK_EXECUTOR_URI") val master = _interp.master match { @@ -77,13 +83,15 @@ class MahoutSparkILoop extends SparkILoop { conf.set("spark.executor.memory", "1g") - _interp.sparkContext = mahoutSparkContext( + sdc = mahoutSparkContext( masterUrl = master, appName = "Mahout Spark Shell", customJars = jars, sparkConf = conf ) + _interp.sparkContext = sdc + echoToShell("Created spark context..") sparkContext } @@ -91,31 +99,35 @@ class MahoutSparkILoop extends SparkILoop { // this is technically not part of Spark's explicitly defined Developer API though // nothing in the SparkILoopInit.scala file is marked as such. override def initializeSpark() { + _interp.beQuietDuring { - // initalize the Mahout Context as a SparkDistributedContext since so that we can access - // the SparkContext from it. + // get the spark context, at the same time create and store a mahout distributed context. _interp.interpret(""" + @transient val sc = { + val _sc = org.apache.spark.repl.Main.interp.createSparkContext() + _sc + } + """) + echoToShell("Spark context is available as \"val sc\".") + // retrieve the stored mahout SparkDistributedContext. + _interp.interpret(""" @transient implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = - new org.apache.mahout.sparkbindings.SparkDistributedContext( - org.apache.spark.repl.Main.interp.createSparkContext()) - + org.apache.spark.repl.Main.interp + .asInstanceOf[org.apache.mahout.sparkbindings.shell.MahoutSparkILoop] + .getSparkDistributedContext """) echoToShell("Mahout distributed context is available as \"implicit val sdc\".") - // get the spark context from the mahout distributed context. - _interp.interpret("import org.apache.spark.SparkContext._") - _interp.interpret("@transient val sc: org.apache.spark.SparkContext = sdc.sc") - echoToShell("Spark context is available as \"val sc\".") - // create a SQL Context. _interp.interpret(""" @transient val sqlContext = { val _sqlContext = org.apache.spark.repl.Main.interp.createSQLContext() _sqlContext } - """) + """) + _interp.interpret("import org.apache.spark.SparkContext._") _interp.interpret("import sqlContext.implicits._") _interp.interpret("import sqlContext.sql") _interp.interpret("import org.apache.spark.sql.functions._") diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 033c49788d..2f5d6008f2 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -160,13 +160,13 @@ class CheckpointedDrmSpark[K: ClassTag]( // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save. if (ktag.runtimeClass == classOf[Int]) { rddInput.toDrmRdd() - .map( x =>(new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path) + .map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else if (ktag.runtimeClass == classOf[String]){ rddInput.toDrmRdd() - .map( x =>(new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path) + .map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else if (ktag.runtimeClass == classOf[Long]) { rddInput.toDrmRdd() - .map( x =>(new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path) + .map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag)) } From 718d71a4e258c91c6190ee001b3ef6d08bf4adb4 Mon Sep 17 00:00:00 2001 From: Andrew Palumbo Date: Fri, 10 Jul 2015 11:09:01 -0400 Subject: [PATCH 9/9] Changed hard-coded spark.executor.memory=1G to default only if it is not already set as to not override values set in MAHOUT_OPTS --- .../apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala index 2a4f17d13c..8df93bde9d 100644 --- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala +++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala @@ -81,7 +81,10 @@ class MahoutSparkILoop extends SparkILoop { conf.set("spark.executor.uri", execUri) } - conf.set("spark.executor.memory", "1g") + // set default value of spark.executor.memory to 1g + if(!conf.contains("spark.executor.memory")) { + conf.set("spark.executor.memory", "1g") + } sdc = mahoutSparkContext( masterUrl = master,