[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/goosekeeper233/Spark-Graph-Algorithms/blob/main/csc502_project.ipynb)

Our project implemented the graph algorithms in MapReduce model discussed in article "Graph Twiddling in a MapReduce World" Spark. We tested our implementation with NetworkX using databases "Zachary's karate club".

# Env Setup

## Spark Setup

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz
!tar xf spark-3.3.2-bin-hadoop2.tgz

In [None]:
!pip install -q findspark
from collections import Counter
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop2"

import findspark
findspark.init("spark-3.3.2-bin-hadoop2")# SPARK_HOME

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

## NetworkX Setup

In [None]:
import networkx as nx

G = nx.karate_club_graph()

## Database Setup

In [None]:
# Convert the graph into an edge list
# Each item in the list will be a tuple (source, target)
edge_list = [(e[0], e[1]) for e in G.edges()]

# Parallelize the edge list to create an RDD
edges_rdd = sc.parallelize(edge_list)

# Algorithms

## Augmenting Edges with Degrees

In [None]:
def augment_edges_with_degrees_spark(edge_rdd):
  def emit_edges(edge):
    src, des = edge
    return [(src, edge), (des, edge)]
  map_1 = edge_rdd.flatMap(emit_edges)

  def reducer_1(input):
    vertex, edges = input
    output = []
    length = len(edges)
    for edge in edges:
      if vertex == edge[0]:
        output.append( ((edge), (length, 0)) )
      else:
        output.append( ((edge), (0, length)) )
    return output
  reduce_1 = map_1.groupByKey().mapValues(list).flatMap(reducer_1)

  def reducer_2(input):
    edge, degree_list = input
    d = [0, 0]
    for degree in degree_list:
      d[0] += degree[0]
      d[1] += degree[1]
    return (edge, tuple(d))
  reduce_2 = reduce_1.groupByKey().mapValues(list).map(reducer_2)

  return reduce_2

augmented_edges_rdd = augment_edges_with_degrees_spark(edges_rdd)


def augment_edges_with_degrees_networkx(graph):
    augmented_edges = []
    for edge in graph.edges():
        degree1 = graph.degree[edge[0]]
        degree2 = graph.degree[edge[1]]
        augmented_edges.append(((edge[0], edge[1]), (degree1, degree2)))
    return augmented_edges
G = nx.karate_club_graph()
augmented_edges_networkx = augment_edges_with_degrees_networkx(G)

augmented_edges_spark = augmented_edges_rdd.collect()
augmented_edges_spark_sorted = sorted(augmented_edges_spark, key=lambda x: (x[0], x[1]))
augmented_edges_networkx_sorted = sorted(augmented_edges_networkx, key=lambda x: (x[0], x[1]))

comparison = augmented_edges_spark_sorted == augmented_edges_networkx_sorted
print(f"The outputs are the same: {comparison}")

The outputs are the same: True


## Simplifying the Graph

In [None]:
#let me see see
#Our example is karate_club, it has no repeating edge neither self-loop
#to implement the task in the paper, we need to create a simple case
#it is indirected, no weight,contains repeating edge and self-loop
simple_list = [(6,7),(1, 2), (1, 2), (3, 4), (3, 4), (3, 4), (3, 3), (4, 5)]

#prepare it for spark
edges_rdd_simplify = sc.parallelize(simple_list);

#prepare it for networkx
G = nx.MultiGraph()
G.add_edges_from(simple_list)
#the input, output are the same as edges_rdd
def simplify_spark(edge_rdd):
  #remove self-loop; the way in paper says drop all repeating in the pair
  #if the member is less than 2, then means the pair originally has looped
  #removed_loop_rdd = edge_rdd.filter(lambda edge: edge[0] != edge[1])
  removed_loop_rdd = edge_rdd.flatMap(lambda edge: [edge] if edge[0] != edge[1] else [])

  #bins the edge by membership;should use hash;however we can't explicitly cast that using local
  #python function on spark,moreover, https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html
  binned_rdd = removed_loop_rdd.map(lambda edge: (tuple(sorted(edge)),1))
  #binned_rdd = removed_loop_rdd.map(lambda edge: (hash(tuple(sorted(edge))),edge))

  #reduce
  edges_grouped = binned_rdd.reduceByKey(lambda a, b: a+b)
  simplified_edge_rdd = edges_grouped.map(lambda x: x[0])

  #simplified_edge_rdd = binned_rdd.reduceByKey(lambda edge1, edge2: edge1)
  return simplified_edge_rdd

