In [1]:
## Iterative Breadth-First-Search (BFS) ##

# If you have 3  nodes: A,B and C
# Distance between A and B is so 1 Degree of Seperation
# Distance between A and C is 2 (B+1) so 2 Degrees of Seperation

## Accumulators: Allows many executors to increment a shared variable


In [2]:
# Superhero Degrees of Seperation
from google.colab import files
uploaded = files.upload()

Saving Marvel-graph.txt to Marvel-graph.txt
Saving Marvel-names.txt to Marvel-names.txt


In [3]:
# Installing Spark 3.1.1 with dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
print('JDK 8 installed')
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
print('Spark 3.1.1 installer downloaded')
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
print('Spark 3.1.1 installed')
!pip install -q findspark
print('findspark installed')
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"
print('Environment Variables Set')
import findspark
findspark.init()
print('Install Complete')

JDK 8 installed
Spark 3.1.1 installer downloaded
Spark 3.1.1 installed
findspark installed
Environment Variables Set
Install Complete


In [4]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Create Spark Session
conf = SparkConf().setMaster('local').setAppName('DegreesofSeperation')
sc = SparkContext.getOrCreate(conf = conf)

# Find Degrees between Spider Man and Adam 3031
startCharacterID = 5306
targetCharacterID = 14

# Accumulator to signal when we reach target character during BFS traversal
hitCounter = sc.accumulator(0)

In [5]:
# Convert data to BFS
def convertToBFS(line):
  # Split file into fields
  fields = line.split()
  # Extract ID column
  heroID = int(fields[0])
  connections = []
  # Take all from row 1 onwards
  for connection in fields[1:]:
    connections.append(int(connection))

  # Non-target heroes are grey. Distance is 9999 (infinity)
  colour = 'WHITE'
  dist = 9999

  # Starting hero is grey, distance is 0 as first
  if (heroID == startCharacterID):
    colour = 'GRAY'
    dist = 0
  # Key and Value output
  return (heroID,(connections,dist,colour))

In [6]:
# Load Marvel Graph into RDD (nodes for BFS)
def startingRDD():
  input = sc.textFile('Marvel-graph.txt')
  return input.map(convertToBFS)

In [7]:
# Extract node info
def bfsMap(node):
  characterID = node[0]
  data = node[1]
  connections = data[0]
  dist = data[1]
  colour = data[2]
  results = []

  # If this node needs to be expanded
  if (colour == 'GRAY'):
    for connection in connections:
      newCharacterID = connection
      newDist = dist + 1
      newColour = 'GRAY'
      # if we find target, increment counter
      if (targetCharacterID == connection):
        hitCounter.add(1)
      
      newEntry = (newCharacterID, ([], newDist, newColour))
    colour = 'BLACK'
  results.append((characterID,(connections,dist,colour)))
  return results

In [8]:
# For each character ID preserve darkest colour and shortest path
def bfsReduce(data1,data2):
  edges1 = data1[0]
  edges2 = data2[0]
  dist1 = data1[1]
  dist2 = data2[1]
  colour1 = data1[2]
  colour2 = data2[2]

  dist = 9999
  colour = 'WHITE'
  edges = []

  # If node is original preserve connections
  if (len(edges1) > 0):
    edges = edges1
  elif (len(edges2) > 0):
    edges = edges2

  # Preserve shortest distance
  if (dist1 < dist):
    dist = dist1
  if (dist2 < dist):
    dist = dist2

  # Preserve darkest colour
  if (colour1 == 'WHITE' and (colour2 == 'GRAY' or colour2 == 'BLACK')):
    colour = colour2
  if (colour1 == 'GRAY' and colour2 == 'BLACK'):
    colour = colour2
  return (edges,dist,colour)

In [9]:
iterationRDD = startingRDD()

# Assume no more than 10 degrees away so run 10 times
for iteration in range(0,10):
  print('Running BFS iterations: '+ str(iteration+1))
  mapped = iterationRDD.flatMap(bfsMap)
  # Map functions are transforms, nothing happens so need action
  # mapped.count forces RDD to be evaluated, updated accumulator
  print('Processing ' + str(mapped.count()) + ' Values')

  if (hitCounter.value > 0):
    print('Hit the target counter from ' + str(hitCounter.value) \
          + ' different direction(s)')
    break

  # Reducer combines data for each character ID
  # Keeping darkest colour and shortest path
  iterationRDD = mapped.reduceByKey(bfsReduce)

Running BFS iterations: 1
Processing 6589 Values
Running BFS iterations: 2
Processing 6486 Values
Running BFS iterations: 3
Processing 6486 Values
Running BFS iterations: 4
Processing 6486 Values
Running BFS iterations: 5
Processing 6486 Values
Running BFS iterations: 6
Processing 6486 Values
Running BFS iterations: 7
Processing 6486 Values
Running BFS iterations: 8
Processing 6486 Values
Running BFS iterations: 9
Processing 6486 Values
Running BFS iterations: 10
Processing 6486 Values


In [None]:
### STOPED AT 6:50

In [11]:
# Stop session
sc.stop()