<a href="https://colab.research.google.com/github/LKSFDS/Projeto_IA/blob/master/Projeto_Final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Este é um trabalho de reprodução de resultados de geração de grafos que contenham N vértices, com a maior quantidade possível de arestas, mas que não gere triângulos. Esse jogo tem como base o artigo 'Constructions in combinatorics via neural networks' de Adam Zsolt Wagner.

Esse artigo pode ser encontrado em: https://doi.org/10.48550/arXiv.2104.14516

O código utilizado pode ser encontrado no seguinte repositório:  https://github.com/dpaleka/cross-entropy-for-combinatorics.

Mais sobre o Deep Cross Entropy Method no seguinte repositório: https://github.com/yandexdataschool/Practical_RL/blob/master/week01_intro/deep_crossentropy_method.ipynb


In [1]:
import networkx as nx
import random
import numpy as np
from statistics import mean
import pickle
import time
import math
import matplotlib.pyplot as plt
import torch
from torch import nn
import itertools
from numba import njit

Primeiramente é preciso escolher os parâmetros que serão utilizados no algoritmo de Aprendizado por Reforço. É preciso levar em conta o valor da taxa de aprendizado. Uma taxa de aprendizado com um valor maior, pode encontrar problemas para localizar a solução otimizada.

In [2]:
N = 10   #Aqui definimos o numero vértices de um grafo
MYN = int(N*(N-1)/2) #Aqui definimos o grafo. Ele será o nosso ambiente.

LEARNING_RATE = 0.0003 #A taxa de aprendizagem, quando aumentada, converge de maneira mais rápida. Caso a aprendizagem fique presa em máximos locais, talvez seja melhor diminuir o valor.
n_sessions = 5000 #Número de sessão por cada iteração. Uma sessão corresponde a geração de uma String que representa um grafo.
percentile = 93 #top 100-X percentual do qual a IA aprende.
super_percentile = 94 #top 100-X percentual que sobrevive para ser utilizado na próxima iteração.

FIRST_LAYER_NEURONS = 32 #Numeros de neurônios contidas nas camadas escondidas.
SECOND_LAYER_NEURONS = 12
THIRD_LAYER_NEURONS = 4

n_actions = 2 #Esse é o tamanho do alfabeto.

observation_space = 2*MYN #Espaço total de observação

len_game = MYN #Número de passos de cada jogo.
state_dim = (observation_space,)

Função que irá criar uma Rede Neural Convolucional. Ela terá 3 camadas e será totalmente conectada. Mais sobre CNN no seguinte artigo: https://arxiv.org/pdf/1608.06993.pdf

Modelos pré-treinados podem ser encontrados em: https://github.com/liuzhuang13/DenseNet

In [3]:
class DenseNet(nn.Module):
    def __init__(self, widths):
        super().__init__()

        num_layers = len(widths)
        layers = [[nn.Linear(widths[i], widths[i+1]), nn.ReLU()] for i in range(num_layers-2)]
        self.layers = [nn.Flatten(1, -1),
                      *list(itertools.chain(*layers)),
                      nn.Linear(widths[-2], widths[-1]),
                      nn.Sigmoid()]

        print(self.layers)
        self.net = nn.Sequential(*self.layers)

    def forward(self, x):
        prob = self.net(x)
        return prob

model = DenseNet([2*MYN, FIRST_LAYER_NEURONS, SECOND_LAYER_NEURONS, THIRD_LAYER_NEURONS, 1])

optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

[Flatten(start_dim=1, end_dim=-1), Linear(in_features=90, out_features=32, bias=True), ReLU(), Linear(in_features=32, out_features=12, bias=True), ReLU(), Linear(in_features=12, out_features=4, bias=True), ReLU(), Linear(in_features=4, out_features=1, bias=True), Sigmoid()]


Função que constrói o ambiente utilizando a CNN préviamente gerada e define a função que receberá a String de tamanho N*(N-1)/2 como entrada e devolverá, como saída, as seguintes formulações matriciais de grafos: matriz de adjacências, lista de vertices vizinhos, lista de grau dos vértices. Os estados gerados, serão utilizados pela função Score.

In [4]:
@njit
def state_to_graph(state):

    adjMatG = np.zeros((N,N),dtype=np.int8) #Matriz adjacência
    edgeListG = np.zeros((N,N),dtype=np.int8) #Lista de Vizinhos
    Gdeg = np.zeros(N,dtype=np.int8) #Sequência de grau
    count = 0
    for i in range(N):
        for j in range(i+1,N):
            if state[count] == 1:
                adjMatG[i][j] = 1
                adjMatG[j][i] = 1
                edgeListG[i][Gdeg[i]] = j
                edgeListG[j][Gdeg[j]] = i
                Gdeg[i] += 1
                Gdeg[j] += 1
            count += 1

    return adjMatG, edgeListG, Gdeg

@njit
def score_state(state):
    return score_graph(*state_to_graph(state))

