# Курсовая работа на тему «Анализ методов генерации признаков узлов в графах»

Все соответствующие результаты можно найти в пояснительной записке к работе.

## Прежде чем начать

Для трэкинга экспериментов используется сервер MLFlow. Чтобы запустить код ниже, настройте переменную окружения `MLFLOW_REMOTE_SERVER`:

- Для удалённого сервера MLFlow вставьте адрес сервера, например `http://mymlflow.com`
- Для локального запуска, запустите сервер с помощью `mlflow server` и вставьте `http://localhost:5000`

Ячейка ниже оставлена для изменения сервера MLFlow и некоторые его настройки.

In [1]:
import os

os.environ["MLFLOW_REMOTE_SERVER"] = "http://localhost:5000"

MLFLOW_SAVE_MODELS = False

Ниже - необходимые импорты, оставьте без изменения.

In [None]:
import logging
from time import time

import matplotlib.pyplot as plt
import mlflow
import networkx as nx
import torch
import torch.nn as nn
import numpy as np
import torch_geometric.nn as gnn
from sklearn.linear_model import LogisticRegression
from sklearn.manifold import TSNE
from sklearn.metrics import (
    accuracy_score,
    davies_bouldin_score,
    f1_score,
    precision_score,
    recall_score,
    silhouette_score,
)

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch_geometric.data import Data
from torch_geometric.datasets import Planetoid, FacebookPagePage
from torch_geometric.nn import GATConv, GCNConv, SAGEConv, Node2Vec, GINConv
from torch_geometric.utils import to_networkx
from tqdm import tqdm
import os
from dataclasses import dataclass
from copy import copy

logging.getLogger("mlflow.utils.requirements_utils").setLevel(logging.ERROR)
logging.getLogger("mlflow.models.model").setLevel(logging.ERROR)


MLFLOW_REMOTE_SERVER = os.environ["MLFLOW_REMOTE_SERVER"]

mlflow.set_tracking_uri(MLFLOW_REMOTE_SERVER)
mlflow.set_experiment("Graph Feature Analysis")
mlflow.config.enable_async_logging()

DEVICE = "cpu"

if torch.cuda.is_available():
    DEVICE = "cuda"
elif torch.backends.mps.is_available():
    DEVICE = "mps"

print(f"Device: {DEVICE}")
print(f"MLflow server: {MLFLOW_REMOTE_SERVER}")
print(f"Save models? - {MLFLOW_SAVE_MODELS}")

Device: mps
MLflow server: http://localhost:5000
Save models? - False


## Загрузка данных

Для нашей задачи достаточно загрузить встроенные в `pytorch_geometric` датасеты с графами. Тестирование методов будут производиться на задаче классификации узлов на следующих графах:

- `Cora`
    - Цитационная сеть научных статей, классификация тематики статьи (7 категорий).
- `PubMed`
    - Цитационная сеть медицинских статей, классификация типа статьи о диабете (3 категории).
- `FacebookPageToPage`
    - Граф взаимных ссылок страниц Facebook, классификация типа страницы (4 категории).

In [3]:
def load_datasets():
    cora = Planetoid(root="/tmp/Cora", name="Cora")[0]

    pubmed = Planetoid(root="/tmp/Pubmed", name="Pubmed")[0]

    facebook = FacebookPagePage("/tmp/Facebook")[0]

    facebook.train_mask = torch.zeros(facebook.num_nodes, dtype=torch.bool)
    facebook.test_mask = torch.zeros(facebook.num_nodes, dtype=torch.bool)

    train_idx, test_idx = train_test_split(np.arange(facebook.num_nodes), test_size=0.2)

    facebook.train_mask[train_idx] = True
    facebook.test_mask[test_idx] = True

    return {
        "cora": cora,
        "pubmed": pubmed,
        "facebook": facebook,
    }

In [4]:
datasets = load_datasets()

## Методы генерации признаков

Ячейка ниже - технического характера для хранения конфигураций процесса обучения и параметров моделей.

In [40]:
@dataclass
class TrainConfig:
    epochs: int
    lr: float

    def dump(self):
        return self.__dict__


