# Tensorflowを用いた強化学習の実装

機械学習用ライブラリTensorflowを用いて、Pythonで強化学習を実装します。<br>
今回のハンズオンでは、五目並べんお実装とモンテカルロ法によって自己対戦で強化学習をさせます。

In [1]:
import tensorflow as tf
import numpy as np
import random as rd
import matplotlib.pyplot as plt
import matplotlib.animation as anm
from PIL import Image, ImageDraw
import copy
import os
import shutil

  from ._conv import register_converters as _register_converters


In [2]:
size = 8

# 五目並べのプログラム
class Game:
    # コンストラクタ
    def __init__(self):
        self.size = size
        self.square = [[0 for _ in range(self.size)] for _ in range(self.size)]
        self.turn = 1
    # 石を置く
    def put(self, row, column):
        if 0 <= row < self.size and 0 <= column < self.size:
            self.square[row][column] = self.turn
        self.turn = -1 if self.turn is 1 else 1
    # 石がおけるかどうか
    def putable(self, row, column):
        if 0 <= row < self.size and 0 <= column < self.size:
            return self.square[row][column] is 0
        else:
            return 0
    # ゲーム終了判定(colorの勝ち)
    def end_game(self, color):
        direction = [[-1, -1], [-1, 0], [-1, 1], [0, -1], [0, 1], [1, -1], [1, 0], [-1 ,-1]]
        for i in range(self.size):
            for j in range(self.size):
                for d in direction:
                    if self.fives(color, i, j, d):
                        return 1
        return 0
    # 五目がつながっている判定
    def fives(self, color, i, j, d):
        number = 0
        while 0 <= i < self.size and 0 <= j < self.size and self.square[i][j] is color:
            number += 1
            i += d[0]
            j += d[1]
        if number >= 5:
            return 1
        else:
            return 0
    # 盤面を表示
    def iout(self):
        img = np.asarray([[[0, 256, 0] if i is 0 else [0, 0, 0] if i is 1 else [256, 256, 256] for i in l] for l in self.square])
        plt.imshow(img)
        plt.show()
        
    # ランダムの着手
    def rand_put(self):
        i, j = -1, -1
        while (not 0 <= i < self.size) or (not 0 <= j < self.size) or not self.putable(i, j):
            i, j = rd.randrange(self.size), rd.randrange(self.size)
        self.put(i, j)
    
    # 次のありえるすべての盤面
    def next_nodes(self):
        n = []
        for i in range(self.size):
            for j in range(self.size):
                if self.putable(i, j):
                    n.append(copy.deepcopy(self))
                    n[-1].put(i, j)
        return n
    
    # 入力による着手
    def input_put(self):
        i, j = -1, -1
        while (not 0 <= i < self.size) or (not 0 <= j < self.size) or not self.putable(i, j):
            i, j = map(int, input('input (row, column)').split())
        self.put(i, j)

In [3]:
def cnn(x):
    x_image = tf.reshape(x, [-1, size, size, 1])    # [None, size, size, 1]
    conv1 = tf.layers.conv2d(x_image, 128, (3,3), padding='same', activation=tf.nn.relu)    # [None, size, size, 128]
    pool1 = tf.layers.max_pooling2d(conv1, (2,2), (2,2))    # [None, size/2, size/2, 512]
    conv2 = tf.layers.conv2d(pool1, 256, (3,3), padding='same', activation=tf.nn.relu)    # [None, size/2, size/2, 256]
    pool2 = tf.layers.max_pooling2d(conv2, (2,2), (2,2))    # [None, size/4, size/4, 512]
    pool2_flat = tf.layers.flatten(pool2)    # [None, size/4*size/4*256 = size*size*16]
    dense1 = tf.layers.dense(pool2_flat, 64, activation=tf.nn.relu)
    y = tf.layers.dense(dense1, 1)
    return y

In [4]:
def best_game(g, model):
    next_node = g.next_nodes()
    next_values = []
    for node in next_node:
        # 着手の評価値を予測
        if node.end_game(node.turn):
            next_values.append(float('inf'))
            break
        next_values.append((-1 if g.turn == 1 else 1) * model.out([node.square])[0])
    # 確率的最適手
    g = next_node[next_values.index(max(next_values))]
    return g

def best_game_p(g, model):
    if rd.random() < 0.005:
        g.randput()
        return g
    next_node = g.next_nodes()
    next_values = []
    for node in next_node:
        # 着手の評価値を予測
        if node.end_game(node.turn):
            next_values.append(float('inf'))
            break
        next_values.append(100**((-1 if g.turn == 1 else 1) * model.out([node.square])[0]))
    sum_value = sum(next_values)
    # 確率的最適手
    p_value, a_value = 0, rd.random()
    index = 0
    for ind, v in enumerate(next_values):
        p_value += v / sum_value
        if p_value >= a_value:
            index = ind
            break
    g = next_node[index]
    return g

In [5]:
class tf_model:
    def __init__(self, size=8):
        tf.reset_default_graph()
        self.x = tf.placeholder(tf.float32, (None, size, size))
        self.t = tf.placeholder(tf.float32, (None, 1))
        self.y = cnn(self.x)
        self.trail = 0
        self.model_path = './model/'
        self.logs_path = './log/'
        self.saver = tf.train.Saver()
        if not os.path.exists(self.model_path):
            os.mkdir(model_path)
        else:
            self.load()
        self.cost = tf.reduce_mean(tf.square(self.y - self.t))
        self.optimizer = tf.train.AdamOptimizer(0.001).minimize(self.cost)
        self.train_summary_loss = tf.summary.scalar('train_loss', self.cost)
        self.saver = tf.train.Saver()
        self.summary_writer = tf.summary.FileWriter(self.logs_path, graph=tf.get_default_graph())
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())
    
    def load(self):
        self.saver.restore(self.sess, self.model_path)
    
    def new_logs(self):
        if os.path.exists(self.logs_path):
            shutil.rmtree(self.logs_path)
        os.mkdir(self.logs_path)
    
    # モデル予測値の出力
    def out(self, X):
        return self.y.eval(feed_dict={self.x: X}, session=self.sess)
    
    # 最適化
    def optimize(self, X, T):
        _, cost, summary_loss = self.sess.run([self.optimizer, self.cost, self.train_summary_loss], feed_dict={sefl.x:X, self.t:T})
        # ログの保存
        summary_writer.add_summary(summary_loss, self.trail)
        self.trail += 1
        # モデルの保存
        self.saver.save(self.sess, self.model_path)

In [6]:
import subprocess
subprocess.run(['jupyter', 'nbconvert', '--to', 'python', 'gomoku.ipynb'])

CompletedProcess(args=['jupyter', 'nbconvert', '--to', 'python', 'gomoku.ipynb'], returncode=0)