In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
from numpy.random.mtrand import randint
import torch
from torch import nn
import numpy as np
import gzip
import json
import os
import urllib
import gdrive.MyDrive.AlphaGo.Goban as Goban
import torch.optim as optim
from gdrive.MyDrive.AlphaGo.Goban import Board

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device.type

'cuda'

In [None]:
!ls gdrive/MyDrive/AlphaGo

CNN.ipynb  Goban.py  model_weights2.pth  model_weights.pth  __pycache__


In [None]:
def get_raw_data_go():
    ''' Returns the set of samples from the local file or download it if it does not exists'''

    raw_samples_file = "samples-9x9.json.gz"

    if not os.path.isfile(raw_samples_file):
        print("File", raw_samples_file, "not found, I am downloading it...", end="")
        urllib.request.urlretrieve("https://www.labri.fr/perso/lsimon/ia-inge2/samples-9x9.json.gz", "samples-9x9.json.gz")
        print(" Done")

    with gzip.open("samples-9x9.json.gz") as fz:
        data = json.loads(fz.read().decode("utf-8"))
    return data

In [None]:
class ConvolutionBlock(nn.Module):
    def __init__(self):
        super(ConvolutionBlock, self).__init__()
        self.conv = nn.Conv2d(9, 9,kernel_size=5, padding='same')
        self.bn = nn.BatchNorm2d(9)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        conv = self.conv(x)
        conv += x
        x1 = self.bn(conv)
        return self.relu(x1)

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.res1 = ConvolutionBlock()
        self.res2 = ConvolutionBlock()
        self.res3 = ConvolutionBlock()
        self.res4 = ConvolutionBlock()
        self.res5 = ConvolutionBlock()
        self.linear1 = nn.Linear(9 * 81 * 2, 82)
        self.linear2 = nn.Linear(9 * 81 * 2, 1)
        self.sigmoid = nn.Sigmoid()
        self.softmax = nn.Softmax(0)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.25)

    def forward(self, x):
        x = self.res1(x)
        x = self.res2(x)
        x = self.res3(x)
        x = self.res4(x)
        x = self.res5(x)
        x = torch.flatten(x)
        p = self.softmax(self.dropout1(self.linear1(x.clone())))
        v = self.sigmoid(self.dropout2(self.linear2(x.clone())))
        
        return (p,v)


In [None]:
def name_to_coord(s):
    assert s != "PASS"
    indexLetters = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4, 'F':5, 'G':6, 'H':7, 'J':8}

    col = indexLetters[s[0]]
    lin = int(s[1:]) - 1
    return col, lin

def rotation(board):
    L = []
    L.append(np.reshape(board, 81))
    for i in range(1, 4):
        rotation = np.rot90(board, i)
        L.append(np.reshape(rotation, 81))
    return L

def symmetry(board):
    L = rotation(np.reshape(board, (9,9))) + rotation(np.transpose(np.reshape(board, (9,9))))
    return L

def coord_to_board(black_moves, white_moves):
    board = [[0. for i in range(9)] for j in range(9)]
    for x,y in black_moves:
        board[x][y] = 1.
    for x,y in white_moves:
        board[x][y] = -1.
    return board

In [None]:
def get_data(data):
    D = []
    j = 0
    length = len(data)
    for game in data:
        print(j, "/", length)
        j += 1
        if j == 1000:
            break
        if game['depth'] < 9:
            continue
        board_black_list = []
        board_white_list = []
        board = Goban.Board()
        err = False
        for m in game['list_of_moves']:
            if m == 'PASS':
                move = -1
            else:
                (x,y) = name_to_coord(m)
                move = y * board._BOARDSIZE + x
            try:
                board.push(move)
            except:
                err = True
                break
            
            board_black = board._board.copy()
            board_white = board._board.copy()
            for i in range(board._BOARDSIZE ** 2):
                if board_black[i] == 2:
                    board_black[i] = 0
                if board_white[i] == 1:
                    board_white[i] = 0
                if board_white[i] == 2:
                    board_white[i] = 1 
            board_black_list.append(np.array(board_black))
            board_white_list.append(np.array(board_white))
        if err:
            continue
        hist_black = board_black_list[-9:-1]
        hist_white = board_black_list[-9:-1]
        hist_black_s = [symmetry(board) for board in hist_black]
        hist_white_s = [symmetry(board) for board in hist_white]
        for sym in range(8):
            hist = [np.array([hist_black_s[i][sym], hist_white_s[i][sym]]) for i in range(8)]
            v = None
            if (game['depth'] - 1) %2 == 0:
                hist.append(np.array([np.ones(board._BOARDSIZE ** 2, dtype=np.uint8), np.ones(board._BOARDSIZE ** 2, dtype=np.uint8)])) #BLACK to 1
                v = int(game['black_wins']) / 100
            else:
                hist.append(np.array([np.zeros(board._BOARDSIZE ** 2, dtype=np.uint8), np.zeros(board._BOARDSIZE ** 2, dtype=np.uint8)])) #WHITE to 0
                v = int(game['white_wins']) / 100
            hist = np.array(hist) 
            m = game['list_of_moves'][-1]
            if m == 'PASS':
                move = board._BOARDSIZE ** 2
            else:
                (x,y) = name_to_coord(m)
                move = y * board._BOARDSIZE + x
            p = np.zeros(board._BOARDSIZE ** 2 + 1, dtype=np.uint8)
            p[move] = 1

            d = torch.reshape(torch.from_numpy(hist), (1,9,2,81)).type(torch.float32)
            D.append((d.to(device),torch.tensor(p, dtype=torch.float32).to(device),torch.tensor(v, dtype=torch.float32).to(device)))
    return D

