In [1]:
import numpy as np
import seaborn as sns
from time import time

import tensorflow as tf
from tensorflow import keras
from keras import layers, models
from keras.regularizers import L1L2
import keras.backend as K

import os
import gc
from pathlib import Path

tf.config.experimental.set_visible_devices([], 'GPU')

Using TensorFlow backend.


In [2]:
from tensorflow.keras.losses import Huber

In [3]:
init_state = np.array([
    [0, 0, 0],
    [0, 0, 0],
    [0, 0, 0]
])

In [4]:
class Game:
    def __init__(self, state, FIRST=1):
        self.state = state
        self.empty = self.make_empty(state)
        self.first_player = FIRST
        
    def make_empty(self, state):
        emp = []
        for i in range(3):
            for j in range(3):
                if state[i][j] == 0:
                    emp.append(3*i + j)
        return emp
    
    def is_lose(self):
        a = self.next_opp()
        
        for i in range(3):
            if self.state[i][0] == self.state[i][1] == self.state[i][2] == a:
                return True
            elif self.state[0][i] == self.state[1][i] == self.state[2][i] == a:
                return True
        if self.state[0][0] == self.state[1][1] == self.state[2][2] == a:
            return True
        if self.state[0][2] == self.state[1][1] == self.state[2][0] == a:
            return True
        return 0
    
    def is_draw(self):
        a = self.next_opp()
        if self.is_lose():
            return 0
        if np.all(self.state):
            return 1
        else:
            return 0
        
    def is_done(self):
        if self.is_lose() or self.is_draw():
            return 1
        else:
            return 0
        
        
    def update(self, target):
        state = self.state.copy()
        x, y = target//3, target%3
        a = self.next_opp()
        state[x][y] = a
        return Game(state)
    
    
    def next_opp(self):
        a = b = 0
        for i in range(len(self.state)):
            for j in range(len(self.state)):
                if self.state[i][j] == self.first_player:
                    a += 1
                elif self.state[i][j] != 0:
                    b += 1
                    
        if a == b:
            return self.first_player
        else:
            return 2 + min(0, 1-self.first_player)

In [5]:
class Random:
    def action(self, game):
        return np.random.choice(game.empty)

In [6]:
def playout(game):
    if game.is_lose():
        return -1

    if game.is_draw():
        return 0

    return -playout(game.update(np.random.choice(game.empty)))


def action(game):
    n_steps=100
    values = [0] * 9
    for i in range(9):
        if i in game.empty:
            for _ in range(n_steps):
                g = game.update(i)
                values[i] += playout(g)
            
    for i in range(9):
        values[i] /= n_steps
        
    return values

In [20]:
DN_FILTERS = 128  # 컨볼루션 레이어 커널 수(오리지널 256）
DN_RESIDUAL_NUM = 16  # 레지듀얼 블록 수(오리지널 19)
DN_INPUT_SHAPE = (3, 3, 2)  # 입력 셰이프
DN_OUTPUT_SIZE = 9  # 행동 수(배치 수(3*3))
    
def residual_block():
    def f(x):
        sc = x
        x = layers.Conv2D(DN_FILTERS, 3, padding='same', use_bias=False,
              kernel_initializer='he_normal', kernel_regularizer=L1L2(l2=0.0005))(x)
        x = layers.BatchNormalization()(x)
        x = layers.Activation('relu')(x)
        x = layers.Conv2D(DN_FILTERS, 3, padding='same', use_bias=False,
              kernel_initializer='he_normal', kernel_regularizer=L1L2(l2=0.0005))(x)
        x = layers.BatchNormalization()(x)
        x = layers.Add()([x, sc])
        x = layers.Activation('relu')(x)
        return x

    return f
    
def dual_network():
    # 모델 생성이 완료된 경우 처리하지 않음
#     if os.path.exists('./model/best.h5'):
#         return

    # 입력 레이어
    input = layers.Input(shape=DN_INPUT_SHAPE)

    # 컨볼루션 레이어
    x = layers.Conv2D(DN_FILTERS, 3, padding='same', use_bias=False,
              kernel_initializer='he_normal', kernel_regularizer=L1L2(l2=0.0005))(input)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    # 레지듀얼 블록 x 16
    for i in range(DN_RESIDUAL_NUM):
        x = residual_block()(x)

    # 풀링 레이어
    x = layers.GlobalAveragePooling2D()(x)
    
#     x = layers.Dense(32, activation='relu', kernel_regularizer=L1L2(l2 = 0.0001), kernel_initializer='he_normal')(x)

    # policy 출력
    p = layers.Dense(DN_OUTPUT_SIZE, kernel_regularizer=L1L2(l2=0.0005),
              activation='tanh', name='pi')(x)

    # value 출력
