In [1]:
import collections
from datetime import time
import os

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install pyspark
!pip install -q findspark
!pip install graphframes
!wget http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.8.1-spark3.0-s_2.12/graphframes-0.8.1-spark3.0-s_2.12.jar

In [15]:
import findspark
import pyspark
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2" 

findspark.init()

SUBMIT_ARGS = "--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

conf = pyspark.SparkConf()
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
print(sc._conf.getAll())

ValueError: ignored

In [20]:
from pyspark.sql.types import *

spark = SparkSession(sc)


In [308]:


findspark.init()

print(pyspark.__version__)

spark = SparkSession.builder.master("local[*]").getOrCreate() 
sc=spark.sparkContext

3.0.1


In [6]:
from graphframes import *


# Classic graph

In [9]:
class Graph:
    adjacency_list = {}
    def __init__(self, adjacency_list=None):
        """ initializes a graph object 
            If no dictionary or None is given, 
            an empty dictionary will be used
        """
        if(adjacency_list == None):
            adjacency_list = {}
        else:
            self.adjacency_list = adjacency_list

    def nodes(self):
        """ returns the vertices of a graph """
        return list(self.adjacency_list.keys())

    def edges(self):
        """ returns the edges of a graph """
        return self.generate_edges()

    def add_vertex(self, vertex):
        """ If the vertex "vertex" is not in 
            self.__adjacency_list, a key "vertex" with an empty
            list as a value is added to the dictionary. 
            Otherwise nothing has to be done. 
        """
        if vertex not in self.adjacency_list:
            self.adjacency_list[vertex] = []

    def add_edge(self, edge):
        """ assumes that edge is of type set, tuple or list; 
            between two vertices can be multiple edges! 
        """
        (vertex1, vertex2) = edge
        if vertex2 not in self.adjacency_list:
          self.adjacency_list[vertex2] = []
        if vertex1 in self.adjacency_list:
            self.adjacency_list[vertex1].append(vertex2)
        else:
            self.adjacency_list[vertex1] = [vertex2]

    def generate_edges(self):
        """ A static method generating the edges of the 
            graph "graph". Edges are represented as sets 
            with one (a loop back to the vertex) or two 
            vertices 
        """
        edges = []
        for vertex in self.adjacency_list:
            for neighbour in self.adjacency_list[vertex]:
                if {neighbour, vertex} not in edges:
                    edges.append({vertex, neighbour})
        return edges

    def bfs(self, root):
      visited, queue = set(), collections.deque([root])
      visited.add(root)

      while queue:

          # Dequeue a node from queue
          node = queue.popleft()
          print(str(node) + " -> ", end="")

          # If not visited, mark it as visited, and
          # enqueue it
          for neighbour in self.adjacency_list[node]:
              if neighbour not in visited:
                  visited.add(neighbour)
                  queue.append(neighbour)

    def dfs(self, node, visited={}):
        if node not in visited:
            print(str(node) + " -> ", end="")
            visited.add(node)
            for neighbour in self.adjacency_list[node]:
                self.dfs(neighbour, visited)

In [11]:
g = Graph()

In [12]:
import csv
nodes = set()
with open('/content/artist_edges.csv') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader, None)  # skip the headers
    for row in csv_reader:
        g.add_edge((row[0], row[1]))
        nodes.add(row[0])
        nodes.add(row[1])

In [None]:
import time
root = list(g.adjacency_list.keys())[0]
start = time.time()
g.bfs(root)
end = time.time()
print("\nBSF standard graph time elapsed:" + str(end - start))

In [None]:
start = time.time()
visited = set()
g.dfs(root, visited)
end = time.time()
print("\nBSF standard graph time elapsed:" + str(end - start))

# Spark

In [None]:
import pandas as pd
colnames=['src', 'dst']
edgesDF = pd.read_csv('/content/artist_edges.csv', names=colnames, header=1)
nodesDF = pd.DataFrame(list(nodes))

In [None]:
rddEdges = sc.parallelize(edgesDF)
rddNodes = sc.parallelize(nodesDF)
edgesSparkDf = spark.createDataFrame(rddEdges, StructType([
    StructField("src", StringType(), True),
    StructField("dst", StringType(), True)
]))
nodesSparkDf = spark.createDataFrame(rddNodes, StructType([
    StructField("id", StringType(), True),
]))

In [None]:
sparkGraph = GraphFrame(nodesSparkDf, edgesSparkDf)

