Skip to content

Commit

Permalink
Fixed compilation issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Feb 22, 2014
1 parent d3bbfd0 commit 7a036cb
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -1,116 +1,116 @@
package org.apache.spark.examples.graphx

import org.apache.spark._
import org.apache.spark.graph._
import org.apache.spark.graph.algorithms._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.text.wikipedia._
import org.apache.spark.rdd.RDD
import java.util.Calendar
import scala.math.Ordering.Implicits._


object AnalyzeWikipedia extends Logging {

def main(args: Array[String]) = {




val host = args(0)
val fname = args(1)
// val numparts = {
// if (args.length >= 3) {
// args(2).toInt
// } else {
// 64
// }
// }
// val preformattedFname = args(2)

val serializer = "org.apache.spark.serializer.KryoSerializer"
System.setProperty("spark.serializer", serializer)
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")

val sc = new SparkContext(host, "AnalyzeWikipedia")
// val top10 = sc.parallelize(1 to 1000, 10).map(x => (x.toString, x)).top(10)(Ordering.by(_._2))


// val conf = new Configuration
// conf.set("key.value.separator.in.input.line", " ");
// conf.set("xmlinput.start", "<page>");
// conf.set("xmlinput.end", "</page>");

// val xmlRDD = sc.newAPIHadoopFile(fname, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
// .map(stringify)

// println("XML pages: " + xmlRDD.count)
// // .repartition(numparts)

// val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
// .filter { art => art.relevant }

// println("Relevant pages: " + wikiRDD.count)

// val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
// val justVids = wikiRDD.map { art => art.vertexID }
// // println("taking top vids")
// // val topvids = justVids.top(10)
// // sc.stop()
// // System.exit(0)

// // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
// val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
// println("Edges: " + edges.count)
// println("Creating graph: " + Calendar.getInstance().getTime())

// val g = Graph(vertices, edges)
// val g = Graph.fromEdges(edges, 1)
// val g = Graph(edges, 1)
val g = GraphLoader.edgeListAndVertexListFiles(sc, fname + "_edges", fname + "_vertices",
minEdgePartitions = 128).cache()
println("Triplets: " + g.triplets.count)

println("starting pagerank " + Calendar.getInstance().getTime())
val startTime = System.currentTimeMillis
val pr = PageRank.run(g, 20)

println("PR numvertices: " + pr.vertices.count + "\tOriginal numVertices " + g.vertices.count)
println("Pagerank runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
val prAndTitle = g.outerJoinVertices(pr.vertices)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
println("finished join.")

val topArticles = prAndTitle.vertices.top(30)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
println("Top articles:\n" + topArticles.deep.mkString("\n"))
// for(v <- topArticles) {
// println(v)
// }
val article_name = "JohnsHopkinsUniversity"
//
//Find relevant vertices
g.mapTriplets(e => {
if ((e.srcAttr contains article_name) || (e.dstAttr contains article_name)) { 1.0 }
else { e.attr }
})
val coarsenedGraph = g.contractEdges({ e => e.attr == 1.0 }, {et => et.srcAttr + " " + et.dstAttr },
{ (v1: String , v2: String) => v1 + "\n" + v2 })
// filter only vertices whose title contains JHU
val relevant = coarsenedGraph.vertices.filter( {case (vid: Vid, data: String) => data contains article_name}).collect
println("Articles matching " + article_name)
println(relevant.deep.mkString("New Article\n"))

sc.stop()
}


def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = {
tup._2.toString
}



}
//package org.apache.spark.examples.graphx
//
//import org.apache.spark._
//import org.apache.spark.graph._
//import org.apache.spark.graph.algorithms._
//import org.apache.spark.rdd.NewHadoopRDD
//import org.apache.hadoop.io.LongWritable
//import org.apache.hadoop.io.Text
//import org.apache.hadoop.conf.Configuration
//import org.apache.mahout.text.wikipedia._
//import org.apache.spark.rdd.RDD
//import java.util.Calendar
//import scala.math.Ordering.Implicits._
//
//
//object AnalyzeWikipedia extends Logging {
//
// def main(args: Array[String]) = {
//
//
//
//
// val host = args(0)
// val fname = args(1)
// // val numparts = {
// // if (args.length >= 3) {
// // args(2).toInt
// // } else {
// // 64
// // }
// // }
// // val preformattedFname = args(2)
//
// val serializer = "org.apache.spark.serializer.KryoSerializer"
// System.setProperty("spark.serializer", serializer)
// System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
//
// val sc = new SparkContext(host, "AnalyzeWikipedia")
// // val top10 = sc.parallelize(1 to 1000, 10).map(x => (x.toString, x)).top(10)(Ordering.by(_._2))
//
//
// // val conf = new Configuration
// // conf.set("key.value.separator.in.input.line", " ");
// // conf.set("xmlinput.start", "<page>");
// // conf.set("xmlinput.end", "</page>");
//
// // val xmlRDD = sc.newAPIHadoopFile(fname, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
// // .map(stringify)
//
// // println("XML pages: " + xmlRDD.count)
// // // .repartition(numparts)
//
// // val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
// // .filter { art => art.relevant }
//
// // println("Relevant pages: " + wikiRDD.count)
//
// // val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
// // val justVids = wikiRDD.map { art => art.vertexID }
// // // println("taking top vids")
// // // val topvids = justVids.top(10)
// // // sc.stop()
// // // System.exit(0)
//
// // // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
// // val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
// // println("Edges: " + edges.count)
// // println("Creating graph: " + Calendar.getInstance().getTime())
//
// // val g = Graph(vertices, edges)
// // val g = Graph.fromEdges(edges, 1)
// // val g = Graph(edges, 1)
// val g = GraphLoader.edgeListAndVertexListFiles(sc, fname + "_edges", fname + "_vertices",
// minEdgePartitions = 128).cache()
// println("Triplets: " + g.triplets.count)
//
// println("starting pagerank " + Calendar.getInstance().getTime())
// val startTime = System.currentTimeMillis
// val pr = PageRank.run(g, 20)
//
// println("PR numvertices: " + pr.vertices.count + "\tOriginal numVertices " + g.vertices.count)
// println("Pagerank runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
// val prAndTitle = g.outerJoinVertices(pr.vertices)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
// println("finished join.")
//
// val topArticles = prAndTitle.vertices.top(30)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
// println("Top articles:\n" + topArticles.deep.mkString("\n"))
// // for(v <- topArticles) {
// // println(v)
// // }
// val article_name = "JohnsHopkinsUniversity"
// //
// //Find relevant vertices
// g.mapTriplets(e => {
// if ((e.srcAttr contains article_name) || (e.dstAttr contains article_name)) { 1.0 }
// else { e.attr }
// })
// val coarsenedGraph = g.contractEdges({ e => e.attr == 1.0 }, {et => et.srcAttr + " " + et.dstAttr },
// { (v1: String , v2: String) => v1 + "\n" + v2 })
//
// // filter only vertices whose title contains JHU
// val relevant = coarsenedGraph.vertices.filter( {case (vid: Vid, data: String) => data contains article_name}).collect
// println("Articles matching " + article_name)
// println(relevant.deep.mkString("New Article\n"))
//
// sc.stop()
// }
//
//
// def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = {
// tup._2.toString
// }
//
//
//
//}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ package org.apache.spark.examples.graphx
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graph.algorithms._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.text.wikipedia._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import java.util.Calendar
import scala.math.Ordering.Implicits._
import org.apache.spark.Logging
import scala.collection.mutable

Expand Down Expand Up @@ -46,15 +42,15 @@ object PrePostProcessWikipedia extends Logging {
case "graphx" => {
val rawData = args(2)
val result = graphx(sc, rawData)
logWarning(result)
// logWarning(result)
}
case "prep" => {
val rawData = args(2)
val outBase = args(3)
prep(sc, rawData, outBase)
}

case _ => throw new IllegalArgumentException("Please provide a valid process")
case _ => throw new IllegalArgumentException("Please proVertexIde a valid process")
}
logWarning(process + "\tTIMEX: " + (System.currentTimeMillis - start)/1000.0)
sc.stop()
Expand All @@ -77,36 +73,36 @@ object PrePostProcessWikipedia extends Logging {
.map(stringify)
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
verticesToSave.saveAsTextFile(vertPath)
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D)
val pr = PageRank.runStandalone(g, 0.01)
val prToSave = pr.map {v => v._1 + "\t"+ v._2}
val g = Graph(vertices, edges) //TODO what to do about partitionStrategy???
val pr = PageRank.run(g, 20)
val prToSave = pr.vertices.map {v => v._1 + "\t"+ v._2}
prToSave.saveAsTextFile(rankPath)
}

