# 04장 Transforming and Shaping Up Graphs to Your Needs


- Use property operators to modify vertex or edge properties
- Use structural operators to modify the shape of a graph
- Join additional RDD collections with a property graph

## 01절 Transforming the vertex and edge attributes


```
class Graph[VD, ED] {
    def mapVertices[VD2](mapFun: (VertexId, VD) => VD2): Graph[VD2, ED]
    def mapEdges[ED2](mapFun: Edge[ED] => ED2): Graph[VD, ED2]
    def mapTriplets[ED2](mapFun: EdgeTriplet[VD, ED] => ED2):  Graph[VD, ED2]
}
```

- Each of them also takes a user-defined mapping function mapFun that performs one of the following:

    - For mapVertices, mapFun takes a pair of (VertexId, VD) as input and returns a transformed vertex attribute of type VD2.
    - For mapEdges, mapFun takes an Edge object as input and returns a transformed edge attribute of type ED2.
    - For mapTriplets, mapFun takes an EdgeTriplet object as input and returns a transformed edge attribute of type ED2.
    
### mapVertices

```
case class Person(first: String, last: String, age: Int)
case class Link(relationship: String, duration: Float)

val inputGraph: Graph[Person, Link] = Graph(people, links)

val outputGraph: Graph[String, Link] = inputGraph.mapVertices((_, person) => person.first + person.last)
```

### mapEdges
```
val outputGraph: Graph[Person, String] = inputGraph.mapEdges(link => link.relationship)
```

### mapTriplets
```
val outputGraph: Graph[Person, (Int, Int)] = inputGraph.mapTriplets(t =>       (t.srcAttr.age - t.attr.duration,
t.dstAttr.age - t.attr.duration)
)
```



## 02절 Modifying graph structures

- The GraphX library also comes with four useful methods for changing the structure of graphs.
```
class Graph[VD, ED] {
    def reverse: Graph[VD, ED]
    
    def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
    
    def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
    
    def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
```

### The reverse operator
- the reverse operator returns a new graph with all the edge directions reversed.

### The subgraph operator
- subgraph is useful for filtering graphs
- It takes two predicate functions as arguments that return Boolean values.
     - The first predicate epred takes an EdgeTriplet and returns true when the triplet satisfies the predicate.
     - vpred predicate takes a pair of (VertexId, VD) and returns true when the vertex satisfies the predicate condition.
     
- "Which people in my friends' list of friends are not yet my friends?":

```
// Given a social network
type Name = String
class Person(name: Name, friends: List[Name])
val socialNetwork: Graph[Person, Int] = ...

// that I am part of
val me = Person(myName, myFriends)

// I want know my friends' friends that are not yet my friends
val potentialFriends = socialNetwork.subgraph(vpred = (_, p: Person) => !(me.friends contains p.name))
```

### The mask operator
- The mask operator also filters a graph on which it is invoked
- the expression graph.mask(anotherGraph) constructs a subgraph of graph by returning a graph that contains the vertices and edges that are also found in anotherGraph.

```
// Run Connected Components
val ccGraph = graph.connectedComponents()

// Remove vertices with missing attribute values and the edges connected to them
val validGraph = graph.subgraph(vpred = (_, attr) => attr.info != "NA")

// Restrict the resulting components to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
```

### The groupEdges operator
- The groupEdges operator is another structural operator that merges duplicate edges between each pair of nodes into a single edge.
- groupEdges requires one function argument named merge merge, which takes a pair of edge attributes of type ED and combines them into a single attribute value of the same type.

## 03절 Joining graph datasets

### joinVertices

```
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
```

### outerJoinVertices

- outerJoinVertices, which is a more general method than joinVertices.
```
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
```

### Example – Hollywood movie graph

In [None]:
bin/spark-shell  --driver-memory 2g  --master local[4]