Função que ativa o modelo interativo. Essa parte de código transforma o array em uma matriz, o que permitirá que o melhor grafo de cada iteração, seja printado.

In [5]:
plt.ion()

def display_graph(adjMatG):
    print("Melhor matriz de adjacencia no passo atual:")
    print(adjMatG)

    G = nx.convert_matrix.from_numpy_array(adjMatG)

    plt.clf()
    nx.draw_circular(G)

    plt.axis('equal')
    plt.draw()
    plt.pause(0.001)
    plt.show()

Nesse trecho do código, serão geradas saídas que representam as probabilidades de inserção de uma nova aresta, ou seja, aqui é onde joga-se o jogo. Inicialmente é fornecida uma entrada que constrói, de maneira parcial, a rede. É, também, nesse trecho de código que é definido se uma nova aresta é ou não inserida no grafo, dada uma probabilidade. Por exemplo, caso haja uma saída com valor de 0.4, a probabilidade de que uma nova aresta seja inserida, será de 0.4

In [6]:
@njit
def play_game(n_sessions, actions,state_next,states,prob, step, total_score):

    for i in range(n_sessions):
        if np.random.rand() < prob[i]:
            action = 1
        else:
            action = 0
        actions[i][step-1] = action
        state_next[i] = states[i,:,step-1]

        if (action > 0):
            state_next[i][step-1] = action
        state_next[i][MYN + step-1] = 0
        if (step < MYN):
            state_next[i][MYN + step] = 1
        #É calculado o resultado final
        terminal = step == MYN
        if terminal:
            total_score[i] = score_state(state_next[i])

        # Grava-se as sessões
        if not terminal:
            states[i,:,step] = state_next[i]

    return actions, state_next,states, total_score, terminal


def generate_session(agent, n_sessions, verbose = 1):

    states =  np.zeros([n_sessions, observation_space, len_game], dtype=int)
    actions = np.zeros([n_sessions, len_game], dtype = int)
    state_next = np.zeros([n_sessions,observation_space], dtype = int)
    prob = np.zeros(n_sessions)
    states[:,MYN,0] = 1
    step = 0
    total_score = np.zeros([n_sessions])
    pred_time = 0
    play_time = 0

    while (True):
        step += 1
        tic = time.time()
        prob = agent(torch.from_numpy(states[:,:,step-1]).to(torch.float))
        prob = prob.detach().cpu().numpy()

        pred_time += time.time()-tic
        tic = time.time()
        actions, state_next, states, total_score, terminal = play_game(
                n_sessions, actions,state_next, states,prob, step, total_score)
        play_time += time.time()-tic

        if terminal:
            break
    if (verbose):
        print("Tempo previsto: "+str(pred_time)+", Tempo jogado: " + str(play_time))
    return states, actions, total_score

Nesse trecho de código começa de fato o treino da IA. Existem seleções das elites, ou seja, os melhores pares de ação-estado, dentro de um percentual pré-determinado, são selecionados para a próxima interação. Lembrando que cada iteração produz 5000 sessões, ou seja, produz 5000 cadeias de palavras, onde cada uma deles é transformada em uma matriz de adjacência representando um grafo e recebe uma pontuação da função score.

In [7]:
def select_elites(states_batch, actions_batch, rewards_batch, percentile=50):
    """
    Os estados e as ações são selecionados de todos as iterações que tenham um valor de recompensa >= ao percentual definido no argumento da função. Nesse caso, percentile = 50.
    O parâmetro states_batch é uma lista de listas de estados de uma dada sessão;
    O parâmetro actions_batch é uma lista de listas de ações de uma dada sessão;
    O parâmetro rewards_batch é uma lista de recompensas de uma dada sessão;
    :returns: essa função retorna ambas as listas com os estados e suas respectivas ações, das sessões de elite.

    """
    counter = n_sessions * (100.0 - percentile) / 100.0
    reward_threshold = np.percentile(rewards_batch,percentile)

    elite_states = []
    elite_actions = []
    elite_rewards = []
    for i in range(len(states_batch)):
        if rewards_batch[i] >= reward_threshold-0.0000001:
            if (counter > 0) or (rewards_batch[i] >= reward_threshold+0.0000001):
                for item in states_batch[i]:
                    elite_states.append(item.tolist())
                for item in actions_batch[i]:
                    elite_actions.append(item)
            counter -= 1
    elite_states = np.array(elite_states, dtype = int)
    elite_actions = np.array(elite_actions, dtype = int)
    return elite_states, elite_actions

A função select_super_sessions seleciona sessões que irão sobreviver para serem reutilizadas na próxima geração do jogo. Ela funciona de maneira muito similar à função definida acima, porém ao salvar uma construção interia, pode representar uma maior diversidade nas construções posteriores.

