In [30]:
# import gym
import numpy as np
import random


class TictactoeEnv:
    '''
    Description:
        Classical Tic-tac-toe game for two players who take turns marking the spaces in a three-by-three grid with X or O.
        The player who succeeds in placing three of their marks in a horizontal, vertical, or diagonal row is the winner.

        The game is played by two players: player 'X' and player 'O'. Player 'x' moves first.

        The grid is represented by a 3x3 numpy array, with value in {0, 1, -1}, with corresponding values:
            0 - place unmarked
            1 - place marked with X
            -1 - place marked with O

        The game environment will recieve movement from two players in turn and update the grid.

    self.step:
        recieve the movement of the player, update the grid

    The action space is [0-8], representing the 9 positions on the grid.

    The reward is 1 if you win the game, -1 if you lose, and 0 besides.
    '''

    def __init__(self):
        self.grid = np.zeros((3,3))
        self.end = False
        self.winner = None
        self.player2value = {'X': 1, 'O': -1}
        self.num_step = 0
        self.current_player = 'X' # By default, player 'X' goes first

    def check_valid(self, position):
        ''' Check whether the current action is valid or not
        '''
        if self.end:
            raise ValueError('This game has ended, please reset it!')
        if type(position) is int:
            position = (int(position / 3), position % 3)
        elif type(position) is not tuple:
            position = tuple(position)

        return False if self.grid[position] != 0 else True

    def step(self, position, print_grid=False):
        ''' Receive the movement from two players in turn and update the grid
        '''
        # check the position and value are valid or not
        # position should be a tuple like (0, 1) or int [0-8]
        if self.end:
            raise ValueError('This game has ended, please reset it!')
        if type(position) is int:
            position = (int(position / 3), position % 3)
        elif type(position) is not tuple:
            position = tuple(position)
        if self.grid[position] != 0:
            raise ValueError('There is already a chess on position {}.'.format(position))

        # place a chess on the position
        self.grid[position] = self.player2value[self.current_player]
        # update
        self.num_step += 1
        self.current_player = 'X' if self.num_step % 2 == 0 else  'O'
        # check whether the game ends or not
        self.checkEnd()

        if print_grid:
            self.render()

        return self.grid.copy(), self.end, self.winner

    def get_current_player(self):
        return self.current_player

    def checkEnd(self):
        # check rows and cols
        if np.any(np.sum(self.grid, axis=0) == 3) or np.any(np.sum(self.grid, axis=1) == 3):
            self.end = True
            self.winner = 'X'
        elif np.any(np.sum(self.grid, axis=0) == -3) or np.any(np.sum(self.grid, axis=1) == -3):
            self.end = True
            self.winner = 'O'
        # check diagnols
        elif self.grid[[0,1,2],[0,1,2]].sum() == 3 or self.grid[[0,1,2],[2,1,0]].sum() == 3:
            self.end = True
            self.winner = 'X'
        elif self.grid[[0,1,2],[0,1,2]].sum() == -3 or self.grid[[0,1,2],[2,1,0]].sum() == -3:
            self.end = True
            self.winner = 'O'
        # check if all the positions are filled
        elif (self.grid == 0).sum() == 0:
            self.end = True
            self.winner = None # no one wins
        else:
            self.end = False
            self.winner = None

    def reset(self):
        # reset the grid
        self.grid = np.zeros((3,3))
        self.end = False
        self.winner = None
        self.num_step = 0
        self.current_player = 'X'

        return self.grid.copy(), self.end, self.winner

    def observe(self):
        return self.grid.copy(), self.end, self.winner

    def reward(self, player='X'):
        if self.end:
            if self.winner is None:
                return 0
            else:
                return 1 if player == self.winner else -1
        else:
            return 0

    def render(self):
        # print current grid
        value2player = {0: '-', 1: 'X', -1: 'O'}
        for i in range(3):
            print('|', end='')
            for j in range(3):
                print(value2player[int(self.grid[i,j])], end=' ' if j<2 else '')
            print('|')
        print()