simplified = simplify_spark(edges_rdd_simplify)
print(simplified.collect())
simplified_sort = simplified.sortBy(lambda x: (x[0], x[1]))
print(simplified_sort.collect())

#graph() does remove duplicate
G = nx.Graph(G)
#remove selfloop explicitly
G.remove_edges_from(nx.selfloop_edges(G))
G_sort = sorted(G.edges(), key=lambda x: (x[0], x[1]))
print(G_sort)

print(f"The outputs are the same: {simplified_sort.collect() == G_sort }")

#2nd test; checking if this part is consitent with the original dataset
#print(simplify_spark(edges_rdd).collect())
#print(edges_rdd.collect())

[(6, 7), (1, 2), (3, 4), (4, 5)]
[(1, 2), (3, 4), (4, 5), (6, 7)]
[(1, 2), (3, 4), (4, 5), (6, 7)]
The outputs are the same: True


## Enumerating Triangles

In [None]:
#let me try try
#I guess, the output from paper is wrong(flipped);anyway
#the input of this function MUST be augmented edge
#check output from augmented edge.structure is ((0, 1), (16, 9))
augumented_rdd = sc.parallelize(augmented_edges_spark_sorted)
print(augumented_rdd.collect())
#def map_edge_by_lowest(edge_data):
 # edge,(degree1,degree2) = edge_data
 # return ( (min(degree1,degree2),edge))

def edge_wanted(grouped):
  key, edges = grouped
  edges_list = list(edges)
  pairs = []
  for i in range(len(edges_list)):
    for j in range(i + 1, len(edges_list)):
      sorted_pair = tuple(sorted((edges_list[i][0] if edges_list[i][0] != key else edges_list[i][1],
                     edges_list[j][0] if edges_list[j][0] != key else edges_list[j][1])))
      #sorted_pair = tuple(sorted([edges_list[i][0],edges_list[j][0]]))
      pairs.append(((sorted_pair,(edges_list[i], edges_list[j]))))
  return pairs

#first, refine the structure so the pair is now (lowest_degree_vertext, edge)
#second, group them by key and ignore the solo edge
#for any 2 of the pair, generate a open traid pair looking for the third edge
#to close them. The paper talks about reduce, however we can't use reducebykey
#in that case; reducebykey will aggregate all value,not what we want
def enumerating_triangle(edges_rdd):
  lowest_degree_rdd = edges_rdd.map(lambda edge: (edge[0][0] if edge[1][0] <= edge[1][1] else edge[0][1],edge[0]))\
  .groupByKey()\
  .filter(lambda x:len(list(x[1])) > 1)\
  .flatMap(edge_wanted)
  #return lowest_degree_rdd

  third_edge_rdd = edges_rdd.map(lambda edge: ((edge[0][0], edge[0][1]) if edge[0][0] <= edge[0][1] else (edge[0][1], edge[0][0]), edge[0]))
  close_triad_rdd = lowest_degree_rdd.join(third_edge_rdd)\
  .map(lambda x: (x[1][0][0], x[1][0][1], x[1][1]))
  return close_triad_rdd
test = enumerating_triangle(augumented_rdd)
print(test.collect())
print(test.count())
#te = third_edge_rdd.collect()
#print(te)

#check from networkx
G = nx.karate_club_graph()
triangles = [triangle for triangle in nx.enumerate_all_cliques(G) if len(triangle) == 3]
print(triangles)
#count the number
print(len(triangles))

