Skip to content

Commit

Permalink
Renamed prepostprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Mar 23, 2014
1 parent a262b07 commit 0a04cb9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,9 @@ import org.apache.mahout.text.wikipedia._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.Logging
// import scala.collection.mutable
import java.util.{HashSet => JHashSet, TreeSet => JTreeSet}
// import org.apache.spark.graphx.MakeString

class TrackCounts extends Serializable {

var red: Long = 0
var stub: Long = 0
var disambig: Long = 0
var notFound: Long = 0
var titleNull: Long = 0
var relevant: Long = 0
var total: Long = 0

def update(o: TrackCounts) {
red += o.red
stub += o.stub
disambig += o.disambig
notFound += o.notFound
titleNull += o.titleNull
relevant += o.relevant
total += o.total
}

def addArticle(art: WikiArticle) {
if (art.redirect) red += 1
if (art.stub) stub += 1
if (art.disambig) disambig += 1
if (art.title == WikiArticle.notFoundString) notFound += 1
if (art.title == null) titleNull += 1
if (art.relevant) relevant += 1
total += 1
}

override def toString: String = {
s"Redirects: $red, Stubs: $stub, Disambig: $disambig, Not Found: $notFound, Null: $titleNull, RELEVANT: $relevant, TOTAL: $total"

}

}


object PrePostProcessWikipedia extends Logging {
object WikiPipelineBenchmark extends Logging {


def main(args: Array[String]) = {
Expand Down Expand Up @@ -127,6 +87,28 @@ object PrePostProcessWikipedia extends Logging {
prToSave.saveAsTextFile(rankPath)
}

// def extractLinkGraph(sc: SparkContext, rawData: String): (RDD[(VertexId, String)], RDD[Edge[Double]]) = {
// val conf = new Configuration
// conf.set("key.value.separator.in.input.line", " ")
// conf.set("xmlinput.start", "<page>")
// conf.set("xmlinput.end", "</page>")
//
// logWarning("about to load xml rdd")
// val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
// .map(t => t._2.toString)
// // xmlRDD.count
// logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")
// val repartXMLRDD = xmlRDD.repartition(128)
// logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.")
//
// val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache
// logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.")
//
// val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128)
// logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions")
//
// }

def benchmarkGraphx(sc: SparkContext, rawData: String, numIters: Int) {

val conf = new Configuration
Expand Down
6 changes: 3 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,11 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm),
"com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm),
"com.clearspring.analytics" % "stream" % "2.5.1"
"com.clearspring.analytics" % "stream" % "2.5.1",
// Added for GraphX benchmarking
"org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm),
"org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm),
"org.apache.mahout" % "mahout-integration" % "0.8",
"org.apache.mahout" % "mahout-integration" % "0.8"
),
libraryDependencies ++= maybeAvro
)
Expand All @@ -340,7 +340,6 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % "0.1.11",
"org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging),
"org.apache.mahout" % "mahout-integration" % "0.8", // added for GraphX benchmarking
"org.apache.cassandra" % "cassandra-all" % "1.2.6"
exclude("com.google.guava", "guava")
exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru")
Expand All @@ -360,6 +359,7 @@ object SparkBuild extends Build {
name := "spark-graphx",
libraryDependencies ++= Seq(
"org.jblas" % "jblas" % "1.2.3"
// "org.apache.mahout" % "mahout-integration" % "0.8" // added for GraphX benchmarking
)
)

Expand Down

0 comments on commit 0a04cb9

Please sign in to comment.