In [1]:
'''
psudocode:
Khởi tạo replay memory D với dung lượng N (Số exp) (hyperparameter)
Khởi tạo action value function Q với random weight \theta
    - action value function Q bản chất là 1 DNN:
        - Đầu vào là trạng thái hiện tại (qua tiền xử lý)
        - Đầu ra là giá trị tương ứng với từng hành động
Khởi tạo target action-value function \hat Q (fixed Q) với \theta^- = \theta
for episode 1,M:
    Khởi tạo chuỗi s_1 = {x_1} và chuỗi tiền xử lý \phi_1 = \phi(s_1)
    for t 1,T:
        Sampling:
            a_t với \epsilon greedy:
            thực hiện a_t với phần thưởng r_t và image x_{t+1}
            lấy s_{t+1} từ s_t, a_t, x_{t+1} và tiền xử lý \phi_{t+1} = \phi(s_{t+1})
            Cất (\phi_t, a_t, r_t, \phi_{t+1}) vào D (D là một hàng đợi ưu tiên)
        Training:
            Lấy ngẫu nhiên minibatch (\phi_j, a_j, r_j, \phi_{j+1})
            Set:
                y_j = r_j nếu episode kết thúc tại j+1
                y_j = r_j + \gamma max_a'\hat Q(phi_{j+1},a';\theta^-)
            gradient decent để cực tiểu hóa MSE loss: (y_j - Q(\phi_j,a_j;\theta))^2
            Cập nhật \hat Q bằng phiên bản Q hiện tại (cập nhật tham số) sau C steps
'''

"\npsudocode:\nKhởi tạo replay memory D với dung lượng N (Số exp) (hyperparameter)\nKhởi tạo action value function Q với random weight \theta\n    - action value function Q bản chất là 1 DNN:\n        - Đầu vào là trạng thái hiện tại (qua tiền xử lý)\n        - Đầu ra là giá trị tương ứng với từng hành động\nKhởi tạo target action-value function \\hat Q (fixed Q) với \theta^- = \theta\nfor episode 1,M:\n    Khởi tạo chuỗi s_1 = {x_1} và chuỗi tiền xử lý \\phi_1 = \\phi(s_1)\n    for t 1,T:\n        Sampling:\n            a_t với \\epsilon greedy:\n            thực hiện a_t với phần thưởng r_t và image x_{t+1}\n            lấy s_{t+1} từ s_t, a_t, x_{t+1} và tiền xử lý \\phi_{t+1} = \\phi(s_{t+1})\n            Cất (\\phi_t, a_t, r_t, \\phi_{t+1}) vào D (D là một hàng đợi ưu tiên)\n        Training:\n            Lấy ngẫu nhiên minibatch (\\phi_j, a_j, r_j, \\phi_{j+1})\n            Set:\n                y_j = r_j nếu episode kết thúc tại j+1\n                y_j = r_j + \\gamma max_a'\\h

In [2]:
'''
Với bài toán tictactoe:
    + Đầu vào là một ma trận vuông có kích thước 3x3, 4x4,...
    + Có thể flatten -> 9, 16,...
    + Sử dụng lớp mask để loại bỏ những hành động không hợp lệ của đầu ra
'''

'\nVới bài toán tictactoe:\n    + Đầu vào là một ma trận vuông có kích thước 3x3, 4x4,...\n    + Có thể flatten -> 9, 16,...\n    + Sử dụng lớp mask để loại bỏ những hành động không hợp lệ của đầu ra\n'

In [3]:
import torch
import torch.nn as nn
import numpy as np
from config import *
from model import *
from game import *

In [4]:
config_train_DQN_for_TicTacToe = {
    'batch_size': 32,
    'M': 10000, # num episodes
    'C': 100,
    'size_board': 3,
    'gamma': 0.9
}

In [5]:
# Khởi tạo replay memory D với dung lượng N (Số exp) (hyperparameter)
from collections import deque
import random

