In [1]:
# 連接雲端硬碟(訓練資料存在雲端)
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
def search(all, target, row, column, visited, live):
    visited.add((row, column))
    directions = [(0, -1), (0, 1), (-1, 0), (1, 0)]  # Up, down, left, right
    for dr, dc in directions:
        r, c = row + dr, column + dc
        if 0 <= r <= 18 and 0 <= c <= 18 and (r, c) not in visited:
            if all[r][c] == 0 and target[r][c] == 0:
                return True
            elif target[r][c] == 1:
                live = search(all, target, r, c, visited, live)
                if live:
                    return live
    return False

def dieCheck(all, target, row, column):
    num = 4
    directions = [(0, -1), (0, 1), (-1, 0), (1, 0)]  # Up, down, left, right
    for dr, dc in directions:
        r, c = row + dr, column + dc
        if r < 0 or r > 18 or c < 0 or c > 18 or all[r][c] == 1:
            num -= 1
    if num == 0:
        target[row][column] = 0
        return target
    else:
        num -= sum(target[row + dr][column + dc] == 1 for dr, dc in directions if 0 <= row + dr <= 18 and 0 <= column + dc <= 18)
        if num == 0:
            live = False
            visited = set()
            for dr, dc in directions:
                r, c = row + dr, column + dc
                if 0 <= r <= 18 and 0 <= c <= 18 and target[r][c] == 1 and (r, c) not in visited:
                    live = search(all, target, r, c, visited, live)
                    if live:
                        break
            if not live:
                for r, c in visited:
                    target[r][c] = 0
    return target

def twoColor(moves, previous_board=None):
    if previous_board is None:
        blackAll = np.zeros((19,19))
        whiteAll = np.zeros((19,19))
    else:
        blackAll, whiteAll = previous_board
        moves = [moves[-1]]

    for i in moves:
        column = int(i[2:4])
        row = int(i[4:6])
        color = i[0]
        if color == 'B':
            blackAll[row][column] = 1
            target = whiteAll
            all = blackAll
        else:
            whiteAll[row][column] = 1
            target = blackAll
            all = whiteAll

        arr = [0, 0, 0, 0]
        if row > 0 and target[row-1][column] == 1:
            arr[0] = 1
        if row < 18 and target[row+1][column] == 1:
            arr[1] = 1
        if column > 0 and target[row][column-1] == 1:
            arr[2] = 1
        if column < 18 and target[row][column+1] == 1:
            arr[3] = 1

        for i in range(4):
            if arr[i]==1:
                if i==0:
                    target = dieCheck(all, target=target, row=row-1, column=column)
                elif i==1:
                    target = dieCheck(all, target=target, row=row+1, column=column)
                elif i==2:
                    target = dieCheck(all, target=target, row=row, column=column-1)
                elif i==3:
                    target = dieCheck(all, target=target, row=row, column=column+1)

        if color == 'B':
            whiteAll = target
        else:
            blackAll = target

    return blackAll, whiteAll

def liberties(board):
    liberties_board = np.zeros((19,19))
    directions = [(0, -1), (0, 1), (-1, 0), (1, 0)]  # Up, down, left, right
    visited = set()
    liberties_visited = set()

    for row in range(19):
        for column in range(19):
            if board[row][column] != 0 and (row, column) not in visited:  # If there is a stone at this position
                group = [(row, column)]
                liberties_count = 0
                liberties_visited.clear()
                visited.add((row, column))

                # Find all stones in the same group
                i = 0
                while i < len(group):
                    r, c = group[i]
                    for dr, dc in directions:
                        nr, nc = r + dr, c + dc
                        if 0 <= nr <= 18 and 0 <= nc <= 18:
                            if board[nr][nc] == 0 and (nr, nc) not in liberties_visited:  # If the adjacent position is empty
                                liberties_count += 1
                                liberties_visited.add((nr, nc))
                            elif board[nr][nc] == board[row][column] and (nr, nc) not in visited:  # If the adjacent stone is the same color
                                group.append((nr, nc))
                                visited.add((nr, nc))
                    i += 1

                # Assign the liberties count to all stones in the same group
                for r, c in group:
                    liberties_board[r][c] = liberties_count
    return liberties_board

