In [1]:
import pandas as pd
import numpy as np
import networkx as nx
from scipy.io import mmread

In [2]:
# 비동기 lpa
import random
from collections import Counter

def async_lpa(G, max_iter=100):
    labels = {node: i for i, node in enumerate(G.nodes())}
    
    for _ in range(max_iter):
        changed = False
        nodes = list(G.nodes())
        random.shuffle(nodes)
        
        new_labels = {}
        for node in nodes:
            if not G[node]:
                continue
            # 이웃 노드의 라벨 집계
            neighbor_labels = Counter(labels[neighbor] for neighbor in G[node])
            max_freq = max(neighbor_labels.values())
            best_labels = [label for label, freq in neighbor_labels.items() if freq == max_freq]
            new_label = random.choice(best_labels)
            
            if new_label != labels[node]:
                changed = True
            new_labels[node] = new_label
        
        if not changed:
            break
        labels = new_labels
        
    comm_dict = defaultdict(list)
    for node, label in labels.items():
        comm_dict[label].append(node)
    return [sorted(nodes) for nodes in comm_dict.values()]



In [3]:
# 동기 병렬처리 lpa
import networkx as nx
from collections import Counter
from concurrent.futures import ThreadPoolExecutor

def update_label(args):
    node, labels, G = args
    neighbors = list(G.neighbors(node))
    if not neighbors:
        return node, labels[node]
    neighbor_labels = Counter(labels[n] for n in neighbors)
    max_freq = max(neighbor_labels.values())
    candidates = [label for label, freq in neighbor_labels.items() if freq == max_freq]
    return node, candidates[0]  # 동점일 경우 첫 번째 선택

def parallel_sync_lpa(G, max_iter=100, n_jobs=4):
    labels = {node: i for i, node in enumerate(G.nodes())}
    for _ in range(max_iter):
        with ThreadPoolExecutor(max_workers=n_jobs) as executor:
            results = executor.map(
                update_label, 
                [(node, labels, G) for node in G.nodes()]
            )
        new_labels = dict(results)
        if new_labels == labels:
            break
        labels = new_labels
        
    comm_dict = defaultdict(list)
    for node, label in labels.items():
        comm_dict[label].append(node)
    return [sorted(nodes) for nodes in comm_dict.values()]

In [4]:
# 최대 독립집합 동기 병령 lpa

import random
from collections import defaultdict, Counter
from typing import List, Dict, Set
import networkx as nx
from concurrent.futures import ThreadPoolExecutor, as_completed

def sync_lpa_independent_set_parallel(
    graph: nx.Graph, 
    seed: int = None, 
    max_iter: int = 100,
    n_jobs: int = 4
) -> List[List[int]]:
    """
    최대 독립집합 기반 동기식 LPA의 병렬 버전
    """
    random.seed(seed)
    nodes = list(graph.nodes())
    labels = {node: idx for idx, node in enumerate(nodes)}
    
    for _ in range(max_iter):
        updated = False
        mis = _find_maximal_independent_set(graph, nodes)
        # 병렬로 새 레이블 계산
        new_labels = {}
        with ThreadPoolExecutor(max_workers=n_jobs) as executor:
            futures = {
                executor.submit(_calc_label, graph, labels, node): node
                for node in mis
            }
            for future in as_completed(futures):
                node, new_label = future.result()
                if new_label is not None:
                    new_labels[node] = new_label

        # 레이블 업데이트 및 변경 여부 확인
        for node, new_label in new_labels.items():
            if labels[node] != new_label:
                labels[node] = new_label
                updated = True
                
        if not updated:
            break
    
    return _group_communities(labels)

def _find_maximal_independent_set(graph: nx.Graph, nodes: List[int]) -> Set[int]:
    """최대 독립집합(MIS) 선택 (순차, 병렬화 가능)"""
    mis = set()
    blocked = set()
    shuffled_nodes = random.sample(nodes, k=len(nodes))
    for node in shuffled_nodes:
        if node not in blocked:
            mis.add(node)
            blocked.add(node)
            blocked.update(graph.neighbors(node))
    return mis

