In [None]:
!pip install torch_geometric
!pip install torch-scatter
!pip install gudhi
!pip install torchdiffeq
!pip install scikit-optimize
!pip install optuna

Main model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import global_mean_pool
from torch_geometric.datasets import TUDataset
from torch_geometric.loader import DataLoader
from torch_geometric.utils import k_hop_subgraph

import gudhi as gd
import numpy as np

from torchdiffeq import odeint

import optuna
import random

# DeepSets-векторизация для персистентных диаграмм
class DeepSetPH(nn.Module):
    def __init__(self, phi_dim=64, out_dim=32):
        super().__init__()
        self.phi = nn.Sequential(
            nn.Linear(2, phi_dim),
            nn.ReLU(),
            nn.Linear(phi_dim, phi_dim),
            nn.ReLU()
        )
        self.rho = nn.Sequential(
            nn.Linear(phi_dim, out_dim),
            nn.ReLU()
        )

    def forward(self, diagrams):
        batch_embeds = []
        for dgm in diagrams:
            if len(dgm) == 0:
                emb = torch.zeros(self.rho[0].out_features, device=self.rho[0].weight.device)
            else:
                dgm_tensor = torch.tensor(dgm, dtype=torch.float32, device=self.rho[0].weight.device)
                phi_out = self.phi(dgm_tensor)
                pooled = phi_out.sum(dim=0)
                emb = self.rho(pooled)
            batch_embeds.append(emb.unsqueeze(0))
        return torch.cat(batch_embeds, dim=0)

# Функция для вычисления локальных Персистентных диаграмм (H_0 и H_1)
def compute_local_ph_diagrams(x, edge_index, k=1, maxdim=1, hom_dims=[0, 1]):
    num_nodes = x.size(0)
    x_np = x.detach().cpu().numpy()
    edge_index = edge_index.cpu()
    diagrams = []
    for v in range(num_nodes):
        subset, sub_edge_index, _, _ = k_hop_subgraph(v, k, edge_index, relabel_nodes=True)
        pts = x_np[subset.numpy()]
        if pts.shape[0] < 2:
            dgm = np.array([[0.0, 0.0]])
        else:
            try:
                rips = gd.RipsComplex(points=pts, max_edge_length=10.0)
                st = rips.create_simplex_tree(max_dimension=maxdim)
                st.persistence()
                dgm = []
                for h in hom_dims:
                    dgm_h = st.persistence_intervals_in_dimension(h)
                    dgm += dgm_h
                dgm = np.array([p for p in dgm if np.isfinite(p[1])])
                if len(dgm) == 0:
                    dgm = np.array([[0.0, 0.0]])
            except Exception as e:
                dgm = np.array([[0.0, 0.0]])
        diagrams.append(dgm)
    return diagrams

# Стандартный слой TopNets
class TopNetsLayer(nn.Module):
    def __init__(self, in_channels, out_channels, phi_dim=64, ph_out_dim=32):
        super().__init__()
        self.msg_linear = nn.Linear(in_channels, out_channels)
        self.ph_encoder = DeepSetPH(phi_dim=phi_dim, out_dim=ph_out_dim)
        self.update_linear = nn.Linear(out_channels + ph_out_dim, out_channels)

    def forward(self, x, edge_index):
        row, col = edge_index
        msg = self.msg_linear(x)
        aggr = torch.zeros_like(msg)
        aggr.index_add_(0, row, msg[col])
        diagrams = compute_local_ph_diagrams(x, edge_index, k=1, maxdim=1, hom_dims=[0, 1])
        ph_embed = self.ph_encoder(diagrams)
        combined = torch.cat([aggr, ph_embed], dim=-1)
        return F.relu(self.update_linear(combined))

# Эквивариантный слой TopNets
class EquivariantTopNetsLayer(nn.Module):
    def __init__(self, in_channels, out_channels, phi_dim=64, ph_out_dim=32):
        super().__init__()
        self.msg_linear = nn.Linear(in_channels, out_channels)
        self.ph_encoder = DeepSetPH(phi_dim=phi_dim, out_dim=ph_out_dim)
        self.update_linear = nn.Linear(out_channels + ph_out_dim, out_channels)

    def forward(self, x, x_coord, edge_index):
        row, col = edge_index
        msg = self.msg_linear(x)
        aggr = torch.zeros_like(msg)
        aggr.index_add_(0, row, msg[col])
        diagrams = compute_local_ph_diagrams(x_coord, edge_index, k=1, maxdim=1, hom_dims=[0, 1])
        ph_embed = self.ph_encoder(diagrams)
        combined = torch.cat([aggr, ph_embed], dim=-1)
        return F.relu(self.update_linear(combined))