# build a small graph of movie actors and actresses 
val actors: RDD[(VertexId, String)] = sc.parallelize(List(
(1L, "George Clooney"),
(2L, "Julia Stiles"),
(3L, "Will Smith"), 
(4L, "Matt Damon"),
(5L, "Salma Hayek")))
    
val movies: RDD[Edge[String]] = sc.parallelize(List(
Edge(1L, 4L,"Ocean's Eleven"),
Edge(2L, 4L, "Bourne Ultimatum"),
Edge(3L, 5L, "Wild Wild West"),
Edge(1L, 5L, "From Dusk Till Dawn"),
Edge(3L, 4L, "The Legend of Bagger Vance")))
                                               
val movieGraph = Graph(actors, movies) 
                                               
movieGraph.triplets.foreach(t => println(
  t.srcAttr + " & " + t.dstAttr + " appeared in " + t.attr)
)

movieGraph.vertices.foreach(println)

In [None]:
# Suppose we have access to a dataset of actor biographies
# quickly load one such dataset into a vertex RDD:

case class Biography(birthname: String, hometown: String)

val bio: RDD[(VertexId, Biography)] = sc.parallelize(List(
(2, Biography("Julia O'Hara Stiles", "NY City, NY, USA")),
(3, Biography("Willard Christopher Smith Jr.", "Philadelphia, PA, USA")),
(4, Biography("Matthew Paige Damon", "Boston, MA, USA")),
(5, Biography("Salma Valgarma Hayek-Jimenez", "Coatzacoalcos, Veracruz, Mexico")),
(6, Biography("José Antonio Domínguez Banderas", "Málaga, Andalucía, Spain")),
(7, Biography("Paul William Walker IV", "Glendale, CA, USA"))
))

In [None]:
# use joinVertices to join this information to our movie graph.
def appendHometown(id: VertexId, name: String, bio: Biography) : String = {
    name + ":"+ bio.hometown
}
    
val movieJoinedGraph = movieGraph.joinVertices(bio)(appendHometown)
movieJoinedGraph.vertices.foreach(println)

In [None]:
# let's use outerJoinVertices to see the difference
val movieOuterJoinedGraph = movieGraph.outerJoinVertices(bio)((_,name, bio) => (name,bio))
movieOuterJoinedGraph.vertices.foreach(println)

In [None]:
# we can use the getOrElse method defined on Option[T] and
# provide a default new attribute value for the vertices that are not present in the
# passed vertex RDD:

val movieOuterJoinedGraph = movieGraph.outerJoinVertices(bio)(
    (_, name, bio) => (name,bio.getOrElse(Biography("NA","NA")))
)
movieOuterJoinedGraph.vertices.foreach(println)


In [None]:
# it is possible to create a new return type for the joined vertices.
# we can create a type Actor to generate a new graph of type
# Graph[Actor,String] as follows:

case class Actor(name: String, birthname: String, hometown: String)
val movieOuterJoinedGraph = movieGraph.outerJoinVertices(bio)((_,name, b) => b match {
    case Some(bio) => Actor(name, bio.birthname, bio.hometown)
    case None => Actor(name, "", "")
})
movieOuterJoinedGraph.vertices.foreach(println)


## 04절 Data operations on VertexRDD and EdgeRDD

- we will introduce operations that transform VertexRDD and EdgeRDD collections.
- The types of these collections are subtypes of RDD[(VertexID, VD)] and RDD[Edge[ED]]

### Mapping VertexRDD and EdgeRDD

```
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]

def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
```

In [None]:
val actorsBio = movieJoinedGraph.vertices
actorsBio.foreach(println)

actorsBio.mapValues(s => s.split(':')(0)).foreach(println)
actorsBio.mapValues((vid,s) => s.split(':')(0)).foreach(println)

### Filtering VertexRDDs

- Using the filter method, we can also filter VertexRDD collections.
```
def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]
```