def _calc_label(graph: nx.Graph, labels: Dict[int, int], node: int):
    """(병렬 실행용) 단일 노드의 새 레이블 계산"""
    neighbors = list(graph.neighbors(node))
    if not neighbors:
        return node, None
    neighbor_labels = Counter(labels[nbr] for nbr in neighbors)
    max_count = max(neighbor_labels.values())
    candidates = [label for label, count in neighbor_labels.items() if count == max_count]
    return node, random.choice(candidates)

def _group_communities(labels: Dict[int, int]) -> List[List[int]]:
    """레이블을 기반으로 커뮤니티 그룹화"""
    communities = defaultdict(list)
    for node, label in labels.items():
        communities[label].append(node)
    return list(communities.values())


In [5]:
# 모듈러리티 비동기 lpa
import random
from collections import Counter, defaultdict
import networkx as nx

def async_lpa_with_modularity(G, max_iter=100, seed=None):
    random.seed(seed)
    labels = {node: node for node in G.nodes()}
    m = G.number_of_edges()
    if m == 0:
        return [list(G.nodes())]
    
    communities = {
        node: {'sum_in': 0, 'sum_tot': G.degree(node)}
        for node in G.nodes()
    }
    
    for _ in range(max_iter):
        updated = False
        nodes = list(G.nodes())
        random.shuffle(nodes)
        
        for node in nodes:
            current_label = labels[node]
            neighbors = list(G.neighbors(node))
            if not neighbors:
                continue
            
            neighbor_labels = [labels[n] for n in neighbors]
            label_counts = Counter(neighbor_labels)
            max_count = max(label_counts.values())
            candidates = [label for label, cnt in label_counts.items() if cnt == max_count]
            
            new_label = current_label  # 기본값 설정
            current_degree = G.degree(node)
            
            if len(candidates) == 1:
                new_label = candidates[0]
            else:
                best_delta = -float('inf')
                best_label = current_label
                
                for candidate in candidates:
                    if candidate == current_label:
                        continue
                    
                    # 커뮤니티 메트릭 추출
                    sum_in_C = communities[current_label]['sum_in']
                    sum_tot_C = communities[current_label]['sum_tot']
                    sum_in_D = communities[candidate]['sum_in']
                    sum_tot_D = communities[candidate]['sum_tot']
                    
                    # 이웃 수 계산
                    k_i_in_C = sum(1 for n in neighbors if labels[n] == current_label)
                    k_i_in_D = sum(1 for n in neighbors if labels[n] == candidate)
                    
                    # ΔQ 계산
                    delta_Q = (
                        (sum_in_D + k_i_in_D - (sum_in_C - k_i_in_C)) / (2 * m)
                    ) - (
                        ((sum_tot_D + current_degree)**2 - (sum_tot_C - current_degree)**2)
                        / (4 * (m**2))
                    )
                    
                    if delta_Q > best_delta:
                        best_delta = delta_Q
                        best_label = candidate
                
                new_label = best_label if best_delta > 0 else current_label
            
            # 커뮤니티 정보 갱신
            if new_label != current_label:
                k_i_in_C = sum(1 for n in neighbors if labels[n] == current_label)
                k_i_in_D = sum(1 for n in neighbors if labels[n] == new_label)
                
                communities[current_label]['sum_in'] -= k_i_in_C
                communities[current_label]['sum_tot'] -= current_degree
                communities[new_label]['sum_in'] += k_i_in_D
                communities[new_label]['sum_tot'] += current_degree
                labels[node] = new_label
                updated = True
        
        if not updated:
            break
    
    community_dict = defaultdict(list)
    for node, label in labels.items():
        community_dict[label].append(node)
    return list(community_dict.values())


In [6]:
# 모듈러리티 동기 병렬 lpa
import random
import networkx as nx
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor

def compute_communities_metrics(G, labels):
    """커뮤니티 메트릭 계산 헬퍼 함수"""
    sum_in = defaultdict(int)
    sum_tot = defaultdict(int)
    communities = defaultdict(list)
    
    for node, label in labels.items():
        communities[label].append(node)
    
    for label, nodes in communities.items():
        sum_tot[label] = sum(G.degree(n) for n in nodes)
    
    for u, v in G.edges():
        if labels[u] == labels[v]:
            sum_in[labels[u]] += 1
    
    return sum_in, sum_tot

