Skip to content

Commit

Permalink
Fixed IO bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
dcrankshaw committed Mar 24, 2014
1 parent a13f3b9 commit 0645183
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ object GraphLoader extends Logging {
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
} // end of edgeListFile

def loadVertices(sc: SparkContext, vertexPath: String): RDD[(VertexId, String)] = {
def loadVertices(sc: SparkContext, vertexPath: String, delimiter: String = "\\s+"): RDD[(VertexId, String)] = {

val vertices = sc.textFile(vertexPath, 128).mapPartitions( iter =>
iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
val lineArray = line.split("\\s+")
val lineArray = line.split(delimiter)
if(lineArray.length < 2) {
println("Invalid line: " + line)
assert(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ object WikiPipelineBenchmark extends Logging {
val rawData = args(2)
val outBase = args(3)
val (vertices, edges) = extractLinkGraph(sc, rawData)
val rawEdges = edges.map(e => (e.srcId, e.dstId))
writeGraphAsText(outBase, vertices, rawEdges, 0)
val g = Graph(vertices, edges)
val cleanG = g.subgraph(x => true, (vid, vd) => vd != null).cache
val rawEdges = cleanG.edges.map(e => (e.srcId, e.dstId))
writeGraphAsText(outBase, cleanG.vertices, rawEdges, 0)
}

case "analyze" => {
Expand Down Expand Up @@ -113,17 +115,19 @@ object WikiPipelineBenchmark extends Logging {
def writeGraphAsText[V](basePath: String,
vertices: RDD[(VertexId, V)],
edges: RDD[(VertexId, VertexId)],
// graph: Graph[V, _],
iter: Int = 0) {
val verticesToSave = vertices.map {v => s"${v._1}\t${v._2}"}
val edgesToSave = edges.map {e => s"${e._1}\t${e._2}"}
logWarning(s"Writing ${verticesToSave.count} VERTICES, ${edgesToSave.count} EDGES to file")
verticesToSave.saveAsTextFile(s"${basePath}_vertices_$iter")
edgesToSave.saveAsTextFile(s"${basePath}_edges_$iter")
}

def readEdgesFromText(sc: SparkContext, path: String): RDD[(VertexId, VertexId)] = {
sc.textFile(path, 128).map { line =>
val lineSplits = line.split("\\s+")
(lineSplits(0).toInt, lineSplits(1).toInt)
(lineSplits(0).toLong, lineSplits(1).toLong)
}
}

Expand Down Expand Up @@ -155,7 +159,7 @@ object WikiPipelineBenchmark extends Logging {
def pipelinePostProcessing(sc: SparkContext, basePath: String, iter: Int) {
val pageranks = GraphLoader.loadVertices(sc, s"${basePath}_${iter}_prs*")
.map {v => (v._1, v._2.toDouble) }
val connComponents = GraphLoader.loadVertices(sc, s"${basePath}_${iter}_ccs*")
val connComponents = GraphLoader.loadVertices(sc, s"${basePath}_${iter}_ccs*", ",")
.map {v => (v._1, v._2.toLong) }
val edges = readEdgesFromText(sc, s"${basePath}_edges_$iter")
val artNames = GraphLoader.loadVertices(sc, s"${basePath}_vertices_$iter")
Expand Down

0 comments on commit 0645183

Please sign in to comment.