- the diff operation also filters vertices inside a VertexRDD collection.
- It takes another VertexRDD set as input and removes vertices from the original set that are also in the input set:
```
def diff(other: VertexRDD[VD]): VertexRDD[VD]
```

### Joining VertexRDDs

```
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

def leftJoin[U, VD2](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[U]) => VD2): VertexRDD[VD2]
```

In [None]:
val actors = movieGraph.vertices
actors.innerJoin(bio)(
    (vid, name, b) => name + " is from " + b.hometown
).foreach(println)

actors.leftJoin(bio)((vid, name, b) => b match {
    case Some(bio) => name + " is from " + bio.hometown
    case None => name + "\'s hometown is unknown"
}).foreach( println )

### Joining EdgeRDDs

```
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
```

### Reversing edge directions

```
def reverse: EdgeRDD[ED]
```

In [None]:
val movies = movieGraph.edges
movies.foreach(println)

val bidirectedGraph = Graph(actors, movies union movies.reverse)
bidirectedGraph.edges.foreach(println)

### Collecting neighboring information

```
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
```

- EdgeDirection attribute can take four possible values:
    - Edge.Direction.In: When this option is specified, each vertex collects only the attributes of neighbors that have an incoming link to it
    - Edge.Direction.Out: Each vertex collects only the attributes of neighbors that it links to
    - Edge.Direction.Either: Each vertex collects the attributes of all its neighbors
    - Edge.Direction.Both: Each vertex collects the attributes of the neighbors with which it has both an incoming edge and outgoing one


### Example – from food network to flavor pairing

In [None]:
import scala.io.Source

Source.fromFile("./data/ingr_info.tsv").getLines().take(7).foreach(println)
Source.fromFile("./data/comp_info.tsv").getLines().take(7).foreach(println)
Source.fromFile("./data/ingr_comp.tsv").getLines().take(7).foreach(println)

class FNNode(val name: String) extends Serializable  
case class Ingredient(override val name: String, category: String) extends FNNode(name)
case class Compound(override val name: String, cas: String) extends FNNode(name)


val ingredients: RDD[(VertexId, FNNode)] =
sc.textFile("./data/ingr_info.tsv").
    filter(! _.startsWith("#")).
    map {line =>
        val row = line split '\t'
        (row(0).toInt, Ingredient(row(1), row(2)))
}
    
val compounds: RDD[(VertexId, FNNode)] =
sc.textFile("./data/comp_info.tsv").
    filter(! _.startsWith("#")).
    map {line =>
        val row = line split '\t'
        (10000L + row(0).toInt, Compound(row(1), row(2)))
}
    
val links: RDD[Edge[Int]] =
sc.textFile("./data/ingr_comp.tsv").
    filter(! _.startsWith("#")).
    map {line =>
        val row = line split '\t'
        Edge(row(0).toInt, 10000L + row(1).toInt, 1)
}
    
val nodes = ingredients ++ compounds

val foodNetwork = Graph(nodes, links)
val similarIngr: RDD[(VertexId, Array[VertexId])] = 
    foodNetwork.collectNeighborIds(EdgeDirection.In)
    
def pairIngredients(ingPerComp: (VertexId, Array[VertexId])):
Seq[Edge[Int]] =
    for {
        x <- ingPerComp._2
        y <- ingPerComp._2
        if x != y
} yield Edge(x,y,1)
    
val flavorPairsRDD: RDD[Edge[Int]] = similarIngr flatMap pairIngredients    
    
val flavorNetwork = Graph(ingredients, flavorPairsRDD).cache

flavorNetwork.triplets.take(20).foreach(println)


val flavorWeightedNetwork = 
    flavorNetwork.partitionBy(PartitionStrategy.EdgePartition2D).groupEdges((x,y) => x+y)

flavorWeightedNetwork.triplets.sortBy(t => t.attr, false).take(20).foreach(
    t => println(
        t.srcAttr.name + " and " + t.dstAttr.name + " share" + t.attr + " compounds."
    )
)