In [7]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, Flatten, Dense, Softmax, BatchNormalization, Dropout, Add
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras import regularizers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
import random
import cv2

import numpy as np
from sklearn.model_selection import train_test_split

df = open('/content/drive/MyDrive/play_style_train.csv').read().splitlines()
games = [i.split(',',2)[-1] for i in df]
game_styles = [int(i.split(',',2)[-2]) for i in df]

chars = 'abcdefghijklmnopqrs'
coordinates = {k:v for v,k in enumerate(chars)}

def prepare_input(moves):

    array = np.zeros((19,19,8))

    target_step = len(moves)
    # 前兩步
    if target_step > 2:
        array[:,:,0], array[:,:,1] = twoColor(moves[:target_step - 2])
    # 前一步
    if target_step > 1:
        array[:,:,2], array[:,:,3] = twoColor([moves[target_step - 2]], previous_board=(array[:,:,0], array[:,:,1]))
    # 目前這一步
    array[:,:,4], array[:,:,5] = twoColor([moves[target_step - 1]], previous_board=(array[:,:,2], array[:,:,3]))

    # 計算氣
    array[:,:,6] = liberties(array[:,:,4] + array[:,:,5] * 2) / 10

    target_col = int(moves[-1][2:4])
    target_row = int(moves[-1][4:6])

    # 單獨最後一步
    array[target_row, target_col,7] = 1

    return array

# Check 有多少局資料
n_games = 0
for game in games:
    n_games += 1
print(f"Total Games: {n_games}")

x = []
for game in games:
    for char in chars:
        if (len(str(coordinates[char])) == 1):
            num = '0' + str(coordinates[char])
        else:
            num = str(coordinates[char])
        game = game.replace(char, num)
    moves_list = game.split(',')
    x.append(prepare_input(moves_list))
x = np.array(x)
y = np.array(game_styles)-1

y_hot = tf.one_hot(y, depth=3)

x_train, x_val, y_train, y_val = train_test_split(x, y_hot.numpy(), test_size=0.10)
print(y_train.shape, y_val.shape)

Total Games: 26615
(23953, 3) (2662, 3)


In [5]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

# 建立residual layer的class
class basicBlock(keras.layers.Layer):

    # kernel_size卷積層大小預設為3，stride為1，filter_num代表輸出多少層filter
    # 如果經過卷積層的filter_num跟原始輸入的filter_num不一樣，下面self.downsample.add會error。
    # 將change = True讓filter_num保持一致
    def __init__(self, filter_num, stride=1, change=False, kernel_size=3):
        super(basicBlock, self).__init__()

        # 內含兩個卷積層
        # padding='same'讓輸出維持19*19*X
        # 卷積層1
        self.conv1 = keras.layers.Conv2D(filter_num, kernel_size=kernel_size, strides=stride, padding='same')

        self.bn1 = keras.layers.BatchNormalization()
        self.relu = keras.layers.Activation('relu')

        # 卷積層2
        self.conv2 = keras.layers.Conv2D(filter_num, kernel_size=kernel_size, strides=stride, padding='same')
        self.bn2 = keras.layers.BatchNormalization()

        # 未經過卷積層的input
        if change != False:
            self.downsample = keras.Sequential()
            self.downsample.add(keras.layers.Conv2D(filter_num, kernel_size=1, strides=stride, padding='same'))
        else:
            self.downsample = lambda x: x

    def call(self, inputs, training=None):

        #前向計算forward
        out = self.conv1(inputs)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)

        #inputs通過identity轉換
        identity = self.downsample(inputs)

        # f(x)+x運算
        output = keras.layers.add([out, identity])

        # 再通過relu激活函數並回傳
        output = tf.nn.relu(output)
        return output