class ReplayMemory:
    def __init__(self, capacity):
        # Khởi tạo buffer vòng với kích thước tối đa là capacity
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, is_terminal_state):
        # Lưu trữ trải nghiệm (experience) vào replay memory
        experience = (state, action, reward, next_state, is_terminal_state)
        self.memory.append(experience)

    def sample(self, batch_size):
        # Lấy một minibatch ngẫu nhiên từ replay memory
        batch_size = min(batch_size, self.__len__())
        return random.sample(self.memory, batch_size)

    def __len__(self):
        # Trả về số lượng trải nghiệm hiện có trong replay memory
        return len(self.memory)


In [6]:
# Khởi tạo action value function Q với random weight \theta
model_DQN = ResMLP(config_ResMLP_for_DQN) #\theta là tham số mô hình
game = TicTacToe()
# for p in model_DQN.parameters():
#     print(p)

In [7]:
def phi_preprocess(state: np.ndarray):
    return state.flatten()

In [8]:
def mask_action_output_DQN(actions: list, len_action_space: int):
    '''
    Implement a one-hot mask for the output, selecting only the output of the selected action
    '''
    mask = torch.zeros((len(actions),len_action_space))
    for i, action in enumerate(actions):
        mask[i, action] = 1
    return mask
# mask_action_output_DQN([0,2,1,3], 4)

In [9]:
import random
import math
from typing import Literal
def epsilon_greedy(state: np.ndarray, epsilon: float, Q_values: torch.Tensor):
    actions = game.get_next_actions(state)

    if random.random() < epsilon:
        # Chọn ngẫu nhiên hành động từ không gian hành động hợp lệ
        action = random.choice(actions)
    else:
        # Chọn hành động có giá trị Q cao nhất từ các hành động hợp lệ
        valid_Q_values = Q_values[actions]
        action = actions[torch.argmax(valid_Q_values).item()]
    
    return action

def greedy(state: np.ndarray, Q_values: torch.Tensor):
    actions = game.get_next_actions(state)
    valid_Q_values = Q_values[actions]
    action = actions[torch.argmax(valid_Q_values).item()]
    return action

def decay_epsilon(episode, M, mode = Literal['curve', 'linear', 'threshold'], rate_curve = 10, curved_direction = Literal['left', 'right', None]):
    b = -1 if curved_direction == 'left' else 1
    x = episode / M

    if mode == 'linear':
        m = 0.01
    if mode == 'curve':
        m = rate_curve
    if mode == 'threshold':
        if x < 0.5:
            return 0.9
        else:
            return 0.1
    epsilon = 1 / (m * x - (m + b * np.sqrt(m**2 + 4 * m)) / 2) + 1 + 2 / (m + b * np.sqrt(m**2 + 4 * m))
    return epsilon

In [10]:
import torch.optim as optim
D = ReplayMemory(100)