class OptimalPlayer:
    '''
    Description:
        A class to implement an epsilon-greedy optimal player in Tic-tac-toe.

    About optimial policy:
        There exists an optimial policy for game Tic-tac-toe. A player ('X' or 'O') can win or at least draw with optimial strategy.
        See the wikipedia page for details https://en.wikipedia.org/wiki/Tic-tac-toe
        In short, an optimal player choose the first available move from the following list:
            [Win, BlockWin, Fork, BlockFork, Center, Corner, Side]

    Parameters:
        epsilon: float, in [0, 1]. This is a value between 0-1 that indicates the
            probability of making a random action instead of the optimal action
            at any given time.

    '''
    def __init__(self, epsilon=0.2, player='X'):
        self.epsilon = epsilon
        self.player = player # 'x' or 'O'

    def set_player(self, player = 'X', j=-1):
        self.player = player
        if j != -1:
            self.player = 'X' if j % 2 == 0 else 'O'

    def empty(self, grid):
        '''return all empty positions'''
        avail = []
        for i in range(9):
            pos = (int(i/3), i % 3)
            if grid[pos] == 0:
                avail.append(pos)
        return avail

    def center(self, grid):
        '''
        Pick the center if its available,
        if it's the first step of the game, center or corner are all optimial.
        '''
        if np.abs(grid).sum() == 0:
            # first step of the game
            return [(1, 1)] + self.corner(grid)

        return [(1, 1)] if grid[1, 1] == 0 else []

    def corner(self, grid):
        ''' Pick empty corners to move '''
        corner = [(0, 0), (0, 2), (2, 0), (2, 2)]
        cn = []
        # First, pick opposite corner of opponent if it's available
        for i in range(4):
            if grid[corner[i]] == 0 and grid[corner[3 - i]] != 0:
                cn.append(corner[i])
        if cn != []:
            return cn
        else:
            for idx in corner:
                if grid[idx] == 0:
                    cn.append(idx)
            return cn

    def side(self, grid):
        ''' Pick empty sides to move'''
        rt = []
        for idx in [(0, 1), (1, 0), (1, 2), (2, 1)]:
            if grid[idx] == 0:
                rt.append(idx)
        return rt

    def win(self, grid, val=None):
        ''' Pick all positions that player will win after taking it'''
        if val is None:
            val = 1 if self.player == 'X' else -1

        towin = []
        # check all positions
        for pos in self.empty(grid):
            grid_ = np.copy(grid)
            grid_[pos] = val
            if self.checkWin(grid_, val):
                towin.append(pos)

        return towin

    def blockWin(self, grid):
        ''' Find the win positions of opponent and block it'''
        oppon_val = -1 if self.player == 'X' else 1
        return self.win(grid, oppon_val)

    def fork(self, grid, val=None):
        ''' Find a fork opportunity that the player will have two positions to win'''
        if val is None:
            val = 1 if self.player == 'X' else -1

        tofork = []
        # check all positions
        for pos in self.empty(grid):
            grid_ = np.copy(grid)
            grid_[pos] = val
            if self.checkFork(grid_, val):
                tofork.append(pos)

        return tofork

    def blockFork(self, grid):
        ''' Block the opponent's fork.
            If there is only one possible fork from opponent, block it.
            Otherwise, player should force opponent to block win by making two in a row or column
            Amomg all possible force win positions, choose positions in opponent's fork in prior
        '''
        oppon_val = -1 if self.player == 'X' else 1
        oppon_fork = self.fork(grid, oppon_val)
        if len(oppon_fork) <= 1:
            return oppon_fork

        # force the opponent to block win
        force_blockwin = []
        val = 1 if self.player == 'X' else -1
        for pos in self.empty(grid):
            grid_ = np.copy(grid)
            grid_[pos] = val
            if np.any(np.sum(grid_, axis=0) == val*2) or np.any(np.sum(grid_, axis=1) == val*2):
                force_blockwin.append(pos)
        force_blockwin_prior = []
        for pos in force_blockwin:
            if pos in oppon_fork:
                force_blockwin_prior.append(pos)

        return force_blockwin_prior if force_blockwin_prior != [] else force_blockwin

    def checkWin(self, grid, val=None):
        # check whether the player corresponding to the val will win
        if val is None:
            val = 1 if self.player == 'X' else -1
        target = 3 * val
        # check rows and cols
        if np.any(np.sum(grid, axis=0) == target) or np.any(np.sum(grid, axis=1) == target):
            return True
        # check diagnols
        elif grid[[0,1,2],[0,1,2]].sum() == target or grid[[0,1,2],[2,1,0]].sum() == target:
            return True
        else:
            return False

    def checkFork(self, grid, val=None):
        # check whether the player corresponding to the val will fork
        if val is None:
            val = 1 if self.player == 'X' else -1
        target = 2 * val
        # check rows and cols
        rows = (np.sum(grid, axis=0) == target).sum()
        cols = (np.sum(grid, axis=1) == target).sum()
        diags = (grid[[0,1,2],[0,1,2]].sum() == target) + (grid[[0,1,2],[2,1,0]].sum() == target)
        if (rows + cols + diags) >= 2:
            return True
        else:
            return False

    def randomMove(self, grid):
        """ Chose a random move from the available options. """
        avail = self.empty(grid)

        return avail[random.randint(0, len(avail)-1)]

    def act(self, grid, **kwargs):
        """
        Goes through a hierarchy of moves, making the best move that
        is currently available each time (with probabitity 1-self.epsilon).
        A touple is returned that represents (row, col).
        """
        # whether move in random or not
        if random.random() < self.epsilon:
            return self.randomMove(grid)

        ### optimial policies

        # Win
        win = self.win(grid)
        if win != []:
            return win[random.randint(0, len(win)-1)]
        # Block win
        block_win = self.blockWin(grid)
        if block_win != []:
            return block_win[random.randint(0, len(block_win)-1)]
        # Fork
        fork = self.fork(grid)
        if fork != []:
            return fork[random.randint(0, len(fork)-1)]
        # Block fork
        block_fork = self.blockFork(grid)
        if block_fork != []:
            return block_fork[random.randint(0, len(block_fork)-1)]
        # Center
        center = self.center(grid)
        if center != []:
            return center[random.randint(0, len(center)-1)]
        # Corner
        corner = self.corner(grid)
        if corner != []:
            return corner[random.randint(0, len(corner)-1)]
        # Side
        side = self.side(grid)
        if side != []:
            return side[random.randint(0, len(side)-1)]

        # random move
        return self.randomMove(grid)