def graphx(sc: SparkContext, rawData: String) {

val conf = new Configuration
conf.set("key.value.separator.in.input.line", " ");
conf.set("xmlinput.start", "<page>");
conf.set("xmlinput.end", "</page>");
conf.set("key.value.separator.in.input.line", " ")
conf.set("xmlinput.start", "<page>")
conf.set("xmlinput.end", "</page>")

val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
.map(stringify)
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
val g = Graph(vertices, edges, partitionStrategy = EdgePartition1D)
val g = Graph(vertices, edges)
val resultG = pagerankConnComponentsAlt(4, g)
logWarning(s"Final graph has ${resultG.triplets.count} EDGES, ${resultG.vertices.count} VERTICES")
logWarning(s"Final graph has ${resultG.triplets.count()} EDGES, ${resultG.vertices.count()} VERTICES")
// val pr = PageRank.run(g, 20)
// val prAndTitle = g
// .outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
// .outerJoinVertices(pr)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
// top20.mkString("\n")

}
Expand All @@ -115,15 +111,26 @@ object PrePostProcessWikipedia extends Logging {
var currentGraph = g
for (i <- 0 to numRepetitions) {
val pr = PageRank.run(currentGraph, 20)
val prAndTitle = currentGraph
.outerJoinVertices(pr)({(id: Vid, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (Vid, (String, Double))) => entry._2._2))
val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
logWarning(s"Top20 for iteration $i:\n${top20.mkString("\n")}")
val top20verts = top20.map(_._1).toSet
// filter out top 20 vertices
val newGraph = currentGraph.subgraph(vpred = ((v, d) => !top20Verts.contains(v)))
val filterTop20 = {(v: VertexId, d: String) =>
!top20verts.contains(v)
}
val newGraph = currentGraph.subgraph(x => true, filterTop20)
val ccGraph = ConnectedComponents.run(newGraph)
val numCCs = ccGraph.vertices.aggregate(new mutable.HashSet())(((set, vtuple) => set += vtuple._2), ((set1, set2) => set1 union set2)).size
val zeroVal = new mutable.HashSet[VertexId]()
val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => {
s.add(vtuple._2)
s
}
val combOp = (s1: mutable.HashSet[VertexId], s2: mutable.HashSet[VertexId]) => { s1 union s2}
val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp)
//(new mutable.HashSet[Int]())((s: mutable.HashSet[Int], vtuple: (VertexId, Int)) => { s.add(vtuple._2); s },(s1: mutable.HashSet[Int], s2: mutable.HashSet[Int]) => { s1 union s2})

//(((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size
logWarning(s"Number of connected components for iteration $i: $numCCs")
// TODO will this result in too much memory overhead???
currentGraph = newGraph
Expand All @@ -143,7 +150,7 @@ object PrePostProcessWikipedia extends Logging {
.map(stringify)
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
val vertices: RDD[(Vid, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
verticesToSave.saveAsTextFile(outBase + "_vertices")
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
Expand All @@ -159,12 +166,10 @@ object PrePostProcessWikipedia extends Logging {

// slightly cheating, but not really
val ranksAndAttrs = ranks.join(attrs)
val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (Vid, (Double, String))) => entry._2._1))
val top20 = ranksAndAttrs.top(20)(Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1))
top20.mkString("\n")
}



def stringify(tup: (org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)): String = {
tup._2.toString
}
Expand Down
Loading

0 comments on commit 7a036cb

Please sign in to comment.