From 0645183f4bc548bba820d6ef669edc72e6cfc5b2 Mon Sep 17 00:00:00 2001 From: Dan Crankshaw Date: Mon, 24 Mar 2014 21:34:00 +0000 Subject: [PATCH] Fixed IO bugs. --- .../scala/org/apache/spark/graphx/GraphLoader.scala | 4 ++-- .../apache/spark/graphx/WikiPipelineBenchmark.scala | 12 ++++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index bca17241538f4..13224acfc47cd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -87,11 +87,11 @@ object GraphLoader extends Logging { GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) } // end of edgeListFile - def loadVertices(sc: SparkContext, vertexPath: String): RDD[(VertexId, String)] = { + def loadVertices(sc: SparkContext, vertexPath: String, delimiter: String = "\\s+"): RDD[(VertexId, String)] = { val vertices = sc.textFile(vertexPath, 128).mapPartitions( iter => iter.filter(line => !line.isEmpty && line(0) != '#').map { line => - val lineArray = line.split("\\s+") + val lineArray = line.split(delimiter) if(lineArray.length < 2) { println("Invalid line: " + line) assert(false) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala index f28a1a3791b34..86e52ce20f5e4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala @@ -37,8 +37,10 @@ object WikiPipelineBenchmark extends Logging { val rawData = args(2) val outBase = args(3) val (vertices, edges) = extractLinkGraph(sc, rawData) - val rawEdges = edges.map(e => (e.srcId, e.dstId)) - writeGraphAsText(outBase, vertices, rawEdges, 0) + val g = Graph(vertices, edges) + val cleanG = g.subgraph(x => true, (vid, vd) => vd != null).cache + val rawEdges = cleanG.edges.map(e => (e.srcId, e.dstId)) + writeGraphAsText(outBase, cleanG.vertices, rawEdges, 0) } case "analyze" => { @@ -113,9 +115,11 @@ object WikiPipelineBenchmark extends Logging { def writeGraphAsText[V](basePath: String, vertices: RDD[(VertexId, V)], edges: RDD[(VertexId, VertexId)], + // graph: Graph[V, _], iter: Int = 0) { val verticesToSave = vertices.map {v => s"${v._1}\t${v._2}"} val edgesToSave = edges.map {e => s"${e._1}\t${e._2}"} + logWarning(s"Writing ${verticesToSave.count} VERTICES, ${edgesToSave.count} EDGES to file") verticesToSave.saveAsTextFile(s"${basePath}_vertices_$iter") edgesToSave.saveAsTextFile(s"${basePath}_edges_$iter") } @@ -123,7 +127,7 @@ object WikiPipelineBenchmark extends Logging { def readEdgesFromText(sc: SparkContext, path: String): RDD[(VertexId, VertexId)] = { sc.textFile(path, 128).map { line => val lineSplits = line.split("\\s+") - (lineSplits(0).toInt, lineSplits(1).toInt) + (lineSplits(0).toLong, lineSplits(1).toLong) } } @@ -155,7 +159,7 @@ object WikiPipelineBenchmark extends Logging { def pipelinePostProcessing(sc: SparkContext, basePath: String, iter: Int) { val pageranks = GraphLoader.loadVertices(sc, s"${basePath}_${iter}_prs*") .map {v => (v._1, v._2.toDouble) } - val connComponents = GraphLoader.loadVertices(sc, s"${basePath}_${iter}_ccs*") + val connComponents = GraphLoader.loadVertices(sc, s"${basePath}_${iter}_ccs*", ",") .map {v => (v._1, v._2.toLong) } val edges = readEdgesFromText(sc, s"${basePath}_edges_$iter") val artNames = GraphLoader.loadVertices(sc, s"${basePath}_vertices_$iter")