In [1]:
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext

In [2]:
sparkConf = SparkConf().setMaster('local').setAppName('marvelHero')

In [3]:
sc = SparkContext(conf=sparkConf)

In [4]:
dataPath = ('../')

In [5]:
marvel_names = 'Marvel-Names.txt'
marvel_graphs = 'Marvel-Graph.txt'

In [6]:
rdd_marvel_graph = sc.textFile('file://' + dataPath + marvel_graphs)
rdd_marvel_name  = sc.textFile('file://' + dataPath + marvel_names)

In [7]:
with open(dataPath + marvel_names) as f:
    for i, lines in enumerate(f):
        if i < 2 :
            print(lines)
        else:
            break

1 "24-HOUR MAN/EMMANUEL"

2 "3-D MAN/CHARLES CHAN"



In [8]:
def cooccurance(contents):
    lines = contents.split(' ')
    characterID = int(lines[0])
    cooccuranceNum = len(lines) - 1
    return (characterID, cooccuranceNum)

In [9]:
with open(dataPath + marvel_graphs) as f:
    for i, lines in enumerate(f):
        if i < 1 :
            print(cooccurance(lines))
        else:
            break

(5988, 49)


In [10]:
def characterName(contents):
    lines = contents.strip().split('\"')
    characterID = int(lines[0])
    name = lines[1]
    return (characterID, name)

In [11]:
with open(dataPath + marvel_names) as f:
    for i, lines in enumerate(f):
        if i < 1 :
            print(lines)
            print(characterName(lines))
        else:
            break

1 "24-HOUR MAN/EMMANUEL"

(1, '24-HOUR MAN/EMMANUEL')


In [12]:
marvel_graph = rdd_marvel_graph.map(cooccurance)
marvel_graph.take(3)

[(5988, 49), (5989, 41), (5982, 43)]

In [13]:
marvel_name = rdd_marvel_name.map(characterName)
marvel_name.take(3)

[(1, '24-HOUR MAN/EMMANUEL'),
 (2, '3-D MAN/CHARLES CHAN'),
 (3, '4-D MAN/MERCURIO')]

In [14]:
marvel_graph_pair = marvel_graph.reduceByKey(lambda x, y: x+y)
marvel_graph_pair.take(3)

[(1, 6), (2, 123), (3, 73)]

In [15]:
# flip the tuple and let order become:
# cooccurance, id
marvel_graph_pair_flip = marvel_graph_pair.map(lambda x: (x[1], x[0]))

In [16]:
# sort the data and find the max one
marvel_graph_pair_sort = marvel_graph_pair_flip.sortByKey(ascending=False)
marvel_graph_pair_sort.take(1)

# or use:
# marvel_graph_pair_flip.max()

[(1937, 859)]

# use broadcast variable

In [17]:
def broadcast_characterName(data):
    characterDict = {}
    with open(data) as f:
        for lines in f:
            line = lines.split('\"')
            characterID = int(line[0])
            characterName = line[1]
            characterDict[characterID] = characterName
    return characterDict

In [18]:
characterNameDict = sc.broadcast(broadcast_characterName(dataPath + marvel_names))

In [19]:
# marvel_graph_pair_sort_ToName: (cooccurance, id)
marvel_graph_pair_sort_ToName = marvel_graph_pair_sort.map(lambda content: (characterNameDict.value[content[1]], 
                                                                           content[0]))
marvel_graph_pair_sort_ToName.take(1)

[('CAPTAIN AMERICA', 1937)]

# use lookup function

In [20]:
# rdd.max() means find the pair with the maxmium key values
print(marvel_graph_pair_flip.max())
marvel_cooccurance_most = marvel_graph_pair_flip.max()

(1937, 859)


In [21]:
mostPopularName = marvel_name.lookup(marvel_cooccurance_most[1])[0]
mostPopularName

'CAPTAIN AMERICA'

# superhero degrees separation -- breadth first search

In [22]:
#startCharacterID = 5988
startCharacterID = 5306
interestCharacterID = 14
hitCounter = sc.accumulator(0)

In [24]:
rdd_marvel_graphs = sc.textFile('file://' + dataPath + marvel_graphs)
rdd_marvel_graphs.take(1)

['5988 748 1722 3752 4655 5743 1872 3413 5527 6368 6085 4319 4728 1636 2397 3364 4001 1614 1819 1585 732 2660 3952 2507 3891 2070 2239 2602 612 1352 5447 4548 1596 5488 1605 5517 11 479 2554 2043 17 865 4292 6312 473 534 1479 6375 4456 ']

function to convert the RDD to breadth first search:

output tuple: (id, (connection, distance, processIndicator))

Here processIndicator can be: yetProcess, processing and processed

