## Finding and Evaluating Communities

In [1]:
import networkx as nx
from networkx.algorithms import community
from neo4j import GraphDatabase
import concurrent.futures
import os
import warnings

warnings.filterwarnings("ignore", category=FutureWarning)

URI = "bolt://localhost:7687"

password = "12345678" # CHANGE
AUTH = ("neo4j", password)

In [2]:
def get_graph_from_neo4j(driver):
    G = nx.DiGraph()
    with driver.session() as session:
        nodes_result = session.run("""
            MATCH (n)
            RETURN id(n) AS id, labels(n) AS labels, properties(n) AS properties
        """)
        for record in nodes_result:    
            G.add_node(record["id"], labels=record["labels"], **record["properties"])

        rels_result = session.run("""
            MATCH (n)-[r]->(m)
            RETURN id(n) AS source, id(m) AS target, type(r) AS type
        """)
        for record in rels_result:
            G.add_edge(record["source"], record["target"], type=record["type"])
    return G

In [3]:
def prepare_graph_for_gexf(graph):
    for node, data in graph.nodes(data=True):
        labels = data.get('labels', [])
        if isinstance(labels, list):
            labels_str = ','.join(labels)
        else:
            labels_str = str(labels)

        data['labels'] = labels_str

    for u, v, data in graph.edges(data=True):
        if 'type' in data:
            data['label'] = data.pop('type')

    return graph

In [4]:
def evaluate_partition(G, communities, method_name):
    if not communities or len(communities) == 0:
        print(f"No communities found by {method_name}.")
        return

    partition = [set(c) for c in communities]

    print(f"Evaluating {method_name} ---")
    try:
        mod = community.modularity(G, partition)
        print(f"Modularity: {mod:.4f}")
    except Exception as e:
        print(f"Could not calculate Modularity: {e}")

    try:
        qual = community.partition_quality(G, partition)
        print(f"Partition Quality (Coverage, Performance): ({qual[0]:.4f}, {qual[1]:.4f})")
    except Exception as e:
        print(f"Could not calculate Partition Quality: {e}")
    print(f"Found {len(partition)} communities.")
    print("-" * 30)

In [5]:
def stamp_partition_to_graph(G, partition, attribute_name):
    nx.set_node_attributes(G, None, attribute_name)

    for i, community_set in enumerate(partition):
        for node in community_set:
            if node in G.nodes:
                G.nodes[node][attribute_name] = i
            else:
                print(f"Node {node} from partition not in graph.")

def stamp_overlapping_partition_to_graph(G, partition, attribute_name):
    nx.set_node_attributes(G, "[]", attribute_name)

    node_to_communities = {}
    for i, community_set in enumerate(partition):
        for node in community_set:
            if node not in node_to_communities:
                node_to_communities[node] = []
            node_to_communities[node].append(i)

    final_attributes = {}
    for node, communities in node_to_communities.items():
        if node in G.nodes:
            final_attributes[node] = str(communities)
        else:
            print(f"Node {node} from partition not in graph.")

    nx.set_node_attributes(G, final_attributes, attribute_name)

def stamp_id_group_to_graph(G):
    nx.set_node_attributes(G, "None", "id_group_anonymous")

    for node, data in G.nodes(data=True):
        if "id_group_anonymous" in data:
            G.nodes[node]["id_group_anonymous"] = str(data["id_group_anonymous"])

In [6]:
def worker_girvan_newman(G):
    try:
        print("  [Thread] Starting Girvan-Newman...")
        comp_generator = community.girvan_newman(G)
        partition = next(comp_generator) # Get first level of partition
        print("  [Thread] Finished Girvan-Newman.")
        return ("girvan_newman", partition, "partition", None)
    except Exception as e:
        return ("girvan_newman", None, None, e)

def worker_edge_betweenness(G, k):
    try:
        print(f"  [Thread] Starting Edge Betweenness (for k={k})...")
        comp_generator = community.girvan_newman(G)
        # Iterate until we get k communities
        for _ in range(k - 1):
             partition = next(comp_generator)
        print(f"  [Thread] Finished Edge Betweenness (for k={k}).")
        return ("edge_betweenness", partition, "partition", None)
    except Exception as e:
        return ("edge_betweenness", None, None, e)

"""def worker_edge_current_flow(G, k):
    try:
        print(f"  [Thread] Starting Edge Current Flow (for k={k})...")
        partition = edge_current_flow_betweenness_partition(G, k)
        print(f"  [Thread] Finished Edge Current Flow (for k={k}).")
        return ("edge_current_flow", partition, "partition", None)
    except Exception as e:
        return ("edge_current_flow", None, None, e)"""