#     v = layers.Dense(1, kernel_regularizer=L1L2(l2=0.0005))(x)
#     v = layers.Activation('tanh', name='v')(v)

    # 모델 생성
    model = models.Model(inputs=input, outputs=p)

    model.compile(optimizer = 'adam',
                 loss = 'logcosh')

    return model

class CNN:
    def __init__(self):
        K.clear_session()
        self.model = dual_network()
        
    def action(self, game):
        res = self.predict(game)
        a = np.argmax(res)
        a = game.empty[a]

        return a

    def train(self, n=100):
        for _ in range(n):
            for i in [1, 2]:
                X = []
                y = []
                game = Game(init_state)

                while 1:
                    status = game.next_opp()
                    state = self.make_state(game)
                    values = action(game)
                    a = np.argmax(values)
                    if a not in game.empty:
                        v = values.copy()
                        while a not in game.empty:
                            v[a] = -float('inf')
                            a = np.argmax(v)
                            
                    X.append(state)
                    y.append(values)
                        
                    game = game.update(a)           
                                        
                    if game.is_done():
                        break
                        
                X = np.array(X)
                X = X.reshape(len(X), 3, 3, 2)
                y = np.reshape(y, (len(y), 9))   
                self.model.fit(X, y, verbose=0)
                
                
    def b_train(self, n=100):
        for _ in range(n):
            X = []
            y = []
            game = Game(init_state)

            while 1:
                status = game.next_opp()
                state = self.make_state(game)
                values = self.model.predict(state)[0]
                a = np.argmax(values)
                if a not in game.empty:
                    v = values.copy()
                    while a not in game.empty:
                        v[a] = -float('inf')
                        a = np.argmax(v)

                game = game.update(a)        
                
                state_next = self.make_state(game)
                if game.is_lose():
                    r = 1
                else:
                    r = 0
                    
                values[a] += 0.1 * (r - 0.95*np.max(action(game)))

                X.append(state)
                y.append(values)

                if game.is_done():
                    break
                    

            X = np.array(X)
            X = X.reshape(len(X), 3, 3, 2)
            y = np.reshape(y, (len(y), 9))   
            self.model.fit(X, y, verbose=0)
        
    def make_state(self, game):
        status = game.next_opp()
        opp = 3 - status
        a = game.state
        a1 = np.where(a==status, 1, 0)
        a2 = np.where(a==opp, 1, 0)
        res = np.array([a1, a2])
        res = res.reshape(2, 3, 3).transpose(1, 2, 0).reshape(1, 3, 3, 2)
        
        return res
    
    def predict(self, game):
        state = self.make_state(game)
        
        res = self.model.predict(state)[0]
        
        res = res[game.empty]
        
        return res 


In [21]:
dd = CNN()
gc.collect()

1828199

In [22]:
dd.train(200)
gc.collect()

492

In [None]:
# for _ in range(100):
#     print(_)
#     dd.train()

In [23]:
g = Game(np.array([
    [1, 0, 0],
    [2, 0, 0],
    [0, 0, 0]
]))

action(g)

[0.0, 0.45, 0.25, 0.0, 0.51, 0.01, -0.03, -0.03, 0.27]

In [24]:
dd.predict(g)

array([-0.9995206 , -0.82600325,  0.97238046, -0.99990404, -0.1664385 ,
       -1.        , -0.9835813 ], dtype=float32)

In [25]:
dd.action(g)

4

In [None]:
g.empty

In [14]:
def play(game, m1, m2):
    global score
    while 1:
        a1 = m1.action(game)
        game = game.update(a1)
#         print(game.state)
        if game.is_lose():
            score[0] += 1
#             print(game.state)
            return 
        elif game.is_draw():
            score[2] += 1
#             print(game.state)
            return 

        a2 = m2.action(game)
        game = game.update(a2)
#         print(game.state)
        if game.is_lose():
            score[1] += 1
#             print(game.state)
            return 
        elif game.is_draw():
            score[2] += 1
#             print(game.state)
            return 
        

In [15]:
game = Game(init_state)
m1 = Random()
# d = DQN(1)

In [26]:
%%time
score = [0, 0, 0]
for _ in range(100):
#     print(_)
    play(game, m1, dd)
print(score)
gc.collect()

score = [0, 0, 0]
for _ in range(100):
#     print(_)
    play(game, dd, m1)
print(score)

[57, 27, 16]
[19, 69, 12]
Wall time: 32.5 s


In [None]:
# sns.barplot(x = [1, 2], y = score[:2])

In [None]:
# v1: 61vs21 // 12vs81


In [17]:
dd.model.save('./btanh.h5')