In [1]:
import networkx as nx
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.svm import SVC
from collections import Counter

**Найти или сгенерировать набор данных для бинарной классификации графов**

In [2]:
def create_dataset(size = 100, nodes = 30):
    graph_list = [nx.cycle_graph(nodes) for i in range(10, size//2 + 10)]
    graph_list.extend([nx.path_graph(nodes) for i in range(size//2 + 10, size + 10)])
    y = [0 if i < (size//2 + 10) else 1 for i in range(size)]

    return graph_list, y

graph_list, y = create_dataset(500, 50)
train_graphs, test_graphs, y_train, y_test = train_test_split(graph_list, y, test_size=0.1, stratify=y)

**Реализовать функцию `shortest_path_kernel(train_graphs, test_graphs)`, которая принимает тренировочный и тестовые наборы, а возвращает пару `K_train, K_test`**
  - Опишите графы с помощью вектора из количества кратчайших путей различной длины
  - Для вычисления длин кратчайших путей можно использовать `nx.shortest_path_length(G)`
  - Ядровая функция для сравнения двух графов - скалярное произведение их двух векторов
  - `K_train` - матрица из ядровых функций для сравнения тренировочных графов между собой
  - `K_test` - матрица из ядровых функций для сравнения тестовых графов с тренировочными

In [5]:
def shortest_path_kernel(train_graphs, test_graphs, nodes = 50):

  # initialization
  train_vector = np.zeros((len(train_graphs), nodes * (nodes - 1)))
  test_vector = np.zeros((len(test_graphs), nodes * (nodes - 1)))

  # train_graphs
  for i, graph in enumerate(train_graphs):
    idx = 0
    # fill vector with shortest paths between each nodes
    for u in range(graph.number_of_nodes() - 1):
      for v in range(u + 1, graph.number_of_nodes()):
        train_vector[i][idx] = nx.shortest_path_length(graph, u, v)
        idx += 1

  # test_graphs
  for i, graph in enumerate(test_graphs):
    idx = 0
    for u in range(graph.number_of_nodes() - 1):
      for v in range(u + 1, graph.number_of_nodes()):
        test_vector[i][idx] = nx.shortest_path_length(graph, u, v)
        idx += 1

  # kernel functions matrixs
  K_train = np.dot(train_vector, train_vector.T)
  K_test = np.dot(test_vector, train_vector.T)
  return K_train, K_test

K_train, K_test = shortest_path_kernel(train_graphs, test_graphs)

**Используя реализованное ядро обучите модель SVC, подберите гиперпараметры, вычислите различные метрики качества**

In [6]:
# tune GridSearch params
param_grid = {
    'C': [0.01, 0.05, 0.1, 1, 10, 100, 1000],
    'cache_size': [50, 100, 200],
    'tol': [0.01, 0.001, 0.0001]
}
# search best params
model = SVC(kernel ='precomputed')
grid = GridSearchCV(model, param_grid)
grid.fit(K_train, y_train)
best_params = grid.best_params_

print(best_params)

# predict best model and compute metrics
y_pred = grid.predict(K_test)

print(f'Accuracy: {accuracy_score(y_test, y_pred)}')
print(f'Precision: {precision_score(y_test, y_pred)}')
print(f'Recall: {recall_score(y_test, y_pred)}')

{'C': 0.01, 'cache_size': 50, 'tol': 0.01}
Accuracy: 0.98
Precision: 0.96
Recall: 1.0


**Также реализовать Weisfeiler-Lehman Kernel и обучить классификатор с ним, сравнить результаты.**

In [3]:
def weisfeiler_lehman_kernel(train_graphs, test_graphs, iteration=5):
    K_train = np.zeros((len(train_graphs), len(train_graphs)))
    K_test = np.zeros((len(test_graphs), len(train_graphs)))
    # List with coloring results
    train_graphs_labels = [weisfeiler_lehman_labels(G, iteration) for G in train_graphs]
    test_graphs_labels = [weisfeiler_lehman_labels(G, iteration) for G in test_graphs]

    # Calculating each cell of K_train
    for i in range(len(train_graphs)):
        for j in range(i, len(train_graphs)):
            # Calculating the kernel value
            kernel_value = weisfeiler_lehman_kernel_value(train_graphs_labels[i], train_graphs_labels[j])
            K_train[i, j] = kernel_value
            # Mirror the kernel values along the diagonal
            K_train[j, i] = kernel_value

    # Calculating each cell of K_test
    for i in range(len(test_graphs)):
        for j in range(len(train_graphs)):
            # Calculating the kernel value
            kernel_value = weisfeiler_lehman_kernel_value(test_graphs_labels[i], train_graphs_labels[j])
            K_test[i, j] = kernel_value

    return K_train, K_test

def weisfeiler_lehman_labels(graph, iteration):
    # Initialize all colors with the degree of the vertex
    labels = {node: str(graph.degree(node)) for node in graph.nodes()}

    for _ in range(iteration):
        # Compute new colors based on neighbors
        new_labels = {}
        for node in graph.nodes():
            neighbors = graph.neighbors(node)
            # Node color + sorted colors of neighboring vertices
            new_label = [labels[node]] + sorted([labels[neighbor] for neighbor in neighbors])
            new_label = ",".join(new_label)

            # Store the new color of the vertex (using a hash to save memory)
            new_labels[node] = str(hash(new_label))
            # new_labels[node] = new_label
        # Update colors
        labels = new_labels
    return labels

def weisfeiler_lehman_kernel_value(labels_i, labels_j):
    # Count each color in each graph
    kernel_value = 0
    counts_i = {}
    counts_j = {}
    for label in labels_i.values():
        counts_i[label] = counts_i.get(label, 0) + 1
    for label in labels_j.values():
        counts_j[label] = counts_j.get(label, 0) + 1
    # Compute the vector as the dot product of color frequencies
    for label in set(counts_i.keys()).union(counts_j.keys()):
        kernel_value += counts_i.get(label, 0) * counts_j.get(label, 0)
    return kernel_value

K_train_wl, K_test_wl = weisfeiler_lehman_kernel(train_graphs, test_graphs)

In [4]:
# tune GridSearch params
param_grid = {
    'C': [0.01, 0.05, 0.1, 1, 10, 100, 1000],
    'cache_size': [50, 100, 200],
    'tol': [0.01, 0.001, 0.0001]
}
# search best params
model = SVC(kernel ='precomputed')
wl_grid = GridSearchCV(model, param_grid, refit = True)
wl_grid.fit(K_train_wl, y_train)
wl_best_params = wl_grid.best_params_

print(wl_best_params)

# predict best model and compute metrics
y_pred_wl = wl_grid.predict(K_test_wl)

print(f'Accuracy: {accuracy_score(y_test, y_pred_wl)}')
print(f'Precision: {precision_score(y_test, y_pred_wl)}')
print(f'Recall: {recall_score(y_test, y_pred_wl)}')

{'C': 0.01, 'cache_size': 50, 'tol': 0.01}
Accuracy: 0.98
Precision: 0.96
Recall: 1.0


**Выводы**:
* Оба метода показали хорошие результаты
* Выбор метода может зависеть от конкретных задач. Если важно избежать ложных срабатываний (например, минимизировать ложноположительные результаты), лучше использовать Shortest Path Kernel.
* Если же важнее находить все положительные примеры, даже с возможными ложными срабатываниями, стоит предпочесть Weisfeiler-Lehman Kernel