def node_label_update(args):
    node, G, prev_labels, prev_sum_in, prev_sum_tot, m = args
    neighbors = list(G.neighbors(node))
    if not neighbors:
        return node, prev_labels[node]
    neighbor_labels = [prev_labels[n] for n in neighbors]
    label_counts = Counter(neighbor_labels)
    max_count = max(label_counts.values())
    candidates = [label for label, cnt in label_counts.items() if cnt == max_count]
    if len(candidates) == 1:
        return node, candidates[0]
    else:
        current_label = prev_labels[node]
        best_delta = -float('inf')
        best_label = current_label
        k_i = G.degree(node)
        k_i_in_C = label_counts.get(current_label, 0)
        for candidate in candidates:
            if candidate == current_label:
                continue
            k_i_in_D = label_counts.get(candidate, 0)
            sum_in_D = prev_sum_in.get(candidate, 0)
            sum_in_C = prev_sum_in.get(current_label, 0)
            sum_tot_D = prev_sum_tot.get(candidate, 0)
            sum_tot_C = prev_sum_tot.get(current_label, 0)
            delta_Q = (
                (sum_in_D + k_i_in_D - (sum_in_C - k_i_in_C)) / (2*m)
            ) - (
                ((sum_tot_D + k_i)**2 - (sum_tot_C - k_i)**2)
                / (4*(m**2))
            )
            if delta_Q > best_delta:
                best_delta = delta_Q
                best_label = candidate
        return node, best_label if best_delta > 0 else current_label

def sync_lpa_modularity_parallel(G, max_iter=100, seed=None, n_jobs=4):
    random.seed(seed)
    labels = {n: i for i, n in enumerate(G.nodes())}
    m = G.number_of_edges()
    if m == 0:
        return [list(G.nodes())]
    prev_labels = labels.copy()
    prev_sum_in, prev_sum_tot = compute_communities_metrics(G, prev_labels)
    for _ in range(max_iter):
        args_list = [
            (node, G, prev_labels, prev_sum_in, prev_sum_tot, m)
            for node in G.nodes()
        ]
        new_labels = {}
        with ThreadPoolExecutor(max_workers=n_jobs) as executor:
            for node, label in executor.map(node_label_update, args_list):
                new_labels[node] = label
        if new_labels == prev_labels:
            break
        prev_labels = new_labels.copy()
        prev_sum_in, prev_sum_tot = compute_communities_metrics(G, prev_labels)
    communities = defaultdict(list)
    for node, label in prev_labels.items():
        communities[label].append(node)
    return list(communities.values())


In [7]:
# 모듈러리티 최대 독립집합 동기 병렬 lpa
import random
import networkx as nx
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor

def parallel_mis(graph, nodes, n_jobs=4):
    """병렬 최대 독립집합(MIS) 선택 (Luby 알고리즘 스타일의 간단 병렬화)"""
    remaining = set(nodes)
    mis = set()
    while remaining:
        # 각 노드에 무작위 우선순위 할당 (병렬화 가능)
        priorities = {node: random.random() for node in remaining}
        selected = set()
        # 병렬로 각 노드의 선택 여부 판단
        def select(node):
            return all(priorities[node] > priorities.get(neigh, -1) for neigh in graph.neighbors(node) if neigh in remaining)
        with ThreadPoolExecutor(max_workers=n_jobs) as executor:
            results = list(executor.map(select, remaining))
        for node, ok in zip(remaining, results):
            if ok:
                selected.add(node)
        mis.update(selected)
        # MIS 및 그 이웃 제거
        to_remove = set(selected)
        for node in selected:
            to_remove.update(graph.neighbors(node))
        remaining -= to_remove
    return mis

