In [1]:
import numpy as np
import random
import time
from collections import namedtuple
from tensorflow import keras
import Goban
import mctsPlayerNN

# Replay-Memory

On définit ici une classe permettant de stocker les actions effectuées par l'agent ainsi que les rewards obtenues, afin d'entrainer le réseau de neurones par la suite.

In [2]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Fonction d'augmentation des données

In [3]:
def symetries_rotations(x):
    # input_shape = (9,9,k)
    new = list()
    new.append(x)
    new.append(np.flipud(new[-1]))
    new.append(np.rot90(new[-2]))
    new.append(np.flipud(new[-1]))
    new.append(np.rot90(new[-2]))
    new.append(np.flipud(new[-1]))
    new.append(np.rot90(new[-2]))
    new.append(np.flipud(new[-1]))
    return new

def data_augmentation(states, actions, next_states, rewards):
    augmented_states = list()
    augmented_actions = list()
    augmented_next_states = list()
    augmented_rewards = list()
    
    for i in range(len(states)):
        augmented_states += symetries_rotations(states[i])
        augmented_next_states += symetries_rotations(next_states[i])
        augmented_actions += [actions[i]] * 8
        augmented_rewards += [rewards[i]] * 8
        
    return augmented_states, augmented_actions, augmented_next_states, augmented_rewards      

# Fonction qui convertit un Goban.Board en numpy.array

In [4]:
def board_encoding(board, liberties=0):
    boards = np.zeros((9,9,3+liberties))
    for x in range(9):
        for y in range(9):
            c = board._board[board.flatten((x,y))]
            if c == board._BLACK:
                boards[x,y,0] = 1
            elif c == board._WHITE:
                boards[x,y,1] = 1
            if liberties > 0:
                l = min(board._stringLiberties[board.flatten((x,y))], liberties-1)
                boards[x,y,l+2] = 1
    if board._nextPlayer != board._BLACK:
        boards[:,:,-1] = 1
    return boards

# Fonction qui réalise une partie

In [5]:
def play():
    player1 = mctsPlayerNN.myPlayer()
    player2 = mctsPlayerNN.myPlayer()

    player1.newGame(Goban.Board._BLACK)
    player2.newGame(Goban.Board._WHITE)
    players = [player1, player2]

    b = Goban.Board()
    nextplayer = 0
    nextplayercolor = Goban.Board._BLACK

    states = list()
    next_states = list()
    actions = list()

    while not b.is_game_over():
        # save the board as state
        states.append(board_encoding(b, liberties=3))
        legals = b.legal_moves()
        otherplayer = (nextplayer + 1) % 2
        othercolor = Goban.Board.flip(nextplayercolor)
        move = players[nextplayer].getPlayerMove()
        # save the move as chosen action
        actions.append(Goban.Board.name_to_flat(move))
        if not Goban.Board.name_to_flat(move) in legals:
            # illegal move
            return None, None, None, None
        b.push(Goban.Board.name_to_flat(move))
        next_states.append(board_encoding(b, liberties=3))
        players[otherplayer].playOpponentMove(move)
        nextplayer = otherplayer
        nextplayercolor = othercolor

    result = b.result()
    if result == "1-0": winner = 1
    elif result == "0-1": winner = 0
    else: winner = -1

    # give rewards
    rewards = [(-1)**(n+winner) for n in range(len(actions))] if winner != -1 else [0]*len(actions)
    
    return states, actions, next_states, rewards

# Apprentissage du Réseau de Neurones

On sélectionne un batch de manière aléatoire depuis la mémoire, puis on entraine le réseau à prédire avec les rewards associés.

In [6]:
BATCH_SIZE = 64
GAMMA = 0.99
model_priors = keras.models.load_model('model/model_priors.h5')
model_values = keras.models.load_model('model/model_values.h5')

def optimize_model(memory):
    if len(memory) < BATCH_SIZE:
        return
    batch = memory.sample(BATCH_SIZE)

    train_state = list()
    train_priors = list()
    train_values = list()
    for state, action, next_state, reward in batch:
        priors = model_priors.predict(np.expand_dims(state, axis=0))[0]
        next_priors = model_priors.predict(np.expand_dims(next_state, axis=0))[0]
        priors[action] = reward + GAMMA * np.amax(next_priors)
        train_state.append(state)
        train_priors.append(priors)
        train_values.append(reward)
        
    train_state = np.array(train_state)
    train_priors = np.array(train_priors)
    train_values = np.array(train_values)
        
    model_priors.fit(train_state, train_priors, epochs=1, verbose=0)
    model_values.fit(train_state, train_values, epochs=1, verbose=0)
    # les modèles sont sauvés pour être utilisés par les joueurs suivants
    model_priors.save('model/model_priors.h5')
    model_values.save('model/model_values.h5')

# Boucle d'entrainement

In [7]:
# hyperparamètres
N_EPISODES = 10
memory = ReplayMemory(5000)

In [8]:
for i in range(N_EPISODES):
    start = time.time()
    # faire une partie entre deux joueurs
    print(f"Episode {i}: playing...", end=" ")
    states, actions, next_states, rewards = play()
    if states is None: 
        print("error! (illegal move)")
        continue
    else: 
        print(f"done ({len(states)} states)")
        
        # augmenter les données
        print(f"Episode {i}: data augmentation...", end=" ")
        states, actions, next_states, rewards = data_augmentation(states, actions, next_states, rewards)
        print(f"done ({len(states)} states)")
    
        # stocker en mémoire les plateaux/actions/récompenses obtenus lors de la partie
        for state, action, next_state, reward in zip(states, actions, next_states, rewards):
            memory.push(state, action, next_state, reward)
        
    # mettre à jour les réseaux de neurones
    print(f"Episode {i}: optimizing models...")
    optimize_model(memory)
    
    print(f"Episode {i} complete! ({round(time.time() - start)}s)")
    print("-------------------------------------")

Episode 0: playing... done (158 states)
Episode 0: data augmentation... done (1264 states)
Episode 0: optimizing models...
Episode 0 complete! (579s)
-------------------------------------
Episode 1: playing... done (72 states)
Episode 1: data augmentation... done (576 states)
Episode 1: optimizing models...
Episode 1 complete! (287s)
-------------------------------------
Episode 2: playing... done (108 states)
Episode 2: data augmentation... done (864 states)
Episode 2: optimizing models...
Episode 2 complete! (414s)
-------------------------------------
Episode 3: playing... done (74 states)
Episode 3: data augmentation... done (592 states)
Episode 3: optimizing models...
Episode 3 complete! (295s)
-------------------------------------
Episode 4: playing... done (136 states)
Episode 4: data augmentation... done (1088 states)
Episode 4: optimizing models...
Episode 4 complete! (486s)
-------------------------------------
Episode 5: playing... done (89 states)
Episode 5: data augmentati