In [1]:
import random
import networkx as nx
import numpy as np
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from collections import Counter

In [13]:
def create_graphs(total=100, min_verts=4, max_verts=100, random_seed=0):
    if random_seed is not None:
        random.seed(random_seed)

    graph_list = []
    target_list = []

    for idx in range(total):
        node_count = random.randint(min_verts, max_verts)
        if idx < total // 2:
            G = nx.path_graph(node_count)
            target = 0
        else:
            G = nx.cycle_graph(node_count)
            target = 1

        graph_list.append(G)
        target_list.append(target)

    return graph_list, target_list

def compute_shortest_path_histogram(graph, max_path=1):
    path_lengths = []
    for start_node in graph.nodes():
        paths = nx.single_source_shortest_path_length(graph, start_node, cutoff=max_path)
        for end_node, length in paths.items():
            if 1 <= length <= max_path and start_node < end_node:
                path_lengths.append(length)

    histogram = np.zeros(max_path)
    for length in path_lengths:
        histogram[length-1] += 1
    return histogram

def compute_kernel(train_data, test_data, max_path=1):
    features_train = np.array([compute_shortest_path_histogram(g, max_path=max_path) for g in train_data])
    features_test = np.array([compute_shortest_path_histogram(g, max_path=max_path) for g in test_data])

    K_tr = np.dot(features_train, features_train.T)
    K_te = np.dot(features_test, features_train.T)
    return K_tr, K_te

In [11]:
graphs, labels = create_graphs(100)

for i in range(0, 100, 25):
    g = graphs[i]
    print(f'LOG: Graph #{i+1}, class: {labels[i]}, nodes: {g.number_of_nodes()}, edges: {g.number_of_edges()}')

graph_data, targets = create_graphs(total=400, min_verts=10, max_verts=80, random_seed=0)
train_graphs, test_graphs, y_tr, y_te = train_test_split(graph_data, targets, test_size=0.1, stratify=targets)

LOG: Graph #1, class: 0, nodes: 53, edges: 52
LOG: Graph #26, class: 0, nodes: 16, edges: 15
LOG: Graph #51, class: 1, nodes: 94, edges: 94
LOG: Graph #76, class: 1, nodes: 42, edges: 42


In [14]:
K_tr1, K_te1 = compute_kernel(train_graphs, test_graphs)
K_tr2, K_te2 = compute_kernel(train_graphs, test_graphs, max_path=2)

print('--- Single feature ---')
clf = SVC(kernel='precomputed', random_state=42)
clf.fit(K_tr1, y_tr)
preds = clf.predict(K_te1)
print(f"Accuracy: {accuracy_score(y_te, preds):.3f}")
print(f"Precision: {precision_score(y_te, preds):.3f}")
print(f"Recall: {recall_score(y_te, preds):.3f}")

print('\n--- Two features ---')
clf = SVC(kernel='precomputed', random_state=42)
clf.fit(K_tr2, y_tr)
preds = clf.predict(K_te2)
print(f"Accuracy: {accuracy_score(y_te, preds):.3f}")
print(f"Precision: {precision_score(y_te, preds):.3f}")
print(f"Recall: {recall_score(y_te, preds):.3f}")

--- Single feature ---
Accuracy: 0.475
Precision: 0.478
Recall: 0.550

--- Two features ---
Accuracy: 1.000
Precision: 1.000
Recall: 1.000


1. С одной фичей (max_path=1) метод показывает низкую эффективность (Accuracy: 0.475), что близко к случайному угадыванию

2. С двумя фичами (max_path=2) достигается идеальная классификация (Accuracy: 1.000)

3. Это свидетельствует о том, что гистограмма распределения путей длины 2 содержит достаточную информацию для идеального разделения цепочек и циклов (простая задача)

## Weisfeiler-Lehman Kernel