In [31]:
def convert(move):
    """
    Convert a move in the tuple format to the int format
    """
    if type(move) != tuple:
        return move
    else:
        return (move[0]*3 + move[1] % 3)

### Deep QL part

In [32]:
import tensorflow as tf

In [33]:
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T


env = TictactoeEnv()
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [34]:
torch.cuda.is_available()

Define the replay buffer

In [35]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

Define the model

In [36]:
class DQN(nn.Module):

    def __init__(self):
        super(DQN, self).__init__()
        self.flatten = nn.Flatten()
        self.lin1 = nn.Linear(18, 128)
        self.lin2 = nn.Linear(128, 128)
        self.lin3 = nn.Linear(128,9)

    def forward(self, x):
        x = x.to(device)
        x = self.flatten(x).float()
        x = F.relu(self.lin1(x))
        x = F.relu(self.lin2(x))
        x = self.lin3(x)
        return x

Hyperparameters

In [37]:
BATCH_SIZE = 64
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 500
eps = 0.1

n_actions = 9

policy_net = DQN().to(device)
target_net = DQN().to(device)
# target_net.load_state_dict(policy_net.state_dict())
# target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=0.0005)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    # epsilon-greedy choiche of the action
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps:
        with torch.no_grad():
            # predict the action with the actual nework
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)

Training

