# Задача 9. Hand-crafted graph features

* **Дедлайн**: 16.05.2025, 23:59
* Основной полный балл: 5
* Максимум баллов: 10


## Задача

- [x] Найти или сгенерировать набор данных для бинарной классификации графов.
- [x] Реализовать функцию `shortest_path_kernel(train_graphs, test_graphs)`, которая принимает тренировочный и тестовые наборы, а возвращает пару `K_train, K_test`
  - Опишите графы с помощью вектора из количества кратчайших путей различной длины
  - Для вычисления длин кратчайших путей можно использовать `nx.shortest_path_length(G)`
  - Ядровая функция для сравнения двух графов - скалярное произведение их двух векторов
  - `K_train` - матрица из ядровых функций для сравнения тренировочных графов между собой
  - `K_test` - матрица из ядровых функций для сравнения тестовых графов с тренировочными
- [x] Используя реализованное ядро обучите модель SVC, подберите гиперпараметры, вычислите различные метрики качества
- [x] (+5 баллов) Также реализовать Weisfeiler-Lehman Kernel и обучить классификатор с ним, сравнить результаты.


In [24]:
import networkx as nx
import numpy as np
from collections import defaultdict
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from sklearn.base import BaseEstimator, TransformerMixin

## Генерация данных ("с центральным узлом" vs. "без центрального узла")

In [25]:
rng = np.random.default_rng(seed=7)

def generate_star_graph_variant(n_nodes):
    if rng.random() > 0.5:
        return nx.star_graph(n_nodes - 1)
    else:
        G = nx.star_graph(n_nodes - 1)
        u, v = rng.choice(n_nodes, 2, replace=False)
        if not G.has_edge(u, v):
            G.add_edge(u, v)
        return G

def generate_uniform_graph_variant(n_nodes):
    if rng.random() > 0.5:
        return nx.path_graph(n_nodes)
    else:
        return nx.grid_2d_graph(int(np.sqrt(n_nodes)), int(np.sqrt(n_nodes)))

def generate_center_dataset(n_samples=300, max_nodes=20):
    data, targets = [], []
    for _ in range(n_samples // 2):
        nodes = rng.integers(5, max_nodes)
        data.append(generate_star_graph_variant(nodes))
        targets.append(1)
        data.append(generate_uniform_graph_variant(nodes))
        targets.append(0)
    return data, np.array(targets)


## Shortest path kernel

In [26]:
def extract_shortest_path_vector(graph, max_len=7):
    sp_lengths = dict(nx.shortest_path_length(graph))
    features = np.zeros(max_len, dtype=np.float32)
    num_nodes = graph.number_of_nodes()

    for src in sp_lengths:
        for dst, dist in sp_lengths[src].items():
            if src != dst and dist <= max_len:
                features[dist - 1] += 1

    if num_nodes > 1:
        features /= (num_nodes * (num_nodes - 1))
    return features

def shortest_path_kernel(train_g, test_g, max_len=7):
    train_feats = np.array([extract_shortest_path_vector(g, max_len) for g in train_g])
    test_feats = np.array([extract_shortest_path_vector(g, max_len) for g in test_g])
    return train_feats @ train_feats.T, test_feats @ train_feats.T


## Weisfeiler-Lehman Kernel

In [27]:
class WLKernel(BaseEstimator, TransformerMixin):
    def __init__(self, h=3, norm=True):
        self.h = h
        self.norm = norm
        self.label_map = {}
        self._fit_graphs = []

    def _wl_step(self, G, labels):
        new = {}
        for node in G:
            neigh = sorted(labels[nei] for nei in G.neighbors(node))
            key = (labels[node], tuple(neigh))
            if key not in self.label_map:
                self.label_map[key] = len(self.label_map) + 1
            new[node] = self.label_map[key]
        return new

    def _graph_feature_vector(self, G):
        labels = {n: 1 for n in G}
        feature_counts = defaultdict(int)
        for l in labels.values():
            feature_counts[l] += 1
        for _ in range(self.h):
            labels = self._wl_step(G, labels)
            for l in labels.values():
                feature_counts[l] += 1
        return feature_counts

    def fit(self, graphs, y=None):
        self.label_map.clear()
        self._fit_graphs = graphs
        for g in graphs:
            self._graph_feature_vector(g)
        return self

    def transform(self, graphs):
        base_vecs = [self._graph_feature_vector(g) for g in self._fit_graphs]
        target_vecs = [self._graph_feature_vector(g) for g in graphs]
        K = np.zeros((len(graphs), len(base_vecs)))
        for i, vec1 in enumerate(target_vecs):
            for j, vec2 in enumerate(base_vecs):
                keys = set(vec1) & set(vec2)
                val = sum(vec1[k] * vec2[k] for k in keys)
                if self.norm:
                    norm1 = np.sqrt(sum(v ** 2 for v in vec1.values()))
                    norm2 = np.sqrt(sum(v ** 2 for v in vec2.values()))
                    val /= (norm1 * norm2 + 1e-10)
                K[i, j] = val
        return K

## Обучение и оценка

In [28]:
graphs, labels = generate_center_dataset(1000, max_nodes=16)
X_tr, X_te, y_tr, y_te = train_test_split(graphs, labels, test_size=0.25, random_state=1)

# Shortest Path Kernel
K_tr_sp, K_te_sp = shortest_path_kernel(X_tr, X_te)
clf_sp = GridSearchCV(SVC(kernel='precomputed'), {'C': [0.01, 0.1, 1, 10]}, cv=5)
clf_sp.fit(K_tr_sp, y_tr)
y_pred_sp = clf_sp.predict(K_te_sp)

print("SPK Accuracy:", accuracy_score(y_te, y_pred_sp))
print("SPK F1:", f1_score(y_te, y_pred_sp))

# Weisfeiler-Lehman Kernel
wlk = WLKernel(h=4)
wlk.fit(X_tr)
K_tr_wl = wlk.transform(X_tr)
K_te_wl = wlk.transform(X_te)

clf_wl = GridSearchCV(SVC(kernel='precomputed'), {'C': [0.01, 1, 100]}, cv=5)
clf_wl.fit(K_tr_wl, y_tr)
y_pred_wl = clf_wl.predict(K_te_wl)

print("WLK Accuracy:", accuracy_score(y_te, y_pred_wl))
print("WLK F1:", f1_score(y_te, y_pred_wl))


SPK Accuracy: 1.0
SPK F1: 1.0
WLK Accuracy: 1.0
WLK F1: 1.0


## Сравнение Weisfeiler-Lehman Kernel и Shortest Path Kernel

In [29]:
print("\nСравнение:")
print(f"Shortest Path: acc={accuracy_score(y_te, y_pred_sp):.3f}, f1={f1_score(y_te, y_pred_sp):.3f}")
print(f"Weisfeiler-Lehman: acc={accuracy_score(y_te, y_pred_wl):.3f}, f1={f1_score(y_te, y_pred_wl):.3f}")


Сравнение:
Shortest Path: acc=1.000, f1=1.000
Weisfeiler-Lehman: acc=1.000, f1=1.000
