# Lab 2 - Social Network Analysis with GraphX
The following steps demonstrate how to use GraphX to make a base graph and apply functions to it.

Throughout this assignment we will use the following property graph that makes a small social network with users and their ages modeled as vertices and likes modeled as directed edges.

<img src="figs/social_graph.png" width="500">

We begin by creating the property graph from arrays of vertices and edges. Later we will demonstrate how to load real data. Here we use the `Edge` class. Edges have a `srcId` and a `dstId` corresponding to the source and destination vertex identifiers. In addition, the `Edge` class has an `attr` member that stores the edge property (in this case the number of likes). Use `sc.parallelize` to construct the following RDDs from the `vertexArray` and `edgeArray` variables, and then build a property graph. The basic property graph constructor takes an RDD of vertices (with type `RDD[(VertexId, V)]`) and an RDD of edges (with type `RDD[Edge[E]]`) and builds a graph (with type `Graph[V, E]`). 

In [2]:
// TODO: Replace <FILL IN> with appropriate code
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
 
val vertexArray = Array(
  (1L, ("Alice", 28)),
  (2L, ("Bob", 27)),
  (3L, ("Charlie", 65)),
  (4L, ("David", 42)),
  (5L, ("Ed", 55)),
  (6L, ("Fran", 50))
  )
val edgeArray = Array(
  Edge(2L, 1L, 7),
  Edge(2L, 4L, 2),
  Edge(3L, 2L, 4),
  Edge(3L, 6L, 3),
  Edge(4L, 1L, 1),
  Edge(5L, 2L, 2),
  Edge(5L, 3L, 8),
  Edge(5L, 6L, 3)
  )

val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

vertexArray = Array((1,(Alice,28)), (2,(Bob,27)), (3,(Charlie,65)), (4,(David,42)), (5,(Ed,55)), (6,(Fran,50)))
edgeArray = Array(Edge(2,1,7), Edge(2,4,2), Edge(3,2,4), Edge(3,6,3), Edge(4,1,1), Edge(5,2,2), Edge(5,3,8), Edge(5,6,3))
vertexRDD = ParallelCollectionRDD[14] at parallelize at <console>:59
edgeRDD = ParallelCollectionRDD[15] at parallelize at <console>:60
graph = org.apache.spark.graphx.impl.GraphImpl@25fea379


org.apache.spark.graphx.impl.GraphImpl@25fea379

In many cases we will want to extract the vertex and edge RDD views of a graph. As a consequence, the graph class contains members (`graph.vertices` and `graph.edges`) to access the vertices and edges of the graph. While these members extend `RDD[(VertexId, V)`] and `RDD[Edge[E]]` they are actually backed by optimized representations that leverage the internal GraphX representation of graph data. Below, use `graph.vertices` to display the names of the users that are at least 30 years old. The output should contain:
```
David is 42
Fran is 50
Ed is 55
Charlie is 65
```

In [3]:
graph.vertices.filter {case (id, (name, age)) => age >= 30}.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))

David is 42
Ed is 55
Fran is 50
Charlie is 65


Now, use the `graph.triplets` view to display who likes who. The output should look like:
```
Bob likes Alice
Bob likes David
Charlie likes Bob
Charlie likes Fran
David likes Alice
Ed likes Bob
Ed likes Charlie
Ed likes Fran
```

In [4]:
// TODO: Replace <FILL IN> with appropriate code
for (triplet <- graph.triplets) {
  println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
}

If someone likes someone else more than 5 times than that relationship is getting pretty serious. For extra credit, find the lovers.

In [5]:
// TODO: Replace <FILL IN> with appropriate code
for (triplet <- graph.triplets.filter(triplet => triplet.attr >= 5)) {
  println(s"${triplet.srcAttr._1} loves ${triplet.dstAttr._1}")
}

You can compute the in-degree of the graph using the `graph.inDegrees` operators that returns a `VertexRDD[Int]`.

In [6]:
val inDegrees: VertexRDD[Int] = graph.inDegrees

inDegrees = VertexRDDImpl[39] at RDD at VertexRDD.scala:57


VertexRDDImpl[39] at RDD at VertexRDD.scala:57

Now, let's incorporate the in and out degree of each vertex into the vertex property. To do this, we first define a `User` class to better organize the vertex property and build a new graph with the user property. We initialized each vertex with 0 in and out degree. Then, we join the in and out degree information with each vertex building the new vertex property. Here we use the `outerJoinVertices` method of `Graph` that takes two argument lists: (i) an RDD of vertex values, and (ii) a function from the id, attribute, and Optional matching value in the RDD to a new vertex value. The `outerJoinVertices` has the following type signature:
```scala
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
    (mapFunc: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED]
```

In [10]:
// TODO: Replace <FILL IN> with appropriate code
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// Define a class to more clearly model the user property
case class User(name: String, age: Int, inDeg: Int, outDeg: Int)

// Create a user Graph
val initialUserGraph: Graph[User, Int] = graph.mapVertices{ case (id, (name, age)) => User(name, age, 0, 0) }
//
// Fill in the degree information
val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
  case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0),u.outDeg)
}.outerJoinVertices(initialUserGraph.outDegrees) {
  case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0))
}


defined class User
initialUserGraph = org.apache.spark.graphx.impl.GraphImpl@641e9fec
userGraph = org.apache.spark.graphx.impl.GraphImpl@38e37fe0


org.apache.spark.graphx.impl.GraphImpl@38e37fe0