# Continuous TopNets (Neural ODE)
class ODEFunc(nn.Module):
    def __init__(self, feature_dim):
        super().__init__()
        self.f = nn.Sequential(
            nn.Linear(feature_dim, feature_dim),
            nn.ReLU(),
            nn.Linear(feature_dim, feature_dim)
        )

    def forward(self, t, x):
        return self.f(x)

class ODEBlock(nn.Module):
    def __init__(self, odefunc, t0=0.0, t1=1.0, tol=1e-3):
        super().__init__()
        self.odefunc = odefunc
        self.integration_time = torch.tensor([t0, t1]).float()
        self.tol = tol

    def forward(self, x):
        integration_time = self.integration_time.type_as(x)
        out = odeint(self.odefunc, x, integration_time, rtol=self.tol, atol=self.tol)
        return out[1]

# модель TopNets с режимами 'standard', 'equivariant' и 'continuous'
class UnifiedTopNets(nn.Module):
    def __init__(self, in_channels, hidden_channels, num_layers, num_classes, mode='standard', phi_dim=64, ph_out_dim=32):
        super().__init__()
        self.mode = mode
        self.readout = global_mean_pool

        if mode == 'standard':
            self.layers = nn.ModuleList()
            self.layers.append(TopNetsLayer(in_channels, hidden_channels, phi_dim, ph_out_dim))
            for _ in range(num_layers - 1):
                self.layers.append(TopNetsLayer(hidden_channels, hidden_channels, phi_dim, ph_out_dim))
            self.classifier = nn.Linear(hidden_channels, num_classes)

        elif mode == 'equivariant':
            self.layers = nn.ModuleList()
            self.layers.append(EquivariantTopNetsLayer(in_channels, hidden_channels, phi_dim, ph_out_dim))
            for _ in range(num_layers - 1):
                self.layers.append(EquivariantTopNetsLayer(hidden_channels, hidden_channels, phi_dim, ph_out_dim))
            self.classifier = nn.Linear(hidden_channels, num_classes)

        elif mode == 'continuous':
            self.encoder = nn.Linear(in_channels, hidden_channels)
            self.ph_encoder = DeepSetPH(phi_dim=phi_dim, out_dim=ph_out_dim)
            self.odefunc = ODEFunc(hidden_channels + ph_out_dim)
            self.odeblock = ODEBlock(self.odefunc)
            self.classifier = nn.Linear(hidden_channels + ph_out_dim, num_classes)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch

        if self.mode == 'standard':
            for layer in self.layers:
                x = layer(x, edge_index)
            x = self.readout(x, batch)
            return self.classifier(x)

        elif self.mode == 'equivariant':
            if hasattr(data, "pos") and data.pos is not None:
                x_coord = data.pos
            else:
                x_coord = x
            for layer in self.layers:
                x = layer(x, x_coord, edge_index)
            x = self.readout(x, batch)
            return self.classifier(x)

        elif self.mode == 'continuous':
            x = self.encoder(x)
            diagrams = compute_local_ph_diagrams(x, edge_index, k=1, maxdim=1, hom_dims=[0, 1])
            ph_embed = self.ph_encoder(diagrams)
            x = torch.cat([x, ph_embed], dim=-1)
            x = self.odeblock(x)
            x = self.readout(x, batch)
            return self.classifier(x)

# Функции тренировки и тестирования
def train_epoch(model, optimizer, criterion, loader, device):
    model.train()
    total_loss = 0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * data.num_graphs
    return total_loss / len(loader.dataset)

@torch.no_grad()
def test_epoch(model, loader, device):
    model.eval()
    correct = 0
    for data in loader:
        data = data.to(device)
        out = model(data)
        pred = out.argmax(dim=1)
        correct += int((pred == data.y).sum())
    return correct / len(loader.dataset)