def modularity_delta(graph, labels, node, old_label, new_label, sum_in, sum_tot, m):
    """모듈러리티 변화량 계산"""
    k_i = graph.degree(node)
    sum_in_old = sum_in[old_label]
    sum_tot_old = sum_tot[old_label]
    sum_in_new = sum_in[new_label]
    sum_tot_new = sum_tot[new_label]
    delta = (sum_in_new - sum_in_old + k_i) / (2 * m)
    delta -= ((sum_tot_new + k_i)**2 - (sum_tot_old - k_i)**2) / (4 * m**2)
    return delta

def compute_communities_metrics(G, labels):
    sum_in = defaultdict(int)
    sum_tot = defaultdict(int)
    communities = defaultdict(list)
    for node, label in labels.items():
        communities[label].append(node)
    for label, nodes in communities.items():
        sum_tot[label] = sum(G.degree(n) for n in nodes)
    for u, v in G.edges():
        if labels[u] == labels[v]:
            sum_in[labels[u]] += 1
    return sum_in, sum_tot

def sync_lpa_mis_modularity_parallel(
    graph: nx.Graph,
    max_iter: int = 100,
    seed: int = None,
    n_jobs: int = 4
):
    random.seed(seed)
    nodes = list(graph.nodes())
    labels = {node: idx for idx, node in enumerate(nodes)}
    m = graph.number_of_edges()
    sum_in, sum_tot = compute_communities_metrics(graph, labels)
    for _ in range(max_iter):
        # 1. 병렬 MIS 선택
        mis = parallel_mis(graph, nodes, n_jobs)
        # 2. MIS 내 노드 병렬 라벨 업데이트
        def update_label(node):
            neighbors = list(graph.neighbors(node))
            if not neighbors:
                return node, labels[node]
            counts = Counter(labels[nbr] for nbr in neighbors)
            max_count = max(counts.values())
            candidates = [label for label, cnt in counts.items() if cnt == max_count]
            if len(candidates) == 1:
                return node, candidates[0]
            # 동률: 모듈러리티 최대화
            current_label = labels[node]
            best_label = current_label
            best_delta = -float('inf')
            for candidate in candidates:
                delta = modularity_delta(graph, labels, node, current_label, candidate, sum_in, sum_tot, m)
                if delta > best_delta:
                    best_delta = delta
                    best_label = candidate
            return node, best_label if best_delta > 0 else current_label
        new_labels = {}
        with ThreadPoolExecutor(max_workers=n_jobs) as executor:
            for node, label in executor.map(update_label, mis):
                new_labels[node] = label
        # 3. 라벨 및 메트릭 동기 갱신
        changed = False
        for node, new_label in new_labels.items():
            if labels[node] != new_label:
                old_label = labels[node]
                for nbr in graph.neighbors(node):
                    if labels[nbr] == old_label:
                        sum_in[old_label] -= 1
                    if labels[nbr] == new_label:
                        sum_in[new_label] += 1
                sum_tot[old_label] -= graph.degree(node)
                sum_tot[new_label] += graph.degree(node)
                labels[node] = new_label
                changed = True
        if not changed:
            break
    # 결과 그룹화
    communities = defaultdict(list)
    for node, label in labels.items():
        communities[label].append(node)
    return list(communities.values())

In [8]:
from sklearn.metrics import normalized_mutual_info_score

def calculate_nmi(true_labels, graph, communities):
    """
    true_labels : list of int, 길이 == 노드 수
    graph       : networkx Graph 또는 __len__이 정의된 객체
    communities : List of List or Set, 각 서브리스트/서브셋이 하나의 커뮤니티를 구성하는 노드 ID들
    """
    # pred_labels 초기화: 노드 수만큼 0으로 채운 리스트 생성
    pred_labels = [0] * len(graph)

    # 커뮤니티별 인덱스를 pred_labels에 할당
    for i, com in enumerate(communities):
        for node in com:
            pred_labels[node] = i

    # NMI 계산 및 반환
    return normalized_mutual_info_score(true_labels, pred_labels)


In [9]:
graph = nx.karate_club_graph()
true_labels_karate = []
for node in graph.nodes:
    label = graph.nodes[node]['club']
    true_labels_karate.append(1 if label == 'Officer' else 0)

In [10]:
#돌고래
matrix = mmread("./soc-dolphins/soc-dolphins.mtx")
# scipy 희소 행렬을 NetworkX 그래프로 변환