def train(batch_size, M, C, size_board, gamma):
    # Khởi tạo replay memory D với dung lượng N (Số exp) (hyperparameter)
    # D = ReplayMemory(N)
    # Khởi tạo target action-value function \hat Q (fixed Q) với \theta^- = \theta
    Q_hat = ResMLP(config_ResMLP_for_DQN)
    Q_hat.load_state_dict(model_DQN.state_dict())

    # Khởi tạo train model
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model_DQN.parameters(), lr=0.001)

    # Khởi tạo epsilon
    epsilon_start = 1
    epsilon_min = 0.1
    epsilon = epsilon_start

    # Tắt gradient cho Q_hat
    for param in Q_hat.parameters():
        param.requires_grad = False

    step = 0
    model_DQN.train()
    ret = [] # list chứa tổng reward trong mỗi episode
    losses = []
    for episode in range(1, M+1): 
        s_t = game.get_init_state(size_board)
        # phi_t = phi_preprocess(s_t)
        running = True
        rewards_all = []
        losses_episode = []
        while(running):
            phi_t = phi_preprocess(s_t)
            # Sampling
            with torch.no_grad():
                Q_values = model_DQN(torch.Tensor(phi_t))
            # using \epsilon greedy to get action
            a_t = epsilon_greedy(s_t, epsilon, Q_values)
            # Execute action at in emulator and observe reward rt and state st+1
            s_t1 = game.get_next_state(s_t, a_t)
            r_t = game.get_reward(s_t1)
            rewards_all.append(r_t)
            is_terminal_state = False
            if game.is_terminal_state(s_t1):
                running = False
                is_terminal_state = True
            # phi_t1 = phi_preprocess(s_t1)
            # Store transition in D
            D.push(s_t.copy(), a_t, r_t, s_t1.copy(), is_terminal_state)
            # Cập nhật s_t:
            s_t = s_t1.copy()
            # Training
            # Compute y_j (Q_target)
            samples = D.sample(batch_size)
            Q_targets = []
            for i, sample in enumerate(samples):
                s_j, a_j, r_j, s_j1, is_terminal_state_j1 = sample
                #Q_hat(phi_j1), compute Q_target
                if not is_terminal_state_j1:
                    phi_j1 = phi_preprocess(torch.Tensor(s_j1))
                    Q_values_j1 = Q_hat(phi_j1)
                    a_j1 = greedy(s_j1, Q_values_j1)
                    y_j = r_j + gamma*Q_values_j1[a_j1]
                    Q_targets.append(y_j)
                else:
                    Q_targets.append(r_j)
            Q_targets = torch.Tensor(Q_targets)
            # Compute Q value estimate:
            states, actions = list(zip(*samples))[:2]
            phi = torch.Tensor(np.array([phi_preprocess(state) for state in states]))
            Q_estimates = model_DQN(phi)
            mask = mask_action_output_DQN(actions, Q_estimates.shape[-1])
            Q_estimates *= mask
            Q_estimates = Q_estimates.sum(-1)
            # Compute Q Loss
            optimizer.zero_grad()

            loss = criterion(Q_estimates, Q_targets)
            losses_episode.append(loss.item())

            loss.backward()  # Backpropagation
            optimizer.step()  # Update trọng số

            #reset Q_hat after C step:
            # Update step:
            step += 1
            if step % C == 0:
                Q_hat.load_state_dict(model_DQN.state_dict())

        epsilon = decay_epsilon(episode, M, 'threshold')

        ret.append(np.sum(rewards_all))
        losses.append(np.average(losses_episode))
        if episode % 100 == 0:
            print(f"Episode {episode}: Avg Rewards:", np.mean(ret[-20:]), "Avg Loss:", np.mean(losses[-20:]), f"epsilon: {epsilon}")
            
train(**config_train_DQN_for_TicTacToe)

Episode 100: Avg Rewards: -0.1 Avg Loss: 0.07931532059020052 epsilon: 0.9
Episode 200: Avg Rewards: 0.15 Avg Loss: 0.07012210273882374 epsilon: 0.9
Episode 300: Avg Rewards: -0.05 Avg Loss: 0.0719928700228532 epsilon: 0.9
Episode 400: Avg Rewards: 0.35 Avg Loss: 0.05666351502838855 epsilon: 0.9
Episode 500: Avg Rewards: 0.3 Avg Loss: 0.04427056389162316 epsilon: 0.9
Episode 600: Avg Rewards: 0.15 Avg Loss: 0.07089202013487617 epsilon: 0.9
Episode 700: Avg Rewards: 0.2 Avg Loss: 0.05884819795998434 epsilon: 0.9
Episode 800: Avg Rewards: 0.0 Avg Loss: 0.077460486816708 epsilon: 0.9
Episode 900: Avg Rewards: -0.15 Avg Loss: 0.0914364596363157 epsilon: 0.9
Episode 1000: Avg Rewards: -0.1 Avg Loss: 0.09514766828467448 epsilon: 0.9
Episode 1100: Avg Rewards: 0.0 Avg Loss: 0.07059782235883176 epsilon: 0.9
Episode 1200: Avg Rewards: -0.1 Avg Loss: 0.08186057003525396 epsilon: 0.9
Episode 1300: Avg Rewards: 0.2 Avg Loss: 0.06593070343447228 epsilon: 0.9
Episode 1400: Avg Rewards: -0.3 Avg Loss:

In [11]:
torch.save(model_DQN, './model/model_TicTacToe3x3.pth')