In [8]:
def select_super_sessions(states_batch, actions_batch, rewards_batch, percentile=90):

    counter = n_sessions * (100.0 - percentile) / 100.0
    reward_threshold = np.percentile(rewards_batch,percentile)

    super_states = []
    super_actions = []
    super_rewards = []
    for i in range(len(states_batch)):
        if rewards_batch[i] >= reward_threshold-0.0000001:
            if (counter > 0) or (rewards_batch[i] >= reward_threshold+0.0000001):
                super_states.append(states_batch[i])
                super_actions.append(actions_batch[i])
                super_rewards.append(rewards_batch[i])
                counter -= 1
    super_states = np.array(super_states, dtype = int)
    super_actions = np.array(super_actions, dtype = int)
    super_rewards = np.array(super_rewards)
    return super_states, super_actions, super_rewards


Aqui é realizado o treino da rede com os grafos construídos nas gerações passadas e que foram selecionados para sobreviverem.

In [9]:
def train_network(model, optimizer, train_loader,
                  num_epochs=1, pbar_update_interval=200, print_logs=False):

    criterion = nn.BCELoss()
    pbar = trange(num_epochs) if print_logs else range(num_epochs)

    for i in pbar:
        for k, batch_data in enumerate(train_loader):
            batch_x = batch_data[:, :-1]
            batch_y = batch_data[:, -1]
            model.zero_grad()
            y_pred = model(batch_x)
            loss = criterion(y_pred, batch_y.unsqueeze(1))
            loss.backward()
            optimizer.step()

            if print_logs and k % pbar_update_interval == 0:
                acc = (y_pred.round() == batch_y).sum().float()/(len(batch_y))
                pbar.set_postfix(loss=loss.item(), acc=acc.item())

Aqui está a Função Score. É ela quem sabe qual a regra do jogo, ou seja, essa é a definição da conjectura a ser trabalhada, dentro do código. Para o teste do maior número de aresta possíveis para N vértices, sem que haja a formação de triângulos em suas construções, a pontuação dada é simples: para cada aresta no grafo é dado um ponto e para cada triângulo encontrado é retirado um ponto. Dessa maneira a IA aprende que ela deve inserir arestas sem formar triângulos.

In [10]:
@njit
def score_graph(adjMatG, edgeListG, Gdeg):

    N = Gdeg.size

    edge_count = np.sum(Gdeg)/2
    tri_count = 0

    for i in range(N):
      for j in range(i+1,N):
        for k in range(j+1,N):
          if adjMatG[i,j]==1 and adjMatG[i,k]==1 and adjMatG[j,k]==1:
            tri_count += 1

    return edge_count - tri_count

Aqui está o main loop. Para um grafo que contenham 10 vértices, o maior valor possível de arestas é 25. O input inicial, ou seja, as entradas primeira matriz de adjacências, é dado de maneira aleatória com valores de 0 e 1.

In [None]:
super_states =  np.empty((0,len_game,observation_space), dtype = int)
super_actions = np.array([], dtype = int)
super_rewards = np.array([])

for i in range(1000000): #Numero de gerações a serem criadas. Lembrando que cada geração 5000 sessões.

    sessions = generate_session(model,n_sessions,1)

    states_batch = np.array(sessions[0], dtype = int)
    actions_batch = np.array(sessions[1], dtype = int)
    rewards_batch = np.array(sessions[2])
    states_batch = np.transpose(states_batch,axes=[0,2,1])

    states_batch = np.append(states_batch,super_states,axis=0)

    if i>0:
        actions_batch = np.append(actions_batch,np.array(super_actions),axis=0)
    rewards_batch = np.append(rewards_batch,super_rewards)


    elite_states, elite_actions = select_elites(states_batch, actions_batch, rewards_batch, percentile=percentile) #Aqui é feita a escolha de quais sessões serão utilizadas para ensinar a IA.

    super_sessions = select_super_sessions(states_batch, actions_batch, rewards_batch, percentile=super_percentile) #Aqui são escolhidas quais sessões irão sobreviver.

    super_sessions = [(super_sessions[0][i], super_sessions[1][i], super_sessions[2][i]) for i in range(len(super_sessions[2]))]
    super_sessions.sort(key=lambda super_sessions: super_sessions[2],reverse=True)


    train_data = torch.from_numpy(np.column_stack((elite_states, elite_actions)))
    train_data = train_data.to(torch.float)
    train_loader = torch.utils.data.DataLoader(train_data, shuffle=True, batch_size=32)
    train_network(model, optimizer, train_loader)


    super_states = [super_sessions[i][0] for i in range(len(super_sessions))]
    super_actions = [super_sessions[i][1] for i in range(len(super_sessions))]
    super_rewards = [super_sessions[i][2] for i in range(len(super_sessions))]

    rewards_batch.sort()
    mean_all_reward = np.mean(rewards_batch[-100:])
    mean_best_reward = np.mean(super_rewards)


    print("\n" + str(i) +  ". Os melhores individuos: " + str(np.flip(np.sort(super_rewards))[:10]))

    display_graph(state_to_graph(super_actions[0])[0])