dolphin_graph = nx.from_scipy_sparse_array(matrix)

true_labels_dolphins = [
    0,0,0,0,0,0,0,0,0,0,
    0,1,1,0,0,0,0,0,1,0,
    0,0,0,0,0,0,0,0,0,0,
    1,0,0,0,0,0,0,0,0,0,
    1,1,1,1,1,1,1,1,1,1,
    1,1,1,1,1,1,1,1,1,1,
    1,1
]

In [11]:
from torch_geometric.datasets import Planetoid
from torch_geometric.utils import to_networkx

dataset = Planetoid(root='/tmp/Cora', name='Cora')
data = dataset[0]
cora_graph = to_networkx(data)
true_labels_cora = data.y

In [None]:
import time
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from statistics import mean, stdev
from collections import defaultdict

# 데이터셋 준비 (Cora는 실제 데이터 로딩 코드 필요)
datasets = {
    "karate": (nx.karate_club_graph(), true_labels_karate),
    "dolphins": (dolphin_graph, true_labels_dolphins),
    "cora": (cora_graph, true_labels_cora)  # Cora 데이터 로딩 코드 구현 필요
}

algorithms = {
    "async_lpa": async_lpa,
    "parallel_sync_lpa": parallel_sync_lpa,
    "sync_lpa_independent_set_parallel": sync_lpa_independent_set_parallel,
    "async_lpa_with_modularity": async_lpa_with_modularity,
    "sync_lpa_modularity_parallel": sync_lpa_modularity_parallel,
    "sync_lpa_mis_modularity_parallel": sync_lpa_mis_modularity_parallel
}

def benchmark_algorithm(algo_func, graph, true_labels, n_runs=100):
    results = {'nmi': [], 'time': []}
    
    for _ in range(n_runs):
        start_time = time.perf_counter()
        communities = algo_func(graph)
        elapsed = time.perf_counter() - start_time
        
        nmi = calculate_nmi(true_labels, graph, communities)
        results['nmi'].append(nmi)
        results['time'].append(elapsed)
    
    return {
        'avg_nmi': mean(results['nmi']),
        'std_nmi': stdev(results['nmi']) if len(results['nmi']) > 1 else 0,
        'avg_time': mean(results['time']),
        'total_time': sum(results['time'])
    }

def run_benchmark(n_runs=100, n_jobs=4):
    benchmark_results = []
    
    for ds_name, (G, true_labels) in datasets.items():
        print(f"Processing dataset: {ds_name}")
        
        with ThreadPoolExecutor(max_workers=n_jobs) as executor:
            futures = []
            for algo_name, algo in algorithms.items():
                futures.append(
                    executor.submit(
                        benchmark_algorithm,
                        algo,
                        G.copy(),
                        true_labels,
                        n_runs
                    )
                )
            
            for (algo_name, algo), future in zip(algorithms.items(), futures):
                result = future.result()
                benchmark_results.append({
                    'Dataset': ds_name,
                    'Algorithm': algo_name,
                    'Avg_NMI': result['avg_nmi'],
                    'Std_NMI': result['std_nmi'],
                    'Avg_Time(s)': result['avg_time'],
                    'Total_Time(s)': result['total_time']
                })
                print(f"  {algo_name} completed")

    # CSV로 저장
    df = pd.DataFrame(benchmark_results)
    df.to_csv('lpa_benchmark_results.csv', index=False)
    return df

# 실행 (n_jobs는 CPU 코어 수에 맞게 조정)
results_df = run_benchmark(n_runs=100, n_jobs=16)
print(results_df)


Processing dataset: karate
  async_lpa completed
  parallel_sync_lpa completed
  sync_lpa_independent_set_parallel completed
  async_lpa_with_modularity completed
  sync_lpa_modularity_parallel completed
  sync_lpa_mis_modularity_parallel completed
Processing dataset: dolphins
  async_lpa completed
  parallel_sync_lpa completed
  sync_lpa_independent_set_parallel completed
  async_lpa_with_modularity completed
  sync_lpa_modularity_parallel completed
  sync_lpa_mis_modularity_parallel completed
Processing dataset: cora