# 多個residual layer組成的block
def build_resblock(filter_num, blocks, stride=1, change=False, kernel=3):
    Resblock = keras.Sequential()
    Resblock.add(basicBlock(filter_num=filter_num, stride=stride, change=change, kernel_size=kernel))

    #第一個之後的residual layer的stride固定是1
    for i in range(1, blocks):
        Resblock.add(basicBlock(filter_num=filter_num, stride=1, change=change, kernel_size=kernel))
    return Resblock

# 建模型
def create_model():
    inputs = keras.layers.Input(shape=(19, 19, 8))
    outputs = keras.layers.Conv2D(kernel_size=3, filters=64, strides=1, padding='same')(inputs)
    outputs = keras.layers.BatchNormalization()(outputs)
    outputs = keras.layers.Activation('relu')(outputs)

    outputs = build_resblock(filter_num=64, blocks=2, kernel=3)(outputs)
    outputs = build_resblock(filter_num=64, blocks=2, stride=1, change=True, kernel=3)(outputs)
    outputs = build_resblock(filter_num=128, blocks=2, stride=1, change=True, kernel=3)(outputs)
    outputs = keras.layers.AveragePooling2D(pool_size=(2,2))(outputs)
    outputs = build_resblock(filter_num=128, blocks=2, stride=1, change=True, kernel=3)(outputs)
    outputs = keras.layers.AveragePooling2D(pool_size=(2,2))(outputs)

    # 全連接層
    outputs = keras.layers.Flatten()(outputs)
    outputs = keras.layers.BatchNormalization()(outputs)
    outputs = keras.layers.Dense(256, activation='relu')(outputs)
    outputs = keras.layers.Dense(3, activation='softmax')(outputs)

    model = tf.keras.models.Model(inputs, outputs)

    return model

model = create_model()

# 設定optimizer，learning rate=0.001
opt = tf.keras.optimizers.Adam(learning_rate=0.001)

model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 19, 19, 8)]       0         
                                                                 
 conv2d (Conv2D)             (None, 19, 19, 64)        4672      
                                                                 
 batch_normalization (Batch  (None, 19, 19, 64)        256       
 Normalization)                                                  
                                                                 
 activation (Activation)     (None, 19, 19, 64)        0         
                                                                 
 sequential (Sequential)     (None, 19, 19, 64)        148736    
                                                                 
 sequential_1 (Sequential)   (None, 19, 19, 64)        157056    
                                                             

