diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/AnalyzeWikipedia.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/AnalyzeWikipedia.scala index 20f893d73b1d5..3888f13fcb9f4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/AnalyzeWikipedia.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/AnalyzeWikipedia.scala @@ -1,116 +1,116 @@ -package org.apache.spark.examples.graphx - -import org.apache.spark._ -import org.apache.spark.graph._ -import org.apache.spark.graph.algorithms._ -import org.apache.spark.rdd.NewHadoopRDD -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.Text -import org.apache.hadoop.conf.Configuration -import org.apache.mahout.text.wikipedia._ -import org.apache.spark.rdd.RDD -import java.util.Calendar -import scala.math.Ordering.Implicits._ - - -object AnalyzeWikipedia extends Logging { - - def main(args: Array[String]) = { - - - - - val host = args(0) - val fname = args(1) - // val numparts = { - // if (args.length >= 3) { - // args(2).toInt - // } else { - // 64 - // } - // } - // val preformattedFname = args(2) - - val serializer = "org.apache.spark.serializer.KryoSerializer" - System.setProperty("spark.serializer", serializer) - System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") - - val sc = new SparkContext(host, "AnalyzeWikipedia") - // val top10 = sc.parallelize(1 to 1000, 10).map(x => (x.toString, x)).top(10)(Ordering.by(_._2)) - - - // val conf = new Configuration - // conf.set("key.value.separator.in.input.line", " "); - // conf.set("xmlinput.start", ""); - // conf.set("xmlinput.end", ""); - - // val xmlRDD = sc.newAPIHadoopFile(fname, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) - // .map(stringify) - - // println("XML pages: " + xmlRDD.count) - // // .repartition(numparts) - - // val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } - // .filter { art => art.relevant } - - // println("Relevant pages: " + wikiRDD.count) - - // val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } - // val justVids = wikiRDD.map { art => art.vertexID } - // // println("taking top vids") - // // val topvids = justVids.top(10) - // // sc.stop() - // // System.exit(0) - - // // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } - // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } - // println("Edges: " + edges.count) - // println("Creating graph: " + Calendar.getInstance().getTime()) - - // val g = Graph(vertices, edges) - // val g = Graph.fromEdges(edges, 1) - // val g = Graph(edges, 1) - val g = GraphLoader.edgeListAndVertexListFiles(sc, fname + "_edges", fname + "_vertices", - minEdgePartitions = 128).cache() - println("Triplets: " + g.triplets.count) - - println("starting pagerank " + Calendar.getInstance().getTime()) - val startTime = System.currentTimeMillis - val pr = PageRank.run(g, 20) - - println("PR numvertices: " + pr.vertices.count + "\tOriginal numVertices " + g.vertices.count) - println("Pagerank runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") - val prAndTitle = g.outerJoinVertices(pr.vertices)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) - println("finished join.") - - val topArticles = prAndTitle.vertices.top(30)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2)) - println("Top articles:\n" + topArticles.deep.mkString("\n")) - // for(v <- topArticles) { - // println(v) - // } - val article_name = "JohnsHopkinsUniversity" - // - //Find relevant vertices - g.mapTriplets(e => { - if ((e.srcAttr contains article_name) || (e.dstAttr contains article_name)) { 1.0 } - else { e.attr } - }) - val coarsenedGraph = g.contractEdges({ e => e.attr == 1.0 }, {et => et.srcAttr + " " + et.dstAttr }, - { (v1: String , v2: String) => v1 + "\n" + v2 }) - - // filter only vertices whose title contains JHU - val relevant = coarsenedGraph.vertices.filter( {case (vid: Vid, data: String) => data contains article_name}).collect - println("Articles matching " + article_name) - println(relevant.deep.mkString("New Article\n")) - - sc.stop() - } - - - def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = { - tup._2.toString - } - - - -} +//package org.apache.spark.examples.graphx +// +//import org.apache.spark._ +//import org.apache.spark.graph._ +//import org.apache.spark.graph.algorithms._ +//import org.apache.spark.rdd.NewHadoopRDD +//import org.apache.hadoop.io.LongWritable +//import org.apache.hadoop.io.Text +//import org.apache.hadoop.conf.Configuration +//import org.apache.mahout.text.wikipedia._ +//import org.apache.spark.rdd.RDD +//import java.util.Calendar +//import scala.math.Ordering.Implicits._ +// +// +//object AnalyzeWikipedia extends Logging { +// +// def main(args: Array[String]) = { +// +// +// +// +// val host = args(0) +// val fname = args(1) +// // val numparts = { +// // if (args.length >= 3) { +// // args(2).toInt +// // } else { +// // 64 +// // } +// // } +// // val preformattedFname = args(2) +// +// val serializer = "org.apache.spark.serializer.KryoSerializer" +// System.setProperty("spark.serializer", serializer) +// System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") +// +// val sc = new SparkContext(host, "AnalyzeWikipedia") +// // val top10 = sc.parallelize(1 to 1000, 10).map(x => (x.toString, x)).top(10)(Ordering.by(_._2)) +// +// +// // val conf = new Configuration +// // conf.set("key.value.separator.in.input.line", " "); +// // conf.set("xmlinput.start", ""); +// // conf.set("xmlinput.end", ""); +// +// // val xmlRDD = sc.newAPIHadoopFile(fname, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) +// // .map(stringify) +// +// // println("XML pages: " + xmlRDD.count) +// // // .repartition(numparts) +// +// // val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } +// // .filter { art => art.relevant } +// +// // println("Relevant pages: " + wikiRDD.count) +// +// // val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } +// // val justVids = wikiRDD.map { art => art.vertexID } +// // // println("taking top vids") +// // // val topvids = justVids.top(10) +// // // sc.stop() +// // // System.exit(0) +// +// // // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } +// // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } +// // println("Edges: " + edges.count) +// // println("Creating graph: " + Calendar.getInstance().getTime()) +// +// // val g = Graph(vertices, edges) +// // val g = Graph.fromEdges(edges, 1) +// // val g = Graph(edges, 1) +// val g = GraphLoader.edgeListAndVertexListFiles(sc, fname + "_edges", fname + "_vertices", +// minEdgePartitions = 128).cache() +// println("Triplets: " + g.triplets.count) +// +// println("starting pagerank " + Calendar.getInstance().getTime()) +// val startTime = System.currentTimeMillis +// val pr = PageRank.run(g, 20) +// +// println("PR numvertices: " + pr.vertices.count + "\tOriginal numVertices " + g.vertices.count) +// println("Pagerank runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") +// val prAndTitle = g.outerJoinVertices(pr.vertices)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) +// println("finished join.") +// +// val topArticles = prAndTitle.vertices.top(30)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2)) +// println("Top articles:\n" + topArticles.deep.mkString("\n")) +// // for(v <- topArticles) { +// // println(v) +// // } +// val article_name = "JohnsHopkinsUniversity" +// // +// //Find relevant vertices +// g.mapTriplets(e => { +// if ((e.srcAttr contains article_name) || (e.dstAttr contains article_name)) { 1.0 } +// else { e.attr } +// }) +// val coarsenedGraph = g.contractEdges({ e => e.attr == 1.0 }, {et => et.srcAttr + " " + et.dstAttr }, +// { (v1: String , v2: String) => v1 + "\n" + v2 }) +// +// // filter only vertices whose title contains JHU +// val relevant = coarsenedGraph.vertices.filter( {case (vid: Vid, data: String) => data contains article_name}).collect +// println("Articles matching " + article_name) +// println(relevant.deep.mkString("New Article\n")) +// +// sc.stop() +// } +// +// +// def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = { +// tup._2.toString +// } +// +// +// +//} diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/PrePostProcessWiki.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/PrePostProcessWiki.scala index a8cdba65b1cef..60f8dd8f24c6f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/PrePostProcessWiki.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/PrePostProcessWiki.scala @@ -3,16 +3,12 @@ package org.apache.spark.examples.graphx import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.graphx.lib._ -import org.apache.spark.graph.algorithms._ -import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.conf.Configuration import org.apache.mahout.text.wikipedia._ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import java.util.Calendar -import scala.math.Ordering.Implicits._ import org.apache.spark.Logging import scala.collection.mutable @@ -46,7 +42,7 @@ object PrePostProcessWikipedia extends Logging { case "graphx" => { val rawData = args(2) val result = graphx(sc, rawData) - logWarning(result) +// logWarning(result) } case "prep" => { val rawData = args(2) @@ -54,7 +50,7 @@ object PrePostProcessWikipedia extends Logging { prep(sc, rawData, outBase) } - case _ => throw new IllegalArgumentException("Please provide a valid process") + case _ => throw new IllegalArgumentException("Please proVertexIde a valid process") } logWarning(process + "\tTIMEX: " + (System.currentTimeMillis - start)/1000.0) sc.stop() @@ -77,36 +73,36 @@ object PrePostProcessWikipedia extends Logging { .map(stringify) val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } .filter { art => art.relevant }.repartition(128) - val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } + val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2} verticesToSave.saveAsTextFile(vertPath) val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } - val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D) - val pr = PageRank.runStandalone(g, 0.01) - val prToSave = pr.map {v => v._1 + "\t"+ v._2} + val g = Graph(vertices, edges) //TODO what to do about partitionStrategy??? + val pr = PageRank.run(g, 20) + val prToSave = pr.vertices.map {v => v._1 + "\t"+ v._2} prToSave.saveAsTextFile(rankPath) } def graphx(sc: SparkContext, rawData: String) { val conf = new Configuration - conf.set("key.value.separator.in.input.line", " "); - conf.set("xmlinput.start", ""); - conf.set("xmlinput.end", ""); + conf.set("key.value.separator.in.input.line", " ") + conf.set("xmlinput.start", "") + conf.set("xmlinput.end", "") val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) .map(stringify) val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } .filter { art => art.relevant }.repartition(128) - val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } + val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } - val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D) + val g = Graph(vertices, edges) val resultG = pagerankConnComponentsAlt(4, g) - logWarning(s"Final graph has ${resultG.triplets.count} EDGES, ${resultG.vertices.count} VERTICES") + logWarning(s"Final graph has ${resultG.triplets.count()} EDGES, ${resultG.vertices.count()} VERTICES") // val pr = PageRank.run(g, 20) // val prAndTitle = g -// .outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) -// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2)) +// .outerJoinVertices(pr)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) +// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2)) // top20.mkString("\n") } @@ -115,15 +111,26 @@ object PrePostProcessWikipedia extends Logging { var currentGraph = g for (i <- 0 to numRepetitions) { val pr = PageRank.run(currentGraph, 20) - val prAndTitle = currentGraph - .outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) - val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2)) + val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}) + val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2)) logWarning(s"Top20 for iteration $i:\n${top20.mkString("\n")}") val top20verts = top20.map(_._1).toSet // filter out top 20 vertices - val newGraph = currentGraph.subgraph(vpred = ((v, d) => !top20Verts.contains(v))) + val filterTop20 = {(v: VertexId, d: String) => + !top20verts.contains(v) + } + val newGraph = currentGraph.subgraph(x => true, filterTop20) val ccGraph = ConnectedComponents.run(newGraph) - val numCCs = ccGraph.vertices.aggregate(new mutable.HashSet())(((set, vtuple) => set += vtuple._2), ((set1, set2) => set1 union set2)).size + val zeroVal = new mutable.HashSet[VertexId]() + val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => { + s.add(vtuple._2) + s + } + val combOp = (s1: mutable.HashSet[VertexId], s2: mutable.HashSet[VertexId]) => { s1 union s2} + val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp) + //(new mutable.HashSet[Int]())((s: mutable.HashSet[Int], vtuple: (VertexId, Int)) => { s.add(vtuple._2); s },(s1: mutable.HashSet[Int], s2: mutable.HashSet[Int]) => { s1 union s2}) + + //(((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size logWarning(s"Number of connected components for iteration $i: $numCCs") // TODO will this result in too much memory overhead??? currentGraph = newGraph @@ -143,7 +150,7 @@ object PrePostProcessWikipedia extends Logging { .map(stringify) val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) } .filter { art => art.relevant }.repartition(128) - val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) } + val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) } val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2} verticesToSave.saveAsTextFile(outBase + "_vertices") val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges } @@ -159,12 +166,10 @@ object PrePostProcessWikipedia extends Logging { // slightly cheating, but not really val ranksAndAttrs = ranks.join(attrs) - val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (Vid, (Double, String))) => entry._2._1)) + val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)) top20.mkString("\n") } - - def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = { tup._2.toString } diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/WikiArticle.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/WikiArticle.scala index e182462ae4cde..4036c6aff1693 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/WikiArticle.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/WikiArticle.scala @@ -1,7 +1,7 @@ package org.apache.spark.examples.graphx import java.util.regex.Pattern -import org.apache.spark.graph._ +import org.apache.spark.graphx._ import java.util.regex.Matcher import scala.util.matching.Regex import scala.collection.mutable @@ -28,15 +28,15 @@ class WikiArticle(wtext: String) extends Serializable { } } val relevant: Boolean = !(redirect || stub || disambig || title == null) - val vertexID: Vid = WikiArticle.titleHash(title) + val vertexID: VertexId = WikiArticle.titleHash(title) val edges: HashSet[Edge[Double]] = { val temp = neighbors.map { n => Edge(vertexID, n, 1.0) } val set = new HashSet[Edge[Double]]() ++ temp set } - // val edges: HashSet[(Vid, Vid)] = { + // val edges: HashSet[(VertexId, VertexId)] = { // val temp = neighbors.map { n => (vertexID, n) } - // val set = new HashSet[(Vid, Vid)]() ++ temp + // val set = new HashSet[(VertexId, VertexId)]() ++ temp // set // } } @@ -70,7 +70,7 @@ object WikiArticle { // Hash of the canonical article name. Used for vertex ID. // TODO this should be a 64bit hash - private def titleHash(title: String): Vid = { math.abs(WikiArticle.myHashcode(canonicalize(title))) } + private def titleHash(title: String): VertexId = { math.abs(WikiArticle.myHashcode(canonicalize(title))) } private def myHashcode(s: String): Long = { // var h: Long = 1125899906842597L // prime diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index 18858466db27b..bca17241538f4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} +import org.apache.spark.rdd.RDD /** * Provides utilities for loading [[Graph]]s from files. @@ -86,4 +87,20 @@ object GraphLoader extends Logging { GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) } // end of edgeListFile + def loadVertices(sc: SparkContext, vertexPath: String): RDD[(VertexId, String)] = { + + val vertices = sc.textFile(vertexPath, 128).mapPartitions( iter => + iter.filter(line => !line.isEmpty && line(0) != '#').map { line => + val lineArray = line.split("\\s+") + if(lineArray.length < 2) { + println("Invalid line: " + line) + assert(false) + } + val id = lineArray(0).trim.toLong + val attr = lineArray.slice(1,lineArray.length).mkString(" ") + (id, attr) + }) + vertices + } + }