In [None]:
data = get_raw_data_go()
D = get_data(data)

File samples-9x9.json.gz not found, I am downloading it... Done
0 / 41563
1 / 41563
2 / 41563
3 / 41563
4 / 41563
5 / 41563
6 / 41563
7 / 41563
8 / 41563
9 / 41563
10 / 41563
11 / 41563
12 / 41563
13 / 41563
14 / 41563
15 / 41563
16 / 41563
17 / 41563
18 / 41563
19 / 41563
20 / 41563
21 / 41563
22 / 41563
23 / 41563
24 / 41563
25 / 41563
26 / 41563
27 / 41563
28 / 41563
29 / 41563
30 / 41563
31 / 41563
32 / 41563
33 / 41563
34 / 41563
35 / 41563
36 / 41563
37 / 41563
38 / 41563
39 / 41563
40 / 41563
41 / 41563
42 / 41563
43 / 41563
44 / 41563
45 / 41563
46 / 41563
47 / 41563
48 / 41563
49 / 41563
50 / 41563
51 / 41563
52 / 41563
53 / 41563
54 / 41563
55 / 41563
56 / 41563
57 / 41563
58 / 41563
59 / 41563
60 / 41563
61 / 41563
62 / 41563
63 / 41563
64 / 41563
65 / 41563
66 / 41563
67 / 41563
68 / 41563
69 / 41563
70 / 41563
71 / 41563
72 / 41563
73 / 41563
74 / 41563
75 / 41563
76 / 41563
77 / 41563
78 / 41563
79 / 41563
80 / 41563
81 / 41563
82 / 41563
83 / 41563
84 / 41563
85 / 41563


In [None]:
model = NeuralNetwork().to(device)
bacth_size = 256
def train():
    criterion = nn.L1Loss()
    #optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.5)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    length = len(D)
    for step in range(10):
        i = 0
        step_loss = None
        while i < length: 
            optimizer.zero_grad()
            total_loss = None
            for b in range(bacth_size):
                if i + b == length:
                    break
                (d, p, v) = D[i + b]
                p_model, v_model = model(d.clone().detach().requires_grad_(True))
                loss = criterion(p_model, p)
                #loss /= (Board._BOARDSIZE ** 2 + 1)
                loss += torch.abs(v - v_model[0])
                if total_loss is None:
                    total_loss = loss
                else:
                    total_loss += loss
            total_loss /= b
            if step_loss is None:
                step_loss = total_loss
            else:
                step_loss += total_loss
                
            print("Loss:", float(total_loss))
            total_loss.backward(retain_graph=True)
            optimizer.step()
            i += bacth_size
        print("Step", step,":", float(step_loss))
        torch.save(model.state_dict(), 'gdrive/MyDrive/AlphaGo/model_weights.pth')

In [None]:
model.load_state_dict(torch.load('gdrive/MyDrive/AlphaGo/model_weights.pth'))
torch.autograd.set_detect_anomaly(True)
train()

Loss: 0.23192870616912842
Loss: 0.2323043942451477
Loss: 0.23811018466949463
Loss: 0.2646026611328125
Loss: 0.24451427161693573
Loss: 0.21633054316043854
Loss: 0.23105086386203766
Loss: 0.2534175217151642
Loss: 0.2808460295200348
Loss: 0.23603731393814087
Loss: 0.2899099886417389
Loss: 0.23955701291561127
Loss: 0.24349907040596008
Loss: 0.23081044852733612
Loss: 0.22311532497406006
Loss: 0.24443155527114868
Loss: 0.25064459443092346
Loss: 0.2865793704986572
Loss: 0.333417147397995
Loss: 0.280410498380661
Loss: 0.2461094707250595
Loss: 0.2737206816673279
Loss: 0.2988211214542389
Loss: 0.24837444722652435
Loss: 0.2426285743713379
Loss: 0.23345813155174255
Loss: 0.25874635577201843
Loss: 0.2621980905532837
Loss: 0.2598097622394562
Step 0 : 7.375383377075195
Loss: 0.24084357917308807
Loss: 0.22547674179077148
Loss: 0.23460838198661804
Loss: 0.24632249772548676
Loss: 0.23803475499153137
Loss: 0.21576648950576782
Loss: 0.24666233360767365
Loss: 0.25787752866744995
Loss: 0.2743490934371948
Lo