<a href="https://colab.research.google.com/github/TheoBoyer/AI-Connect4/blob/master/Power4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
 %tensorflow_version 2.x
!pip install pyyaml h5py
import tensorflow as tf

import matplotlib.pyplot as plt
import numpy as np

from random import randint, random
from sklearn.utils import shuffle

from os import system
from copy import deepcopy

print(tf.__version__)

TensorFlow 2.x selected.
2.1.0-rc1


#**Jeu du puissance 4**

In [0]:
# Utils function
class Power4:
  @staticmethod
  def get_diags(array, n=4):
      w, h = array.shape
      l_max = min(w,h)
      for i in range(n-1, w+h-n):
          n_arr = []
          for j in range(i+1):
              if j<h and i-j <w:
                  n_arr.append((i-j, j))
          yield np.array(list(map(lambda x:array[x], n_arr)))

  @staticmethod
  def alignement_4(array):
      for i in range(len(array)-3):
          p_sum = array[i:i+4].sum()
          if abs(p_sum)==4:
              return int(p_sum/abs(p_sum))
      return 0

  @staticmethod
  def who_play(grid):
    return [1, -1][grid.sum()]

  @staticmethod
  def get_playing_mask(grid):
    return (np.abs(grid).sum(axis=1) < 6).astype(np.uint8)

  @staticmethod
  def get_winner(grid):
    for c in grid:
        p_winner = Power4.alignement_4(c)
        if p_winner!=0:
            return p_winner
    for c in grid.T:
        p_winner = Power4.alignement_4(c)
        if p_winner!=0:
            return p_winner
    for c in Power4.get_diags(grid):
        p_winner = Power4.alignement_4(c)
        if p_winner!=0:
            return p_winner
    for c in Power4.get_diags(np.flip(grid, 0)):
        p_winner = Power4.alignement_4(c)
        if p_winner!=0:
            return p_winner
    return 0
    
  @staticmethod
  def get_state(grid):
    return np.append((grid==1).reshape((6, 7, 1)), (grid==-1).reshape((6, 7, 1)), axis=2).astype(np.uint8)

In [0]:
class Power4GameWrapper:
    def __init__(self, grid=None):
      if grid is not None:
        self.grid = grid
      else:
        self.grid = np.zeros((7,6)).astype(np.int8)

    def who_play(self):
        return Power4.who_play(self.grid)

    def play(self, move):
        assert self.get_winner()==0, "Un joueur a déjà gagné"
        assert move in range(7), "'move' should be an integer between 0 and 7"
        stack = np.abs(self.grid[move]).sum()
        assert stack < 6, "This column is filled !"
        self.grid[move, stack] = self.who_play()
        return self.get_winner()

    def get_playing_mask(self):
      return Power4.get_playing_mask(self.grid)

    def get_winner(self):
        return Power4.get_winner(self.grid)

    def get_state(self):
      return Power4.get_state(self.grid)

    def __str__(self):
        #print(self.grid)
        grid = ""
        for c in reversed(self.grid.T):
            grid += "|"
            for l in c:
                grid+="{}|".format(["X", " ", "O"][l+1])
            grid+="\n"
        grid += ' '
        for i in range(7):
            grid += '{} '.format(i)
        grid += "\nNext Move: {}\n".format(['X', 'O'][min(1, self.who_play()+1)])
        return grid

# **Agent**

In [0]:
# Agent constants
INITIAL_CONV = 64
RES_LAYERS = 5
OUTPUT_POLICY_SIZE = 7

In [0]:
class ResnetIdentityBlock(tf.keras.Model):
  def __init__(self, kernel_size, filters):
    super(ResnetIdentityBlock, self).__init__(name='')
    filters1, filters2, filters3 = filters

    self.conv2a = tf.keras.layers.Conv2D(filters1, (1, 1))
    self.bn2a = tf.keras.layers.BatchNormalization()

    self.conv2b = tf.keras.layers.Conv2D(filters2, kernel_size, padding='same')
    self.bn2b = tf.keras.layers.BatchNormalization()

    self.conv2c = tf.keras.layers.Conv2D(filters3, (1, 1))
    self.bn2c = tf.keras.layers.BatchNormalization()

  def call(self, input_tensor, training=False):
    x = self.conv2a(input_tensor)
    x = self.bn2a(x, training=training)
    x = tf.nn.relu(x)

    x = self.conv2b(x)
    x = self.bn2b(x, training=training)
    x = tf.nn.relu(x)

    x = self.conv2c(x)
    x = self.bn2c(x, training=training)

    x += input_tensor
    return tf.nn.relu(x)


