Skip to content

Commit

Permalink
Fixed partitioning issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Mar 2, 2014
1 parent fad630f commit 3342751
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 56 deletions.
2 changes: 1 addition & 1 deletion conf/core-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<property>
<name>fs.default.name</name>
<value>hdfs://ec2-54-80-197-211.compute-1.amazonaws.com:9000</value>
<value>hdfs://ec2-54-196-98-120.compute-1.amazonaws.com:9000</value>
</property>

<property>
Expand Down
32 changes: 16 additions & 16 deletions conf/slaves
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
ec2-54-81-55-13.compute-1.amazonaws.com
ec2-54-80-42-122.compute-1.amazonaws.com
ec2-107-20-119-33.compute-1.amazonaws.com
ec2-54-224-98-126.compute-1.amazonaws.com
ec2-54-80-27-215.compute-1.amazonaws.com
ec2-54-221-106-106.compute-1.amazonaws.com
ec2-23-21-15-4.compute-1.amazonaws.com
ec2-50-16-37-65.compute-1.amazonaws.com
ec2-54-80-199-27.compute-1.amazonaws.com
ec2-54-227-141-97.compute-1.amazonaws.com
ec2-54-80-200-145.compute-1.amazonaws.com
ec2-23-22-155-180.compute-1.amazonaws.com
ec2-54-197-154-251.compute-1.amazonaws.com
ec2-54-227-50-5.compute-1.amazonaws.com
ec2-54-197-117-246.compute-1.amazonaws.com
ec2-54-196-135-53.compute-1.amazonaws.com
ec2-54-205-1-88.compute-1.amazonaws.com
ec2-54-211-158-212.compute-1.amazonaws.com
ec2-54-81-44-29.compute-1.amazonaws.com
ec2-107-22-38-86.compute-1.amazonaws.com
ec2-54-80-214-130.compute-1.amazonaws.com
ec2-54-80-215-61.compute-1.amazonaws.com
ec2-54-197-175-71.compute-1.amazonaws.com
ec2-54-205-212-89.compute-1.amazonaws.com
ec2-23-21-29-224.compute-1.amazonaws.com
ec2-54-227-58-187.compute-1.amazonaws.com
ec2-54-197-99-10.compute-1.amazonaws.com
ec2-50-19-13-180.compute-1.amazonaws.com
ec2-184-72-163-63.compute-1.amazonaws.com
ec2-54-196-86-139.compute-1.amazonaws.com
ec2-50-16-47-48.compute-1.amazonaws.com
ec2-54-227-45-168.compute-1.amazonaws.com
Original file line number Diff line number Diff line change
Expand Up @@ -139,35 +139,28 @@ object PrePostProcessWikipedia extends Logging {
.map(t => t._2.toString)
// xmlRDD.count
logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")
val repartXMLRDD = xmlRDD.repartition(128)
logWarning(s"XML RDD repartitioned. Found ${repartXMLRDD.count} raw articles.")

val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) }.cache
// val numRedirects = allArtsRDD.filter { art => art.redirect }.count
// val numStubs = allArtsRDD.filter { art => art.stub }.count
// val numDisambig = allArtsRDD.filter { art => art.disambig }.count
// val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count
// logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound")
val allArtsRDD = repartXMLRDD.map { raw => new WikiArticle(raw) }.cache
logWarning(s"Total articles: Found ${allArtsRDD.count} UNPARTITIONED articles.")

val wikiRDD = allArtsRDD.filter { art => art.relevant }.cache //.repartition(128)
wikiRDD.repartition(128)
logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles in ${wikiRDD.partitions.size} partitions")


// val repartAllArtsRDD = allArtsRDD.repartition(128)
// logWarning(s"Total articles: Found ${repartAllArtsRDD.count} PARTITIONED articles.")
// val wikiRDD = unpartWikiRDD.repartition(128).cache
// val wikiRDD = unpartWikiRDD.coalesce(128, false).cache
// logWarning(s"WikiRDD partitions size: ${wikiRDD.partitions.size}")

