In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

# PART B

In [5]:
# Reading the graph data
adj_mat = spark.read.csv('/content/graph data.csv')

In [6]:
#Changing the column names of the data frame to the nodes names from 1 ... 25
adj_mat_nodes = adj_mat.toDF(*[str(i) for i in range(1,26)])
adj_mat_nodes.show()

+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  0|  1|  0|  1|  1|  0|  1|  1|  1|  0|  0|  1|  0|  1|  1|  0|  1|  1|  0|  1|  0|  1|  0|  1|  1|
|  1|  0|  0|  1|  0|  0|  1|  0|  1|  0|  0|  1|  0|  0|  1|  0|  1|  0|  1|  0|  0|  1|  0|  1|  0|
|  0|  1|  0|  1|  1|  0|  0|  0|  1|  0|  0|  0|  0|  1|  1|  0|  0|  1|  0|  0|  0|  1|  1|  0|  1|
|  0|  1|  1|  0|  0|  0|  1|  0|  0|  0|  1|  1|  0|  1|  0|  0|  1|  1|  0|  0|  1|  0|  1|  1|  0|
|  0|  0|  1|  1|  0|  1|  1|  1|  0|  0|  0|  1|  0|  1|  0|  0|  1|  1|  1|  0|  0|  0|  0|  1|  1|
|  1|  0|  1|  1|  0|  0|  1|  0|  1|  0|  1|  1|  0|  1|  0|  0|  1|  1|  0|  0|  1|  1|  1|  0|  0|
|  0|  0|  0|  1|  0|  1|  0|  1|  1|  0|  1|  1|  0|  0|  1|  0|  0|  1|  1|  0| 

In [7]:
# question 1 - Find all self-loops (i.e. edge between a node onto itself)

# Selecting the diagonal values and checking whether they are 1 to identify self loops
for i in range(25):
  if adj_mat.select(adj_mat.columns[i]).collect()[i][0] == 1:
    print(i+1,i+1)

In [8]:
# question 2 - Node with the largest out-degree

# Converting the data frame to rdd and zipping with the index values as node names 1...25
adj_mat_index = adj_mat_nodes.rdd.zipWithIndex().map(lambda x: (x[0],x[1]+1))

# mapping each row converting the values to integers and summing them up
row_sum = adj_mat_index.map(lambda x: ([int(y) for y in x[0]],x[1])).map(lambda x: (sum(x[0]),x[1]))

# sorting the sum values by the key and identifying the node with the highest number of out-degree
row_sort = row_sum.sortByKey(ascending=False)
row_sort.collect()


[(15, 1),
 (13, 6),
 (13, 10),
 (13, 21),
 (12, 5),
 (12, 7),
 (12, 8),
 (12, 11),
 (12, 25),
 (11, 4),
 (11, 9),
 (11, 12),
 (11, 13),
 (11, 14),
 (11, 15),
 (11, 16),
 (11, 22),
 (10, 2),
 (10, 3),
 (10, 18),
 (10, 20),
 (10, 23),
 (9, 17),
 (9, 24),
 (7, 19)]

In [9]:
print('Node with the highest out going edges is',row_sort.collect()[0][1], 'with', row_sort.collect()[0][0], 'out going edges')

Node with the highest out going edges is 1 with 15 out going edges


In [10]:
# question 3 - Node with the largest in-degree

#initiating an empty list to append the tuple value of  in-degree and node
in_degree = []

# looping over every column mapping over the columns and summing up the values
for i in range(len(adj_mat_nodes.columns)):
  in_degree.append((sum(adj_mat_index.map(lambda x: x[0][i]).map(lambda x: int(x)).collect()),i+1))

# Converting the list to a rdd and sorting by the in-degree values
column_sort = sc.parallelize(in_degree).sortByKey(ascending=False)
column_sort.collect()

[(20, 4),
 (18, 22),
 (16, 24),
 (15, 9),
 (15, 14),
 (14, 2),
 (14, 11),
 (14, 12),
 (13, 17),
 (12, 19),
 (11, 7),
 (11, 15),
 (11, 16),
 (11, 23),
 (10, 3),
 (10, 18),
 (9, 21),
 (9, 25),
 (8, 1),
 (8, 5),
 (7, 8),
 (6, 10),
 (6, 13),
 (5, 20),
 (4, 6)]

In [11]:
print('Node with the highest in coming edges is',column_sort.collect()[0][1],'with',column_sort.collect()[0][0],'incoming edges')

Node with the highest in coming edges is 4 with 20 incoming edges


In [12]:
# question 4 - Find the distribution of vertices in-degrees

# grouping by the key i.e is the in-degree values to list all the nodes with the same values
column_sort.groupByKey().map(lambda x: (x[0],list(x[1]))).collect()


[(20, [4]),
 (18, [22]),
 (16, [24]),
 (14, [2, 11, 12]),
 (12, [19]),
 (10, [3, 18]),
 (8, [1, 5]),
 (6, [10, 13]),
 (4, [6]),
 (15, [9, 14]),
 (13, [17]),
 (11, [7, 15, 16, 23]),
 (9, [21, 25]),
 (7, [8]),
 (5, [20])]

In [13]:
# question 5 - Find a path between node 1 to node 9 [output: a list of nodes that connects 1 and 9]

# function to find a path between the given edges
def find_path(start_node, end_node, matrix):

    # setting visiting to nodes to empty set
    visited_nodes = set()

    #initiating queque with the tuple of 1st node given and an empty path
    queue = [(start_node, [])]

    while queue:
        # assigning the present node as the start node and path as the intial path
        present_node, path = queue.pop(0)

        # if present node is the end node then returning the path
        if present_node == end_node:
            return path + [present_node]

        # checking if the present node is not in visited nodes
        if present_node not in visited_nodes:
            visited_nodes.add(present_node)

            # Get all the nodes with out going edges from present node
            row = matrix.filter(lambda x: x[1] == present_node).first()[0]
            neighbors = [col for col in range(1, 26) if row[str(col)] == '1']

            # for every out going edge checking if the edge is visited and running the loop for the unvisited nodes
            for neighbor in neighbors:
                if neighbor not in visited_nodes:
                    queue.append((neighbor, path + [present_node]))

    return None

# Finding a path from node 1 to node 9
path = find_path(1, 9, adj_mat_index)

print(path)



[1, 9]