In [13]:
model_DQN = torch.load('./model/model_TicTacToe3x3.pth')
def predict(size_board):
    model_DQN.eval()
    num_games = 10000
    num_wins = 0
    num_draws = 0
    num_losts = 0

    result_game_lost = None
    for i in range(1, 1 + num_games):
        running = True
        current_state = game.get_init_state(size_board)
        result = 'not win'

        while(running):
            # lấy nước đi tốt nhất của Agent
            phi = phi_preprocess(current_state)
            with torch.no_grad():
                Q_values = model_DQN(torch.Tensor(phi))
            best_action = greedy(current_state, Q_values)
            # Cập nhật current_state
            current_state = game.get_next_state(current_state, best_action)
            r = game.get_reward(current_state)

            if game.is_terminal_state(current_state):
                running = False
                if r == 1:
                    num_wins += 1
                    result = 'win'
                elif r == -1:
                    num_losts += 1
                    result = 'lost'
                    result_game_lost = current_state
                else:
                    num_draws += 1
                    result = 'draw'
        print(f'game {i}: {result}')
    print(f"win rate: {(num_wins/num_games)*100:.3f}%", 
          f"lost rate: {(num_losts/num_games)*100:.3f}%", 
          f"draw rate: {(num_draws/num_games)*100:.3f}%", '\n',
          result_game_lost)
predict(3)


game 1: win
game 2: lost
game 3: win
game 4: win
game 5: win
game 6: win
game 7: win
game 8: win
game 9: win
game 10: win
game 11: win
game 12: win
game 13: lost
game 14: win
game 15: win
game 16: win
game 17: win
game 18: win
game 19: win
game 20: win
game 21: draw
game 22: win
game 23: draw
game 24: lost
game 25: win
game 26: win
game 27: win
game 28: win
game 29: lost
game 30: lost
game 31: lost
game 32: win
game 33: win
game 34: lost
game 35: win
game 36: draw
game 37: win
game 38: win
game 39: lost
game 40: lost
game 41: win
game 42: lost
game 43: win
game 44: win
game 45: win
game 46: win
game 47: win
game 48: win
game 49: win
game 50: win
game 51: draw
game 52: win
game 53: lost
game 54: win
game 55: win
game 56: win
game 57: draw
game 58: win
game 59: win
game 60: win
game 61: draw
game 62: win
game 63: lost
game 64: lost
game 65: draw
game 66: win
game 67: win
game 68: win
game 69: lost
game 70: draw
game 71: win
game 72: lost
game 73: draw
game 74: win
game 75: lost
game 76: 

In [None]:
# print 1 episode:
size_board = 3
s_t = game.get_init_state(size_board)
running = True
epsilon = 0.5
episode = []
while(running):
    phi_t = phi_preprocess(s_t)
    # Sampling
    with torch.no_grad():
        Q_values = model_DQN(torch.Tensor(phi_t))
    # using \epsilon greedy to get action
    a_t = epsilon_greedy(s_t, epsilon, Q_values)
    # Execute action at in emulator and observe reward rt and state st+1
    s_t1 = game.get_next_state(s_t, a_t)
    r_t = game.get_reward(s_t1)
    is_terminal_state = False
    if game.is_terminal_state(s_t1):
        running = False
        is_terminal_state = True
    # phi_t1 = phi_preprocess(s_t1)
    # Store transition in D
    episode.append((s_t.copy(), a_t, r_t, s_t1.copy(), is_terminal_state))
    # Cập nhật s_t:
    s_t = s_t1.copy()
episode

[(array([[ 0.,  0., -1.],
         [ 0.,  0.,  0.],
         [ 0.,  0.,  0.]]),
  3,
  0,
  array([[ 0.,  0., -1.],
         [ 1., -1.,  0.],
         [ 0.,  0.,  0.]]),
  False),
 (array([[ 0.,  0., -1.],
         [ 1., -1.,  0.],
         [ 0.,  0.,  0.]]),
  6,
  0,
  array([[ 0.,  0., -1.],
         [ 1., -1.,  0.],
         [ 1.,  0., -1.]]),
  False),
 (array([[ 0.,  0., -1.],
         [ 1., -1.,  0.],
         [ 1.,  0., -1.]]),
  1,
  0,
  array([[ 0.,  1., -1.],
         [ 1., -1.,  0.],
         [ 1., -1., -1.]]),
  False),
 (array([[ 0.,  1., -1.],
         [ 1., -1.,  0.],
         [ 1., -1., -1.]]),
  0,
  1,
  array([[ 1.,  1., -1.],
         [ 1., -1.,  0.],
         [ 1., -1., -1.]]),
  True)]

In [None]:
torch.to