In [None]:
#IF USING GOOGLE COLAB ONLY
'''
from google.colab import drive

drive.mount('/content/drive/')

%cd /content/drive/My Drive/ENSC/3A/S9/AlphaGo/ #change path
!ls
'''



In [None]:
import os
import urllib
import gzip
import json
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, Dense, Flatten, Add, Input
from tensorflow.keras.activations import linear, relu, tanh
from keras.models import Model
from keras.layers.convolutional import Conv2D
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate

import numpy as np
import random
import Goban
import keras
import cv2
from sklearn.model_selection import train_test_split


In [None]:
def get_raw_data_go():
    ''' Returns the set of samples from the local file or download it if it does not exists'''

    raw_samples_file = "samples-9x9.json.gz"

    if not os.path.isfile(raw_samples_file):
        print("File", raw_samples_file, "not found, I am downloading it...", end="")
        urllib.request.urlretrieve("https://www.labri.fr/perso/lsimon/ia-inge2/samples-9x9.json.gz", "samples-9x9.json.gz")
        print(" Done")

    with gzip.open("samples-9x9.json.gz") as fz:
        data = json.loads(fz.read().decode("utf-8"))
    return data

def fulfill_board(list_moves):
    ''' From a list of moves, fills  and returns the board according to these moves '''
    board=Goban.Board()
    list_col_letters=['A','B','C','D','E','F','G','H','I']
    for i in range(len(list_moves)):
        board.push(b.name_to_flat(list_moves[i]))
    return board


def get_all_boards_from_game(list_moves):
    ''' From a list of moves, concatenates all boards, one created for each move '''
    global_board=np.zeros([9,9,len(list_moves)])
    list_col_letters=['A','B','C','D','E','F','G','H','I']
    current_board=Goban.Board()
    for i in range(len(list_moves)):
        #sometimes the same move is present on the same board, Goban does not seem to take this into account
        try:
            current_board.push(current_board.name_to_flat(list_moves[i]))
        except KeyError:
            print('ERROR')
            break
        
        global_board[:,:,i]=current_board._board.reshape(9,9)
    return global_board



In [None]:
def generate_single_data(dataset):
    ''' From the above dataset, returns features as 4 following boards, the corresponding winrate (=value), and the move played as a probability distribution where 1= move played '''
    rand_sample=random.randint(0,len(dataset)-1)
    first_game=dataset[rand_sample]
    list_moves=first_game.get('list_of_moves')

    #let's take 4 following boards as a single sample
    nb_boards_of_interest=4

    boards=get_all_boards_from_game(list_moves)

    num_board_end=random.randint(5,len(list_moves))

    boards_interest=boards[:,:,num_board_end-nb_boards_of_interest:num_board_end] 

    white_boards=np.zeros([9,9,nb_boards_of_interest])
    black_boards=np.zeros([9,9,nb_boards_of_interest])
    j=0
    for j in range(boards_interest.shape[2]):
        random_board=boards_interest[:,:,j]
        black_board=np.zeros([9,9])

        black_board=np.where(random_board==1,1,black_board)
        black_boards[:,:,j]=black_board


        white_board=np.zeros([9,9])

        white_board=np.where(random_board==2,2,white_board)
        white_boards[:,:,j]=white_board

        j+=1
    

    #if there is an even number of moves, then the current player is the white player because he started the game
    if(num_board_end%2==0):
        player='white'
    else:
        player='black'
        
    if(player=='white'): 
        next_player=np.zeros([9,9])
        value=(first_game.get('white_wins'))/100 #getting value = winrate
    else:
        next_player=np.ones([9,9])
        value=first_game.get('black_wins')/100 #getting value=winrate

    next_move=num_board_end

    input_data=np.zeros([9,9,9]) #for the 8 first boards, we have the black boards, then the white ones
    #the last board is the current board

    input_data[:,:,0:nb_boards_of_interest]=white_boards
    input_data[:,:,nb_boards_of_interest:nb_boards_of_interest*2]=black_boards
    input_data[:,:,nb_boards_of_interest*2]=next_player

    return (input_data, value,next_move)


dataset=get_raw_data_go()

nb_samples=100000
tab_features_multiple=np.zeros([nb_samples,9,9,9])
tab_labels_value=np.zeros(nb_samples)
tab_labels_next_move=np.zeros(nb_samples)