In [50]:
def optimize_model(policy_net, memory, optimizer, transition = None):
    if(memory is None):
        state = transition.state
        action = transition.action
        next_state = transition.next_state
        reward = transition.reward
        state_action_value = policy_net(state)[0][action].reshape(1)
        next_state_value = 0.0 if next_state is None else target_net(next_state).max(0)[0].detach()
        print(next_state_value.shape)
        print(reward.shape)
        expected_state_action_value = (next_state_value * GAMMA) + reward

        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_value, expected_state_action_value)

        # Optimize the model
        optimizer.zero_grad()
        loss.backward()
        # for param in policy_net.parameters():
        #     param.grad.data.clamp_(-1, 1)
        optimizer.step()
        return loss

    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))
    # print(f"batch is {type(batch)} and of size {len(batch)}")
    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None]).view(-1, 18)

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward).view(-1)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)
    # print("policy_net(state_batch) ",policy_net(state_batch).shape, "\n")
    # print(policy_net(state_batch))
    # print("action_batch ",action_batch.shape, "\n")
    # print(action_batch)
    # print("gather ",state_action_values.shape, "\n")
    # print(state_action_values)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch#.cuda()
    # print(next_state_values.shape, "\n")
    # print(next_state_values)
    # print(reward_batch)

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    # print(state_action_values, "aaaaaaaaaa")
    # print(expected_state_action_values.unsqueeze(1), "aaaaaaaaaa")
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()
    return loss

In [51]:
def get_state(grid, q_player):
    ones = torch.ones(1,3,3)
    if q_player == 0:
        state = torch.cat((ones*(grid == 1), ones*(grid == -1)))
        return state.unsqueeze(0)
    else:
        state = torch.cat((ones*(grid == -1), ones*(grid == +1)))
        return state.unsqueeze(0)


In [46]:
results = []
losses = []
for repetition in range(10):
    BATCH_SIZE = 64
    GAMMA = 0.99
    EPS_START = 0.9
    EPS_END = 0.05
    EPS_DECAY = 200
    TARGET_UPDATE = 500
    eps = 0.1

    n_actions = 9

    policy_net = DQN().to(device)
    target_net = DQN().to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=0.0005)
    memory = ReplayMemory(10000)


    steps_done = 0
    num_games = 20000
    Turns = np.array(['X','O'])
    rewards = []
    loss_run = []
    for i in range(num_games):
        # Initialize the environment and state
        print("iteration", repetition,", game "+str(i))
        env.reset()
        grid, _, __ = env.observe()
        state = get_state(grid, 1 - i%2)
        player_opt_1 = OptimalPlayer(epsilon=0.5, player=Turns[i%2])
        for t in range(9):
            if env.current_player == player_opt_1.player:
                move = convert(player_opt_1.act(grid))
                grid, end, winner = env.step(move, print_grid=False)
            else:
            # Select and perform an action
                move = select_action(state)
                last_move_q = move
                try:
                    last_state_q = get_state(grid, 1 - i%2)
                    grid, end, winner = env.step(int(move), print_grid=False)
                    reward = torch.Tensor([env.reward(player=Turns[1 - i%2])])
                except ValueError:
                    reward = torch.Tensor([-1])
                    rewards.append(-1)
                    memory.push(last_state_q, last_move_q, None, reward)
                    break

            if not end:
                next_state = get_state(grid, 1-i%2)
                if env.current_player != player_opt_1.player and t>0:
                    memory.push(last_state_q, last_move_q, next_state, reward)

            else:
                next_state = None
                reward = env.reward(player=Turns[1 - i%2])
                rewards.append(reward)
                memory.push(last_state_q, last_move_q, next_state, torch.Tensor([reward]))
                break



            # print("pushed", move)
            # Move to the next state
            state = next_state

            # Perform one step of the optimization (on the policy network)
            loss = optimize_model(policy_net, memory, optimizer)
            loss_run.append(loss)

        # Update the target network, copying all weights and biases in DQN
        if i % 500 == 0:
            target_net.load_state_dict(policy_net.state_dict())

    results.append(rewards)
    losses.append(loss_run)

In [41]:
np.save("Q11_res", results)
np.save("Q11_loss", losses)