Using the `degreeGraph` print the number of people who like each user:
```
User 1 is called Alice and is liked by 2 people.
User 2 is called Bob and is liked by 2 people.
User 3 is called Charlie and is liked by 1 people.
User 4 is called David and is liked by 1 people.
User 5 is called Ed and is liked by 0 people.
User 6 is called Fran and is liked by 2 people.
```

In [11]:
// TODO: Replace <FILL IN> with appropriate code
for ((id, property) <- userGraph.vertices.collect) {
     println(s"User $id is called ${property.name} and is liked by ${property.inDeg} people.")
}

User 4 is called David and is liked by 1 people.
User 1 is called Alice and is liked by 2 people.
User 5 is called Ed and is liked by 0 people.
User 6 is called Fran and is liked by 2 people.
User 2 is called Bob and is liked by 2 people.
User 3 is called Charlie and is liked by 1 people.


Print the names of the users who are liked by the same number of people they like.

In [12]:
// TODO: Replace <FILL IN> with appropriate code
userGraph.vertices.filter {
  case (id, u) => u.inDeg == u.outDeg
}.collect.foreach {
  case (id, property) => println(property.name)
}

David
Bob


Suppose we want to find the oldest follower of each user, using the above property graph. The `aggregateMessages` operator allows us to do this. This operator applies a user defined `sendMsg` function to each edge triplet in the graph and then uses the `mergeMsg` function to aggregate those messages at their destination vertex.
```scala
class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}
```
Bellow, you can find the oldest follower for each user by sending a message containing the name and age of each follower and aggregating the messages by taking the message from the older follower.

In [21]:
// TODO: Replace <FILL IN> with appropriate code
// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)](
  // sendMsg
  triplet => triplet.sendToDst((triplet.srcAttr.name, triplet.srcAttr.age)),
  // mergeMsg
  (a, b) => if (a._2 > b._2) a else b
)


oldestFollower = VertexRDDImpl[109] at RDD at VertexRDD.scala:57


VertexRDDImpl[109] at RDD at VertexRDD.scala:57

Display the oldest follower for each user as bellow. Note that some users may have no messages.
```
David is the oldest follower of Alice.
Charlie is the oldest follower of Bob.
Ed is the oldest follower of Charlie.
Bob is the oldest follower of David.
Ed does not have any followers.
Charlie is the oldest follower of Fran.
```

In [22]:
userGraph.vertices.leftJoin(oldestFollower) { (id, user, optOldestFollower) =>
  optOldestFollower match {
    case None => s"${user.name} does not have any followers."
    case Some((name, age)) => s"${name} is the oldest follower of ${user.name}."
  }
}.collect.foreach { case (id, str) => println(str) }

Bob is the oldest follower of David.
David is the oldest follower of Alice.
Ed does not have any followers.
Charlie is the oldest follower of Fran.
Charlie is the oldest follower of Bob.
Ed is the oldest follower of Charlie.


Now, find the average follower age of the followers of each user.

In [26]:
// TODO: Replace <FILL IN> with appropriate code
val averageAge: VertexRDD[Double] = userGraph.aggregateMessages[(Int, Double)](
  // map function returns a tuple of (1, Age)
  triplet => triplet.sendToDst((1,triplet.srcAttr.age)),
  // reduce function combines (sumOfFollowers, sumOfAge)
  (a, b) => (a._1 + b._1, a._2 + b._2)
).mapValues((id, p) => p._2 / p._1)

// Display the results
userGraph.vertices.leftJoin(averageAge) { (id, user, optAverageAge) =>
  optAverageAge match {
    case None => s"${user.name} does not have any followers."
    case Some(avgAge) => s"The average age of ${user.name}\'s followers is $avgAge."
  }
}.collect.foreach { case (id, str) => println(str) }

The average age of David's followers is 27.0.
The average age of Alice's followers is 34.5.
Ed does not have any followers.
The average age of Fran's followers is 60.0.
The average age of Bob's followers is 60.0.
The average age of Charlie's followers is 55.0.


averageAge = VertexRDDImpl[125] at RDD at VertexRDD.scala:57


VertexRDDImpl[125] at RDD at VertexRDD.scala:57

Suppose you want to study the community structure of users that are 30 or older. To support this type of analysis GraphX includes the `subgraph` operator that takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that satisfy the edge predicate and connect vertices that satisfy the vertex predicate. 

In the following example, restrict your graph to the users that are 30 or older, and  examine the communities in this restricted graph. Connected components are labeled (numbered) by the lowest vertex Id in that component. Notice that by examining the subgraph you have disconnected David from the rest of his community. Moreover his connections to the rest of the graph are through younger users.

In [30]:
// TODO: Replace <FILL IN> with appropriate code
val olderGraph = userGraph.subgraph(vpred = (id, u) => u.age >= 30)

// compute the connected components
val cc = olderGraph.connectedComponents

// display the component id of each user:
olderGraph.vertices.leftJoin(cc.vertices) {
  case (id, user, comp) => s"${user.name} is in component ${comp.get}"
}.collect.foreach{ case (id, str) => println(str) }

David is in component 4
Ed is in component 3
Fran is in component 3
Charlie is in component 3


olderGraph = org.apache.spark.graphx.impl.GraphImpl@66d95869
cc = org.apache.spark.graphx.impl.GraphImpl@6f64cef8


org.apache.spark.graphx.impl.GraphImpl@6f64cef8