for i in range(nb_samples):
    sample=np.zeros([9,9,9])
    value=0
    sample, value, next_move=generate_single_data(dataset)
    
    tab_features_multiple[i,:,:,:]=sample
    tab_labels_value[i]=value
    tab_labels_next_move[i] =next_move




#saving features and labels as numpy arrays
np.save('Features_multiple.npy',tab_features_multiple)
np.save('Labels_value.npy',tab_labels_value)
np.save('Labels_next_move.npy',tab_labels_next_move)

In [None]:
features=np.load('Features_multiple.npy')
labels_value=np.load('Labels_value.npy')
labels_next_move=np.load('Labels_next_move.npy')
print(features.shape)
print(labels_next_move.shape)
print(labels_value.shape)

#one hot encoding
final_label_moves=np.zeros([labels_next_move.shape[0], 82])
for i in range(final_label_moves.shape[0]):
  j=labels_next_move[i]
  j=int(j)
  final_label_moves[i,j]=1

In [None]:
#residual_tower

visible=Input(shape=(9,9,9))
conv1=Conv2D(256,(3,3), strides=(1, 1), input_shape=(9,9,9), padding='same') (visible)
norm1= BatchNormalization() (conv1)
output= Activation(relu) (norm1)

residual_tower=Model(inputs=visible, outputs=output)

In [None]:
#residual block

visible=Input(shape=residual_tower.output_shape[1:])
conv1=Conv2D(256,(3,3), strides=(1, 1), input_shape=residual_tower.output_shape[1:], padding='same') (visible)
norm1= BatchNormalization() (conv1)
act1= Activation(relu) (norm1)
conv2=Conv2D(256,(3,3), strides=(1, 1), padding='same') (act1)
norm2= BatchNormalization() (conv2)
#residual layer
act2=Activation(relu, trainable=False) (norm2)
conv3=Conv2D(256,(3,3), strides=(1, 1), padding='same') (act2)
act3=Activation(relu) (conv3)
conv4=Conv2D(256,(3,3), strides=(1, 1), padding='same') (act3)
residual=Add() ([conv4, act2])
output=Activation(relu) (residual)
residual_block=Model(inputs=visible, outputs=output)

print(residual_block.output_shape)

In [None]:
#value head
visible=Input(shape=residual_block.output_shape[1:])
conv1= Conv2D(1,(1,1), strides=(1, 1), input_shape=residual_block.output_shape[1:], padding='same') (visible)
norm1= BatchNormalization() (conv1)
act1= Activation(relu) (norm1)
flat1= Flatten() (act1)
dense1= Dense(256, activation='relu') (flat1)
output= Dense(1, activation='tanh') (dense1)

value_head=Model(inputs=visible, outputs=output)
print(value_head.output_shape)

In [None]:
#policy_head

visible=Input(shape=residual_block.output_shape[1:])
conv1=Conv2D(2,(1,1), strides=(1, 1), input_shape=residual_block.output_shape[1:], padding='same') (visible)
norm1= BatchNormalization() (conv1)
act1= Activation(relu) (norm1)
flat1= Flatten() (act1)
output=Dense(82, activation='softmax') (flat1)

policy_head=Model(inputs=visible, outputs=output)

print(policy_head.output_shape)


In [None]:
#global model with 2 outputs
visible=Input(shape=(9,9,9))
output_residual_tower=residual_tower(visible)
for i in range(19):
  if(i==0): #on prend la sortie de residual tower
    output_residual_block=residual_block(output_residual_tower)
  else:
    output_residual_block=residual_block(output_residual_block)
output_value=value_head(output_residual_block)
output_policy=policy_head(output_residual_block)
global_model=Model(inputs=visible, outputs=[output_value, output_policy])
print(global_model.output_shape)

checkpoint_path = "training_1/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 verbose=1)



global_model.compile(
#loss=my_loss_fn,
loss=['mean_squared_error', 'categorical_crossentropy'],
#loss='sparse_categorical_crossentropy',
optimizer=tf.keras.optimizers.Adam(learning_rate = 0.0001),
metrics=['accuracy']) #we have 2 distinguished outputs, so it is more relevant to check the losses evolutions



global_model.fit(features,  [labels_value, final_label_moves], batch_size=64, epochs=30, callbacks=[cp_callback])
global_model.save('models/current_model.h5')