# Activity: Superhero Popularity

Task: Given a list of superhero networks, we will determine which superhero is the most popular.

In [1]:
spark

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.19:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1589051986641)
SparkSession available as 'spark'


res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2d7fd53d


# Import Data

In [2]:
// Hero Names
val names = sc.textFile("../../data/marvel-names.txt")

names: org.apache.spark.rdd.RDD[String] = ../../data/marvel-names.txt MapPartitionsRDD[1] at textFile at <console>:25


In [4]:
// Look at a few entries
names.take(10).foreach(println)

1 "24-HOUR MAN/EMMANUEL"
2 "3-D MAN/CHARLES CHAN"
3 "4-D MAN/MERCURIO"
4 "8-BALL/"
5 "A"
6 "A'YIN"
7 "ABBOTT, JACK"
8 "ABCISSA"
9 "ABEL"
10 "ABOMINATION/EMIL BLO"


In [5]:
// Hero graph
val graphs = sc.textFile("../../data/marvel-graph.txt")

graphs: org.apache.spark.rdd.RDD[String] = ../../data/marvel-graph.txt MapPartitionsRDD[3] at textFile at <console>:26


In [7]:
// Look at a few entries
graphs.take(5).foreach(println)

5988 748 1722 3752 4655 5743 1872 3413 5527 6368 6085 4319 4728 1636 2397 3364 4001 1614 1819 1585 732 2660 3952 2507 3891 2070 2239 2602 612 1352 5447 4548 1596 5488 1605 5517 11 479 2554 2043 17 865 4292 6312 473 534 1479 6375 4456 
5989 4080 4264 4446 3779 2430 2297 6169 3530 3272 4282 6432 2548 4140 185 105 3878 2429 1334 4595 2767 3956 3877 4776 4946 3407 128 269 5775 5121 481 5516 4758 4053 1044 1602 3889 1535 6038 533 3986 
5982 217 595 1194 3308 2940 1815 794 1503 5197 859 5096 6039 2664 651 2244 528 284 1449 1097 1172 1092 108 3405 5204 387 4607 4545 3705 4930 1805 4712 4404 247 4754 4427 1845 536 5795 5978 533 3984 6056 
5983 1165 3836 4361 1282 716 4289 4646 6300 5084 2397 4454 1913 5861 5485 
5980 2731 3712 1587 6084 2472 2546 6313 875 859 323 2664 1469 522 2506 2919 2423 3624 5736 5046 1787 5776 3245 3840 2399 


# Find the Superhero with the most co-appearances

## Helper Functions

The function `parseNames` extracts `hero ID -> hero name` tuples (or None in case of failure).

In [10]:
def parseNames(line: String): Option[(Int, String)]={
    var fields = line.split("\"")
    
    if (fields.length >1){
      return Some(fields(0).trim().toInt, fields(1))
    } else{
      return None // flatmap will just discard None results, and extract data from Some results.
    }
}

parseNames: (line: String)Option[(Int, String)]


The method `countCoOccurences` to extract the hero ID and the number of connections from each line.

In [9]:
def countCoOccurences(line: String) = {
    var elements = line.split("\\s+") //Split by multiple spaces
    (elements(0).toInt, elements.length - 1)
}

countCoOccurences: (line: String)(Int, Int)


## Processing

Build up a `heroID -> nameRDD`

In [17]:
val namesRDD = names.flatMap(parseNames)

namesRDD: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[7] at flatMap at <console>:28


In [18]:
namesRDD.take(5).foreach(println)

(1,24-HOUR MAN/EMMANUEL)
(2,3-D MAN/CHARLES CHAN)
(3,4-D MAN/MERCURIO)
(4,8-BALL/)
(5,A)


Convert to `(heroID, number of connections)` RDD

In [11]:
val pairings = graphs.map(countCoOccurences)

pairings: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at map at <console>:28


In [12]:
pairings.take(5).foreach(println)

(5988,48)
(5989,40)
(5982,42)
(5983,14)
(5980,24)


Combine entries that span more than one line.

In [13]:
val totalFriendsByCharacter = pairings.reduceByKey((x,y) => x + y)

totalFriendsByCharacter: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at reduceByKey at <console>:26


Flip it to `(# of connections, heroID)`.

In [14]:
val flipped = totalFriendsByCharacter.map( x => (x._2, x._1))

flipped: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[6] at map at <console>:26


Find the max number of connections

In [15]:
val mostPopular = flipped.max()

mostPopular: (Int, Int) = (1933,859)


