Инициализация

In [3]:
import numpy as np
import tensorflow as tf
import spektral
import random
import networkx as nx
import community

Cora - набор данных для обучения представлению графов (потом надо найти инфу по ней из видоса)
spektral позволяет нам быстро загружать и предварительно обрабатывать многие стандартные наботы данных для обучения представлению графов (к примеру, Cora). Она поставляется с полезными функциями загрузки, которые позволят нам прямой доступ к таким элементам, как:
adj - матрица смежности графа
features - матрица функций, которая дает нам функцию в каждом из узлов
labels - метки, обозначающие тему каждой статьи
train_mask - массив масок: какие узлы принадлежат обучающему набору (training set)
val_mask - массив масок: какие узлы принадлежат проверочному набору (validation set)
test_mask - массив масок: какие узлы принадлежат тестовому набору (test set)

Загружаем эти данные из набора данных 'Cora'

In [4]:
cora_dataset = spektral.datasets.citation.Citation(name='Cora')
test_mask = cora_dataset.mask_te
train_mask = cora_dataset.mask_tr
val_mask = cora_dataset.mask_va
graph = cora_dataset.graphs[0]
features = graph.x
adj = graph.a
labels = graph.y

#features = features.todense()
adj = adj + np.eye(adj.shape[0])
#features = features.astype('float32')
#adj = adj.astype('float32')

adj_np = np.asarray(adj, np.float32)
adj_tf = tf.convert_to_tensor(adj_np, np.float32)

features_np = np.asarray(features, np.float32)
features_tf = tf.convert_to_tensor(features_np, np.float32)

print("Type of adj:", type(adj_tf))
print("Type of features:", type(features_tf))

#Выведем размеры набора данных Cora
# print(graph)
# print(features_tf.shape)
print(adj_tf)
# print(labels.shape)

#Выведем количество узлов в каждом наборе
# print(np.sum(train_mask))
# print(np.sum(val_mask))
# print(np.sum(test_mask))

Type of adj: <class 'tensorflow.python.framework.ops.EagerTensor'>
Type of features: <class 'tensorflow.python.framework.ops.EagerTensor'>
tf.Tensor(
[[1. 0. 0. ... 0. 0. 0.]
 [0. 1. 1. ... 0. 0. 0.]
 [0. 1. 1. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 1. 0. 0.]
 [0. 0. 0. ... 0. 1. 1.]
 [0. 0. 0. ... 0. 1. 1.]], shape=(2708, 2708), dtype=float32)


In [5]:
# Create an example EagerTensor
x = tf.constant([[1, 2, 3], [4, 5, 6]])

# Set elements less than or equal to 2 to 0, and elements greater than 2 to 1
result = tf.where(x <= 2, 0, 1)
matrix_size = adj_tf.shape[0]
print(matrix_size)

2708


Опредем функцию потери перекрестной энтропии и посчета точности вычисления для конкретного набора (mask)

In [6]:
def masked_softmax_cross_entropy(logits, labels, mask):
    loss = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels)
    mask = tf.cast(mask, dtype=tf.float32)
    mask /= tf.reduce_mean(mask)
    loss *= mask
    return tf.reduce_mean(loss)