In [25]:
def convertBFS(contents):
    lines = contents.split()
    currentCharacterID = int(lines[0])
    
    # rest nodes are treated as connections
    connections = []
    for element in lines[1:]:
        connections.append(int(element))
    
    # distance is initially set to a 'infinitely' large value
    distance = 10000
    
    # set the processIndicator to indicate whether the node is processed or not, white means yet to process/consider
    processIndicator = 'yetProcess'
    
    # if the current character is what we interested in
    if (currentCharacterID == startCharacterID):
        # mean the current character is under considered to expand its network connection
        processIndicator = 'processing'
        # distance to itself is zero
        distance = 0
        
    return (currentCharacterID, (connections, distance, processIndicator))

In [27]:
iterationRDD = rdd_marvel_graphs.map(convertBFS)

In [28]:
iterationRDD.take(1)

[(5988,
  ([748,
    1722,
    3752,
    4655,
    5743,
    1872,
    3413,
    5527,
    6368,
    6085,
    4319,
    4728,
    1636,
    2397,
    3364,
    4001,
    1614,
    1819,
    1585,
    732,
    2660,
    3952,
    2507,
    3891,
    2070,
    2239,
    2602,
    612,
    1352,
    5447,
    4548,
    1596,
    5488,
    1605,
    5517,
    11,
    479,
    2554,
    2043,
    17,
    865,
    4292,
    6312,
    473,
    534,
    1479,
    6375,
    4456],
   10000,
   'yetProcess'))]

create new vertices are needed to process or reduce distances in the reduce stage. If we encounter the node we'are looking for as a processing node, increment our accumulator to singal that we are done.

In [29]:
def bfsMap(node):
    # break down the tuple to the individual element we need
    currentCharacterID = node[0]
    
    # elements: node[1]: (connection, distance, color)
    elements = node[1]
    
    # further break down
    connections = elements[0]
    distance   = elements[1]
    processIndicator = elements[2]
    
    # list to store results
    currentNodeResults = []
    
    # if the current character is under processing
    if (processIndicator == 'processing'):
        # we loop through its connection
        for connection in connections:
            # we set the connection to be the next node for exploring
            nextCharacterID = connection
            nextDistance = distance + 1
            nextProcessIndicator = 'processing'
            
            # if connections of current node contains the interest character,
            # then we increase the hitCounter -- spark accumulator which is used to imply when we find
            # the target/interest character during breadth first search traversal.
            if (connection == interestCharacterID):
                hitCounter.add(1)
                
            # we set the set of information to new node, as the connections might be different
            # from node to node, so we set it to blank list
            nextNode = (nextCharacterID, ([], nextDistance, nextProcessIndicator))
            
            # set the results to currentNodeResults, what we really want is the 'nextDistance' which will
            # be iteratively increased until we found the target character
            currentNodeResults.append(nextNode)
        
        # set the processIndicator of cuurent node to processed
        processIndicator = 'processed'
    
    # Emit the input node such that we don't lost it
    currentNodeResults.append( (currentCharacterID, (connections, distance, processIndicator)) )
    
    return currentNodeResults

In [30]:
mapped = iterationRDD.flatMap(bfsMap)
mapped.take(1)

[(5988,
  ([748,
    1722,
    3752,
    4655,
    5743,
    1872,
    3413,
    5527,
    6368,
    6085,
    4319,
    4728,
    1636,
    2397,
    3364,
    4001,
    1614,
    1819,
    1585,
    732,
    2660,
    3952,
    2507,
    3891,
    2070,
    2239,
    2602,
    612,
    1352,
    5447,
    4548,
    1596,
    5488,
    1605,
    5517,
    11,
    479,
    2554,
    2043,
    17,
    865,
    4292,
    6312,
    473,
    534,
    1479,
    6375,
    4456],
   10000,
   'yetProcess'))]

In [31]:
def bfsReduce(data1, data2):
    # edges is the connection of key (i.e. a character ID)
    edges1 = data1[0]
    edges2 = data2[0]
    
    # 
    distance1 = data1[1]
    distance2 = data2[1]
    
    # processIndicator
    processIndicator1 = data1[2]
    processIndicator2 = data2[2]
    
    
    
    distance = 10000
    processIndicator = processIndicator1
    edges = []
    
    # see if one is the original node with its connections.
    # if yes, we store all its connections
    # here: x = [1, 2, 3]; x.append([4, 5]) --> x: [1, 2, 3, [4, 5]]
    #       x = [1, 2, 3]; x.extend([4, 5]) --> x: [1, 2, 3, 4, 5]
    if (len(edges1) > 0):
        edges.extend(edges1)
    if (len(edges2) > 0):
        edges.extend(edges2)
        
    # store the minimum distance
    distance = min(distance1, distance2)
    
    # store the processIndicator information
    '''
    if ((processIndicator1 == 'yetProcess') and (processIndicator2 == 'processing' or processIndicator2 == 'processed')):
        processIndicator = processIndicator2
    if(processIndicator1 == 'processing' and processIndicator2 == 'processed'):
        processIndicator = processIndicator2
    if(processIndicator2 == 'yetProcess' and (processIndicator1 == 'processing' or processIndicator1 == 'processed')):
        processIndicator = processIndicator1
    if(processIndicator2 == 'processing' and processIndicator1 == 'processed'):
        processIndicator = processIndicator1
    '''    
    
    
    impDict = {'processed':3, 'processing':2, 'yetProcess':1}
    processIndicatorArr = np.array([processIndicator1, processIndicator2])
    impIndicator = pd.Series(data = processIndicatorArr)
    moreImpIndicator = max(impIndicator.map(impDict))
    
    for procIndic, impIndic in impDict.items():
        if impIndic == moreImpIndicator:
            processIndicator = procIndic
    
    return(edges, distance, processIndicator)

