In [1]:
import time
import pyspark
from graphframes import *
from pyspark.sql.functions import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import hash
import os
from pyspark.ml.clustering import PowerIterationClustering
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.functions import collect_list
import networkx as nx
from connected_components import *
N = 4

In [2]:
def get_largest_cc(cc):
    """ function to get largest connected component id"""
    # cc.sort("component").groupby("component").agg(F.collect_list("id").alias("subreddits")).show()
    largest_component = cc.groupby("component").agg(F.count("id").alias("component_size"))\
                        .orderBy(F.desc("component_size")).first()
    return largest_component.__getitem__('component')
    
def get_new_vertices(cc):
    """ function to get list of subreddits (nodes) in largest connected component """
    largest_component = get_largest_cc(cc)  
    new_vertices = cc.filter("component == {}".format(largest_component)).drop("component")
    valid_vertex_ids = set(new_vertices.select('id').rdd.flatMap(lambda x: x).collect())
    return new_vertices, valid_vertex_ids

def get_new_edges(sc, spark, valid_vertex_ids):
    edges = pickle_to_dataframe(sc, spark, 'all_edges.pkl')
    new_edges = edges.filter(col('src').isin(valid_vertex_ids)).drop("weights")
    return new_edges

def get_communities_lpa(new_graph):
    start = time.time()
    communities = new_graph.labelPropagation(maxIter=7)
    end = time.time()
    diff = end-start
    print(communities.select(countDistinct("label")).take(1))
    return communities, diff

def prepare_data_evaluation(valid_vertex_ids, new_edges, communities):
    G = nx.Graph()
    G.add_nodes_from(list(valid_vertex_ids))
    G.add_edges_from(new_edges.collect())
#     edges_without_weights = new_edges.drop("weights")
#     G.add_edges_from(edges_without_weights.collect())

    # Create a dictionary mapping node IDs to their corresponding communities
    node_communities = {}
    communities = communities.groupBy("cluster").agg(collect_list("id").alias("nodes"))
    for row in communities.collect():
        for node in row["nodes"]:
            node_communities[node] = row["cluster"]

    communities_list = communities.collect()
    nodes_list = [frozenset(row["nodes"]) for row in communities_list]

    return G, nodes_list

def evaluate(G, nodes_list):
    modularity = nx.algorithms.community.modularity(G, nodes_list)
    coverage, performance = nx.algorithms.community.quality.partition_quality(G, nodes_list)
    nodes_list_con = [list(x) for x in nodes_list]
    conductance = [nx.algorithms.cuts.conductance(G, cluster_i) for cluster_i in nodes_list_con]  
    c_arr = np.array(conductance)
    conductance_val =  np.min(c_arr)

    return modularity, conductance_val, performance, coverage



In [3]:
def filter_weights(edges, threshold):
    return edges.filter("weights > {}".format(threshold))

def get_new_edges_PIC(sc, spark, valid_vertex_ids, min_degree):
    edges = pickle_to_dataframe(sc, spark, 'all_edges.pkl')
    new_edges = filter_weights(edges, min_degree)
    new_edges = new_edges.filter(col('src').isin(valid_vertex_ids))
    return new_edges

def map_string_to_ints(vertices):
    #srcs = [row['src'] for row in edges.select('src').collect()]
    #dsts = [row['dst'] for row in edges.select('dst').collect()]
    #all_nodes = list(set(srcs + dsts))
    all_nodes = [row['id'] for row in vertices.select('id').collect()]
    node_to_index = {}
    index_to_node = {}
    i = 0
    for node in all_nodes:
        node_to_index.update({node: i})
        index_to_node.update({i:node})
        i += 1
    return node_to_index, index_to_node

#nodes, edges are dataframes
def save_to_gephi(nodes, edges, filename):
    """ function to save from graph frames to gephi
        takes in nodes, edges as df, and filename as string
        converts to network x graph, then writes to gexf"""
    G = nx.DiGraph()
    G.add_nodes_from(list(nodes))
    G.add_edges_from(edges.collect())