In [None]:
sparkGraph.bfs()

In [62]:
sparkGraph.bfs()

1794 -> 12439 -> 14164 -> 17527 -> 10649 -> 6144 -> 40276 -> 46876 -> 15989 -> 34436 -> 49618 -> 31717 -> 16926 -> 18139 -> 10854 -> 42003 -> 14137 -> 33879 -> 43486 -> 8697 -> 36561 -> 20175 -> 47689 -> 17615 -> 47712 -> 34944 -> 26978 -> 30471 -> 38605 -> 39591 -> 27956 -> 47984 -> 12916 -> 45152 -> 45580 -> 18877 -> 32982 -> 13014 -> 38793 -> 36880 -> 38357 -> 46402 -> 32687 -> 38213 -> 41859 -> 40667 -> 2359 -> 22417 -> 30967 -> 48275 -> 28141 -> 18534 -> 33995 -> 35706 -> 45196 -> 18296 -> 33946 -> 32298 -> 39683 -> 32141 -> 36363 -> 26099 -> 32807 -> 32516 -> 7599 -> 40251 -> 39341 -> 37933 -> 34597 -> 39731 -> 33230 -> 31867 -> 50200 -> 7185 -> 40186 -> 41050 -> 20168 -> 38481 -> 38852 -> 3304 -> 25981 -> 49167 -> 44207 -> 23951 -> 39158 -> 30151 -> 34315 -> 31358 -> 37821 -> 45712 -> 33422 -> 44340 -> 25841 -> 29854 -> 33859 -> 11501 -> 48619 -> 38510 -> 18861 -> 37680 -> 31336 -> 15212 -> 28794 -> 25160 -> 17782 -> 47323 -> 34342 -> 31832 -> 40666 -> 31055 -> 22831 -> 22523 ->

In [63]:
start = time.time()
visited = set()
g.dfs(root, visited)
end = time.time()
print("\nBSF standard graph time elapsed:" + str(end - start))

1794 -> 12439 -> 47712 -> 34944 -> 41877 -> 28095 -> 48082 -> 50192 -> 50129 -> 47501 -> 50014 -> 48160 -> 49017 -> 50150 -> 49884 -> 50410 -> 50442 -> 50493 -> 50456 -> 49547 -> 50015 -> 42232 -> 49697 -> 49792 -> 49612 -> 15288 -> 49615 -> 49370 -> 49405 -> 49856 -> 49503 -> 49718 -> 49973 -> 50043 -> 49956 -> 49979 -> 50077 -> 50083 -> 50417 -> 50301 -> 50382 -> 50254 -> 50438 -> 50218 -> 50091 -> 49835 -> 49651 -> 49640 -> 43923 -> 49867 -> 42800 -> 49833 -> 49966 -> 50123 -> 50467 -> 50037 -> 50243 -> 48905 -> 49475 -> 50054 -> 49805 -> 50441 -> 49056 -> 49530 -> 49917 -> 50468 -> 49817 -> 50111 -> 49436 -> 49776 -> 17003 -> 49594 -> 48538 -> 50407 -> 44674 -> 50334 -> 40968 -> 49609 -> 49860 -> 50385 -> 50237 -> 50349 -> 45432 -> 49572 -> 45975 -> 39964 -> 48806 -> 50312 -> 49650 -> 47045 -> 48398 -> 49581 -> 50165 -> 50290 -> 50076 -> 49723 -> 49834 -> 48749 -> 49955 -> 47469 -> 48908 -> 48845 -> 46090 -> 48724 -> 36947 -> 50328 -> 49085 -> 49954 -> 47124 -> 49576 -> 48824 -> 49

# Spark

In [13]:
import pandas as pd
colnames=['src', 'dst']
edgesDF = pd.read_csv('/content/artist_edges.csv', names=colnames, header=1)
nodesDF = pd.DataFrame(list(nodes))

In [21]:
rddEdges = sc.parallelize(edgesDF)
rddNodes = sc.parallelize(nodesDF)
edgesSparkDf = spark.createDataFrame(rddEdges, StructType([
    StructField("src", StringType(), True),
    StructField("dst", StringType(), True)
]))
nodesSparkDf = spark.createDataFrame(rddNodes, StructType([
    StructField("id", StringType(), True),
]))

In [22]:
sparkGraph = GraphFrame(nodesSparkDf, edgesSparkDf)

In [None]:
sparkGraph.bfs()