In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("SuperheroDegreesOfSeparation")
sc = SparkContext(conf = conf)

In [132]:
sh_id_1=1
sh_id_2=6

In [133]:
def extract_names(superhero_row):
    import re
    superhero=superhero_row.split(' ',1)
    superhero_id=int(superhero[0])
    superhero_name=re.sub('^[^a-zA-Z0-9\s]*|[^a-zA-Z0-9\s]*$','',superhero[1].strip())
    return (superhero_id,superhero_name)

superhero_names_raw=sc.textFile("/home/ggomarr/Documents/Education/Udemy_Spark/23_Marvel-Names.txt")
superhero_names=superhero_names_raw.map(extract_names)

In [134]:
print('{:4d} - {}'.format(sh_id_1,superhero_names.lookup(sh_id_1)[0]))
print('{:4d} - {}'.format(sh_id_2,superhero_names.lookup(sh_id_2)[0]))

   1 - 24-HOUR MAN/EMMANUEL
   6 - A'YIN


In [145]:
def extract_record(superhero_row):
    superhero=superhero_row.split()
    sh_id=int(superhero[0])
    sh_friends=[int(sh_friend_id) for sh_friend_id in superhero[1:]]
    sh_distance=99
    sh_path=[]
    sh_status=0 # 0: not seen, 1: in the frontier, 2: already processed
    return sh_id,(sh_friends,sh_distance,sh_path,sh_status)

superhero_net_raw=sc.textFile("/home/ggomarr/Documents/Education/Udemy_Spark/23_Marvel-Graph.txt")
superhero_net=(superhero_net_raw.map(extract_record)
                                .reduceByKey(lambda x, y: (x[0] + y[0],x[1],x[2],x[3])))

In [146]:
print('{:4d} - {}'.format(sh_id_1,superhero_net.lookup(sh_id_1)[0]))
print('{:4d} - {}'.format(sh_id_2,superhero_net.lookup(sh_id_2)[0]))

   1 - ([1999, 6471, 6463, 6464, 6459], 99, [], 0)
   6 - ([4366, 5017, 1207, 1127, 3329, 5587, 4539, 5317, 6283, 346, 1299, 6360, 6300, 5002, 3601, 3950, 4466, 4540, 3359, 867, 1381], 99, [], 0)


In [147]:
def initialize_search(superhero):
    if superhero[0]==sh_id_1:
        return superhero[0],(superhero[1][0],0,[],1)
    else:
        return superhero

superhero_net_initalized=superhero_net.map(initialize_search)

In [148]:
print('{:4d} - {}'.format(sh_id_1,superhero_net_initalized.lookup(sh_id_1)[0]))
print('{:4d} - {}'.format(sh_id_2,superhero_net_initalized.lookup(sh_id_2)[0]))

   1 - ([1999, 6471, 6463, 6464, 6459], 0, [], 1)
   6 - ([4366, 5017, 1207, 1127, 3329, 5587, 4539, 5317, 6283, 346, 1299, 6360, 6300, 5002, 3601, 3950, 4466, 4540, 3359, 867, 1381], 99, [], 0)


In [153]:
def expand_frontier(superhero):
    if superhero[1][3]==1:
        aux_out=[]
        for sh_id in superhero[1][0]:
            aux_out.append((sh_id,([],superhero[1][1]+1,superhero[1][2]+[superhero[0]],1)))
        aux_out.append((superhero[0],(superhero[1][0],superhero[1][1],superhero[1][2],2)))
        return aux_out
    else:
        return [ superhero ]

superhero_net_expanded=superhero_net_initalized.flatMap(expand_frontier)

In [154]:
print('{:4d} - {}'.format(sh_id_1,superhero_net_expanded.lookup(sh_id_1)[0]))
print('{:4d} - {}'.format(sh_id_2,superhero_net_expanded.lookup(sh_id_2)[0]))
for sh_id in superhero_net_expanded.lookup(sh_id_1)[0][0]:
    for superhero in superhero_net_expanded.lookup(sh_id):
        print('{:4d} - {}'.format(sh_id,superhero))

   1 - ([1999, 6471, 6463, 6464, 6459], 0, [], 2)
   6 - ([4366, 5017, 1207, 1127, 3329, 5587, 4539, 5317, 6283, 346, 1299, 6360, 6300, 5002, 3601, 3950, 4466, 4540, 3359, 867, 1381], 99, [], 0)