In [9]:
epochs = 100
batch_size = 512
batch_count = (x_train.shape[0] // batch_size) + 1

# 最小learning_rate，希望learning_rate最多下降10次
miniLR = 0.001 / 1024

# 紀錄最好的ValAccuracy
bestValAccuracy = 0.0

# 紀錄上一次的ValAccuracy
best_val_acc = 0.0
early_stop_count = 0
accuracyUptime = 0

def get_data(i):
    if (i + 1) * batch_size > x_train.shape[0]:
        x = x_train[ i * batch_size : -1]
        y = y_train[ i * batch_size : -1]
    else:
        x = x_train[ (i) * batch_size : (i+1) * batch_size ]
        y = y_train[ (i) * batch_size : (i+1) * batch_size ]

    for idx, board in enumerate(x):
        # 上下翻轉
        if random.random() > 0.5: # 50%
            x[idx] = cv2.flip(board, 0)
        # 左右翻轉
        if random.random() > 0.5: # 50%
            x[idx] = cv2.flip(board, 1)
    return x, y

history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}
# train
for epoch in range(epochs):
    print(f'-----------epoch {epoch+1}  (LR: {float(opt.learning_rate):e})-----------')

    batch_sort = random.sample([i for i in range(batch_count)], batch_count)
    for i in batch_sort:
        X_train_batch, y_train_batch = get_data(i)

        # 開始訓練
        with tf.GradientTape() as tape:
            y_pred = model(X_train_batch)
            # Compute loss.
            # one hot encoding用categorical_crossentropy計算loss
            loss = tf.keras.losses.categorical_crossentropy(y_train_batch, y_pred)
            trainable_variables = model.trainable_variables
            # 計算梯度
            gradients = tape.gradient(loss, trainable_variables)
        opt.apply_gradients(zip(gradients, trainable_variables))

    # 印loss和accuracy
    print(f'loss: {sum(loss) / len(loss)}')
    acc = tf.keras.metrics.categorical_accuracy(y_train_batch, y_pred)
    print(f'accuracy: {sum(acc) / len(acc)}')


    # 驗證模型
    history["train_loss"].append(sum(loss) / len(loss))
    history["train_acc"].append(sum(acc) / len(acc))
    y_pred = model(x_val)
    val_loss = tf.keras.losses.categorical_crossentropy(y_val, y_pred)
    val_loss = sum(val_loss) / len(val_loss)
    val_acc = tf.keras.metrics.categorical_accuracy(y_val, y_pred)
    val_acc = sum(val_acc) / len(val_acc)

    total_val_loss = float(val_loss)
    total_val_acc = float(val_acc)
    print(f'val loss: {total_val_loss}')
    print(f'val accuracy: {total_val_acc}')
    history["val_loss"].append(total_val_loss)
    history["val_acc"].append(total_val_acc)

    # draw curve
    plt.plot(history["train_acc"])
    plt.plot(history["val_acc"])
    plt.title("Accuracy")
    plt.ylabel("Accuracy")
    plt.xlabel('Epoch')
    plt.legend(['Train','Val'])
    plt.savefig(f'./accuracy.png')
    plt.clf()
    # draw curve
    plt.plot(history["train_loss"])
    plt.plot(history["val_loss"])
    plt.title("Loss")
    plt.ylabel("Loss")
    plt.xlabel('Epoch')
    plt.legend(['Train','Val'])
    plt.savefig(f'./loss.png')
    plt.clf()

    model.save_weights(f'./last.h5')

    # val_accuracy比上一次好就存檔
    if (bestValAccuracy < total_val_acc):
        bestValAccuracy = total_val_acc
        model.save_weights(f'./best.h5')
        print('weight saved')

    if ((epoch + 1) % 5 == 0):
        model.save_weights(f'./epoch{epoch+1}.h5')
        print('weight saved')

    # 連續三次val_accuracy沒有上漲，learning_rate就乘1/2
    if(best_val_acc <= total_val_acc):
        accuracyUptime = 0
        early_stop_count = 0
    else:
        accuracyUptime +=1
        early_stop_count += 1
        if (accuracyUptime == 3 and opt.learning_rate > miniLR):
            opt.learning_rate = (opt.learning_rate)/2
            print(f'learning rate reduced to {float(opt.learning_rate):e}')
            # 下次learning rate下降是五(提高條件)
            accuracyUptime = -2
        if early_stop_count == 7:
            print("-------Early stop-------")
            break

    # 更新 best_val_acc
    if total_val_acc > best_val_acc:
        best_val_acc = total_val_acc

-----------epoch 1  (LR: 1.000000e-03)-----------




loss: 1.0769071578979492
accuracy: 0.37890625
val loss: 1.0772584676742554
val accuracy: 0.39631855487823486
weight saved
-----------epoch 2  (LR: 1.000000e-03)-----------
loss: 1.0690579414367676
accuracy: 0.357421875
val loss: 1.0701448917388916
val accuracy: 0.40533432364463806
weight saved
-----------epoch 3  (LR: 1.000000e-03)-----------
loss: 0.9140371680259705
accuracy: 0.55859375
val loss: 0.9473146796226501
val accuracy: 0.5454545617103577
weight saved
-----------epoch 4  (LR: 1.000000e-03)-----------
loss: 0.8929985165596008
accuracy: 0.587890625
val loss: 0.9123886227607727
val accuracy: 0.5882794857025146
weight saved
-----------epoch 5  (LR: 1.000000e-03)-----------
loss: 0.9204666018486023
accuracy: 0.58984375
val loss: 0.9261883497238159
val accuracy: 0.5800150036811829
weight saved
-----------epoch 6  (LR: 1.000000e-03)-----------
loss: 0.8252468705177307
accuracy: 0.673828125
val loss: 0.832013726234436
val accuracy: 0.6401202082633972
weight saved
-----------epoch 7  

<Figure size 640x480 with 0 Axes>