# **Distributed Graph Processing using Apache Spark**

**Divyesh Ramani - 202201241**

**Manan Patel - 202201310**

In [1]:
!pip install -q pyspark

In [2]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf = conf)

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
from pyspark.sql.types import *

def read_nodes_from_file(file_path):
    node = {}
    with open(file_path, 'r', encoding='utf-8-sig') as f: # Specify 'utf-8-sig'
        for line in f:
            line = line.strip()
            if line and line != "Exit":
                if "_" in line:
                    node_label, node_type = line.split("_")
                    node[node_label] = node_type
                else:
                    print(f"Skipping invalid line: {line}")
    return node

def read_edges_from_file(file_path, temp_graph):
    with open(file_path, 'r', encoding='utf-8-sig') as f:  # Handle BOM
        for line in f:
            line = line.strip()
            if line and line != "Exit":  # Skip empty lines and "Exit"
                # Remove BOM if present:
                if line.startswith('\ufeff'):
                    line = line[1:]
                user_input = line.split("_")
                if (user_input[0] in temp_graph and user_input[1] in temp_graph):
                    if ((user_input[2], 'o') in temp_graph[user_input[0]][1]):
                        temp_graph[user_input[0]][1][(user_input[2], 'o')].append(int(user_input[1]))
                    else:
                        temp_graph[user_input[0]][1][(user_input[2], 'o')] = [int(user_input[1])]

                    if ((user_input[2], 'i') in temp_graph[user_input[1]][1]):
                        temp_graph[user_input[1]][1][(user_input[2], 'i')].append(int(user_input[0]))
                    else:
                        temp_graph[user_input[1]][1][(user_input[2], 'i')] = [int(user_input[0])]


GraphNode_path = "/content/drive/MyDrive/GraphNode.txt"
node = read_nodes_from_file(GraphNode_path)


temp_graph = {}
for key in node:
  temp_graph[key]=[node[key], {}]
GraphEdge_path = '/content/drive/MyDrive/GraphEdge.txt'  # Replace with your edge file path
read_edges_from_file(GraphEdge_path, temp_graph)


QueryNode_path = "/content/drive/MyDrive/NewQueryNode.txt"
query_node = read_nodes_from_file(QueryNode_path)


temp_query = {}
for key in query_node:
  temp_query[key]=[query_node[key], {}]
QueryEdge_path = '/content/drive/MyDrive/NewQueryEdge.txt'
read_edges_from_file(QueryEdge_path, temp_query)



graph = []
for key in temp_graph:
  graph.append((int(key), temp_graph[key][0], temp_graph[key][1]))

GraphRDD = sc.parallelize(graph)
GraphRDD.persist()

query = []
for key in temp_query:
  query.append((int(key), temp_query[key][0], temp_query[key][1]))

QueryRDD = sc.parallelize(query)
QueryRDD.persist()



NumQuery=QueryRDD.count()
segments = []
data = QueryRDD.collect()
for i in range(NumQuery):
    node_label = data[i][1]
    edge_list = list(data[i][2].keys())

    tmp1 = [node_label]
    tmp1.append(edge_list)

    tmp2 = [node_label]
    for j in range(len(edge_list)):
        tmp2.append([edge_list[j]])

    if i == 0 or i == QueryRDD.count() - 1:
        segments.append(tmp1)
    else:
        segments.append(tmp2)

#Initializing the ValidRDD which will initially be equal to the entire GraphRDD
validRDD = GraphRDD
validRDD = validRDD.map(lambda x: [x, []])
#Iterating through the segments of the Query
for i in range(NumQuery):
  node_label = segments[i][0] #Node of the current Query segment.
  edge_list = segments[i][1:] #Edges involved in the current Query segment.
  #Shortlisting the nodes from ValidRDD which match our desired Node Label
  shortlistedRDD = validRDD.filter(lambda x: x[0][1] == node_label)

  #Getting a list of corresponding Edge Label and Edge Direction involved.
  e_lab = []
  e_dir = []
  for j in range(len(edge_list)):
      e_lab.append(edge_list[j][0][0])
      e_dir.append(edge_list[j][0][1])

  #Further shortlisting the nodes based on the edge list.
  for j in range(len(e_lab)):
      shortlistedRDD = shortlistedRDD.filter(lambda x: (e_lab[j], e_dir[j]) in x[0][2].keys())

  #We broadcast the list of shortlisted nodes.
  shortlisted_data = shortlistedRDD.collect()  # Collect data once
  shortlisted_broadcast = sc.broadcast(shortlisted_data)  # Broadcast collected data

  #Now we move to create a sample space for next possible node, neighbours of the current nodes.
  newRDD = sc.parallelize([])
  for j in range(len(shortlisted_data)):
    current_label=shortlisted_data[j][0][0]
    for k in range(len(e_lab)):
      newRDD = newRDD.union(
        GraphRDD.filter(lambda x: x[0] in shortlisted_broadcast.value[j][0][2].get((e_lab[k], e_dir[k]), []))
        .map(lambda x: [x, shortlisted_broadcast.value[j][1] + [current_label]])  # Adjust j as needed
      )

  newRDD.persist()
  validRDD=newRDD

for element in validRDD.collect():
  print(element[1])

[16, 2, 1, 3, 4, 14]


In [6]:
sc.stop()