Look up the name (`lookup` returns an array of results, so we need to access the first result with (0)).

In [21]:
val mostPopularName = namesRDD.lookup(mostPopular._2)(0)

mostPopularName: String = CAPTAIN AMERICA


# Degree of Separation: Breadth-First-Search

**Task**: Finds the degrees of separation between two Marvel comic book characters, based on co-appearances in a comic.

Here, let us represent each line from network file as a node with connections, a color, and a distance.

For example:
```
        5983 1165 3836 4361 1282
```
becomes
```
    (5983, ((1165, 3836, 4361, 1282), 9999, WHITE))
```
Our initial condition is that a node is infinitely distance `(9999)` and white.

Generally speaking, the *node* is of the form
```
    node = (characterID, (BFSData, distance, color))
```

To do this, we use *mapper* and a *reducer*:

The mapper:
* Creates new nodes for each connection of gray nodes, with a distance incremented by one, color gray, and no connections.
* Color the gray node we just processed black.
* Copies the node itself into the results.


The reducer:
* Combines together all nodes for the same hero ID.
* Preserves the shortest distance, and the darkest color found.
* Preserves the list of connections from the original node.

Therefore, we want a breadth-first-search iteration as a map and reduce job.

**How do we know when we are done?**

* An accumulator allows many executors to increment a shared variable.
* For example:
`var hitCounter:LongAccumulator("Hit Counter")`
sets up a shared accumulator named "Hit Counter" with an initial value of 0.
* For each iteration, if the character we are interested in is hit, we increment the `hitCounter` accumulator.
* After each iteration, we check if `hitCounter` is greater than one - if so, we are done

## Setup

In [54]:
import org.apache.spark.rdd._
import org.apache.spark.SparkContext
import org.apache.spark.util.LongAccumulator
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd._
import org.apache.spark.SparkContext
import org.apache.spark.util.LongAccumulator
import scala.collection.mutable.ArrayBuffer


Suppose we want to find the separation between these two heros.

In [55]:
val startCharacterID = 5306 // Spiderman
val targetCharacterID = 14 // ADAM 3031 (lol, who?)

startCharacterID: Int = 5306
targetCharacterID: Int = 14


We make our accumulator a "global" option so we can reference it in a mapper.

In [56]:
var hitCounter: Option[LongAccumulator] = None

hitCounter: Option[org.apache.spark.util.LongAccumulator] = None


Some customer data types:

In [57]:
// BFSData - contains an array of heroID connections, the distance, and color
type BFSData = (Array[Int], Int, String)

// BFSNode has a heroID and the BFSData associated with it.
type BFSNode = (Int, BFSData)

defined type alias BFSData
defined type alias BFSNode


We define a helper function `converToBFS` that converts each line of raw input a `BFSNode`

In [58]:
def convertToBFS(line: String): BFSNode = {
    
    //Split up the line into fields
    val fields = line.split("\\s+")
    
    // Extract this heroID from the first field
    val heroID = fields(0).toInt
    
    // Extract subsequent heroID into the connections array
    var connections: ArrayBuffer[Int] = ArrayBuffer()     // Initialize array
    for (connection <- 1 to (fields.length -1)){
        connections += fields(connection).toInt
    }
    // Set the default distance and color as 9999 and white, respectively.
    var color:String = "WHITE"
    var distance:Int = 9999
    
    // Unless this is the character we are starting from
    if (heroID == startCharacterID){
        color = "GRAY"
        distance = 0
    }
    
    return (heroID, (connections.toArray, distance, color))
    
}

convertToBFS: (line: String)BFSNode


Define another method `createStartingRDD` where it creates an "iteration 0" of our RDD of BFSNodes.

In [59]:
def createStartingRDD(sc:SparkContext): RDD[BFSNode] ={
    val inputFile = sc.textFile("../../data/marvel-graph.txt")
    return inputFile.map(convertToBFS)
}

createStartingRDD: (sc: org.apache.spark.SparkContext)org.apache.spark.rdd.RDD[BFSNode]


Define a method `bfsMap` where it expands a `BFSNode` into this node and its children.