def masked_accuracy(logits, labels, mask):
    corrent_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(labels, 1))
    accuracy_all = tf.cast(corrent_prediction, tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    mask /= tf.reduce_mean(mask)
    accuracy_all *= mask
    return tf.reduce_mean(accuracy_all)

Определим слой GNN
fts - матрица признаков узлов
adj - матрица смежности
transform - некоторая трансформация, которая будет применяться к каждому узлу
activation - функция активации

In [7]:
def gnn(fts, adj, transform, activation):
    seq_fts = transform(fts) #эквивалент матрицы весов W
    #if isinstance(adj, tf.sparse.SparseTensor):
    #    ret_fts = tf.sparse.sparse_dense_matmul(adj, seq_fts)
    #else:
    ret_fts = tf.matmul(adj, seq_fts)
    return activation(ret_fts)

# def gnn(fts, adj, weights, activation):
    
#     # Check if adj is a sparse matrix
#     if isinstance(adj, tf.sparse.SparseTensor):
#         print("adj is a sparse matrix")
#     else:
#         print("adj is not a sparse matrix")
        
#     # Check if seq_fts is a sparse matrix
#     if isinstance(fts, tf.sparse.SparseTensor):
#         print("fts is a sparse matrix")
#     else:
#         print("fts is not a sparse matrix")

#     if isinstance(adj, tf.sparse.SparseTensor):
#         ret_fts = tf.sparse.sparse_dense_matmul(adj, fts)
#     else:
#         ret_fts = tf.matmul(adj, fts)

#     print("Type of ret_fts = ", type(ret_fts))
#     print("Type of weights = ", type(weights))
#     return activation(tf.matmul(ret_fts, weights))

Определим двуслойную GNN для обучения на данных Cora
fts - матрица признаков узлов
adj - матрица смежности
gnn_fn - некоторая модель GNN 
units - количество единиц, которые будет вычислять gnn в каждом узле (сколько изменений в скрытых функциях)
epochs - количество эпох обучения
lr - скорость обучения


In [8]:
def train_cora(fts, adj, gnn_fn, units, epochs, lr):
    layer_1 = tf.keras.layers.Dense(units, activation=tf.nn.relu) #скрытый слой равный числу units
    layer_2 = tf.keras.layers.Dense(7, activation=tf.nn.relu) #определяет классификацию каждого узла

    def cora_gnn(fts, adj):
        hidden = gnn_fn(fts, adj, layer_1, tf.nn.relu) #tf.nn.relu - нелинейная функция активации
        logits = gnn_fn(hidden, adj, layer_2, tf.identity)
        return logits
    
    optimazer = tf.keras.optimizers.Adam(learning_rate=lr)

    best_accuracy = 0.0
    for ep in range(epochs + 1):
        with tf.GradientTape() as tape:
            logits = cora_gnn(fts, adj)
            loss = masked_softmax_cross_entropy(logits, labels, train_mask)
        
        variables = tape.watched_variables()
        grads = tape.gradient(loss, variables)
        optimazer.apply_gradients(zip(grads, variables))

        logits = cora_gnn(fts, adj)
        val_accuracy = masked_accuracy(logits, labels, val_mask)
        test_accuracy = masked_accuracy(logits, labels, test_mask)

        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            print('Epoch', ep, '| Training loss:', loss.numpy(), '| Val accuracy:', val_accuracy.numpy(), '| Test accuracy:', test_accuracy.numpy())
    return best_accuracy

Главный вызов обучения GNN

In [9]:
a = train_cora(features_tf, adj_tf, gnn, 8, 500, 0.001)
print("Best score = ", a)

Немного изменим матрицу смежности графа, чтобы получить более устойчивую модель обучения

In [10]:
# deg = tf.reduce_sum(adj_tf, axis=-1) #матрица степеней вершин графа
# train_cora(features_tf, adj_tf / deg, gnn, 16, 500, 0.001)

Воспользуемся методом нормализации матрицы смежности, предложенную Томасом Кипфом в сетевой модели свертки графов

In [11]:
# norm_deg = tf.linalg.diag(1.0 / tf.sqrt(deg))
# norm_adj = tf.matmul(norm_deg, tf.matmul(adj_tf, norm_deg))
# train_cora(features_tf, norm_adj, gnn, 8, 500, 0.001)

Ниже будет описание и реализация генетического алгоритма

Функция инициализаци популяции. Создает список хромосом случайных весов (генов) с нормальным распределением

In [12]:
# Функция инициализации популяции
def initialize_population(size, matrix_size):
    return [np.random.randint(0, 2, (matrix_size, matrix_size)) for _ in range(size)]

def initialize_population_from_CORA(size):
    numpy_array = adj_tf.numpy()
    return [numpy_array for _ in range(size)]

Фитнесс-функция. Она будет оценивать работу нейронной сети с данными весами. На вход получает хромосому - набор весов нейросети

In [13]:
# Функция оценки приспособленности
def fitness(adjacency_matrix):
    # Здесь должна быть функция, оценивающая работу нейронной сети с данными весами
    # Например, можно использовать точность классификации или ошибку предсказания
        return np.sum(adjacency_matrix) # Пример простой функции для демонстрации

In [14]:
score = [70, 85, 90, 65, 80]
population = ['A', 'B', 'C', 'D', 'E']
k = 3

print(type(population))

# Step 1: Zip the score and population arrays
zipped_data = list(zip(score, population))

# Step 2: Sort the zipped array based on the score values
sorted_data = sorted(zipped_data, key=lambda x: x[0], reverse=True)

# Step 3: Extract the first k elements of the sorted array
top_k_elements = sorted_data[:k]

# Step 4: Unzip the sorted array to get the desired result
sorted_score, sorted_population = zip(*top_k_elements)

np_sorted_score = np.array(sorted_score)
np_sorted_population = np.array(sorted_population)
print(np_sorted_score, type(np_sorted_score))
print(np_sorted_population, type(np_sorted_population))

<class 'list'>
[90 85 80] <class 'numpy.ndarray'>
['C' 'B' 'E'] <class 'numpy.ndarray'>


Генетические операторы:
Функция селекции. Отбор лучших индивидуумов. Использует оценки фитнеса для выбора k лучших хромосом.
Функция скрещивания (кроссовера). Скрещивает двух родителей для создания двух потомков. Точка кроссовера выбирается случайно.
Функция мутации. Мутирует хромосому, изменяя каждый ген с заданной вероятностью на значение, взятое из нормального распределения.

In [15]:
# Функция селекции
def selection(population, scores, k):
    selected_indices = np.argsort(scores)[:k]
    # sorted_scores = np.sort(scores)[::-1]
    # selected_indices = sorted_scores[:k]

    return [population[i] for i in selected_indices]


# def selection_connectivity(population, scores, k):
#     # Step 1: Zip the score and population arrays
#     zipped_data = list(zip(scores, population))

#     # Step 2: Sort the zipped array based on the score values
#     sorted_data = sorted(zipped_data, key=lambda x: x[0], reverse=True)

#     # Step 3: Extract the first k elements of the sorted array
#     top_k_elements = sorted_data[:k]

#     # Step 4: Unzip the sorted array to get the desired result
#     sorted_score, sorted_population = zip(*top_k_elements)

#     np_sorted_score = list(sorted_score)
#     np_sorted_population = list(sorted_population)
#     #print(sorted_score, type(np_sorted_score))
#     #print(sorted_population, type(np_sorted_population))
#     return np_sorted_population


# Функция кроссовера
# def crossover(parent1, parent2):
#     crossover_point = random.randint(1, len(parent1) - 1)
#     child1 = np.vstack((parent1[:crossover_point], parent2[crossover_point:]))
#     child2 = np.vstack((parent2[:crossover_point], parent1[crossover_point:]))
#     return child1, child2
def crossover(parent1, parent2):
    crossover_point = random.randint(1, len(parent1) - 1)
    child1 = np.concatenate((parent1[:crossover_point], parent2[crossover_point:]))
    child2 = np.concatenate((parent2[:crossover_point], parent1[crossover_point:]))
    return child1, child2

# Функция мутации
# def mutate(adjacency_matrix, rate):
#     for i in range(len(adjacency_matrix)):
#         for j in range(len(adjacency_matrix[i])):
#             if random.random() < rate:
#                 adjacency_matrix[i][j] = 1 - adjacency_matrix[i][j]  # Flip the value
#     return adjacency_matrix
def mutate(adjacency_matrix, rate):
    mask = np.random.choice([0, 1], size=adjacency_matrix.shape, p=[1-rate, rate])
    adjacency_matrix = np.logical_xor(adjacency_matrix, mask).astype(int)
    return adjacency_matrix

Ниже представлены некоторые фитнесс функции для различных тестов ген алгоритма

In [16]:
# 1. Подключения
# Фитнес-функция может оценивать матрицу смежности по количеству связей между узлами графа. Чем больше связей, тем лучше. 

# def connectivity_fitness(adj_matrix):
#     graph = nx.from_numpy_array(adj_matrix)
#     num_components = nx.number_connected_components(graph)
#     return num_components


def connectivity_fitness(adj_matrix):
    graph = nx.from_numpy_array(adj_matrix)
    num_components = nx.number_connected_components(graph)
    return (10000 - num_components)


# 2. Диаметр
#Цель: свести к минимуму самый длинный и короткий путь между вершинами графа, что указывает на более компактную структуру.
def diameter_fitness(adj_matrix):
    # Convert adjacency matrix to a graph
    graph = nx.from_numpy_array(adj_matrix)
    # Calculate the diameter of the graph
    diameter = nx.diameter(graph)
    return diameter

# 3. Разреженность
#Цель: Поощрять разреженные графы, в которых отсутствует много ребер, что потенциально может привести к упрощению структуры.

def sparsity_fitness(adj_matrix):
    sparsity = np.count_nonzero(adj_matrix == 0) / adj_matrix.size
    return 10 - sparsity

# 4. Коэффициент кластеризации
# Цель: Максимально увеличить средний коэффициент кластеризации по графику, что указывает на более высокую степень локальной связности.

def clustering_coefficient_fitness(adj_matrix):
    # Calculate the average clustering coefficient directly from the adjacency matrix
    avg_clustering_coefficient = nx.average_clustering(nx.Graph(adj_matrix))
    return avg_clustering_coefficient

# 5. Модульность
# Цель: Максимально повысить модульность, являющуюся показателем структуры сетей, для поощрения сообществ в рамках graph.
def modularity_fitness(adj_matrix):
    # Calculate the modularity of the graph directly from the adjacency matrix
    graph = nx.Graph(adj_matrix)  # Create a graph from the adjacency matrix
    partition = community.best_partition(graph)
    modularity = community.modularity(partition, graph)
    return modularity

In [25]:
def neuro_fitness(adj_matrix):
    print(adj_matrix)
    print(adj_matrix.shape)
    new_adj_genetic = tf.constant(adj_matrix, dtype='float32')
    print(type(new_adj_genetic), new_adj_genetic, new_adj_genetic.shape)
    print("Fitness start")
    score = train_cora(features_tf, new_adj_genetic, gnn, 32, 200, 0.01)
    print("Fitness end")
    return score

Главная функция генетического алгоритма. Основная функция, которая управляет процессом эволюции. Инициализирует популяцию, затем в цикле для каждого поколения вычисляет фитнес, проводит отбор, скрещивание и мутацию, пока не будет достигнуто заданное количество поколений. Выводит лучший результат фитнесс-функции для каждого поколения.

In [26]:
# Параметры алгоритма
population_size = 4 # Размер популяции (количество хромосом в популяции)
matrix_size = adj_tf.shape[0]   # Допустим, у нас 20 весов  # Assuming a 5x5 adjacency matrix
num_generations = 10 # Количество поколений
mutation_rate = 0.01 # Вероятность мутации в каждом гене
selection_rate = 0.5 # 50% лучших переходят в следующее поколение

def genetic_algorithm():
    population = initialize_population(population_size, matrix_size)
    #population = initialize_population_from_CORA(population_size)
    for generation in range(num_generations):
        #scores = [modularity_fitness(matrix) for matrix in population]
        #scores = [sparsity_fitness(matrix) for matrix in population]
        #scores = [clustering_coefficient_fitness(matrix) for matrix in population]
        #scores = [connectivity_fitness(matrix) for matrix in population]
        #scores = [diameter_fitness(matrix) for matrix in population]
        scores = [neuro_fitness(matrix) for matrix in population]
        print("score = ", scores)
        selected = selection(population, scores, int(selection_rate * population_size))
        next_generation = []
        while len(next_generation) < population_size:
            parent1, parent2 = random.sample(selected, 2)
            child1, child2 = crossover(parent1, parent2)
            next_generation.append(mutate(child1, mutation_rate))
            #next_generation.append(child1)
            if len(next_generation) < population_size:
                next_generation.append(mutate(child2, mutation_rate))
                #next_generation.append(child2)
        population = next_generation
        print(f"Generation {generation + 1}, Best Score: {max(scores)}")
    return population

# Главная функция генетического алгоритма
# def genetic_algorithm():
#     population = initialize_population(population_size, matrix_size)
#     for generation in range(num_generations):
#         #scores = [fitness(matrix) for matrix in population] #самая простая фитнесс-функция - через сумму смежностей
#         #scores = [connectivity_fitness(matrix) for matrix in population]
#         #scores = [diameter_fitness(matrix) for matrix in population]
#         scores = [sparsity_fitness(matrix) for matrix in population]
#         selected = selection(population, scores, int(selection_rate * population_size))
#         next_generation = []
#         while len(next_generation) < population_size:
#             parent1, parent2 = random.sample(selected, 2)
#             child1, child2 = crossover(parent1, parent2)
#             next_generation.append(mutate(child1, mutation_rate))
#             if len(next_generation) < population_size:
#                 next_generation.append(mutate(child2, mutation_rate))
#         population = next_generation
#         print("population ", population)
#         print(f"Generation {generation + 1}, Best Score: {max(scores)}")
#     return population

Главный запуск генетического алгоритма

In [27]:
# Запуск генетического алгоритма
best_population = genetic_algorithm()

# Определяем самую лучшую матрицу
scores = [neuro_fitness(matrix) for matrix in best_population]
max_score = max(scores)
max_index = scores.index(max_score)
print(max_index)
best_adj_matrix_genetic = best_population[max_index]

[[0 1 0 ... 0 1 0]
 [1 0 0 ... 1 0 1]
 [0 0 1 ... 0 0 0]
 ...
 [1 1 1 ... 1 0 0]
 [0 0 0 ... 1 0 1]
 [1 1 0 ... 1 1 0]]
(2708, 2708)
<class 'tensorflow.python.framework.ops.EagerTensor'> tf.Tensor(
[[0. 1. 0. ... 0. 1. 0.]
 [1. 0. 0. ... 1. 0. 1.]
 [0. 0. 1. ... 0. 0. 0.]
 ...
 [1. 1. 1. ... 1. 0. 0.]
 [0. 0. 0. ... 1. 0. 1.]
 [1. 1. 0. ... 1. 1. 0.]], shape=(2708, 2708), dtype=float32) (2708, 2708)
Fitness start


AttributeError: module 'keras.src.activations' has no attribute 'get'

In [None]:
print(best_adj_matrix_genetic)
print(best_adj_matrix_genetic.shape)
new_adj_genetic = tf.constant(best_adj_matrix_genetic, dtype='float32')
print(type(new_adj_genetic), new_adj_genetic, new_adj_genetic.shape)

In [None]:
# Запуск нейросети на новой матрице смежности
train_cora(features_tf, new_adj_genetic, gnn, 8, 500, 0.001)

In [None]:
#train_cora(features_tf, new_adj_genetic, gnn, 32, 5000, 0.01)