Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
Fix problems in GraphX building code for issue #4
Browse files Browse the repository at this point in the history
  • Loading branch information
nilesh-c committed May 15, 2017
1 parent 33545bb commit f6c8d34
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,24 @@ trait GraphXGraphOps[Rdf <: SparkGraphX{ type Blah = Rdf }]
}

def makeGraph(triples: RDD[Rdf#Triple]): Rdf#Graph = {
val spo: RDD[(Rdf#Node, (Rdf#URI, Rdf#Node))] = triples.map {
case Triple(s, p, o) => (s, (p, o))
val spo: RDD[(Rdf#Node, (Rdf#URI, Rdf#Node))] = triples.map(fromTriple).map {
case (s, p, o) => (s, (p, o))
}

val vertexIDs = spo.flatMap {
case (s: Rdf#Node, (p: Rdf#URI, o: Rdf#Node)) =>
case (s, (p, o)) =>
Seq(s, p.asInstanceOf[Rdf#Node], o)
}.zipWithUniqueId()

val vertices: RDD[(VertexId, Rdf#Node)] = vertexIDs.map(v => (v._2, v._1))

val subjectMappedEdges = spo.join(vertexIDs).map {
case (s: Rdf#Node, ((p: Rdf#URI, o: Rdf#Node), sid: Long)) =>
case (s, ((p, o), sid)) =>
(o, (sid, p))
}

val subjectObjectMappedEdges: RDD[Edge[Rdf#URI]] = subjectMappedEdges.join(vertexIDs).map {
case (o: Rdf#Node, ((sid: Long, p: Rdf#URI), oid: Long)) =>
case (o, ((sid, p), oid)) =>
Edge(sid, oid, p)
}

Expand Down

0 comments on commit f6c8d34

Please sign in to comment.