@dataclass
class Params:
    num_features: int
    layers: list[int]
    embedding_dim: int
    num_classes: int
    walk_length: int
    walks_per_node: int
    context_size: int

    model_global_type: str
    model_local_type: str

    def dump(self):
        return self.__dict__

Здесь задаём возможные статистические признаки для классического анализа графа - степень, PageRank, коэффициент кластеризации и другие. Этим способом мы удобно можем конфигурировать модель классического анализа, изменяя список из объектов класса `ClassicalFeature`.

In [41]:
class ClassicalFeature:
    def __init__(self) -> None:
        self.feature = None
        self.id = 0

    def compute_feature(self, graph): ...

    def __call__(self, graph, node, id=None):
        if not self.id == id:
            self.id = id
            self.feature = self.compute_feature(graph)

        return self.feature[node]


class Degree(ClassicalFeature):
    def compute_feature(self, graph):
        return graph.degree


class Clustering(ClassicalFeature):
    def compute_feature(self, graph):
        return nx.clustering(graph)


class PageRank(ClassicalFeature):
    def compute_feature(self, graph):
        return nx.pagerank(graph)


class EigenvectorCentrality(ClassicalFeature):
    def compute_feature(self, graph):
        return nx.eigenvector_centrality(graph)


class NumberOfTriangles(ClassicalFeature):
    def compute_feature(self, graph):
        return nx.triangles(graph)


class CoreNumber(ClassicalFeature):
    def compute_feature(self, graph):
        graph.remove_edges_from(nx.selfloop_edges(graph))
        return nx.core_number(graph)

В следующей ячейке описываются базовый класс с необходимыми методами и атрибутами, от которого будут наследоваться все остальные.

In [42]:
class BaseExtractor(nn.Module):
    def __init__(self, *args, **kwargs):
        super().__init__()
        self.is_trainable: bool = True
        self.method: str = "method_name"
        self.custom_train: bool = False

    def get_embeddings(self, graph):
        pass

Композитор (`Composer`) - особый класс для объединения эмбеддингов нескольких моделей в один. Позволяет гибко настраивать список моделей (и использование исходных данных), эмбеддинги которых необходимо объединить и передать дальше, используя одинаковый интерфейс.

