From 0a04cb9a826082fd9ccbdf9ccd8a945a11721356 Mon Sep 17 00:00:00 2001 From: Dan Crankshaw Date: Sun, 23 Mar 2014 20:57:08 +0000 Subject: [PATCH] Renamed prepostprocess --- ...Wiki.scala => WikiPipelineBenchmark.scala} | 64 +++++++------------ project/SparkBuild.scala | 6 +- 2 files changed, 26 insertions(+), 44 deletions(-) rename graphx/src/main/scala/org/apache/spark/graphx/{PrePostProcessWiki.scala => WikiPipelineBenchmark.scala} (88%) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala similarity index 88% rename from graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala rename to graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala index a88344eab3b9b..da8312b8fbef6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala @@ -10,49 +10,9 @@ import org.apache.mahout.text.wikipedia._ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.Logging -// import scala.collection.mutable import java.util.{HashSet => JHashSet, TreeSet => JTreeSet} -// import org.apache.spark.graphx.MakeString - -class TrackCounts extends Serializable { - - var red: Long = 0 - var stub: Long = 0 - var disambig: Long = 0 - var notFound: Long = 0 - var titleNull: Long = 0 - var relevant: Long = 0 - var total: Long = 0 - - def update(o: TrackCounts) { - red += o.red - stub += o.stub - disambig += o.disambig - notFound += o.notFound - titleNull += o.titleNull - relevant += o.relevant - total += o.total - } - - def addArticle(art: WikiArticle) { - if (art.redirect) red += 1 - if (art.stub) stub += 1 - if (art.disambig) disambig += 1 - if (art.title == WikiArticle.notFoundString) notFound += 1 - if (art.title == null) titleNull += 1 - if (art.relevant) relevant += 1 - total += 1 - } - - override def toString: String = { - s"Redirects: $red, Stubs: $stub, Disambig: $disambig, Not Found: $notFound, Null: $titleNull, RELEVANT: $relevant, TOTAL: $total" - - } - -} - -object PrePostProcessWikipedia extends Logging { +object WikiPipelineBenchmark extends Logging { def main(args: Array[String]) = { @@ -127,6 +87,28 @@ object PrePostProcessWikipedia extends Logging { prToSave.saveAsTextFile(rankPath) } +// def extractLinkGraph(sc: SparkContext, rawData: String): (RDD[(VertexId, String)], RDD[Edge[Double]]) = { +// val conf = new Configuration +// conf.set("key.value.separator.in.input.line", " ") +// conf.set("xmlinput.start", "") +// conf.set("xmlinput.end", "") +// +// logWarning("about to load xml rdd") +// val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) +// .map(t => t._2.toString) +// // xmlRDD.count +// logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.") +// val repartXMLRDD = xmlRDD.repartition(128) +// logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.") +// +// val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache +// logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.") +// +// val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128) +// logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions") +// +// } + def benchmarkGraphx(sc: SparkContext, rawData: String, numIters: Int) { val conf = new Configuration diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 99ab58c88f8ce..665e27df4680e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -315,11 +315,11 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-graphite" % "3.0.0", "com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm), "com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm), - "com.clearspring.analytics" % "stream" % "2.5.1" + "com.clearspring.analytics" % "stream" % "2.5.1", // Added for GraphX benchmarking "org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm), "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm), - "org.apache.mahout" % "mahout-integration" % "0.8", + "org.apache.mahout" % "mahout-integration" % "0.8" ), libraryDependencies ++= maybeAvro ) @@ -340,7 +340,6 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( "com.twitter" %% "algebird-core" % "0.1.11", "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging), - "org.apache.mahout" % "mahout-integration" % "0.8", // added for GraphX benchmarking "org.apache.cassandra" % "cassandra-all" % "1.2.6" exclude("com.google.guava", "guava") exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru") @@ -360,6 +359,7 @@ object SparkBuild extends Build { name := "spark-graphx", libraryDependencies ++= Seq( "org.jblas" % "jblas" % "1.2.3" + // "org.apache.mahout" % "mahout-integration" % "0.8" // added for GraphX benchmarking ) )