Skip to content

Commit

Permalink
Fixed pipeline. Now seems to work well.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Feb 25, 2014
1 parent 7799b3e commit 75b2da7
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 55 deletions.
2 changes: 1 addition & 1 deletion conf/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<property>
<name>fs.default.name</name>
<value>hdfs://ec2-54-227-199-7.compute-1.amazonaws.com:9000</value>
<value>hdfs://ec2-184-72-130-69.compute-1.amazonaws.com:9000</value>
</property>

<property>
Expand Down
19 changes: 16 additions & 3 deletions conf/slaves
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
ec2-54-204-136-27.compute-1.amazonaws.com
ec2-54-204-61-198.compute-1.amazonaws.com
ec2-54-204-153-59.compute-1.amazonaws.com
ec2-54-80-151-251.compute-1.amazonaws.com
ec2-54-242-226-224.compute-1.amazonaws.com
ec2-54-205-35-26.compute-1.amazonaws.com
ec2-184-72-164-33.compute-1.amazonaws.com
ec2-107-20-33-174.compute-1.amazonaws.com
ec2-54-80-2-210.compute-1.amazonaws.com
ec2-50-17-77-137.compute-1.amazonaws.com
ec2-174-129-164-255.compute-1.amazonaws.com
ec2-23-20-81-32.compute-1.amazonaws.com
ec2-54-80-236-6.compute-1.amazonaws.com
ec2-54-226-129-134.compute-1.amazonaws.com
ec2-54-221-94-96.compute-1.amazonaws.com
ec2-23-22-81-129.compute-1.amazonaws.com
ec2-23-23-43-146.compute-1.amazonaws.com
ec2-54-196-107-67.compute-1.amazonaws.com
ec2-23-20-48-31.compute-1.amazonaws.com
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ class NewHadoopRDD[K, V](
@transient private val jobId = new JobID(jobtrackerId, id)

override def getPartitions: Array[Partition] = {
logWarning("About to instantiate inputformat.")
val inputFormat = inputFormatClass.newInstance
logWarning("Inputformat instantiated successfully")
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.BoundedPriorityQueue
import scala.collection.mutable
import org.apache.hadoop.io.{LongWritable, Text}
import java.util.{HashSet => JHashSet, TreeSet => JTreeSet}
// import org.apache.hadoop.conf.Configuration
import org.apache.mahout.text.wikipedia._

Expand All @@ -51,6 +52,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[LongWritable])
kryo.register(classOf[Text])
kryo.register(classOf[WikiArticle])
kryo.register(classOf[JHashSet[VertexId]])
kryo.register(classOf[JTreeSet[VertexId]])
// kryo.register(classOf[MakeString])
// kryo.register(classOf[PrePostProcessWikipedia])
// kryo.register(classOf[(LongWritable, Text)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import org.apache.mahout.text.wikipedia._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.Logging
import scala.collection.mutable
// import scala.collection.mutable
import java.util.{HashSet => JHashSet, TreeSet => JTreeSet}
// import org.apache.spark.graphx.MakeString


Expand Down Expand Up @@ -74,7 +75,6 @@ object PrePostProcessWikipedia extends Logging {
val rankPath = outBase + "_ranks"

val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], hadoopconf)
// .map(t => (new MakeString(t)).str)
.map(t => t._2.toString)

val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
Expand All @@ -96,30 +96,24 @@ object PrePostProcessWikipedia extends Logging {
conf.set("xmlinput.start", "<page>")
conf.set("xmlinput.end", "</page>")

logWarning("Trying to instantiate XmlInputFormat")
val xx = classOf[XmlInputFormat].newInstance
logWarning(s"XmlInputFOrmat instance: $xx")

logWarning(s"classOf[String]: ${classOf[String]}")
logWarning(s"classOf[XmlInputFormat]: ${classOf[XmlInputFormat]}")
logWarning(s"classOf[LongWritable]: ${classOf[LongWritable]}")
logWarning(s"classOf[Text]: ${classOf[Text]}")
logWarning("about to load xml rdd")
val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
.map(t => t._2.toString)
// .map(t => (new MakeString(t)).str)
xmlRDD.count
logWarning("XML RDD counted")
// xmlRDD.count
logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles.")
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
logWarning("creating graph")
val g = Graph(vertices, edges)
g.triplets.count
logWarning("Graph triplets counted.")
val resultG = pagerankConnComponentsAlt(numIters, g)
logWarning(s"Final graph has ${resultG.triplets.count()} EDGES, ${resultG.vertices.count()} VERTICES")
val cleanG = g.subgraph(x => true, (vid, vd) => vd != null)
logWarning(s"DIRTY graph has ${g.triplets.count()} EDGES, ${g.vertices.count()} VERTICES")
logWarning(s"CLEAN graph has ${cleanG.triplets.count()} EDGES, ${cleanG.vertices.count()} VERTICES")
val resultG = pagerankConnComponentsAlt(numIters, cleanG)
logWarning(s"ORIGINAL graph has ${cleanG.triplets.count()} EDGES, ${cleanG.vertices.count()} VERTICES")
logWarning(s"FINAL graph has ${resultG.triplets.count()} EDGES, ${resultG.vertices.count()} VERTICES")
// val pr = PageRank.run(g, 20)
// val prAndTitle = g
// .outerJoinVertices(pr)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
Expand Down Expand Up @@ -148,13 +142,27 @@ object PrePostProcessWikipedia extends Logging {
}
val newGraph = currentGraph.subgraph(x => true, filterTop20)
val ccGraph = ConnectedComponents.run(newGraph)
val zeroVal = new mutable.HashSet[VertexId]()
val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => {
// val zeroVal = new mutable.HashSet[VertexId]()
// val seqOp = (s: mutable.HashSet[VertexId], vtuple: (VertexId, VertexId)) => {
// s.add(vtuple._2)
// s
// }
// val combOp = (s1: mutable.HashSet[VertexId], s2: mutable.HashSet[VertexId]) => { s1 union s2}
// val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp)


val zeroVal = new JTreeSet[VertexId]()
val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => {
s.add(vtuple._2)
s
}
val combOp = (s1: mutable.HashSet[VertexId], s2: mutable.HashSet[VertexId]) => { s1 union s2}
val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp)
val combOp = (s1: JTreeSet[VertexId], s2: JTreeSet[VertexId]) => {
s1.addAll(s2)
s1
}
val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp).size()


