# installation

In [None]:
! pip install -q pyspark

[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[K     |████████████████████████████████| 198 kB 48.1 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
! tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [None]:
! pip install -q findspark

In [None]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

In [None]:
# First Method
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .getOrCreate()

sc = spark.sparkContext

In [None]:
spark.version

'3.2.1'

# Q5

##### Dataset

In [None]:
!wget "https://raw.githubusercontent.com/optimopium/PySpark-walkthrough/main/data/Marvel-graph.txt" "marvel"
!wget "https://raw.githubusercontent.com/optimopium/PySpark-walkthrough/main/data/Marvel-names.txt"

--2022-04-07 14:48:09--  https://raw.githubusercontent.com/optimopium/PySpark-walkthrough/main/data/Marvel-graph.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1666954 (1.6M) [text/plain]
Saving to: ‘Marvel-graph.txt.1’


2022-04-07 14:48:09 (80.9 MB/s) - ‘Marvel-graph.txt.1’ saved [1666954/1666954]

--2022-04-07 14:48:09--  http://marvel/
Resolving marvel (marvel)... failed: Name or service not known.
wget: unable to resolve host address ‘marvel’
FINISHED --2022-04-07 14:48:09--
Total wall clock time: 0.2s
Downloaded: 1 files, 1.6M in 0.02s (80.9 MB/s)
--2022-04-07 14:48:09--  https://raw.githubusercontent.com/optimopium/PySpark-walkthrough/main/data/Marvel-names.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 18

##### Spark(Parallel BFS)

The Time complexity of BFS is time of 0(β log n) when 0(n 1+1/β) processors are used.  


First, We divide the BFS into Three parts: 
1.   Initialization
> In this stage, we set all nodes' status to "Not ready", except the start node which is set "ready". The distance for all nodes is infinity at the start except the starting node which is 0.


2.   Map
> Here we take all the nodes with "ready" as their status and explore their connections. We turn the status of each node connected into ready and return them in our results list with their id as key. If one of the nodes that is connected to our ready node is the target one, we add to accumulator to save the number of nodes connected to the target on that level. At last, we return the read node we had found with the status of Searched.


3.   Reduce
>In Reduce we only keep the most advanced and minium distance of each node and return them.


In [None]:
spark = SparkSession(sc)

In [None]:
def mapNode(node):
    target = node[0]
    data = node[1]
    connections = data[0]
    distance = data[1]
    searchStatus = data[2]
    results = []
    if (searchStatus == 'READY'):
        for connection in connections:
            newTarget = connection
            newDistance = distance + 1
            newStatus = 'READY'
            if (targetB == connection):
                counter.add(1)
            newEntry = (newTarget, ([], newDistance, newStatus))
            results.append(newEntry)
        searchStatus = 'SEARCHED'
    
    results.append((target, (connections, distance, searchStatus)))
    return results

In [None]:
def reduceNode(data1, data2):
    connections1 = data1[0]
    connections2 = data2[0]
    distance1 = data1[1]
    distance2 = data2[1]
    searchStatus1 = data1[2]
    searchStatus2 = data2[2]
    distance = 10000
    searchStatus = 'UNSEARCHED'
    connections = []
    if (len(connections) > 0):
        connections.extend(connections1)
    if (len(connections2) > 0):
        connections.extend(connections2)
    # Preserve minimum distance
    if (distance1 < distance):
        distance = distance1
    if (distance2 < distance):
        distance = distance2
    # Preserve the most advanced searchStatus
    if (searchStatus1 == 'NOT READY' and 
       (searchStatus2 == 'READY' or searchStatus2 == 'SEARCHED')):
           searchStatus = searchStatus2
    if (searchStatus1 == 'READY' and searchStatus2 == 'SEARCHED'):
           searchStatus = searchStatus2
    if (searchStatus2 == 'NOT READY' and 
       (searchStatus1 == 'READY' or searchStatus1 == 'SEARCHED')):
           searchStatus = searchStatus1
    if (searchStatus2 == 'READY' and searchStatus1 == 'SEARCHED'):
           searchStatus = searchStatus1
    
    return (connections, distance, searchStatus)

In [None]:
def toNode(line):
    data = line.split()
    target = data[0]
    connections = data[1:]
    searchStatus = 'NOT READY'
    distance = 10000
    if (target == targetA):
        searchStatus = 'READY'
        distance = 0
    return (target, (connections, distance, searchStatus))
def transformInput(text_path):
    input = sc.textFile(text_path)
    return input.map(toNode)

In [None]:
path_to_text = "Marvel-graph.txt"
targetA = "90"
targetB = "98"
counter = sc.accumulator(0)
iteratingRDD = transformInput(path_to_text)
for iteration in range(0, 20):
    mapped = iteratingRDD.flatMap(mapNode)
    mapped.collect()
    if (counter.value > 0):
        print("Target B was found at " + str(iteration+1) +  
        " levels(degree of separation) from Target A and was connected to " +           
        str(counter.value) + " super heros at that level.")
        break
    iteratingRDD = mapped.reduceByKey(reduceNode)

Target B was found 4 levels(degree of separation) from Target A and was connected to 3 super heros at that level.


##### most popular heros

In [None]:
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import udf
import pyspark.sql.functions as F

In [None]:
num_of_friends = udf(lambda z: len(z.split())-1, StringType())
extract_node = udf(lambda z: z.split()[0], StringType())
extract_name = udf(lambda z: z.split()[1], StringType())

In [None]:
marvel_df = spark.read.text("Marvel-graph.txt")
marvel_df = marvel_df.withColumn("num_of_friends", num_of_friends('value'))
marvel_df = marvel_df.withColumn("source_node", extract_node('value'))
names_df = spark.read.text("Marvel-names.txt")
names_df = names_df.withColumn("source_node", extract_node('value'))
names_df = names_df.withColumn("name", extract_name('value'))

joined_marvel_df = marvel_df.join(names_df,names_df.source_node ==  marvel_df.source_node,"inner")

In [None]:
popular = marvel_df.agg(F.max("num_of_friends").alias("best")).collect()
most_friends = popular[0]['best']
heros_with_most_friends = joined_marvel_df.filter(marvel_df["num_of_friends"] == most_friends).collect()
for hero in heros_with_most_friends:
  print(hero["name"])

"HUSSAR"
"ERIC
"SCARFE,
"CARDINAL/CLEMDENON"


##### Ordinary BFS

The Time complexity of BFS is O(V + E) when Adjacency List is used

In [None]:
graph = {
  '5' : ['3','7'],
  '3' : ['2', '4'],
  '7' : ['8'],
  '2' : [],
  '4' : ['8'],
  '8' : []
}

visited = [] # List for visited nodes.
queue = []     #Initialize a queue

def bfs(visited, graph, node): #function for BFS
  visited.append(node)
  queue.append(node)

  while queue:          # Creating loop to visit each node
    m = queue.pop(0) 
    print (m, end = " ") 

    for neighbour in graph[m]:
      if neighbour not in visited:
        visited.append(neighbour)
        queue.append(neighbour)

# Driver Code
print("Following is the Breadth-First Search")
bfs(visited, graph, '5')    # function calling