请点击[此处](https://ai.baidu.com/docs#/AIStudio_Project_Notebook/a38e5576)查看本环境基本用法.  <br>
Please click [here ](https://ai.baidu.com/docs#/AIStudio_Project_Notebook/a38e5576) for more detailed instructions. 

# 构建简易Tic-Tac-Toe环境



In [None]:
import numpy as np

BLACK, WHITE = 1, -1  # first turn or second turn player

class State:
    '''Board implementation of Tic-Tac-Toe'''
    X, Y = 'ABC',  '123'
    C = {0: '_', BLACK: 'O', WHITE: 'X'}

    def __init__(self):
        self.board = np.zeros((3, 3)) # (x, y)
        self.color = 1
        self.win_color = 0
        self.record = []

    def action2str(self, a):
        return self.X[a // 3] + self.Y[a % 3]

    def str2action(self, s):
        return self.X.find(s[0]) * 3 + self.Y.find(s[1])

    def record_string(self):
        return ' '.join([self.action2str(a) for a in self.record])

    def __str__(self):
        # output board.
        s = '   ' + ' '.join(self.Y) + '\n'
        for i in range(3):
            s += self.X[i] + ' ' + ' '.join([self.C[self.board[i, j]] for j in range(3)]) + '\n'
        s += 'record = ' + self.record_string()
        return s

    def play(self, action):
        # state transition function
        # action is position inerger (0~8) or string representation of action sequence
        if isinstance(action, str):
            for astr in action.split():
                self.play(self.str2action(astr))
            return self

        x, y = action // 3, action % 3
        self.board[x, y] = self.color

        # check whether 3 stones are on the line
        if self.board[x, :].sum() == 3 * self.color \
          or self.board[:, y].sum() == 3 * self.color \
          or (x == y and np.diag(self.board, k=0).sum() == 3 * self.color) \
          or (x == 2 - y and np.diag(self.board[::-1,:], k=0).sum() == 3 * self.color):
            self.win_color = self.color

        self.color = -self.color
        self.record.append(action)
        return self

    def terminal(self):
        # terminal state check
        return self.win_color != 0 or len(self.record) == 3 * 3

    def terminal_reward(self):
        # terminal reward 
        return self.win_color if self.color == BLACK else -self.win_color

    def legal_actions(self):
        # list of legal actions on each state
        return [a for a in range(3 * 3) if self.board[a // 3, a % 3] == 0]

    def feature(self):
        # input tensor for neural net (state)
        return np.stack([self.board == self.color, self.board == -self.color]).astype(np.float32)

    def action_feature(self, action):
        # input tensor for neural net (action)
        a = np.zeros((1, 3, 3), dtype=np.float32)
        a[0, action // 3, action % 3] = 1
        return a

state = State().play('B1')
print(state)
print('input feature')
print(state.feature())
state = State().play('B2 A1 C2')
print('input feature')
print(state.feature())

   1 2 3
A _ _ _
B O _ _
C _ _ _
record = B1
input feature
[[[0. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [1. 0. 0.]
  [0. 0. 0.]]]
input feature
[[[1. 0. 0.]
  [0. 0. 0.]
  [0. 0. 0.]]

 [[0. 0. 0.]
  [0. 1. 0.]
  [0. 1. 0.]]]


# 构建神经网络

In [None]:
import paddle
import paddle.nn as nn
import paddle.nn.functional as F

class Conv(paddle.nn.Layer):
    def __init__(self, filters0, filters1, kernel_size, bn=False):
        super().__init__()
        self.conv = nn.Conv2D(filters0, filters1, kernel_size, stride=1, padding=kernel_size//2, bias_attr=False)
        self.bn = None
        if bn:
            self.bn = nn.BatchNorm2D(filters1)

    def forward(self, x):
        h = self.conv(x)
        if self.bn is not None:
            h = self.bn(h)
        return h

class ResidualBlock(paddle.nn.Layer):
    def __init__(self, filters):
        super().__init__()
        self.conv = Conv(filters, filters, 3, True)

    def forward(self, x):
        return F.relu(x + (self.conv(x)))

In [None]:
num_filters = 16
num_blocks = 4

class Representation(paddle.nn.Layer):
    ''' Conversion from observation to inner abstract state '''
    def __init__(self, input_shape):
        super().__init__()
        self.input_shape = input_shape
        self.board_size = self.input_shape[1] * self.input_shape[2]

        self.layer0 = Conv(self.input_shape[0], num_filters, 3, bn=True)
        self.blocks = nn.LayerList([ResidualBlock(num_filters) for _ in range(num_blocks)])

    def forward(self, x):
        h = F.relu(self.layer0(x))
        for block in self.blocks:
            h = block(h)
        return h

    def inference(self, x):
        self.eval()
        with paddle.no_grad(): 
            rp = self.forward(paddle.to_tensor(x).unsqueeze(0))
        return rp.cpu().numpy()[0]

class Prediction(paddle.nn.Layer):
    ''' Policy and value prediction from inner abstract state '''
    def __init__(self, action_shape):
        super().__init__()
        self.board_size = np.prod(action_shape[1:])
        self.action_size = action_shape[0] * self.board_size

        self.conv_p1 = Conv(num_filters, 4, 1, bn=True)
        self.conv_p2 = Conv(4, 1, 1)

        self.conv_v = Conv(num_filters, 4, 1, bn=True)
        self.fc_v = nn.Linear(self.board_size * 4, 1, bias_attr=False)

    def forward(self, rp):
        h_p = F.relu(self.conv_p1(rp))
        h_p = self.conv_p2(h_p).reshape([-1, self.action_size])

        h_v = F.relu(self.conv_v(rp))
        h_v = self.fc_v(h_v.reshape([-1, self.board_size * 4]))

        # range of value is -1 ~ 1
        return F.softmax(h_p, axis=-1), paddle.tanh(h_v)

    def inference(self, rp):
        self.eval()
        with paddle.no_grad():
            
            p, v = self.forward(paddle.to_tensor(rp).unsqueeze(0))
        return p.cpu().numpy()[0], v.cpu().numpy()[0][0]

class Dynamics(paddle.nn.Layer):
    '''Abstract state transition'''
    def __init__(self, rp_shape, act_shape):
        super().__init__()
        self.rp_shape = rp_shape
        self.layer0 = Conv(rp_shape[0] + act_shape[0], num_filters, 3, bn=True)
        self.blocks = nn.LayerList([ResidualBlock(num_filters) for _ in range(num_blocks)])

    def forward(self, rp, a):
        h = paddle.concat([rp, a], axis=1)
        h = self.layer0(h)
        for block in self.blocks:
            h = block(h)
        return h

    def inference(self, rp, a):
        self.eval()
        with paddle.no_grad():
            rp = self.forward(paddle.to_tensor(rp).unsqueeze(0), paddle.to_tensor(a).unsqueeze(0))
        return rp.cpu().numpy()[0]

class Net(paddle.nn.Layer):
    '''Whole net'''
    def __init__(self):
        super().__init__()
        state = State()
        input_shape = state.feature().shape
        action_shape = state.action_feature(0).shape
        rp_shape = (num_filters, *input_shape[1:])

        self.representation = Representation(input_shape)
        self.prediction = Prediction(action_shape)
        self.dynamics = Dynamics(rp_shape, action_shape)  
    
    def forward(self):
        pass

    def predict(self, state0, path):
        '''Predict p and v from original state and path'''
        outputs = []
        x = state0.feature()
        rp = self.representation.inference(x)
        outputs.append(self.prediction.inference(rp))
        for action in path:
            a = state0.action_feature(action)
            rp = self.dynamics.inference(rp, a)
            outputs.append(self.prediction.inference(rp))
        return outputs


In [None]:
def show_net(net, state):
    '''Display policy (p) and value (v)'''
    print(state)
    p, v = net.predict(state, [])[-1]
    print(p,v)
    print('p = ')
    print((p * 1000).astype(int).reshape((-1, *net.representation.input_shape[1:3])))
    print('v = ', v)
    print()

#  Outputs before training
show_net(Net(), State())
paddle.summary(Representation([1,3,3]), (1,1,3,3))
paddle.summary(Prediction((1,3,3)), (1,16,3,3))
paddle.summary(Dynamics([16,3,3],[1,3,3]), [(3,16,3,3),(3,1,3,3)])



   1 2 3
A _ _ _
B _ _ _
C _ _ _
record = 
[0.11111111 0.11111111 0.11111111 0.11111111 0.11111111 0.11111111
 0.11111111 0.11111111 0.11111111] 0.0
p = 
[[[111 111 111]
  [111 111 111]
  [111 111 111]]]
v =  0.0

----------------------------------------------------------------------------
  Layer (type)       Input Shape          Output Shape         Param #    
   Conv2D-131       [[1, 1, 3, 3]]       [1, 16, 3, 3]           144      
BatchNorm2D-121    [[1, 16, 3, 3]]       [1, 16, 3, 3]           64       
    Conv-131        [[1, 1, 3, 3]]       [1, 16, 3, 3]            0       
   Conv2D-132      [[1, 16, 3, 3]]       [1, 16, 3, 3]          2,304     
BatchNorm2D-122    [[1, 16, 3, 3]]       [1, 16, 3, 3]           64       
    Conv-132       [[1, 16, 3, 3]]       [1, 16, 3, 3]            0       
ResidualBlock-81   [[1, 16, 3, 3]]       [1, 16, 3, 3]            0       
   Conv2D-133      [[1, 16, 3, 3]]       [1, 16, 3, 3]          2,304     
BatchNorm2D-123    [[1, 16, 3, 3]]

{'total_params': 11984, 'trainable_params': 11664}

# 基于神经网络的蒙特卡洛搜索

In [None]:
class Node:
    '''Search result of one abstract (or root) state'''
    def __init__(self, p, v):
        self.p, self.v = p, v
        self.n, self.q_sum = np.zeros_like(p), np.zeros_like(p)
        self.n_all, self.q_sum_all = 1, v / 2 # prior

    def update(self, action, q_new):
        # Update
        self.n[action] += 1
        self.q_sum[action] += q_new

        # Update overall stats
        self.n_all += 1
        self.q_sum_all += q_new

In [None]:
import time
import copy

class Tree:
    '''Monte Carlo Tree'''
    def __init__(self, net):
        self.net = net
        self.nodes = {}

    def search(self, state, path, rp, depth):
        # Return predicted value from new state
        key = state.record_string()
        if len(path) > 0:
            key += '|' + ' '.join(map(state.action2str, path))
        if key not in self.nodes:
            p, v = self.net.prediction.inference(rp)
            self.nodes[key] = Node(p, v)
            return v

        # State transition by an action selected from bandit
        node = self.nodes[key]
        p = node.p
        mask = np.zeros_like(p)
        if depth == 0:
            # Add noise to policy on the root node
            p = 0.75 * p + 0.25 * np.random.dirichlet([0.15] * len(p))
            # On the root node, we choose action only from legal actions
            mask[state.legal_actions()] = 1
            p *= mask
            p /= p.sum() + 1e-16

        n, q_sum = 1 + node.n, node.q_sum_all / node.n_all + node.q_sum
        ucb = q_sum / n + 2.0 * np.sqrt(node.n_all) * p / n + mask * 4 # PUCB formula
        best_action = np.argmax(ucb)

        # Search next state by recursively calling this function
        rp_next = self.net.dynamics.inference(rp, state.action_feature(best_action))
        path.append(best_action)
        q_new = -self.search(state, path, rp_next, depth + 1) # With the assumption of changing player by turn
        node.update(best_action, q_new)

        return q_new

    def think(self, state, num_simulations, temperature = 0, show=False):
        # End point of MCTS
        if show:
            print(state)
        start, prev_time = time.time(), 0
        for _ in range(num_simulations):
            self.search(state, [], self.net.representation.inference(state.feature()), depth=0)

            # Display search result on every second
            if show:
                tmp_time = time.time() - start
                if int(tmp_time) > int(prev_time):
                    prev_time = tmp_time
                    root, pv = self.nodes[state.record_string()], self.pv(state)
                    print('%.2f sec. best %s. q = %.4f. n = %d / %d. pv = %s'
                          % (tmp_time, state.action2str(pv[0]), root.q_sum[pv[0]] / root.n[pv[0]],
                             root.n[pv[0]], root.n_all, ' '.join([state.action2str(a) for a in pv])))

        #  Return probability distribution weighted by the number of simulations
        n = root = self.nodes[state.record_string()].n + 1
        n = (n / np.max(n)) ** (1 / (temperature + 1e-8))
        return n / n.sum()

    def pv(self, state):
        # Return principal variation (action sequence which is considered as the best)
        s, pv_seq = copy.deepcopy(state), []
        while True:
            key = s.record_string()
            if key not in self.nodes or self.nodes[key].n.sum() == 0:
                break
            best_action = sorted([(a, self.nodes[key].n[a]) for a in s.legal_actions()], key=lambda x: -x[1])[0][0]
            pv_seq.append(best_action)
            s.play(best_action)
        return pv_seq

In [None]:
tree = Tree(Net())
tree.think(State(), 100, show=True)

tree = Tree(Net())
tree.think(State().play('A1 C1 A2 C2'), 200, show=True)

tree = Tree(Net())
tree.think(State().play('B2 A2 A3 C1 B3'), 200, show=True)

tree = Tree(Net())
tree.think(State().play('B2 A2 A3 C1'), 200, show=True)

   1 2 3
A _ _ _
B _ _ _
C _ _ _
record = 
1.01 sec. best B3. q = 0.4840. n = 31 / 91. pv = B3
   1 2 3
A O O _
B _ _ _
C X X _
record = A1 C1 A2 C2
1.03 sec. best C3. q = 0.2201. n = 13 / 54. pv = C3
2.04 sec. best A3. q = 0.2105. n = 23 / 80. pv = A3
3.04 sec. best A3. q = 0.1947. n = 30 / 99. pv = A3
4.03 sec. best A3. q = 0.1955. n = 35 / 118. pv = A3
5.06 sec. best A3. q = 0.1754. n = 39 / 132. pv = A3
6.07 sec. best A3. q = 0.1391. n = 42 / 144. pv = A3
7.07 sec. best A3. q = 0.1591. n = 43 / 156. pv = A3
8.04 sec. best A3. q = 0.1270. n = 46 / 168. pv = A3
9.04 sec. best A3. q = 0.1168. n = 50 / 178. pv = A3
10.09 sec. best A3. q = 0.1291. n = 53 / 187. pv = A3
11.06 sec. best A3. q = 0.1082. n = 54 / 198. pv = A3
   1 2 3
A _ X O
B _ O O
C X _ _
record = B2 A2 A3 C1 B3
1.00 sec. best A1. q = 0.0526. n = 19 / 59. pv = A1
2.01 sec. best B1. q = 0.0370. n = 27 / 97. pv = B1
3.03 sec. best A1. q = 0.0270. n = 37 / 132. pv = A1
4.02 sec. best B1. q = -0.0000. n = 44 / 158. pv = B1
5

array([0., 0., 0., 0., 0., 0., 0., 1., 0.], dtype=float32)

# 定义优化器和训练流程

In [None]:
import paddle.optimizer as optim

batch_size = 32
num_steps = 100

def gen_target(ep, k):
    '''Generate inputs and targets for training'''
    # path, reward, observation, action, policy
    turn_idx = np.random.randint(len(ep[0]))
    ps, vs, ax = [], [], []
    for t in range(turn_idx, turn_idx + k + 1):
        if t < len(ep[0]):
            p = ep[4][t]
            a = ep[3][t]
        else: # state after finishing game
            # p is 0 (loss is 0)
            p = np.zeros_like(ep[4][-1])
            # random action selection
            a = np.zeros(np.prod(ep[3][-1].shape), dtype=np.float32)
            a[np.random.randint(len(a))] = 1
            a = a.reshape(ep[3][-1].shape)
        vs.append([ep[1] if t % 2 == 0 else -ep[1]])
        ps.append(p)
        ax.append(a)
        
    return ep[2][turn_idx], ax, ps, vs

def train(episodes, net, opt):
    '''Train neural net'''
    p_loss_sum, v_loss_sum = 0, 0
    net.train()
    k = 4
    for _ in range(num_steps):
        x, ax, p_target, v_target = zip(*[gen_target(episodes[np.random.randint(len(episodes))], k) for j in range(batch_size)])
        x = paddle.to_tensor(np.array(x))
        ax = paddle.to_tensor(np.array(ax))
        p_target = paddle.to_tensor(np.array(p_target))
        v_target = paddle.to_tensor(np.array(v_target),dtype='float32')

        # Change the order of axis as [time step, batch, ...]
        ax = paddle.transpose(ax, [1, 0, 2, 3, 4])
        p_target = paddle.transpose(p_target, [1, 0, 2])
        v_target = paddle.transpose(v_target, [1, 0, 2])

        p_loss, v_loss = 0, 0

        # Compute losses for k (+ current) steps
        for t in range(k + 1):
            rp = net.representation(x) if t == 0 else net.dynamics(rp, ax[t - 1])
            p, v = net.prediction(rp)
            p_loss += paddle.sum(-p_target[t] * paddle.log(p))
            v_loss += paddle.sum((v_target[t] - v) ** 2)

        p_loss_sum += paddle.squeeze(p_loss,0)
        v_loss_sum += paddle.squeeze(v_loss,0)

        optimizer.clear_grad()
        (p_loss + v_loss).backward()
        optimizer.step()

    num_train_datum = num_steps * batch_size
    print('p_loss %f v_loss %f' % (p_loss_sum / num_train_datum, v_loss_sum / num_train_datum))
    return net

In [None]:
#  Battle against random agents

def vs_random(net, n=100):
    results = {}
    for i in range(n):
        first_turn = i % 2 == 0
        turn = first_turn
        state = State()
        while not state.terminal():
            if turn:
                p, _ = net.predict(state, [])[-1]
                action = sorted([(a, p[a]) for a in state.legal_actions()], key=lambda x:-x[1])[0][0]
            else:
                action = np.random.choice(state.legal_actions())
            state.play(action)
            turn = not turn
        r = state.terminal_reward() if turn else -state.terminal_reward()
        results[r] = results.get(r, 0) + 1
    return results

# 开始训练我们的神经网络

In [None]:
# Main algorithm of MuZero

num_games = 5000 #  默认为50000
num_games_one_epoch = 20
num_simulations = 40

net = Net()
####参数读取
model_state_dict = paddle.load("net0.pdparams")
net.set_state_dict(model_state_dict)

optimizer = optim.Momentum(parameters=net.parameters(), learning_rate=3e-4, weight_decay=3e-5,momentum=0.8)

# Display battle results as {-1: lose 0: draw 1: win} (for episode generated for training, 1 means that the first player won)
vs_random_sum = vs_random(net)
print('vs_random = ', sorted(vs_random_sum.items()))

episodes = []
result_distribution = {1: 0, 0: 0, -1: 0}

for g in range(num_games):
    # Generate one episode
    record, p_targets, features, action_features = [], [], [], []
    state = State()
    # temperature using to make policy targets from search results
    temperature = 0.7
    while not state.terminal():
        tree = Tree(net)
        p_target = tree.think(state, num_simulations, temperature)
        p_targets.append(p_target)
        features.append(state.feature())
        # Select action with generated distribution, and then make a transition by that action
        action = np.random.choice(np.arange(len(p_target)), p=p_target)
        action_features.append(state.action_feature(action))
        state.play(action)
        record.append(action)
        temperature *= 0.8
    # reward seen from the first turn player
    reward = state.terminal_reward() * (1 if len(record) % 2 == 0 else -1)
    result_distribution[reward] += 1
    episodes.append((record, reward, features, action_features, p_targets))
    if g % num_games_one_epoch == 0:
        print('game ', end='')
    print(g, ' ', end='')

    # Training of neural net
    if (g + 1) % num_games_one_epoch == 0:
        # Show the result distributiuon of generated episodes
        print('generated = ', sorted(result_distribution.items()))
        net = train(episodes, net, optimizer)
        vs_random_once = vs_random(net)
        print('vs_random = ', sorted(vs_random_once.items()), end='')
        for r, n in vs_random_once.items():
            vs_random_sum[r] += n
        print(' sum = ', sorted(vs_random_sum.items()))
        #show_net(net, State())
        #show_net(net, State().play('A1 C1 A2 C2'))
        #show_net(net, State().play('A1 B2 C3 B3 C1'))
        #show_net(net, State().play('B2 A2 A3 C1 B3'))
        #show_net(net, State().play('B2 A2 A3 C1'))
print('finished')

vs_random =  [(-1, 12), (0, 11), (1, 77)]
game 0  1  2  3  4  5  6  7  8  9  10  11  12  13  14  15  16  17  18  19  generated =  [(-1, 3), (0, 5), (1, 12)]


  "When training, we now always track global mean and variance.")


p_loss 1.984630 v_loss 0.510406
vs_random =  [(-1, 8), (0, 8), (1, 84)] sum =  [(-1, 20), (0, 19), (1, 161)]
game 20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35  36  37  38  39  generated =  [(-1, 5), (0, 12), (1, 23)]
p_loss 1.793176 v_loss 0.392811
vs_random =  [(-1, 9), (0, 12), (1, 79)] sum =  [(-1, 29), (0, 31), (1, 240)]
game 40  41  42  43  44  45  46  47  48  49  50  51  52  53  54  55  56  57  58  59  generated =  [(-1, 6), (0, 18), (1, 36)]
p_loss 1.710342 v_loss 0.353093
vs_random =  [(-1, 7), (0, 5), (1, 88)] sum =  [(-1, 36), (0, 36), (1, 328)]
game 60  61  62  63  64  65  66  67  68  69  70  71  72  73  74  75  76  77  78  79  generated =  [(-1, 6), (0, 25), (1, 49)]
p_loss 1.725020 v_loss 0.382014
vs_random =  [(-1, 3), (0, 11), (1, 86)] sum =  [(-1, 39), (0, 47), (1, 414)]
game 80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99  generated =  [(-1, 6), (0, 32), (1, 62)]
p_loss 1.761850 v_loss 0.313605
vs_random =  [(-1, 8), (0,

In [None]:
###根据时间戳保存参数
import time
t =time.time()
model_state_dict = net.state_dict()
paddle.save(model_state_dict, "net"+str(int(t))+".pdparams")