# байес
def objective(trial, dataset_name, device):
    mode = trial.suggest_categorical("mode", ["standard", "equivariant", "continuous"])
    hidden_channels = trial.suggest_int("hidden_channels", 32, 128, step=16)
    num_layers = trial.suggest_int("num_layers", 1, 5)
    phi_dim = trial.suggest_int("phi_dim", 32, 128, step=16)
    ph_out_dim = trial.suggest_int("ph_out_dim", 16, 64, step=8)
    lr = trial.suggest_loguniform("lr", 1e-3, 1e-2)
    epochs = trial.suggest_int("epochs", 20, 50)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])

    dataset = TUDataset(root='/tmp/' + dataset_name, name=dataset_name)
    dataset = dataset.shuffle()
    split_idx = int(0.8 * len(dataset))
    train_dataset = dataset[:split_idx]
    test_dataset = dataset[split_idx:]
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    model = UnifiedTopNets(
        in_channels=dataset.num_node_features,
        hidden_channels=hidden_channels,
        num_layers=num_layers,
        num_classes=dataset.num_classes,
        mode=mode,
        phi_dim=phi_dim,
        ph_out_dim=ph_out_dim
    ).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    best_acc = 0.0
    for epoch in range(epochs):
        loss = train_epoch(model, optimizer, criterion, train_loader, device)
        acc = test_epoch(model, test_loader, device)
        best_acc = max(best_acc, acc)
        trial.report(acc, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
    return best_acc

#запуск экспериментов на нескольких датасетах с подбором гиперпараметров
if __name__ == "__main__":
    torch.manual_seed(42)
    random.seed(42)
    np.random.seed(42)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    dataset_names = ["MUTAG", "PROTEINS", "IMDB-BINARY", "NCI1"]
    results = {}
    for ds in dataset_names:
        print(f"\n=== Запуск оптимизации для датасета: {ds} ===")
        study = optuna.create_study(direction="maximize")
        study.optimize(lambda trial: objective(trial, ds, device), n_trials=20)
        print(f"Лучшее значение для {ds}: {study.best_value:.4f}")
        print(f"Лучшие гиперпараметры для {ds}: {study.best_params}")
        results[ds] = {"best_value": study.best_value, "best_params": study.best_params}

    print("\n=== Итоговые результаты по датасетам ===")
    for ds in dataset_names:
        print(f"{ds}: Acc = {results[ds]['best_value']:.4f}, Params = {results[ds]['best_params']}")

My MLP model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.datasets import TUDataset
from torch_geometric.loader import DataLoader
from torch_geometric.nn import global_mean_pool

import optuna
import numpy as np
import random

# MLP модель
class MLPGraph(nn.Module):
    def __init__(self, in_channels, hidden_channels, num_layers, num_classes):
        super().__init__()
        layers = []
        layers.append(nn.Linear(in_channels, hidden_channels))
        layers.append(nn.ReLU())
        for _ in range(num_layers - 1):
            layers.append(nn.Linear(hidden_channels, hidden_channels))
            layers.append(nn.ReLU())
        self.mlp = nn.Sequential(*layers)
        self.classifier = nn.Linear(hidden_channels, num_classes)

    def forward(self, data):
        x, batch = data.x, data.batch
        if x is None:
            num_nodes = data.num_nodes
            x = torch.ones((num_nodes, 1), device=data.edge_index.device)
        x = self.mlp(x)
        x = global_mean_pool(x, batch)
        return self.classifier(x)

# Тренировка и тест
def train_epoch(model, optimizer, criterion, loader, device):
    model.train()
    total_loss = 0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * data.num_graphs
    return total_loss / len(loader.dataset)

@torch.no_grad()
def test_epoch(model, loader, device):
    model.eval()
    correct = 0
    for data in loader:
        data = data.to(device)
        out = model(data)
        pred = out.argmax(dim=1)
        correct += int((pred == data.y).sum())
    return correct / len(loader.dataset)

# байес
def objective_mlp(trial, dataset_name, device):
    hidden_channels = trial.suggest_int("hidden_channels", 32, 128, step=16)
    num_layers = trial.suggest_int("num_layers", 1, 5)
    lr = trial.suggest_loguniform("lr", 1e-3, 1e-2)
    epochs = trial.suggest_int("epochs", 20, 50)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])

    dataset = TUDataset(root='/tmp/' + dataset_name, name=dataset_name)
    dataset = dataset.shuffle()
    split_idx = int(0.8 * len(dataset))
    train_dataset = dataset[:split_idx]
    test_dataset = dataset[split_idx:]
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    in_channels = dataset.num_node_features
    if in_channels == 0:
        in_channels = 1

    model = MLPGraph(
        in_channels=in_channels,
        hidden_channels=hidden_channels,
        num_layers=num_layers,
        num_classes=dataset.num_classes
    ).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    best_acc = 0.0
    for epoch in range(epochs):
        train_epoch(model, optimizer, criterion, train_loader, device)
        acc = test_epoch(model, test_loader, device)
        best_acc = max(best_acc, acc)
        trial.report(acc, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
    return best_acc

if __name__ == "__main__":
    torch.manual_seed(42)
    random.seed(42)
    np.random.seed(42)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    dataset_names = ["MUTAG", "PROTEINS", "IMDB-BINARY", "NCI1"]
    results = {}
    for ds in dataset_names:
        print(f"\n=== Оптимизация для датасета: {ds} (MLP) ===")
        study = optuna.create_study(direction="maximize")
        study.optimize(lambda trial: objective_mlp(trial, ds, device), n_trials=20)
        print(f"Лучшее значение для {ds}: {study.best_value:.4f}")
        print(f"Лучшие гиперпараметры для {ds}: {study.best_params}")
        results[ds] = {"best_value": study.best_value, "best_params": study.best_params}

    print("\n=== Итоговые результаты MLP ===")
    for ds in dataset_names:
        print(f"{ds}: Acc = {results[ds]['best_value']:.4f}, Params = {results[ds]['best_params']}")


My GCN

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.datasets import TUDataset
from torch_geometric.loader import DataLoader
from torch_geometric.nn import GCNConv, global_mean_pool

import optuna
import numpy as np
import random

# GCN модель
class GCNGraphClassifier(nn.Module):
    def __init__(self, in_channels, hidden_channels, num_layers, num_classes):
        super().__init__()
        self.convs = nn.ModuleList()
        self.convs.append(GCNConv(in_channels, hidden_channels))
        for _ in range(num_layers - 1):
            self.convs.append(GCNConv(hidden_channels, hidden_channels))
        self.classifier = nn.Linear(hidden_channels, num_classes)

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        if x is None:
            num_nodes = data.num_nodes
            x = torch.ones((num_nodes, 1), device=edge_index.device)
        for conv in self.convs:
            x = conv(x, edge_index)
            x = F.relu(x)
        x = global_mean_pool(x, batch)
        return self.classifier(x)

# Тренировка и тест
def train_epoch(model, optimizer, criterion, loader, device):
    model.train()
    total_loss = 0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * data.num_graphs
    return total_loss / len(loader.dataset)

@torch.no_grad()
def test_epoch(model, loader, device):
    model.eval()
    correct = 0
    for data in loader:
        data = data.to(device)
        out = model(data)
        pred = out.argmax(dim=1)
        correct += int((pred == data.y).sum())
    return correct / len(loader.dataset)

# байес
def objective_gcn(trial, dataset_name, device):
    hidden_channels = trial.suggest_int("hidden_channels", 32, 128, step=16)
    num_layers = trial.suggest_int("num_layers", 1, 5)
    lr = trial.suggest_loguniform("lr", 1e-3, 1e-2)
    epochs = trial.suggest_int("epochs", 20, 50)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])

    dataset = TUDataset(root='/tmp/' + dataset_name, name=dataset_name)
    dataset = dataset.shuffle()
    split_idx = int(0.8 * len(dataset))
    train_dataset = dataset[:split_idx]
    test_dataset = dataset[split_idx:]
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    in_channels = dataset.num_node_features
    if in_channels == 0:
        in_channels = 1

    model = GCNGraphClassifier(
        in_channels=in_channels,
        hidden_channels=hidden_channels,
        num_layers=num_layers,
        num_classes=dataset.num_classes
    ).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    best_acc = 0.0
    for epoch in range(epochs):
        train_epoch(model, optimizer, criterion, train_loader, device)
        acc = test_epoch(model, test_loader, device)
        best_acc = max(best_acc, acc)
        trial.report(acc, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
    return best_acc

if __name__ == "__main__":
    torch.manual_seed(42)
    random.seed(42)
    np.random.seed(42)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    dataset_names = ["MUTAG", "PROTEINS", "IMDB-BINARY", "NCI1"]
    results = {}
    for ds in dataset_names:
        print(f"\n=== Оптимизация для датасета: {ds} (GCN) ===")
        study = optuna.create_study(direction="maximize")
        study.optimize(lambda trial: objective_gcn(trial, ds, device), n_trials=20)
        print(f"Лучшее значение для {ds}: {study.best_value:.4f}")
        print(f"Лучшие гиперпараметры для {ds}: {study.best_params}")
        results[ds] = {"best_value": study.best_value, "best_params": study.best_params}

    print("\n=== Итоговые результаты GCN ===")
    for ds in dataset_names:
        print(f"{ds}: Acc = {results[ds]['best_value']:.4f}, Params = {results[ds]['best_params']}")