#     edges_without_weights = edges.drop("weights")
#     G.add_edges_from(edges_without_weights.collect())
    nx.write_gexf(G, filename)
    

In [4]:
def generate_random_graph(num_nodes):
    G = nx.gaussian_random_partition_graph(num_nodes, 150, 50, 0.25, 0.1)
    G_pd = nx.to_pandas_edgelist(G)
    G_pd.show()

In [5]:
def __main__():
    spark = SparkSession.builder.config("spark.memory.offHeap.enabled","true")\
                            .config("spark.memory.offHeap.size","100g")\
                            .config("spark.executor.memory", "100g")\
                            .config("spark.driver.memory", "100g")\
                            .appName("Reddit Community Detection").getOrCreate()
    sc = SparkContext.getOrCreate()
    #cc = pickle_to_dataframe(sc, spark, 'connected_components.pkl')
    #new_vertices, valid_vertex_ids = get_new_vertices(cc)
    #new_edges = get_new_edges(sc, spark, valid_vertex_ids)
    #new_edges.show()
    cc = pickle_to_dataframe(sc, spark, 'connected_components.pkl')
    new_vertices, valid_vertex_ids = get_new_vertices(cc)
    #new_vertices.show()
    new_edges = get_new_edges_PIC(sc, spark, valid_vertex_ids, 1)
    #new_edges.show()
    n_i, i_n = map_string_to_ints(new_vertices)
    old_edges = new_edges.drop('weights')
    new_edges = new_edges.rdd.map(lambda x: (n_i[x[0]], n_i[x[1]], x[2])).toDF(['src', 'dst', 'weights'])
    #new_edges = new_edges.repartition(N, hash(new_edges["src"]) % N)   
    #N = [2, 4, 8, 16, 32]
    #N = [8, 16, 32]
    #for n in N:
    new_edges = new_edges.repartition(N)
    #new_edges.show()
    pic = PowerIterationClustering(k=65, srcCol= 'src', dstCol= 'dst', weightCol="weights")
    max_iter = 75
    pic.setMaxIter(max_iter)
    start = time.time()
    assignments = pic.assignClusters(new_edges)
    end = time.time()
    diff = end-start
    #assignments.sort(assignments.id).show()
    new_vertices = new_vertices.rdd.map(lambda x: (n_i[x[0]], x[1])).toDF(['src', 'name'])
    #new_vertices.show()
    result = new_vertices.join(assignments, new_vertices.src == assignments.id, "inner").drop('id')
    #result.show()
    #id_label = communities.select('id', 'label').rdd.map(lambda x: (x[0], {"label": x[1]})).collect()
    #result.show()
    nodes = result.rdd.map(lambda x: (x[1], {'label':x[2]})).collect()
    #print("NODES")
    #print(nodes[:10])
    #print("EDGES")
    #old_edges.show()
    save_to_gephi(nodes, old_edges, 'PIC_communities.gexf')
    print("For N: ", N, ", Iter: ", max_iter)
    print("PIC ran for {} seconds.".format(diff))
    assignments = assignments.rdd.map(lambda x: (i_n[x[0]], x[1])).toDF(['id', 'cluster'])
    #assignments.show()
    #print("VALID_VERTEX_IDS")
    #print(valid_vertex_ids)
    G, nodes_list = prepare_data_evaluation(valid_vertex_ids, old_edges, assignments)
    modularity, conductance, performance, coverage = evaluate(G, nodes_list)
    print("PIC Evaluation Metrics :")
    print("Modularity score = ", modularity)
    print("Conductance = ", conductance)
    print("Performance = ", performance)
    print("Coverage = ", coverage)
    
    
    

In [6]:
__main__()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/26 20:24:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

For N:  4 , Iter:  75
PIC ran for 167.84344053268433 seconds.


                                                                                

PIC Evaluation Metrics :
Modularity score =  0.4082763021073951
Conductance =  0.056
Performance =  0.89349694010456
Coverage =  0.5684957355272096