[((0, 1), (16, 9)), ((0, 2), (16, 10)), ((0, 3), (16, 6)), ((0, 4), (16, 3)), ((0, 5), (16, 4)), ((0, 6), (16, 4)), ((0, 7), (16, 4)), ((0, 8), (16, 5)), ((0, 10), (16, 3)), ((0, 11), (16, 1)), ((0, 12), (16, 2)), ((0, 13), (16, 5)), ((0, 17), (16, 2)), ((0, 19), (16, 3)), ((0, 21), (16, 2)), ((0, 31), (16, 6)), ((1, 2), (9, 10)), ((1, 3), (9, 6)), ((1, 7), (9, 4)), ((1, 13), (9, 5)), ((1, 17), (9, 2)), ((1, 19), (9, 3)), ((1, 21), (9, 2)), ((1, 30), (9, 4)), ((2, 3), (10, 6)), ((2, 7), (10, 4)), ((2, 8), (10, 5)), ((2, 9), (10, 2)), ((2, 13), (10, 5)), ((2, 27), (10, 4)), ((2, 28), (10, 3)), ((2, 32), (10, 12)), ((3, 7), (6, 4)), ((3, 12), (6, 2)), ((3, 13), (6, 5)), ((4, 6), (3, 4)), ((4, 10), (3, 3)), ((5, 6), (4, 4)), ((5, 10), (4, 3)), ((5, 16), (4, 2)), ((6, 16), (4, 2)), ((8, 30), (5, 4)), ((8, 32), (5, 12)), ((8, 33), (5, 17)), ((9, 33), (2, 17)), ((13, 33), (5, 17)), ((14, 32), (2, 12)), ((14, 33), (2, 17)), ((15, 32), (2, 12)), ((15, 33), (2, 17)), ((18, 32), (2, 12)), ((18, 

## Enumerating Rectangles (4-cycles)

In [None]:
def enumerating_rectangles_spark(edges_rdd):
  # Step 1: Bin every edge by both its high and low vertex, marking each output record as high or low, producing a binned edge file.
  augmented_edges_rdd = augment_edges_with_degrees_spark(edges_rdd) # [((0, 6), (16, 4)), ((4, 6), (3, 4)), ((5, 6), (4, 4)), ...
  def transform_edge(edge):
    ((src, dst), (src_deg, dst_deg)) = edge
    if src_deg > dst_deg:
        return [(src, ((src, dst), 'H')), (dst, ((src, dst), 'L'))]
    else:
        return [(src, ((src, dst), 'L')), (dst, ((src, dst), 'H'))]
  labeled_edges_rdd = augmented_edges_rdd.flatMap(transform_edge)
  grouped_labeled_edges_rdd = labeled_edges_rdd.groupByKey()

  # Step 2: Go through each bin in the binned edge file,
  #         exporting every pair of distinct low records in the bin and every pair of a low record with a high record in the bin to produce a triad file.
  #         Each record in the triad file is binned by the pair of vertices the triad connects, that pair ordered lexicographically
  # (8, [((8, 30), 'H'), ((0, 8), 'L'), ((2, 8), 'L'), ((8, 32), 'L'), ((8, 33), 'L')])
  def process_bin(bin_data):
      common_vertex, edges = bin_data
      lows = []
      highs = []
      for edge in edges:
          (src, dst), label = edge
          if label == 'L':
              lows.append((src, dst))
          else:
              highs.append((src, dst))
      # Generate pairs of low-low and low-high, excluding the common vertex
      triads = []
      for low in lows:
          other_vertex_low = low[1] if low[0] == common_vertex else low[0]
          # Low-Low pairs
          for other_low in lows:
              if low != other_low:
                  other_vertex_low_2 = other_low[1] if other_low[0] == common_vertex else other_low[0]
                  triad_key = tuple(sorted([other_vertex_low, other_vertex_low_2]))
                  triads.append((triad_key, (low, other_low)))
          # Low-High pairs
          for high in highs:
              other_vertex_high = high[1] if high[0] == common_vertex else high[0]
              triad_key = tuple(sorted([other_vertex_low, other_vertex_high]))
              triads.append((triad_key, (low, high)))
      return triads
  triad_edges_rdd = grouped_labeled_edges_rdd.flatMap(process_bin)
  triad_grouped_rdd = triad_edges_rdd.groupByKey().mapValues(list)

  # Step 3: Go through each bin in the triad file bin, exporting a rectangle for every triad pair in the bin
  # ((0, 6), [((4, 6), (0, 4)), ((0, 4), (4, 6)), ((5, 6), (0, 5)), ((0, 5), (5, 6))])
  def find_rectangles(bin_data):
      _, triads = bin_data
      rectangles = []
      # Remove duplicate triads
      unique_triads = list(set(triads))
      # Compare each pair of unique triads to find rectangles
      for i in range(len(unique_triads)):
          for j in range(i + 1, len(unique_triads)):
              triad1 = unique_triads[i]
              triad2 = unique_triads[j]
              # Combine the edges of both triads and find all unique vertices
              all_edges = triad1 + triad2
              all_vertices = set([vertex for edge in all_edges for vertex in edge])
              # A rectangle is formed if there are exactly 4 unique vertices
              if len(all_vertices) == 4:
                  # Sort edges within the rectangle to remove redundancy
                  # sorted_rectangle = tuple(sorted([tuple(sorted(edge)) for edge in all_edges]))
                  sorted_rectangle = tuple(sorted([edge for edge in all_edges]))  # don't really need to sort vertices in edges since we assme it's pre-sorted in this project and we keep them intact everywhere
                  rectangles.append(sorted_rectangle)
      return rectangles
  # Map each bin through the find_rectangles function
  flattened_rectangles_rdd = triad_grouped_rdd.flatMap(find_rectangles).distinct()
  return flattened_rectangles_rdd


rectangles_rdd = enumerating_rectangles_spark(edges_rdd)
print(rectangles_rdd.collect())
print(f"Spark rectangle count: {rectangles_rdd.count()}")

[((0, 8), (0, 31), (8, 32), (31, 32)), ((0, 2), (0, 31), (2, 32), (31, 32)), ((0, 2), (0, 8), (2, 32), (8, 32)), ((0, 2), (0, 31), (2, 28), (28, 31)), ((2, 8), (2, 32), (8, 30), (30, 32)), ((2, 28), (2, 32), (28, 31), (31, 32)), ((0, 4), (0, 5), (4, 6), (5, 6)), ((0, 4), (0, 5), (4, 10), (5, 10)), ((4, 6), (4, 10), (5, 6), (5, 10)), ((0, 6), (0, 10), (4, 6), (4, 10)), ((0, 5), (0, 6), (5, 16), (6, 16)), ((0, 8), (0, 13), (2, 8), (2, 13)), ((0, 1), (0, 8), (1, 2), (2, 8)), ((0, 3), (0, 8), (2, 3), (2, 8)), ((0, 7), (0, 8), (2, 7), (2, 8)), ((0, 1), (0, 13), (1, 2), (2, 13)), ((0, 3), (0, 13), (2, 3), (2, 13)), ((0, 7), (0, 13), (2, 7), (2, 13)), ((0, 1), (0, 3), (1, 2), (2, 3)), ((0, 1), (0, 7), (1, 2), (2, 7)), ((0, 3), (0, 7), (2, 3), (2, 7)), ((0, 1), (0, 8), (1, 30), (8, 30)), ((1, 2), (1, 30), (2, 8), (8, 30)), ((28, 31), (28, 33), (31, 32), (32, 33)), ((23, 27), (23, 29), (27, 33), (29, 33)), ((23, 29), (23, 32), (29, 33), (32, 33)), ((23, 27), (23, 32), (27, 33), (32, 33)), ((26,

In [None]:
from itertools import combinations

def find_rectangles_networkx(G):
    rectangles_set = set()
    nodes = list(G.nodes())
    for quad in combinations(nodes, 4):
      # Check if the quadruple of nodes can form any of the three configurations
      rectangle = rectangle_configuration(G, quad)
      if len(rectangle) != 0:
          rectangles_set.update(rectangle)
    rectangles_list = list(rectangles_set)
    return rectangles_list

def rectangle_configuration(G, quad):
    v1, v2, v3, v4 = sorted(quad)
    triangles = []
    if (G.has_edge(v1,v2) and G.has_edge(v1,v4) and G.has_edge(v2,v3) and G.has_edge(v3,v4)):
      edges = [(v1,v2),(v1,v4),(v2,v3),(v3,v4)]
      triangle = tuple(sorted([edge for edge in edges]))
      triangles.append(triangle)
    if (G.has_edge(v1,v3) and G.has_edge(v1,v4) and G.has_edge(v2,v3) and G.has_edge(v2,v4)):
      edges = [(v1,v3),(v1,v4),(v2,v3),(v2,v4)]
      triangle = tuple(sorted([edge for edge in edges]))
      triangles.append(triangle)
    if (G.has_edge(v1,v2) and G.has_edge(v1,v3) and G.has_edge(v2,v4) and G.has_edge(v3,v4)):
      edges = [(v1,v2),(v1,v3),(v2,v4),(v3,v4)]
      triangle = tuple(sorted([edge for edge in edges]))
      triangles.append(triangle)
    return triangles

G = nx.karate_club_graph()
rectangles_networkx = find_rectangles_networkx(G)
print(rectangles_networkx)
print(f"Found {len(rectangles_networkx)} rectangles")

[((24, 27), (24, 31), (27, 33), (31, 33)), ((0, 3), (0, 13), (1, 3), (1, 13)), ((0, 5), (0, 6), (5, 16), (6, 16)), ((1, 3), (1, 13), (2, 3), (2, 13)), ((0, 3), (0, 8), (2, 3), (2, 8)), ((18, 32), (18, 33), (30, 32), (30, 33)), ((0, 2), (0, 3), (1, 2), (1, 3)), ((1, 19), (1, 30), (19, 33), (30, 33)), ((23, 25), (23, 27), (24, 25), (24, 27)), ((1, 2), (1, 7), (2, 3), (3, 7)), ((0, 12), (0, 13), (3, 12), (3, 13)), ((15, 32), (15, 33), (31, 32), (31, 33)), ((1, 7), (1, 13), (3, 7), (3, 13)), ((23, 29), (23, 33), (29, 32), (32, 33)), ((18, 32), (18, 33), (23, 32), (23, 33)), ((20, 32), (20, 33), (23, 32), (23, 33)), ((26, 29), (26, 33), (29, 32), (32, 33)), ((23, 27), (23, 29), (27, 33), (29, 33)), ((2, 9), (2, 27), (9, 33), (27, 33)), ((0, 2), (0, 3), (2, 13), (3, 13)), ((18, 32), (18, 33), (20, 32), (20, 33)), ((18, 32), (18, 33), (29, 32), (29, 33)), ((0, 2), (0, 17), (1, 2), (1, 17)), ((2, 9), (2, 13), (9, 33), (13, 33)), ((22, 32), (22, 33), (23, 32), (23, 33)), ((30, 32), (30, 33), (3

In [None]:
print(set(rectangles_rdd.collect()) == set(rectangles_networkx))

True


## Finding Trusses

In [None]:
# Example usage:
triangles_test =  [[0, 1, 2], [0, 1, 3], [0, 2, 3] , [1, 2, 3], [1, 3, 4]]
def find_trusses(sc, triangles_list, k):
    # Convert the list of triangles into an RDD
    triangles_rdd = sc.parallelize(triangles_list)

    # FlatMap function to emit all edges from each triangle
    def emit_edges(triangle):
        # Sort the vertices of the triangle
        sorted_triangle = sorted(triangle)
        # Emit edges with 1 as the count
        return [((sorted_triangle[i], sorted_triangle[j]), 1) for i in range(3) for j in range(i+1, 3)]

    # ReduceByKey function to count the occurrences of each edge
    edge_counts_rdd = triangles_rdd.flatMap(emit_edges).reduceByKey(lambda a, b: a + b)

    # Filter function to retain edges that are part of at least k-2 triangles
    trusses_rdd = edge_counts_rdd.filter(lambda item: item[1] >= (k - 2))

    # Collect and return the edges that form trusses
    trusses = trusses_rdd.collect()
    return trusses

# Example usage:
# Assuming that `sc` is your SparkContext and `triangles` is your input list of triangles
k = 3  # Adjust k according to your truss requirement
trusses = find_trusses(sc, triangles_test , k)
print("Edges that form trusses:", trusses)



k = 5  # Or any other value for k-trusses

trusses = find_trusses(sc,triangles , k)
print("Edges that form trusses:", trusses)

Edges that form trusses: [((0, 2), 2), ((1, 3), 3), ((0, 1), 2), ((1, 2), 2), ((0, 3), 2), ((2, 3), 2), ((1, 4), 1), ((3, 4), 1)]
Edges that form trusses: [((0, 2), 5), ((1, 3), 4), ((1, 7), 3), ((1, 13), 3), ((3, 7), 3), ((3, 13), 3), ((8, 32), 3), ((23, 33), 3), ((29, 33), 3), ((0, 1), 7), ((1, 2), 4), ((0, 3), 5), ((0, 7), 3), ((0, 13), 3), ((2, 3), 4), ((2, 7), 3), ((2, 13), 3), ((32, 33), 10)]