In [60]:
def bfsMap(node:BFSNode): Array[BFSNode] = {
    // Extract data from the BFSNode
    val characterID: Int = node._1
    val data: BFSData = node._2
    
    val connections:Array[Int] = data._1  
    val distance:Int = data._2
    var color:String = data._3
    
    var results:ArrayBuffer[BFSNode] =ArrayBuffer()
    //------------------
    //-- BFS Algorithm
    //------------------
    // Gray nodes are flagged for expansion, and create
    // gray nodes for each connection
    if (color =="GRAY"){
        for (connection <- connections){
            val newCharacterID = connection
            val newDistance = distance + 1
            val newColor = "GRAY"
            
            // Have we stumbled across the character we are looking for?
            // If so increment our accumulator so the driver script knows.
            if (targetCharacterID == connection){
                if (hitCounter.isDefined){
                    hitCounter.get.add(1)
                }
            }
            // Create our new Gray node for this connection and add it
            // to the results
            val newEntry: BFSNode = (newCharacterID, (Array(), newDistance, newColor))
            results += newEntry
        }
        
        // Color this node as black, indicating it has been processed already.
        color = "BLACK"
        
    }
    // Add the original node back in, so its connections can get merged
    // with the gray nodes in the reducer.
    val thisEntry: BFSNode = (characterID, (connections, distance, color))
    results += thisEntry
    
    return results.toArray
}

bfsMap: (node: BFSNode)Array[BFSNode]


Define method `bfsReduce` that combines nodes for the same `heroID`, preserving the shortest length darket color.

In [61]:
def bfsReduce(data1: BFSData, data2: BFSData): BFSData = {
    
    // Extract data that we are combining
    val edges1: Array[Int] = data1._1
    val edges2: Array[Int] = data2._1
    val distance1: Int = data1._2
    val distance2: Int = data2._2
    val color1:String = data1._3
    val color2:String = data2._3
    
    // Default node values
    var distance:Int = 9999
    var color: String = "WHITE"
    var edges:ArrayBuffer[Int] = ArrayBuffer()
    
    // See if one is the original node with its connections.
    // If so, preserve them.
    if (edges1.length > 0){
        edges ++= edges1
    }
    if (edges2.length > 0){
        edges ++= edges2
    }
    
    // Preserve minimum distance
    if (distance1 < distance){
        distance = distance1
    }
    if (distance2 < distance){
        distance = distance2
    }
    
    //Preserve darkest color
    if (color1 == "WHITE" && (color2 == "GRAY" || color2 == "BLACK")){
        color = color2
    }
    if (color1 == "GRAY" && color2 == "BLACK"){
        color = color2
    }
    if (color2 == "WHITE" && (color1 == "GRAY" || color1 == "BLACK")) {
      color = color1
    }
    if (color2 == "GRAY" && color1 == "BLACK") {
      color = color1
    }
    if (color1 == "GRAY" && color2 == "GRAY") {
        color = color1
    }
    if (color1 == "BLACK" && color2 == "BLACK") {
        color = color1
    }
    return (edges.toArray, distance, color)
}

bfsReduce: (data1: BFSData, data2: BFSData)BFSData


## Perform BFS

First define our accumulator, used to signal when we find the target character in our BFS traversal.

In [62]:
hitCounter = Some(sc.longAccumulator("Hit Counter"))

hitCounter: Option[org.apache.spark.util.LongAccumulator] = Some(LongAccumulator(id: 3607, name: Some(Hit Counter), value: 0))


In [67]:
import scala.util.control.Breaks._

import scala.util.control.Breaks._


In [68]:

var iterationRdd = createStartingRDD(sc)
var iteration: Int = 0

breakable{
    for (iteration <- 1 to 10){
    println("Running BFS Iteration# " + iteration)
    
    // Create new vertices as needed to darken or reduce distances
    // in the reduce stage. If we encounter the node we are looking for
    // as a GRAY node, increment our accumulator to signal that we are done.
    val mapped = iterationRdd.flatMap(bfsMap)
    
    // Note that mapped.count() action here forces the RDD to be evaluated,
    // and that is the only reason our accumulator is actually updated.
    println("Processing "+ mapped.count() + " values.")
    
    if (hitCounter.isDefined){
        val hitCount = hitCounter.get.value
        println(s"HitCount: $hitCount")
        if (hitCount > 0){
            println("Hit the target character! From " + hitCount + " different direction(s).")
            break
        }
    }
    // Reducer combines data for each characterID, preserving the darkest
    // color and shortest path.
    iterationRdd = mapped.reduceByKey(bfsReduce)
  }
    
}


Running BFS Iteration# 1
Processing 8330 values.
HitCount: 30
Hit the target character! From 30 different direction(s).


iterationRdd: org.apache.spark.rdd.RDD[BFSNode] = MapPartitionsRDD[200] at map at <console>:48
iteration: Int = 0