def worker_greedy_modularity(G):
    try:
        print("  [Thread] Starting Greedy Modularity...")
        partition = community.greedy_modularity_communities(G)
        print("  [Thread] Finished Greedy Modularity.")
        return ("greedy_modularity", partition, "partition", None)
    except Exception as e:
        return ("greedy_modularity", None, None, e)

def worker_naive_greedy_modularity(G):
    try:
        print("  [Thread] Starting Naive Greedy Modularity...")
        partition = community.naive_greedy_modularity_communities(G)
        print("  [Thread] Finished Naive Greedy Modularity.")
        return ("naive_greedy_modularity", partition, "partition", None)
    except Exception as e:
        return ("naive_greedy_modularity", None, None, e)

def worker_lukes(G):
    try:
        print("  [Thread] Starting Lukes Partitioning...")
        partition = community.lukes_partitioning(G, max_size=2000)
        print("  [Thread] Finished Lukes Partitioning.")
        return ("lukes", partition, "partition", None)
    except Exception as e:
        return ("lukes", None, None, e)

def worker_label_propagation(G):
    try:
        print("  [Thread] Starting Label Propagation...")
        comp_generator = community.label_propagation_communities(G)
        partition = [set(c) for c in comp_generator]
        print("  [Thread] Finished Label Propagation.")
        return ("label_propagation", partition, "partition", None)
    except Exception as e:
        return ("label_propagation", None, None, e)

def worker_fast_label_propagation(G):
    try:
        print("  [Thread] Starting Fast Label (asyn_lpa)...")
        comp_generator = community.asyn_lpa_communities(G)
        partition = [set(c) for c in comp_generator]
        print("  [Thread] Finished Fast Label (asyn_lpa).")
        return ("fast_label_propagation", partition, "partition", None)
    except Exception as e:
        return ("fast_label_propagation", None, None, e)

def worker_louvain(G):
    try:
        print("  [Thread] Starting Louvain...")
        partition = community.louvain_communities(G)
        print("  [Thread] Finished Louvain.")
        return ("louvain", partition, "partition", None)
    except Exception as e:
        return ("louvain", None, None, e)

def worker_fluidc(G, k):
    try:
        print(f"  [Thread] Starting Fluidc (k={k})...")
        if k > G.number_of_nodes():
            return ("fluidc", None, None, Exception(f"k ({k}) > node count"))
        comp_generator = community.asyn_fluidc(G, k)
        partition = [set(c) for c in comp_generator]
        print(f"  [Thread] Finished Fluidc (k={k}).")
        return ("fluidc", partition, "partition", None)
    except Exception as e:
        return ("fluidc", None, None, e)

def worker_k_clique(G, k):
    try:
        print(f"  [Thread] Starting K-Clique (k={k})...")
        comp_generator = community.k_clique_communities(G, k)
        communities = [set(c) for c in comp_generator]
        print(f"  [Thread] Finished K-Clique (k={k}).")
        return ("k_clique", communities, "overlapping", None)
    except Exception as e:
        return ("k_clique", None, None, e)

In [7]:
# noinspection PyUnresolvedReferences
def analyze_graph_communities_parallel(G, graph_name, k_target):
    
    print(f"\n{'='*25} PROCESSING GRAPH: {graph_name} {'='*25}")
    print(f"Graph has {G.number_of_nodes()} nodes and {G.number_of_edges()} edges.")
    
    if G.number_of_edges() == 0:
        print("Skipping graph, no edges.")
        return

    futures = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        print(f"Submitting all community tasks for {graph_name} to ThreadPool...")
        
        futures[executor.submit(worker_girvan_newman, G)] = "Girvan-Newman"
        #futures[executor.submit(worker_edge_betweenness, G, k_target)] = "Edge Betweenness"
        #futures[executor.submit(worker_edge_current_flow, G, k_target)] = "Edge Current Flow"
        futures[executor.submit(worker_greedy_modularity, G)] = "Greedy Modularity"
        #futures[executor.submit(worker_naive_greedy_modularity, G)] = "Naive Greedy Modularity"
        futures[executor.submit(worker_lukes, G)] = "Lukes Partitioning"
        futures[executor.submit(worker_label_propagation, G)] = "Label Propagation"
        futures[executor.submit(worker_fast_label_propagation, G)] = "Fast Label Propagation"
        futures[executor.submit(worker_louvain, G)] = "Louvain"
        futures[executor.submit(worker_fluidc, G, k_target)] = "Asynchronous Fluidc"
        #futures[executor.submit(worker_k_clique, G, k_target)] = "K-Clique"

        print("All tasks submitted. Waiting for results...")
        
        for future in concurrent.futures.as_completed(futures):
            task_name = futures[future]
            try:
                name, data, partition_type, error = future.result()
                
                if error:
                    print(f"\n--- ERROR running {task_name} ---")
                    print(f"  > {error}")
                    print("-" * 30)
                    continue
                
                print(f"\nProcessing result for {task_name}...")
                
                if partition_type == "partition":
                    evaluate_partition(G, data, name)
                    stamp_partition_to_graph(G, data, f"{name}_community")
                
                elif partition_type == "overlapping":
                    print(f"--- Evaluating {name} (Overlapping) ---")
                    print(f"  > Found {len(data)} overlapping {k_target}-clique communities.")
                    print("  > Modularity/Quality metrics not applicable.")
                    print("-" * 30)
                    stamp_overlapping_partition_to_graph(G, data, f"{name}_community")
                
            except Exception as e:
                print(f"\n--- CRITICAL ERROR processing result for {task_name} ---")
                print(f"  > {e}")
                print("-" * 30)

    output_dir = "../visualization"
    os.makedirs(output_dir, exist_ok=True)
    export_filename = os.path.join(output_dir, f"{graph_name}_with_communities.gexf")
    
    try:
        print(f"\nExporting {graph_name} to GEXF...")
        stamp_id_group_to_graph(G)
        G_exportable = prepare_graph_for_gexf(G.copy())
        
        nx.write_gexf(G_exportable, export_filename)
        print(f"\n>>> Successfully exported graph to {export_filename}")
    except Exception as e:
        print(f"\n>>> FAILED to export GEXF file: {e}")

