# Who's the most popular superhero?

When I say 'popular', I don't mean popular among comic readers. What we are defining as popularity is that how many times a certain superhero appears in another superhero's comic. We'll imagine that if, for example, spiderman appears in a comic with Captain America, that means they are friends. So, the superhero with the most friends (appears with the most other superheroes) will be considered the most 'popular'.

We'll be using two different files: 

* Marvel-Graph.txt: each line contains firstly the superhero's ID and then all the superheroes he/she has appeared with in other comic books
* Marvel-Names.txt: contains the ID and name of every superhero.

In [13]:
def countCoOccurences(line):
    elements = line.split()
    return (int(elements[0]), len(elements) - 1)

def parseNames(line):
    fields = line.split('\"')
    return (int(fields[0]), fields[1].encode("utf8"))
    

names = sc.textFile('/Users/jacquesthibodeau/big-data-datasets/Marvel-Names.txt')
namesRDD = names.map(parseNames)

lines = sc.textFile('/Users/jacquesthibodeau/big-data-datasets/Marvel-Graph.txt')
pairings = lines.map(countCoOccurences)

totalFriendsBySuperhero = pairings.reduceByKey(lambda x, y : x + y)
flipped = totalFriendsBySuperhero.map(lambda x : (x[1], x[0]))

mostPopularHeroID = flipped.max()

mostPopularHeroName = namesRDD.lookup(mostPopularHeroID[1])[0]

print(str(mostPopularHeroName) + " is the most popular superhero, with " + \
     str(mostPopularHeroID[0]) + " co-appearances.")

b'CAPTAIN AMERICA' is the most popular superhero, with 1933 co-appearances.


There we have it, Captain America is the most popular superhero.

---

Now, we will use Breadth First Search (BFS) in order to figure out the degrees of seperation between superheroes.

The point of the following section is to know when to use Spark to solve problems that aren't obviously a Spark problem. With creativity, you can take more complicated problems and frame them as a Spark problem in order to take advantage of Spark's benefits with big data.

In [20]:
# The superheroes we wish to find the degree of seperation between:
startSuperheroID = 5306 # Spiderman
targetSuperheroID = 3006 # Jinx

# We use what is called an accumulator. It signals us when we have
# found the target we are looking for during out BFS traversal.
hitCounter = sc.accumulator(0)

def convertToBFS(line):
    fields = line.split()
    heroID = int(fields[0])
    connections = []
    for connection in fields[1:]:
        connections.append(int(connection))
    
    # initial conditions for BFS
    color = 'WHITE'
    distance = 9999
    
    if (heroID == startSuperheroID):
        color = 'GRAY'
        distance = 0
    
    return (heroID, (connections, distance, color)) # key/value pair

def createStartingRDD():
    inputFile = sc.textFile('/Users/jacquesthibodeau/big-data-datasets/Marvel-Graph.txt')
    return inputFile.map(convertToBFS)

def bfsMap(node):
    superheroID = node[0]
    data = node[1]
    connections = data[0]
    distance = data[1]
    color = data[2]
    
    results = []
    
    # If this node needs to be expanded...
    if (color == 'GRAY'):
        for connection in connections:
            newSuperheroID = connection
            newDistance = distance + 1
            newColor = 'GRAY'
            if (targetSuperheroID == connection):
                hitCounter.add(1)
                
            newEntry = (newSuperheroID, ([], newDistance, newColor))
            results.append(newEntry)
            
        # We've processed this node, so we color it black
        color = 'BLACK'
        
    # Emit the input node so we don't lost it.
    results.append( (superheroID, (connections, distance, color)) )
    return results

def bfsReduce(data1, data2):
    edges1 = data1[0]
    edges2 = data2[0]
    distance1 = data1[1]
    distance2 = data2[1]
    color1 = data1[2]
    color2 = data2[2]

    distance = 9999
    color = color1
    edges = []

    # See if one is the original node with its connections.
    # If so preserve them.
    if (len(edges1) > 0):
        edges.extend(edges1)
    if (len(edges2) > 0):
        edges.extend(edges2)

    # Preserve minimum distance
    if (distance1 < distance):
        distance = distance1

    if (distance2 < distance):
        distance = distance2

    # Preserve darkest color
    if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
        color = color2

    if (color1 == 'GRAY' and color2 == 'BLACK'):
        color = color2

    if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
        color = color1

    if (color2 == 'GRAY' and color1 == 'BLACK'):
        color = color1

    return (edges, distance, color)
    
iterationRDD = createStartingRDD()    
    
for iteration in range(0, 10):
    print("Running BFS iteration #" + str(iteration+1))
    
    # Create new vertices as needed to darken or reduce distances in the
    # reduce stage. If we encounter the node we're looking for as a GRAY
    # node, increment our accumulator to signal that we're done.
    mapped = iterationRDD.flatMap(bfsMap)
    
    # Note that mapped.count() action here forces the RDD to be evaluated, 
    # and that's the only reason out accumulator is actually updated.
    print("Processing " + str(mapped.count()) + " values.")
    
    if (hitCounter.value > 0):
        print("Hit the target superhero! From " + str(hitCounter.value) + " different direction(s)")
        break
    
    # Reducer combines data for each superhero ID, preserving the darkest
    # color and shortest path.
    iterationRDD = mapped.reduceByKey(bfsReduce)

Running BFS iteration #1
Processing 8330 values.
Running BFS iteration #2
Processing 220615 values.
Hit the target superhero! From 2 different direction(s)


Looks like Spiderman and Jinx are 2 degrees of seperation from each other through 2 different superheroes (directions).