// val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)
val wikiRDDCount = wikiRDD.count
logWarning(s"wikiRDD counted. Found ${wikiRDDCount} relevant articles.")

// val wikiRDDCount = wikiRDD.count
// logWarning(s"wikiRDD counted. Found ${wikiRDDCount} relevant articles.")
// logWarning("Counting differently")

// count: redirects, stubs, disambigs, titlenotfound, titlenull, relevant, total
// val zeroCount = new TrackCounts
// val countSeqOp = (curCount: TrackCounts, art: WikiArticle) => {
// curCount.addArticle(art)
// curCount
// }
// val countCombOp = (c1: TrackCounts, c2: TrackCounts) => {
// c1.update(c2)
// c1
// }
//
// val cr = allArtsRDD.aggregate(zeroCount)(countSeqOp, countCombOp)
// logWarning(s"Different count results: $cr")
// System.exit(0)

val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
Expand All @@ -179,11 +172,6 @@ object PrePostProcessWikipedia extends Logging {
val resultG = pagerankConnComponentsAlt(numIters, cleanG)
logWarning(s"ORIGINAL graph has ${cleanG.triplets.count()} EDGES, ${cleanG.vertices.count()} VERTICES")
logWarning(s"FINAL graph has ${resultG.triplets.count()} EDGES, ${resultG.vertices.count()} VERTICES")
// val pr = PageRank.run(g, 20)
// val prAndTitle = g
// .outerJoinVertices(pr)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))})
// val top20 = prAndTitle.vertices.top(20)(Ordering.by((entry: (VertexId, (String, Double))) => entry._2._2))
// top20.mkString("\n")

}

Expand Down
22 changes: 11 additions & 11 deletions graphx/src/main/scala/org/apache/spark/graphx/WikiArticle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import java.nio.ByteBuffer


class WikiArticle(wtext: String) extends Serializable {
@transient val links: Array[String] = WikiArticle.parseLinks(wtext)
@transient val neighbors = links.map(WikiArticle.titleHash).distinct
@transient lazy val redirect: Boolean = !WikiArticle.redirectPattern.findFirstIn(wtext).isEmpty
@transient lazy val stub: Boolean = !WikiArticle.stubPattern.findFirstIn(wtext).isEmpty
@transient lazy val disambig: Boolean = !WikiArticle.disambigPattern.findFirstIn(wtext).isEmpty
@transient lazy val tiXML = WikiArticle.titlePattern.findFirstIn(wtext).getOrElse("")
val links: Array[String] = WikiArticle.parseLinks(wtext)
val neighbors = links.map(WikiArticle.titleHash).distinct
val redirect: Boolean = !WikiArticle.redirectPattern.findFirstIn(wtext).isEmpty
val stub: Boolean = !WikiArticle.stubPattern.findFirstIn(wtext).isEmpty
val disambig: Boolean = !WikiArticle.disambigPattern.findFirstIn(wtext).isEmpty
val tiXML = WikiArticle.titlePattern.findFirstIn(wtext).getOrElse("")
val title: String = {
try {
XML.loadString(tiXML).text
Expand All @@ -42,11 +42,11 @@ class WikiArticle(wtext: String) extends Serializable {
}

object WikiArticle {
@transient val titlePattern = "<title>(.*)<\\/title>".r
@transient val redirectPattern = "#REDIRECT\\s+\\[\\[(.*?)\\]\\]".r
@transient val disambigPattern = "\\{\\{disambig\\}\\}".r
@transient val stubPattern = "\\-stub\\}\\}".r
@transient val linkPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE)
val titlePattern = "<title>(.*)<\\/title>".r
val redirectPattern = "#REDIRECT\\s+\\[\\[(.*?)\\]\\]".r
val disambigPattern = "\\{\\{disambig\\}\\}".r
val stubPattern = "\\-stub\\}\\}".r
val linkPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE)

val notFoundString = "NOTFOUND"

Expand Down

0 comments on commit 3342751

Please sign in to comment.