In [8]:
import traceback

G_shares = nx.Graph()
G_viral = nx.Graph()
G_misinfo = nx.Graph()

try:
    driver = GraphDatabase.driver(URI, auth=AUTH)

    print("Connecting to Neo4j and building NetworkX graph...")

    networkx_graph = get_graph_from_neo4j(driver)

    print("Graph imported into NetworkX")

    user_nodes = {n for n, d in networkx_graph.nodes(data=True) if 'User' in d.get('labels', [])}

    for u_node in user_nodes:
        G_shares.add_node(u_node, **networkx_graph.nodes[u_node])
        G_viral.add_node(u_node, **networkx_graph.nodes[u_node])
        G_misinfo.add_node(u_node, **networkx_graph.nodes[u_node])

    for u, v, data in networkx_graph.edges(data=True):
        if u in user_nodes and v in user_nodes:
            edge_type = data.get('type')
            if edge_type == 'SHARES':
                G_shares.add_edge(u, v, **data)
            elif edge_type == 'VIRAL_SHARES':
                G_viral.add_edge(u, v, **data)
            elif edge_type == 'SHARES_MISINFORMATION':
                G_misinfo.add_edge(u, v, **data)

    G_shares.remove_nodes_from(list(nx.isolates(G_shares)))
    G_viral.remove_nodes_from(list(nx.isolates(G_viral)))
    G_misinfo.remove_nodes_from(list(nx.isolates(G_misinfo)))

    graphs_to_process = [
        ("Shares", G_shares),
        ("Viral", G_viral),
        ("Misinfo", G_misinfo)
    ]

    K_COMMUNITIES = 4

    for graph_name, G in graphs_to_process:
        analyze_graph_communities_parallel(G, graph_name, k_target=K_COMMUNITIES)

    print("\n\nAnalysis complete.")

except Exception:
    traceback.print_exc()
finally:
    if 'driver' in locals() and driver:
        driver.close()
        print("Connection closed.")

Connecting to Neo4j and building NetworkX graph...




Graph imported into NetworkX

Graph has 3506 nodes and 72911 edges.
Submitting all community tasks for Shares to ThreadPool...
  [Thread] Starting Girvan-Newman...
  [Thread] Starting Greedy Modularity...
  [Thread] Starting Lukes Partitioning...
  [Thread] Starting Fast Label (asyn_lpa)...
  [Thread] Starting Louvain...
  [Thread] Starting Label Propagation...
  [Thread] Starting Fluidc (k=4)...
All tasks submitted. Waiting for results...

--- ERROR running Lukes Partitioning ---
  > lukes_partitioning works only on trees
------------------------------

--- ERROR running Asynchronous Fluidc ---
  > Fluid Communities require connected Graphs.
------------------------------
  [Thread] Finished Label Propagation.

Processing result for Label Propagation...
Evaluating label_propagation ---
Modularity: 0.0428
Partition Quality (Coverage, Performance): (0.9920, 0.3701)
Found 195 communities.
------------------------------
  [Thread] Finished Fast Label (asyn_lpa).

Processing result for Fas