Skip to content

Commit

Permalink
Fixed compile errors
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Mar 24, 2014
1 parent e6fc93b commit a13f3b9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[WikiArticle])
// kryo.register(classOf[JHashSet[VertexId]])
kryo.register(classOf[JTreeSet[VertexId]])
kryo.register(classOf[TrackCounts])
// kryo.register(classOf[MakeString])
// kryo.register(classOf[PrePostProcessWikipedia])
// kryo.register(classOf[(LongWritable, Text)])

// This avoids a large number of hash table lookups.
kryo.setReferences(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,21 @@ object WikiPipelineBenchmark extends Logging {
case "graphx" => {
val rawData = args(2)
val numIters = args(3).toInt
val result = benchmarkGraphx(sc, rawData, numIters)
logWarning(result)
benchmarkGraphx(sc, rawData, numIters)
}

case "extract" => {
val rawData = args(2)
val outBase = args(3)
val (vertices, edges) = extractLinkGraph(sc, rawData)
writeGraphAsText(outBase, vertices, edges, 0)
val rawEdges = edges.map(e => (e.srcId, e.dstId))
writeGraphAsText(outBase, vertices, rawEdges, 0)
}

case "analyze" => {
val outBase = args(2)
val iter = args(3).toInt
pipelinePostProcessing(sc, outBase, iter)

}

case _ => throw new IllegalArgumentException("Please provide a valid process")
Expand Down Expand Up @@ -111,18 +110,20 @@ object WikiPipelineBenchmark extends Logging {
currentGraph
}

def writeGraphAsText[V](basePath: String, vertices: RDD[(VertexId, U)], edges: RDD[Edge[Double]], iter: Int = 0) {
def writeGraphAsText[V](basePath: String,
vertices: RDD[(VertexId, V)],
edges: RDD[(VertexId, VertexId)],
iter: Int = 0) {
val verticesToSave = vertices.map {v => s"${v._1}\t${v._2}"}
val edgesToSave = edges.map {e => s"${e.srcId}\t${e.dstId}"}
vertices.saveAsTextFile(s"${basePath}_vertices_$iter")
edges.saveAsTextFile(s"${basePath}_edges_$iter")
val edgesToSave = edges.map {e => s"${e._1}\t${e._2}"}
verticesToSave.saveAsTextFile(s"${basePath}_vertices_$iter")
edgesToSave.saveAsTextFile(s"${basePath}_edges_$iter")
}

// assumes vertex attr is string, can be parsed afterwards
def readEdgesFromText(sc: SparkContext, path: String): RDD[(VertexId, VertexId)] = {
sc.textFile(path, 128).map { line =>
val lineSplits = line.split("\\s+")
(lineSplits(0), lineSplits(1))
(lineSplits(0).toInt, lineSplits(1).toInt)
}
}

Expand Down Expand Up @@ -152,9 +153,9 @@ object WikiPipelineBenchmark extends Logging {
}

def pipelinePostProcessing(sc: SparkContext, basePath: String, iter: Int) {
val pageranks = GraphLoader.loadVertices(sc, s"${basePath}_prs")
val pageranks = GraphLoader.loadVertices(sc, s"${basePath}_${iter}_prs*")
.map {v => (v._1, v._2.toDouble) }
val connComponents = GraphLoader.loadVertices(sc, s"${basePath}_ccs")
val connComponents = GraphLoader.loadVertices(sc, s"${basePath}_${iter}_ccs*")
.map {v => (v._1, v._2.toLong) }
val edges = readEdgesFromText(sc, s"${basePath}_edges_$iter")
val artNames = GraphLoader.loadVertices(sc, s"${basePath}_vertices_$iter")
Expand All @@ -174,7 +175,7 @@ object WikiPipelineBenchmark extends Logging {
logWarning(s"Number of connected components for iteration $iter: $numCCs")
val top20verts = top20.map(_._1).toSet
val newVertices = artNames.filter { case (v, d) => !top20verts.contains(v) }
val newEdges = edges.filter { case (s, d) => !(top20verts.contains(v) || top20verts.contains(d)) }
val newEdges = edges.filter { case (s, d) => !(top20verts.contains(s) || top20verts.contains(d)) }
writeGraphAsText(basePath, newVertices, newEdges, iter + 1)
}
}

0 comments on commit a13f3b9

Please sign in to comment.