def get_model(input_shape):
  t_input = tf.keras.layers.Input(input_shape)
  x = tf.keras.layers.Conv2D(INITIAL_CONV, 3, padding='same')(t_input)
  x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.activations.relu(x)
  for _ in range(RES_LAYERS):
    x = ResnetIdentityBlock(3, (INITIAL_CONV, INITIAL_CONV, INITIAL_CONV))(x)

  x_feat = x

  x = tf.keras.layers.Conv2D(INITIAL_CONV, 3)(x_feat)
  x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.activations.relu(x)
  x = tf.keras.layers.Flatten()(x)
  x = tf.keras.layers.Dense(OUTPUT_POLICY_SIZE, use_bias=False)(x)
  policy_output = tf.keras.activations.softmax(x)

  x = tf.keras.layers.Conv2D(INITIAL_CONV, 3)(x_feat)
  x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.activations.relu(x)
  x = tf.keras.layers.Flatten()(x)
  x = tf.keras.layers.Dense(1, use_bias=False)(x)
  value_output = tf.keras.activations.tanh(x)

  return tf.keras.models.Model(inputs=t_input, outputs=[policy_output, value_output])

def build_model():
  return get_model((6, 7, 2))

build_model().summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 6, 7, 2)]    0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 6, 7, 64)     1216        input_1[0][0]                    
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 6, 7, 64)     256         conv2d[0][0]                     
__________________________________________________________________________________________________
tf_op_layer_Relu (TensorFlowOpL [(None, 6, 7, 64)]   0           batch_normalization[0][0]        
______________________________________________________________________________________________

In [0]:
def discount(rewards, rate=0.91):
    """
    Discount the givens rewards

    Args:
        rewards: 1d list of rewards
        rate: discount rate
    
    Return:
        1d array containing the discounted rewards
    """
    discounted_r = [0 for _ in range(len(rewards))]
    current_r = 0
    for idx, r in enumerate(reversed(rewards)):
        current_r = current_r * rate + r
        discounted_r[len(rewards) - idx - 1] = current_r
    return discounted_r

def normalize(arr):
    """
    Normalize the given array

    Args:
        arr: array of numbers
    
    Return:
        array normalized
    """
    arr -= arr.mean()
    arr /= arr.std()
    return arr

In [0]:
class MonteCarloTreeSearch:
  def __init__(self, estimator, Environment):
    self.estimator = estimator
    self.Environment = Environment

  def getValues(self, state, nEstimations=1600):
    prediction = self.estimator.predict(np.expand_dims(state, axis=0))
    print(prediction)
    raise aaa

class Buffer:
  def __init__(self):
    self.states = []
    self.actions = []
    self.rewards = []

  def push(states, actions, rewards):
    self.states += states
    self.actions += actions
    self.rewards += discount(rewards)

  def grab(self):
    data = (shuffle(
        np.array(self.states),
        np.array(self.actions),
        normalize(np.array(self.rewards))
    ))
    self.states = []
    self.actions = []
    self.rewards = []
    return data

class Agent:
  def __init__(self, weights_path=None):
    self.model = build_model()
    if weights_path is not None:
      self.model.load_weights(weigths_path)
    self.buffer = Buffer()
    self.mcts = MonteCarloTreeSearch(self.model, Power4)

  def play_move(self, game):
    values = self.mcts.getValues(game.grid)
    return randint(0, 6)

class Human:
  def get_move(self):
    try:
        move = int(input())
        assert move in range(7)
        return move
    except Exception as e:
        print(e)
        return self.get_move()

  def play_move(self, game):
    print(game)
    move = self.get_move()
    try:
        winner = game.play(move)
        return winner
    except Exception as e:
        print(e)
        return self.play_move()


In [0]:
def play_game(p1, p2):
    players = [p1, p2]
    idx_playing = 0
    winner = 0
    g = Power4GameWrapper()
    while winner==0:
      winner = players[idx_playing].play_move(g)
      idx_playing = 1 - idx_playing


In [0]:
play_game(Human(), Agent())

| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
 0 1 2 3 4 5 6 
Next Move: O

0
[array([[0.13768716, 0.1481444 , 0.14161916, 0.14173415, 0.14370105,
        0.14421165, 0.1429024 ]], dtype=float32), array([[0.02381852]], dtype=float32)]


NameError: ignored