In [15]:
def extract_wl_features(graph_list, num_hops):
    all_graph_labels = []
    for G in graph_list:
        labels = {n: str(G.degree(n)) for n in G.nodes()}
        all_graph_labels.append(labels)

    unique_labels = set()
    graph_histories = []

    for idx, G in enumerate(graph_list):
        current_labels = all_graph_labels[idx].copy()
        history = []

        for hop in range(num_hops + 1):
            freq_count = Counter(current_labels.values())
            history.append(freq_count.copy())
            unique_labels.update(freq_count.keys())

            if hop < num_hops:
                new_labels = {}
                for node in G.nodes():
                    neighbor_labels = sorted([current_labels[neigh] for neigh in G.neighbors(node)])
                    new_label = current_labels[node] + "_" + "_".join(neighbor_labels)
                    new_labels[node] = new_label
                current_labels = new_labels

        graph_histories.append(history)

    label_mapping = {lab: i for i, lab in enumerate(sorted(unique_labels))}

    feature_vectors = []
    for history in graph_histories:
        combined_hist = Counter()
        for hist in history:
            combined_hist.update(hist)

        vec = np.zeros(len(label_mapping))
        for label, count in combined_hist.items():
            vec[label_mapping[label]] = count
        feature_vectors.append(vec)

    return feature_vectors, label_mapping

def compute_wl_kernel(train_graphs, test_graphs, num_hops=2):
    train_features, label_map = extract_wl_features(train_graphs, num_hops)

    def get_test_features(test_graphs):
        test_labels = []
        for G in test_graphs:
            labels = {n: str(G.degree(n)) for n in G.nodes()}
            test_labels.append(labels)

        test_histories = []

        for idx, G in enumerate(test_graphs):
            current_labels = test_labels[idx].copy()
            history = []

            for hop in range(num_hops + 1):
                freq_count = Counter(current_labels.values())
                history.append(freq_count.copy())

                if hop < num_hops:
                    new_labels = {}
                    for node in G.nodes():
                        neighbor_labels = sorted([current_labels[neigh] for neigh in G.neighbors(node)])
                        new_label = current_labels[node] + "_" + "_".join(neighbor_labels)
                        new_labels[node] = new_label
                    current_labels = new_labels

            test_histories.append(history)

        test_vectors = []
        for history in test_histories:
            combined_hist = Counter()
            for hist in history:
                combined_hist.update(hist)

            vec = np.zeros(len(label_map))
            for label, count in combined_hist.items():
                if label in label_map:
                    vec[label_map[label]] = count
            test_vectors.append(vec)

        return test_vectors

    test_features = get_test_features(test_graphs)
    train_features = np.array(train_features)
    test_features = np.array(test_features)

    K_train = train_features @ train_features.T
    K_test = test_features @ train_features.T

    return K_train, K_test

In [16]:
print('-- 0 hops --')
K_train, K_test = compute_wl_kernel(train_graphs, test_graphs, num_hops=0)

model = SVC(kernel='precomputed', random_state=0)
model.fit(K_train, y_tr)
preds = model.predict(K_test)
print(f"WL Accuracy: {accuracy_score(y_te, preds):.3f}")
print(f"WL Precision: {precision_score(y_te, preds):.3f}")
print(f"WL Recall: {recall_score(y_te, preds):.3f}")

print('\n-- 1 hop --')
K_train1, K_test1 = compute_wl_kernel(train_graphs, test_graphs, num_hops=1)

model = SVC(kernel='precomputed', random_state=0)
model.fit(K_train1, y_tr)
preds = model.predict(K_test1)
print(f"WL Accuracy: {accuracy_score(y_te, preds):.3f}")
print(f"WL Precision: {precision_score(y_te, preds):.3f}")
print(f"WL Recall: {recall_score(y_te, preds):.3f}")

-- 0 hops --
WL Accuracy: 1.000
WL Precision: 1.000
WL Recall: 1.000

-- 1 hop --
WL Accuracy: 1.000
WL Precision: 1.000
WL Recall: 1.000


1. Уже на 0 итерациях (только по степеням вершин) достигается идеальная классификация

2. Добавление 1 хопа не улучшает результат, так как он уже максимален

3. Это указывает на то, что распределение степеней вершин достаточно для различения цепочек и циклов

## Общие выводы

- Оба метода способны достигать 100% точности на данной задаче (**очень простая задача классификации**)

- WL метод более эффективен - требует только начальных меток (степеней вершин)

- Метод кратчайших путей требует анализа путей длины 2 для достижения максимальной точности

- WL метод демонстрирует лучшую "разрешающую способность" на простых структурах графов