1999 - ([], 1, [1], 1)
1999 - ([1487, 6457, 6456, 6459, 6458, 6479, 6478, 6473, 6472, 6471, 6470, 6477, 1778, 6475, 6474, 1, 6480, 6468, 6469, 13, 14, 6461, 6463, 6464, 6467, 6460, 6455, 2399], 99, [], 0)
6471 - ([], 1, [1], 1)
6471 - ([6455, 6457, 6456, 6459, 6458, 6479, 6478, 6473, 6472, 6470, 6477, 1778, 6475, 6474, 1, 6480, 6468, 6469, 13, 14, 1999, 6461, 6463, 6464, 6465, 6467, 6460, 1487, 2399], 99, [], 0)
6463 - ([], 1, [1], 1)
6463 - ([1778, 6455, 6454, 6457, 6456, 6459, 6458, 6479, 6478, 6473, 6472, 6471, 6470, 6477, 6476, 6475, 6474, 1, 1319, 6480, 6468, 6469, 13, 14, 5184, 1999, 6461, 6462, 6464, 6465, 6466, 6467, 6460, 1487, 2399], 99, [], 0)
6464 - ([], 1, [1], 1)
6464 - ([1778, 6455, 6454, 6457, 6456, 6459, 6458, 6479, 6478, 6473, 6472, 6471, 6470, 6477, 6476, 6475, 6474, 1, 6480,

In [155]:
def reduce_frontier(sh_1,sh_2):
    return max(sh_1[0],sh_2[0]),min(sh_1[1],sh_2[1]),sh_1[2] if sh_1[1]<sh_2[1] else sh_2[2],max(sh_1[3],sh_2[3])

superhero_net_reduced=superhero_net_expanded.reduceByKey(reduce_frontier)

In [156]:
print('{:4d} - {}'.format(sh_id_1,superhero_net_reduced.lookup(sh_id_1)[0]))
print('{:4d} - {}'.format(sh_id_2,superhero_net_reduced.lookup(sh_id_2)[0]))
for sh_id in superhero_net_reduced.lookup(sh_id_1)[0][0]:
    for superhero in superhero_net_reduced.lookup(sh_id):
        print('{:4d} - {}'.format(sh_id,superhero))

   1 - ([1999, 6471, 6463, 6464, 6459], 0, [], 2)
   6 - ([4366, 5017, 1207, 1127, 3329, 5587, 4539, 5317, 6283, 346, 1299, 6360, 6300, 5002, 3601, 3950, 4466, 4540, 3359, 867, 1381], 99, [], 0)
1999 - ([1487, 6457, 6456, 6459, 6458, 6479, 6478, 6473, 6472, 6471, 6470, 6477, 1778, 6475, 6474, 1, 6480, 6468, 6469, 13, 14, 6461, 6463, 6464, 6467, 6460, 6455, 2399], 1, [1], 1)
6471 - ([6455, 6457, 6456, 6459, 6458, 6479, 6478, 6473, 6472, 6470, 6477, 1778, 6475, 6474, 1, 6480, 6468, 6469, 13, 14, 1999, 6461, 6463, 6464, 6465, 6467, 6460, 1487, 2399], 1, [1], 1)
6463 - ([1778, 6455, 6454, 6457, 6456, 6459, 6458, 6479, 6478, 6473, 6472, 6471, 6470, 6477, 6476, 6475, 6474, 1, 1319, 6480, 6468, 6469, 13, 14, 5184, 1999, 6461, 6462, 6464, 6465, 6466, 6467, 6460, 1487, 2399], 1, [1], 1)
6464 - ([1778, 6455, 6454, 6457, 6456, 6459, 6458, 6479, 6478, 6473, 6472, 6471, 6470, 6477, 6476, 6475, 6474, 1, 6480, 6468, 6469, 13, 14, 5184, 1999, 6461, 6462, 6463, 6465, 6466, 6467, 6460, 1487, 2399], 1, [

In [157]:
sh_id_1=1
sh_id_2=6
max_cnt=10
sh_id_2_found=sc.accumulator(0)
superhero_net=(sc.textFile("/home/ggomarr/Documents/Education/Udemy_Spark/23_Marvel-Graph.txt")
                 .map(extract_record)
                 .reduceByKey(lambda x, y: (x[0] + y[0],x[1],x[2],x[3]))
                 .map(initialize_search))
cnt=0
while not sh_id_2_found.value and cnt<max_cnt:
    cnt=cnt+1
    print('Iteration {}'.format(cnt))
    superhero_net=superhero_net.flatMap(expand_frontier)
    superhero_net=superhero_net.reduceByKey(reduce_frontier)
    if superhero_net.lookup(sh_id_2)[0][1]<99:
        sh_id_2_found.add(1)

Iteration 1
Iteration 2
Iteration 3
Iteration 4


In [158]:
superhero_net.lookup(sh_id_2)[0]

([4366,
  5017,
  1207,
  1127,
  3329,
  5587,
  4539,
  5317,
  6283,
  346,
  1299,
  6360,
  6300,
  5002,
  3601,
  3950,
  4466,
  4540,
  3359,
  867,
  1381],
 4,
 [1, 6471, 2399, 6300],
 1)