In [49]:
class Composer(BaseExtractor):
    def __init__(self, models, raw_data=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.models = models
        self.method = f"Compose of [{', '.join([model.method for model in self.models] + ['Raw Data'] if raw_data != None else [])}]"
        self.is_trainable = False
        self.raw_data = raw_data
        self.params = self.models[0].params

    def get_embeddings(self, graph):
        with torch.no_grad():
            embeddings = [] if not self.raw_data != None else [self.raw_data.to(DEVICE)]

            for model in self.models:
                embeddings.append(model.get_embeddings(graph).to(DEVICE))
            return torch.cat(embeddings, dim=1)

Модель классического анализа для получения информации о структуре графа и предоставления этого в виде эмбеддинга для каждого узла. Именно сюда необходимо передавать список фичей, необходимый для экстрагирования.

In [None]:
class ClassicalFeatureExtractor(BaseExtractor):
    def __init__(
        self, features: list[ClassicalFeature], params: Params, *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.is_trainable = False
        self.method = "Classical"
        self.features = features
        self.computed_features = None
        self.params = params

    def get_embeddings(self, graph, recompute=False):
        if self.computed_features is not None and not recompute:
            return self.computed_features

        self.computed_features = []

        graph = to_networkx(graph, to_undirected=True)

        id = np.random.randint(100000)

        for node in tqdm(graph.nodes, desc="Извлечение признаков"):
            self.computed_features.append([f(graph, node, id) for f in self.features])

        self.computed_features = torch.tensor(
            self.computed_features, dtype=torch.float
        ).to(DEVICE)

        return self.computed_features

Следующие методы представлены в виде моделей глубого обучения на основе слоёв из `pytorch_geometric`. 

Для их обучения используется две части - `encoder` и `decoder`. Энкодер представляет собой модель графовой нейронной сети, использующийся как раз для генерации признаков узлов. Декодер же использовается лишь в процессе обучения модели и представляет собой крайне лёгкую голову классификации (два полносвязных слоя).

In [45]:
class GNNExtractor(BaseExtractor):
    def __init__(self, encoder: nn.Module, decoder: nn.Module, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.is_trainable = True
        self.method = "GNN"

    def forward(self, data):
        x, ei = data.x, data.edge_index
        ecnoded = self.encoder(x, ei)
        decoded = self.decoder(ecnoded)
        return decoded

    def get_embeddings(self, graph):
        with torch.no_grad():
            x, ei = graph.x, graph.edge_index
            return self.encoder(x, ei)


class GCN(GNNExtractor):
    def __init__(self, params: Params, *args, **kwargs):
        layers = [params.num_features] + params.layers + [params.embedding_dim]
        seq = []
        for i in range(len(layers) - 1):
            seq.extend(
                [
                    (
                        GCNConv(layers[i], layers[i + 1]),
                        "x, edge_index -> x",
                    ),
                    nn.ReLU(),
                ]
            )
        encoder = gnn.Sequential(
            "x, edge_index",
            seq,
        )

        decoder = nn.Sequential(
            nn.Linear(params.embedding_dim, 32),
            nn.ReLU(),
            nn.Linear(32, params.num_classes),
        )

        super().__init__(encoder, decoder, *args, **kwargs)
        self.method = "GCN"
        self.params = params


class GraphSAGE(GNNExtractor):
    def __init__(self, params: Params, *args, **kwargs):
        layers = [params.num_features] + params.layers + [params.embedding_dim]
        seq = []
        for i in range(len(layers) - 1):
            seq.extend(
                [
                    (
                        SAGEConv(layers[i], layers[i + 1]),
                        "x, edge_index -> x",
                    ),
                    nn.ReLU(),
                ]
            )
        encoder = gnn.Sequential(
            "x, edge_index",
            seq,
        )

        decoder = nn.Sequential(
            nn.Linear(params.embedding_dim, 32),
            nn.ReLU(),
            nn.Linear(32, params.num_classes),
        )

        super().__init__(encoder, decoder, *args, **kwargs)
        self.method = "GraphSAGE"
        self.params = params


class GAT(GNNExtractor):
    def __init__(self, params: Params, *args, **kwargs):
        layers = [params.num_features] + params.layers + [params.embedding_dim]
        seq = []
        for i in range(len(layers) - 1):
            seq.extend(
                [
                    (
                        GATConv(layers[i], layers[i + 1]),
                        "x, edge_index -> x",
                    ),
                    nn.ReLU(),
                ]
            )
        encoder = gnn.Sequential(
            "x, edge_index",
            seq,
        )

        decoder = nn.Sequential(
            nn.Linear(params.embedding_dim, 32),
            nn.ReLU(),
            nn.Linear(32, params.num_classes),
        )
        super().__init__(encoder, decoder, *args, **kwargs)
        self.method = "GAT"
        self.params = params


class GIN(GNNExtractor):
    def __init__(self, params: Params, *args, **kwargs):
        layers = [params.num_features] + params.layers + [params.embedding_dim]
        seq = []
        for i in range(len(layers) - 1):
            seq.extend(
                [
                    (
                        GINConv(
                            nn.Sequential(
                                nn.Linear(layers[i], layers[i + 1]),
                                nn.ReLU(),
                            )
                        ),
                        "x, edge_index -> x",
                    ),
                    nn.ReLU(),
                ]
            )
        encoder = gnn.Sequential(
            "x, edge_index",
            seq,
        )

        decoder = nn.Sequential(
            nn.Linear(params.embedding_dim, 32),
            nn.ReLU(),
            nn.Linear(32, params.num_classes),
        )
        super().__init__(encoder, decoder, *args, **kwargs)
        self.method = "GIN"
        self.params = params

Последняя модель `Node2Vec` основана на идеи матричной факторизации и случайного блуждания по графу.

Обучение такой модели не до конца совместима с используемым интерфейсом, поэтому было принято решение использовать `custom_train` для определения, использовать ли общий цикл обучения или использовать тот, что привязан к модели.

In [46]:
class NodeToVec(BaseExtractor):
    def __init__(self, params: Params, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.model = None
        self.is_trainable = True
        self.custom_train = True
        self.method = "Node2Vec"
        self.params = params

    def get_embeddings(self, graph):
        with torch.no_grad():
            if self.model is None:
                raise RuntimeError("Model not trained yet.")
            return self.model.embedding.weight.data.to(DEVICE)

    def train(self, data, config: TrainConfig):
        self.model = Node2Vec(
            data.edge_index,
            embedding_dim=self.params.embedding_dim,
            walk_length=self.params.walk_length,
            context_size=self.params.context_size,
            walks_per_node=self.params.walks_per_node,
        )
        loader = self.model.loader(batch_size=128, shuffle=True)
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=config.lr)
        self.model.train()

        time_start = time()

        for epoch in tqdm(range(config.epochs), desc=f"Обучение {self.method}"):
            total_loss = 0
            for pos_rw, neg_rw in loader:
                optimizer.zero_grad()
                loss = self.model.loss(pos_rw, neg_rw)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()

            avg_loss = total_loss / len(loader)
            mlflow.log_metric("train_loss", avg_loss, step=epoch)

        time_end = time()
        mlflow.log_metric("train_time", time_end - time_start)

        mlflow.log_params(config.dump(), synchronous=False)
        mlflow.log_params(self.params.dump(), synchronous=False)

        # TODO: пофиксить сейв node2vec
        # if MLFLOW_SAVE_MODELS:
        #     mlflow.pytorch.log_model(self.model, "model")

Следующие функции описывают процесс обучения и подсчёта метрик на разных выборках узлов графа. Эмбеддинги визуализируются с помощью проекции через `t-SNE`.

In [None]:
def _compute_metrics_by_mask(output, labels, mask):
    preds = output[mask].argmax(dim=1).cpu()
    labels = labels[mask].cpu()

    return {
        "accuracy": accuracy_score(labels, preds),
        "f1": f1_score(labels, preds, average="macro", zero_division=0),
        "precision": precision_score(labels, preds, average="macro", zero_division=0),
        "recall": recall_score(labels, preds, average="macro", zero_division=0),
    }


def compute_and_log_train_metrics(output, dataset, loss: float, step: int):
    train_metrics = _compute_metrics_by_mask(output, dataset.y, dataset.train_mask)
    test_metrics = _compute_metrics_by_mask(output, dataset.y, dataset.test_mask)

    metrics = {
        "train_loss": loss,
        **{f"train_{k}": v for k, v in train_metrics.items()},
        **{f"test_{k}": v for k, v in test_metrics.items()},
    }

    mlflow.log_metrics({k: v for k, v in metrics.items()}, step=step, synchronous=False)


def train(model, data, config: TrainConfig):
    mlflow.log_params(model.params.dump(), synchronous=False)
    mlflow.log_params(config.dump(), synchronous=False)

    if not model.is_trainable:
        return

    if model.custom_train:
        model.train(data, config)
        return

    model = model.to(DEVICE)
    data = data.to(DEVICE)

    optimizer = torch.optim.AdamW(model.parameters(), lr=config.lr)
    criterion = nn.CrossEntropyLoss()
    model.train()

    time_start = time()

    for i in tqdm(range(config.epochs), desc=f"Обучение {model.method}"):
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out[data.train_mask], data.y[data.train_mask])

        compute_and_log_train_metrics(out, data, loss.item(), i)

        loss.backward()
        optimizer.step()

    time_end = time()

    mlflow.log_metric("train_time", time_end - time_start, synchronous=False)
    # mlflow.pytorch.log_model(model, "model")


# Оценка качества
def evaluate(features, labels, train_mask, test_mask, classifier=LogisticRegression):
    features = features.to(DEVICE)
    X_train = features[train_mask].detach().cpu()
    y_train = labels[train_mask].detach().cpu()
    X_test = features[test_mask].detach().cpu()
    y_test = labels[test_mask].detach().cpu()

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    clf = classifier(max_iter=1000).fit(X_train, y_train)
    preds = clf.predict(X_test)

    silhouette = silhouette_score(X_train, y_train)
    db_index = davies_bouldin_score(X_test, y_test)

    metrics = {
        "accuracy": accuracy_score(y_test, preds),
        "f1": f1_score(y_test, preds, average="macro", zero_division=0),
        "precision": precision_score(y_test, preds, average="macro", zero_division=0),
        "recall": recall_score(y_test, preds, average="macro", zero_division=0),
        "silhouette_score": silhouette,
        "davies_bouldin_score": db_index,
    }

    mlflow.log_metrics({k: v for k, v in metrics.items()}, synchronous=False)


# Визуализация
def visualize(embeddings, labels, title="Embeddings with T-SNE", limit=100):
    reducer = TSNE(n_components=2)
    emb_2d = reducer.fit_transform(embeddings)

    fig = plt.figure(figsize=(8, 8), layout="tight")

    plt.scatter(emb_2d[:limit, 0], emb_2d[:limit, 1], c=labels[:limit], s=10)

    plt.grid(alpha=0.5, linestyle=":")
    plt.title(title)

    mlflow.log_figure(fig, "embeddings_visualization.png")

    plt.close(fig)


# Сравнение моделей
def benchmark(
    dataset: Data,
    train_config: TrainConfig,
    models: list[BaseExtractor] = [],
    run_only_decoder: bool = True,
    classifier=LogisticRegression,
    run_name: str | None = None,
):
    with mlflow.start_run(run_name=run_name):
        if run_only_decoder:
            with mlflow.start_run(run_name="DecoderOnly", nested=True):
                print(f"Evaluating {classifier.__name__}")
                time_start = time()
                evaluate(
                    dataset.x,
                    dataset.y,
                    dataset.train_mask,
                    dataset.test_mask,
                    classifier=classifier,
                )
                mlflow.log_metric(
                    "evaluation_time", time() - time_start, synchronous=False
                )

        for model in models:
            with mlflow.start_run(run_name=model.method, nested=True):
                train(model, dataset, train_config)
                time_start = time()
                emb = model.get_embeddings(dataset).to(DEVICE)
                mlflow.log_metric(
                    "embedding_time", time() - time_start, synchronous=False
                )
                print(
                    f"Evaluating {classifier.__name__} with {model.method} embeddings"
                )
                time_start = time()
                evaluate(
                    emb,
                    dataset.y,
                    dataset.train_mask,
                    dataset.test_mask,
                    classifier=classifier,
                )
                mlflow.log_metric(
                    "evaluation_time", time() - time_start, synchronous=False
                )
                visualize(
                    emb[dataset.test_mask].detach().cpu().numpy(),
                    dataset.y[dataset.test_mask].detach().cpu().numpy(),
                    f"{model.method} Features",
                )

## Эксперименты

### Оптимизация гиперпараметров

В этой секции прогоняются бенчмарки моделей с разным количеством слоёв, размерностью эмбеддингов и прочих параметров моделей, чтобы выявить наилучшие комбинации.

Для графовых нейронных сетей:

In [None]:
layers_ = [
    [],
    [16],
    [32],
    [64],
    [64, 32],
    [64, 64],
    [128, 64, 32],
]

embdims = [32, 64, 128, 256, 512]

models_list = [GCN, GraphSAGE, GAT, GIN]

repeat = 3

print(f"Total models: {len(models_list) * len(layers_) * len(embdims) * repeat}")

dataset = datasets["cora"]

for model in models_list:
    mlflow.set_experiment(f"Hyperparam {model.__name__}")

    models_gnn = []
    config = TrainConfig(
        epochs=50,
        lr=0.01,
    )
    for layers in layers_:
        for embdim in embdims:
            for _ in range(repeat):
                params = Params(
                    num_features=dataset.num_features,
                    layers=layers,
                    embedding_dim=embdim,
                    num_classes=len(dataset.y.unique()),
                    walk_length=0,
                    context_size=0,
                    walks_per_node=0,
                    model_global_type="gnn",
                    model_local_type=model.__name__,
                )

                models_gnn.append(model(params))

    classifier = LogisticRegression

    models_classical = []

    models_compose = []

    models = models_classical + models_compose + models_gnn

    only_decoder = False
    # only_decoder = True

    emb_models = [model.to(DEVICE) for model in models]

    benchmark(
        dataset,
        config,
        emb_models,
        run_only_decoder=only_decoder,
        classifier=classifier,
        run_name="cora",
    )

Для Node2Vec:

In [None]:
walk_lengths = [10, 20, 30, 50]

context_sizes = [5, 10]

walk_per_node = [5, 10]

embdims = [32, 64]

models_list = [NodeToVec]

repeat = 3

print(
    f"Total models: {len(models_list) * len(walk_lengths) * len(embdims) * repeat * len(context_sizes) * len(walk_per_node)}"
)

dataset = datasets["cora"]

for model in models_list:
    mlflow.set_experiment(f"Hyperparam {model.__name__}")

    models_gnn = []
    config = TrainConfig(
        epochs=50,
        lr=0.01,
    )
    for walk_length in walk_lengths:
        for context_size in context_sizes:
            for walks_per_node in walk_per_node:
                for embdim in embdims:
                    for _ in range(repeat):
                        params = Params(
                            num_features=dataset.num_features,
                            layers=[0],
                            embedding_dim=embdim,
                            num_classes=len(dataset.y.unique()),
                            walk_length=walk_length,
                            context_size=context_size,
                            walks_per_node=walks_per_node,
                            model_global_type="walk",
                            model_local_type=model.__name__,
                        )

                        models_gnn.append(model(params))

    classifier = LogisticRegression

    models_classical = []

    models_compose = []

    models = models_classical + models_compose + models_gnn

    only_decoder = False
    # only_decoder = True

    emb_models = [model.to(DEVICE) for model in models]

    benchmark(
        dataset,
        config,
        emb_models,
        run_only_decoder=only_decoder,
        classifier=classifier,
        run_name="cora",
    )

Судя по результатам, наилучшими параметрами в среднем оказались:

- Два слоя по 64 нейрона
- Размер эмбеддинга - 128
- walk_length: 20
- context_size: 10
- walks_per_node: 10

## Сравнительный анализ

Код ниже запускает обработку всех датасетов всеми логически возможными методами:

In [None]:
mlflow.set_experiment("Graph Feature Analysis")

for name, dataset in datasets.items():
    print(f"--- {name} ---")

    config = TrainConfig(
        epochs=50,
        lr=0.01,
    )

    params = Params(
        num_features=dataset.num_features,
        layers=[64, 64],
        embedding_dim=64,
        num_classes=len(dataset.y.unique()),
        walk_length=20,
        context_size=10,
        walks_per_node=10,
        model_global_type="classical",
        model_local_type=ClassicalFeatureExtractor.__name__,
    )

    classifier = LogisticRegression

    models_classical = [
        ClassicalFeatureExtractor(
            [
                Degree(),
                Clustering(),
                PageRank(),
                EigenvectorCentrality(),
                NumberOfTriangles(),
                CoreNumber(),
            ],
            params,
        )
    ]

    params = copy(params)
    params.model_global_type = "walk"
    params.model_local_type = NodeToVec.__name__

    models_classical.append(NodeToVec(params))

    gnn_models = [GCN, GAT, GraphSAGE, GIN]

    models_gnn = []

    params = copy(params)
    params.embedding_dim = 128
    params.model_global_type = "gnn"

    for model in gnn_models:
        params = copy(params)
        params.model_local_type = model.__name__
        models_gnn.append(model(params))

    models_compose = [
        Composer([models_classical[0]], raw_data=dataset.x),
        Composer([models_classical[1]], raw_data=dataset.x),
        Composer([models_gnn[0]], raw_data=dataset.x),
    ]

    models = models_classical + models_compose + models_gnn

    only_decoder = True

    emb_models = [model.to(DEVICE) for model in models]

    benchmark(
        dataset,
        config,
        emb_models,
        run_only_decoder=only_decoder,
        classifier=classifier,
        run_name=name,
    )