In [52]:
results = []
losses = []
for repetition in range(10):
    BATCH_SIZE = 1
    GAMMA = 0.99
    EPS_START = 0.9
    EPS_END = 0.05
    EPS_DECAY = 200
    TARGET_UPDATE = 500
    eps = 0.1

    n_actions = 9

    policy_net = DQN().to(device)
    target_net = DQN().to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=0.0005)
    memory = None


    steps_done = 0
    num_games = 20000
    Turns = np.array(['X','O'])
    rewards = []
    loss_run = []
    for i in range(num_games):
        # Initialize the environment and state
        print("iteration", repetition,", game "+str(i))
        env.reset()
        grid, _, __ = env.observe()
        state = get_state(grid, 1 - i%2)
        player_opt_1 = OptimalPlayer(epsilon=0.5, player=Turns[i%2])
        for t in range(9):
            if env.current_player == player_opt_1.player:
                move = convert(player_opt_1.act(grid))
                grid, end, winner = env.step(move, print_grid=False)
            else:
            # Select and perform an action
                move = select_action(state)
                last_move_q = move
                try:
                    last_state_q = get_state(grid, 1 - i%2)
                    grid, end, winner = env.step(int(move), print_grid=False)
                    reward = torch.Tensor([env.reward(player=Turns[1 - i%2])])
                except ValueError:
                    reward = torch.Tensor([-1])
                    rewards.append(-1)
                    transition = Transition(last_state_q, last_move_q, None, reward)
                    loss = optimize_model(policy_net, memory, optimizer, transition)
                    loss_run.append(loss)
                    break

            if not end:
                next_state = get_state(grid, 1-i%2)
                if env.current_player != player_opt_1.player and t>0:
                    transition = Transition(last_state_q, last_move_q, next_state, reward)
                    loss = optimize_model(policy_net, memory, optimizer, transition)
                    loss_run.append(loss)

            else:
                next_state = None
                reward = env.reward(player=Turns[1 - i%2])
                rewards.append(reward)
                transition = Transition(last_state_q, last_move_q, next_state, torch.Tensor([reward]))
                loss = optimize_model(policy_net, memory, optimizer, transition)
                loss_run.append(loss)
                break



            # print("pushed", move)
            # Move to the next state
            state = next_state

        # Update the target network, copying all weights and biases in DQN
        if i % 500 == 0:
            target_net.load_state_dict(policy_net.state_dict())

    results.append(rewards)
    losses.append(loss_run)

In [None]:
np.save("Q12_res", results)
np.sape("Q12_loss", losses)

In [None]:
while True:
    continue

In [None]:
np.mean([np.array(results)[i*250:(i+1)*250] for i in range(80)],axis=1)

In [None]:
import matplotlib.pyplot as plt
mean = np.array([np.mean((np.sum(np.array(results), axis = 0)/3)[i*250:(i+1)*250]) for i in range(80)])
var = np.array([np.var((np.sum(np.array(results), axis = 0)/3)[i*250:(i+1)*250]) for i in range(80)])
plt.plot(np.arange(80),mean)
plt.fill_between(np.arange(80), mean-var/2, mean+var/2, alpha = 0.2)
plt.title("Average reward over time with Q-Learning (0.1) versus Optimal Player(0.5)")
plt.xlabel("Batch of 250 games")
plt.ylabel("Average reward")


In [None]:
import matplotlib.pyplot as plt
mean = np.array([np.mean((np.sum(np.array(losses), axis = 0)/5)[i*250:(i+1)*250]) for i in range(80)])
var = np.array([np.var((np.sum(np.array(losses), axis = 0)/5)[i*250:(i+1)*250]) for i in range(80)])
plt.plot(np.arange(80),mean)
plt.fill_between(np.arange(80), mean-var/2, mean+var/2, alpha = 0.2)
plt.title("Average loss over time with Q-Learning (0.1) versus Optimal Player(0.5)")
plt.xlabel("Batch of 250 games")
plt.ylabel("Average loss")

In [None]:
ao = []
for i in range(80):
    ao.append(np.mean(np.array(results[2])[i*250:(1+i)*250]))

In [None]:
import matplotlib.pyplot as plt
plt.plot(np.arange(80),np.array(ao))