//(new mutable.HashSet[Int]())((s: mutable.HashSet[Int], vtuple: (VertexId, Int)) => { s.add(vtuple._2); s },(s1: mutable.HashSet[Int], s2: mutable.HashSet[Int]) => { s1 union s2})

//(((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size
Expand All @@ -173,7 +181,6 @@ object PrePostProcessWikipedia extends Logging {
conf.set("xmlinput.start", "<page>");
conf.set("xmlinput.end", "</page>");
val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf)
// .map(t => (new MakeString(t)).str)
.map(t => t._2.toString)
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
.filter { art => art.relevant }.repartition(128)
Expand Down
6 changes: 0 additions & 6 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,14 @@ object Pregel extends Logging {
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
logError("In pregel apply")
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
logError("aaaa")
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
logError("bbbbb")
var activeMessages = messages.count()
logError("ccccc")
// Loop
var prevG: Graph[VD, ED] = null.asInstanceOf[Graph[VD, ED]]
logError("ddddd")
var i = 0
while (activeMessages > 0 && i < maxIterations) {
logWarning(s"In pregel iteration $i")
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Update the graph with the new vertices.
Expand Down
38 changes: 17 additions & 21 deletions graphx/src/main/scala/org/apache/spark/graphx/WikiArticle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ class WikiArticle(wtext: String) extends Serializable {
try {
XML.loadString(tiXML).text
} catch {
case e => "" // don't use null because we get null pointer exceptions
case e => "NOTFOUND" // don't use null because we get null pointer exceptions
}
}
val relevant: Boolean = !(redirect || stub || disambig || title == null)
val relevant: Boolean = !(redirect || stub || disambig || title == "NOTFOUND" || title == null)
val vertexID: VertexId = WikiArticle.titleHash(title)
val edges: HashSet[Edge[Double]] = {
val temp = neighbors.map { n => Edge(vertexID, n, 1.0) }
Expand Down Expand Up @@ -55,7 +55,7 @@ object WikiArticle {
val temp: Array[String] = matcher.group(1).split("\\|")
if (temp != null && temp.length > 0) {
val link: String = temp(0)
if (link.contains(":") == false) {
if (!link.contains(":")) {
linkBuilder += link
}
}
Expand All @@ -73,24 +73,20 @@ object WikiArticle {
private def titleHash(title: String): VertexId = { math.abs(WikiArticle.myHashcode(canonicalize(title))) }

private def myHashcode(s: String): Long = {
// var h: Long = 1125899906842597L // prime
// var h: Long = 4294967291L // prime
// var h = 29
// val len: Int = s.length
// for (i<- 0 until len) {
// h = 31*h + s.charAt(i)
// }
// h
// // s.hashCode()
// }

val md: MessageDigest = MessageDigest.getInstance("MD5")
md.update(s.getBytes)
val result: Array[Byte] = md.digest()
val longResult = ByteBuffer.wrap(result).getLong
// shift result by 2
val retval = longResult >> 10
retval
var h: Long = 1125899906842597L // prime
// var h = 29
val len: Int = s.length
for (i <- 0 until len) {
h = 31*h + s.charAt(i)
}
h
// val md: MessageDigest = MessageDigest.getInstance("MD5")
// md.update(s.getBytes)
// val result: Array[Byte] = md.digest()
// val longResult = ByteBuffer.wrap(result).getLong
// // shift result by 2
// val retval = longResult >> 10
// retval
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ object GraphImpl {
GraphImpl(vertexRDD, edgeRDD)
}

// NOTE(crankshaw) this is the constructor the wiki pipeline uses
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
Expand Down

0 comments on commit 75b2da7

Please sign in to comment.