In [32]:
impDict = {'processed':3, 'processing':2, 'yetProcess':1}
arr = np.array(['processed', 'processing'])
impIndic = pd.Series(data = arr)
moreImpIndic = max(impIndic.map(impDict))


for ind, imp in impDict.items():
    if imp == moreImpIndic:
        indc = ind
indc

'processed'

In [33]:
iterationRdd = mapped.reduceByKey(bfsReduce)
iterationRdd.take(1)

[(1, ([1999, 6471, 6463, 6464, 6459], 10000, 'yetProcess'))]

# main loop

In [34]:
#Main program here:
# The characters we wish to find the degree of separation between:
startCharacterID = 4555 #SpiderMan
targetCharacterID = 100  #ADAM 3,031 (who?)

# Our accumulator, used to signal when we find the target character during
# our BFS traversal.
hitCounter = sc.accumulator(0)

iterationRdd = rdd_marvel_graphs.map(convertBFS)

for iteration in range(0, 10):
    print("Running BFS iteration# " + str(iteration+1))

    # Create new vertices as needed to darken or reduce distances in the
    # reduce stage. If we encounter the node we're looking for as a GRAY
    # node, increment our accumulator to signal that we're done.
    mapped = iterationRdd.flatMap(bfsMap)

    # Note that mapped.count() action here forces the RDD to be evaluated, and
    # that's the only reason our accumulator is actually updated.
    print("Processing " + str(mapped.count()) + " values.")

    if (hitCounter.value > 0):
        print("Hit the target character! From " + str(hitCounter.value) \
            + " different direction(s).")
        break

    # Reducer combines data for each character ID, preserving the darkest
    # color and shortest path.
    iterationRdd = mapped.reduceByKey(bfsReduce)

Running BFS iteration# 1
Processing 6662 values.
Running BFS iteration# 2
Processing 43293 values.
Hit the target character! From 1 different direction(s).


# create social network data

In [36]:
def getConnection(contents):
    lines = contents.split()
    currentID = int(lines[0])
    connections = []
    for connection in lines[1:]:
        connections.append(int(connection))
    return (currentID, connections)

In [38]:
marvel_connection = rdd_marvel_graphs.map(getConnection)

In [39]:
marvel_connection_reduce = marvel_connection.reduceByKey(lambda x,y: set(x).intersection(y))

In [40]:
marvel_connection_reduce.take(1)

[(1, [1999, 6471, 6463, 6464, 6459])]

In [41]:
def unique_character_connection(contents):
    marvel_id = contents[0]
    connections = contents[1]
    lst = []
    for connection in connections:
        lst.append((marvel_id, connection))
    return lst

In [42]:
marvel_pair_connections = marvel_connection_reduce.map(unique_character_connection)
marvel_pair_connections.take(1)

[[(1, 1999), (1, 6471), (1, 6463), (1, 6464), (1, 6459)]]

In [43]:
completeResult = marvel_pair_connections.collect()

In [44]:
marvel_connection_df = pd.DataFrame(np.zeros((1,2)).astype(np.int), columns=['Source', 'Target'])

In [47]:
numMarvel = len(completeResult)
for n in np.arange(numMarvel):
    numMarvel_i_connection = len(completeResult[n])
    for i in (np.arange(numMarvel_i_connection)):
        if (n == 0) & (i == 0):
            marvel_connection_arr = list(completeResult[0][0])
        else:
            marvel_connection_arr = np.vstack((marvel_connection_arr, completeResult[n][i]))

In [48]:
marvel_connection_df = pd.DataFrame(data = marvel_connection_arr, columns=['Source', 'Target'])

In [49]:
marvel_connection_df.head(3)

Unnamed: 0,Source,Target
0,1,1999
1,1,6471
2,1,6463


In [50]:
marvel_connection_df.to_csv('marvel_connection.csv', sep=',')

In [51]:
marvel_name = rdd_marvel_name.map(characterName)
marvel_name_completeResult = marvel_name.collect()

In [52]:
marvel_name_df = pd.DataFrame(data = marvel_name_completeResult, columns=['id', 'label'])

In [53]:
marvel_name_df.head(3)

Unnamed: 0,id,label
0,1,24-HOUR MAN/EMMANUEL
1,2,3-D MAN/CHARLES CHAN
2,3,4-D MAN/MERCURIO


In [54]:
marvel_name_df.to_csv('marvel_name.csv', sep=',')