In [None]:
# install GraphFrames using the following command with your Virtual Environment activated and pyspark already installed:
# pyspark --packages graphframes:graphframes:0.6.0-spark2.3-s_2.11
from itertools import islice

import pyspark

from les_mis import LES_MIS_GRAPH

sc = pyspark.SparkContext.getOrCreate()
graph = sc.parallelize(LES_MIS_GRAPH, 32)
NUM_PARTITIONS = graph.getNumPartitions()

In [None]:
nodes = graph.flatMap(lambda x: (x[0], x[1])).distinct().collect()
node_to_id = {node: i for i, node in enumerate(nodes)}
id_to_node = {i: node for i, node in enumerate(nodes)}
node_id_to_cluster = {i: f"cluster:{i}" for i in range(len(nodes))}
len(nodes)

In [None]:
CLUSTERS: dict[str, dict[int, tuple[str, int]]] = {v:{k: ("move", 0)} for k, v in node_id_to_cluster.items()}
list(islice(CLUSTERS.items(), 3))

In [None]:
def get_node_attributes(graph: pyspark.rdd.RDD, node_to_id: dict[int, str]):
    """
    Function is designed to take all nodes in a graph and output their node ID's and their attributes
    to a dictionary that can be queried when we want to easily access the attributes of a node.
    """
    encoded_edges = [(node_to_id[edge[0]], node_to_id[edge[1]], edge[2]) for edge in graph.collect()]
    node_attributes = {i:([], []) for i, node in enumerate(nodes)}
    for entry in encoded_edges:
        node_attributes[entry[0]][0].append(entry[1])
        node_attributes[entry[0]][1].append(entry[2])
        node_attributes[entry[1]][0].append(entry[0])
        node_attributes[entry[1]][1].append(entry[2])

    return node_attributes

node_id_to_attrs = get_node_attributes(graph, node_to_id)
list(islice(node_id_to_attrs.items(), 3))

In [None]:
def update_cluster_map(node_id: int, new_cluster_id: str):
    # function that updates the cluster id that maps to each respective node id
    node_id_to_cluster[node_id] = new_cluster_id

def generate_test_clusters(nodes: list[int]):
    # generate a test cluster map
    # modifies global objects
    for index in range(int(len(nodes) / 2)):
        node  = nodes[index]
        adj_node = node_id_to_attrs[node][0][0] # get first adjacent node
        node_id_to_cluster[adj_node] = node_id_to_cluster[node]
        # add the adjacent node to the cluster

generate_test_clusters(list(id_to_node.keys()))
list(islice(node_id_to_cluster.items(), 3))

In [None]:
def init_input_c(graph: pyspark.rdd.RDD):
    return graph.flatMap(lambda x: (x[0], x[1])).distinct().zipWithIndex().map(lambda x: (node_to_id[x[0]], f"cluster:{x[1]}"))

def update_input_c(graph: pyspark.rdd.RDD):
    # Update the input vertex to cluster mapping based on the updated cluster values
    return graph.map(lambda x: (x[0], node_id_to_cluster[x[0]])).sortByKey()

def init_input_g(graph: pyspark.rdd.RDD):
    """
    Get the input graph in the format (v, [list of edges and their degree], [])
    """
    return graph.map(lambda x: (x[0], (x[1], x[2]))).union(
        graph.map(lambda x: (x[1], (x[0], x[2])))
    ).map(
        lambda x: (node_to_id[x[0]], ((node_to_id[x[1][0]], node_id_to_attrs[node_to_id[x[1][0]]]), x[1][1]))
    ).groupByKey().mapValues(lambda r: ([x[0] for x in r], [x[1] for x in r])).sortByKey(lambda x: x[0])

input_c = init_input_c(graph)
input_c.take(2)

In [None]:
input_c = update_input_c(input_c) # updated based on test update cluster for now
input_c.take(2)

In [None]:
# Prepare Input G part 1
input_g = init_input_g(graph)
input_g.take(1)

In [None]:
# Zip doesn't work with the modifications to input c and g respectively
# so we needed to repartition at this stage in the algorithm
input_c = sc.parallelize(input_c.collect(), NUM_PARTITIONS).sortByKey(lambda x: x[0])
input_g = sc.parallelize(input_g.collect(), NUM_PARTITIONS).sortByKey(lambda x: x[0])

In [None]:
first_zip = input_c.zip(input_g).map(lambda x: ( x[0][1], ([(x[0][0], x[1][1])]) ) ).sortByKey()

In [None]:
# Map the vertices such that the existing cluster takes a new node...
first_agg = first_zip.mapValues(lambda c: [(v[0], e, v[1][1]) for v in c for e in v[1][0]])
first_agg.take(4)

In [None]:
sample_cluster = [
    (0, # 
        (
            [
                (46, ([27, 75, 25, 52, 28, 35, 33, 4, 20, 56, 48, 22, 17, 0, 18], [3, 3, 3, 3, 4, 4, 4, 2, 9, 2, 1, 5, 1, 1, 2])),
                (18, ([0, 20], [2, 3]))
            ],
            [1, 2] 
        )
     ),
    (46,
        (
            [  
                (46, ([0, 27, 75, 25], [1, 3, 3, 3])),
                (18, ([0, 20], [2, 3]))
            ],
            [1, 2]
        ),
        [27, 75, 25, 52, 28, 35, 33, 4, 20, 56, 48, 22, 17, 0, 18], [3, 3, 3, 3, 4, 4, 4, 2, 9, 2, 1, 5, 1, 1, 2]
    )
]



In [None]:
# Paper Definitions from page 690 in Distributed Graph Clustering Using Modularity Map Equation:
# deg(v) - The weighted degree of a node v that is the sum of all outgoing edges (v, u) of v.
# vol(C) - The sum of all weighted degrees of a set of nodes C.
# cut(v, C) - The sum of all weights of edges (v, u) where u is in C.

# cluster 0 expected outputs:
# 
# vol(C \ v) = 0
# cut(v, C \ v) = sum([3, 3, 3, 3, 4, 4, 4, 2, 9]) + sum([1, 2])
# cut 

In [None]:
class Node:
    # Could try using this and serializing the object in each stage for the graph...
    def __init__(self, cluster_id: str, neighbors: list[tuple[str, int]]):
        self.id = id
        self.cluster_id = cluster_id
        self.neighbors = [neighbor for neighbor, _ in neighbors]
        self.weights = [weight for _, weight in neighbors]
    
    def update_cluster_id(self, cluster_id: str):
        self.cluster_id = cluster_id

    def get_volume(self, node_id: int):
        for i in range(len(self.neighbors)):
            if self.neighbors[i] == node_id:
                return self.weights[i]    

    def get_degree(self):
        return sum(self.weights)

    def get_neighbours(self):
        return self.neighbors

    def get_cut(self, other):
        return len(set(self.neighbors).intersection(set(other.neighbors)))
