# Mini-project 1: Tic-Tac-Toe
Clément DAUVILLIERS - Florian VINCENT

## Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import numpy.typing as npt
from tqdm.notebook import trange
from copy import deepcopy
from queue import deque
from random import sample
from typing import List, Tuple, NewType, Union, Callable
from tic_env import TictactoeEnv, OptimalPlayer
from player import Player

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import namedtuple

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
Grid = NewType('Grid', npt.NDArray[np.float64])

## Useful functions

In [None]:
def windowed_avg(arr: npt.ArrayLike, window_len: int=250) -> npt.NDArray:
    """
    Computes the average over successive windows of an array.
    arr must be a 1D array whose length is a multiple of the
    window length.
    """
    result = []
    for win_start in range(0, arr.shape[0], window_len):
        result.append(np.mean(arr[win_start:win_start + window_len]))
    return result

# 2. Q-Learning

### QLPlayer class
The following class implements the QLearning player.

In [None]:
class QLPlayer(Player):
    """
    Implements a player that learns using the QLearning algorithm.
    """

    def __init__(self, player='X',
                 lr: float = 0.05,
                 discount: float = 0.99,
                 epsilon: Callable[[int], float] = lambda _: 0.05,
                 seed: int = 666):
        super().__init__(player, epsilon, seed)
        self.lr = lr
        self.discount = discount

        # Q-values grid
        # 3^9 = 19683 states and 9 actions
        self.qvalues: Grid = np.zeros((19683, 9))

        # Memory
        # Starts with None before any action and state are ever seen
        self.last_action: Union[None, int] = None
        self.last_state: Union[None, Grid] = None

    def act(self,
            grid: Grid,
            iteration: int = 0):
        # Remember the state for the next learning step
        state = QLPlayer.state_to_int(grid)
        self.last_state = state

        # Epsilon-greedy choice
        if self.rng_.random() < self.epsilon(iteration):
            chosen_action = self.randomMove(grid)
        else:
            # Retrieves the list of possible actions and converts them
            # from cell positions to integer indexes
            avail_actions = QLPlayer.positions_to_ints(Player.empty(grid))
            # Ranks ALL actions according to their Qvalues in the current
            # state
            actions_ranks = np.argsort(self.qvalues[state])[::-1]
            # Browses all actions in order of their qvalue rank, until
            # finding one that is available
            for action in actions_ranks:
                if action in avail_actions:
                    # Memorizes the action and the current state for the learning
                    # phase
                    chosen_action = int(action)
                    break

        # Remember the action for the learning step
        self.last_action = chosen_action
        return chosen_action

    def learn(self, reward, new_grid, end):
        """
        Updates the Qvalues based on the last (S, A) pair and
        the received reward and the new state.
        """
        # If the new_grid is a final state, we can't compute its expected optimal
        # qvalue. We instead set it to zero.
        if end:
            new_state_qval: int = 0
        else:
            # Computes the optimal Qvalue in the new state max Q(s', a)
            new_state: int = QLPlayer.state_to_int(new_grid)
            new_state_qval: np.float64 = np.max(self.qvalues[new_state])

        # QValue that needs to be updated Q(s, a)
        current_qval: np.float64 = self.qvalues[self.last_state, self.last_action]

        self.qvalues[self.last_state, self.last_action] += self.lr * (reward + self.discount * new_state_qval - current_qval)

    @staticmethod
    def position_to_int(position: Tuple[int, int]) -> int:
        """
        (row col) -> row*3 + col
        """
        return position[0] * 3 + position[1]

    @staticmethod
    def positions_to_ints(positions: List[Tuple[int, int]]) -> List[int]:
        """
        Given a list of cells [(a, b), (c, d), ..],
        returns the list of the corresponding indexes.
        """
        return [QLPlayer.position_to_int(cell) for cell in positions]

    @staticmethod
    def state_to_int(grid: Grid) -> int:
        """
        Converts a grid state to the index of its
        row in the lookup table.
        """
        # Converts the grid values from -1, 0, 1 to 0, 1, 2 (a base 3 number)
        # Then converts the base 3 number to base 10
        return int((np.ravel(grid) + 1) @ np.array([3 ** i for i in range(9)]))

    @staticmethod
    def int_to_state(state_int: int) -> Grid:
        """
        Converts the index of row in the qvalues table to
        its corresponding state.
        """
        # Converts from base 10 to base 3
        return np.array([
            (state_int % (3 ** (i + 1))) // (3 ** i)
            for i in range(9)
        ]).reshape((3, 3)) - 1

## 2.1 Learning from experts

### Question 1

In [None]:
def play_games(learning_player: Union[Player, OptimalPlayer],
               benchmark_player: Union[Player, OptimalPlayer],
               nb_games: int = 20000,
               turns_swap: str = "switch",
               seed: int = 666,
               learn: bool = True,
               period_Ms: int = 0,
               progress=True) -> Tuple[npt.NDArray[np.int_],
                                            npt.NDArray[np.float32]]:
    """
    Plays a given number of games between two players, and returns the rewards.
    --learning_player: Player object implementing act(), learn(), update();
    --benchmark_player: Player object implementing act();
    --nb_games: How many games should be played;
    --turns_swap: str, either "switch" to switch turns after every game,
                    or "random".
    --seed: random seed.
    """
    turns = np.array(['X', 'O'])
    learning_player.set_player(turns[0])
    benchmark_player.set_player(turns[1])
    rewards: List[int] = []
    env = TictactoeEnv()
    game_swap = nb_games // 2 if turns_swap=='half' else None
    if period_Ms:
        Ms = []
    else:
        Ms = None
    if not progress:
        iterator = range(nb_games)
    else:
        iterator = trange(nb_games)

    for game in iterator:
        # Sets up the environment for the game
        env.reset()
        grid: Grid = env.observe()[0]
        if turns_swap == "switch":
            learning_player.set_player(j=game)
            benchmark_player.set_player(j=game + 1)
        elif turns_swap == "half":
            learning_player.set_player(j=int(game < game_swap))
            learning_player.set_player(j=int(game >= game_swap))
        else:
            turns = np.random.shuffle(turns)
            learning_player.set_player(turns[0])
            benchmark_player.set_player(turns[1])

        while True:
            # Action step
            if env.current_player == learning_player.player:
                move: int = learning_player.act(grid, game)
            else:
                move: int = benchmark_player.act(grid)

            grid, end, winner = env.step(move, print_grid=False)
            reward: int = env.reward(learning_player.player)

            # Learning step
            # The agent learns only after the other has played, as from the
            # point of view of the agent, the next state is not the one right after
            # its move, but the next state in which the agent will need to make a decision.
            # if current player == learning player means the benchmark player just played !
            if (env.current_player == learning_player.player or end) and learn:
                learning_player.learn(reward, grid, end)

            if end:
                rewards.append(reward)
                break

        if period_Ms * game and game % period_Ms == 0:
            Ms.append(get_Ms(learning_player,
                             n_test=500,
                             seed=seed))

    return np.array(rewards), np.array(Ms)


def get_Ms(learning_player: Union[Player, OptimalPlayer],
           n_test: int = 500,
           seed: int = 666) -> List[float]:
    rewards_opt: npt.NDArray[np.int_] = \
        play_games(learning_player=learning_player,
                   benchmark_player=OptimalPlayer(0.),
                   nb_games=n_test,
                   turns_swap='half',
                   seed=seed,
                   learn=False,
                   period_Ms=0,
                   progress=False)[0]
    rewards_rand: npt.NDArray[np.int_] = \
        play_games(learning_player=learning_player,
                   benchmark_player=OptimalPlayer(1.),
                   nb_games=n_test,
                   turns_swap='half',
                   seed=seed,
                   learn=False,
                   period_Ms=0,
                   progress=False)[0]
    return [rewards_opt.mean(), rewards_rand.mean()]

In [None]:
def constant_epsilon(value):
    """
    Returns a function for epsilon whose value
    is independent of the iteration.
    """
    return lambda iteration: value

In [None]:
nb_games = 20000
# Defines a QLearning Agent with constant epsilon of 0.05
qlplayer = QLPlayer(epsilon=constant_epsilon(0.05), lr=0.05, discount=0.99)
semi_random_player = OptimalPlayer(0.5)
rewards_21 = play_games(qlplayer, semi_random_player, nb_games=nb_games)

In [None]:
window_size = 250
plt.plot(windowed_avg(rewards_21, window_size))
plt.ylabel('Average final reward over 250 games')
plt.ylim(-1, 1)
plt.xlabel('Games played')
xticks = np.arange(nb_games // window_size + 1, step=10)
plt.xticks(xticks, xticks * window_size)
plt.suptitle("QLearning vs Optimal(0.5)")
plt.show()

### 2.1.1 Decreasing exploration

In [None]:
def decreasing_epsilon(n_star: int,
                       e_min: float = .1,
                       e_max: float = .8) -> Callable[[int], float]:
    return lambda n: max(e_min, e_max * (1. - float(n) / float(n_star)))

In [None]:
nb_games = 20000
list_Ms_plots = []
for n_star, c in zip((1, 10, 100, 1000, 10000, 20000),
                     ('r', 'b', 'g', 'c', 'k', 'y')):
    print("Launch games for n*=", n_star)
    qlplayer = QLPlayer(epsilon=decreasing_epsilon(n_star))
    semi_random_player = OptimalPlayer(0.5)
    rewards_21, Ms = play_games(qlplayer,
                                semi_random_player,
                                nb_games=nb_games,
                                period_Ms=250)
    print("End of games for n*=", n_star)
    plt.plot(windowed_avg(rewards_21, window_size),
             label=f'$n^*=${n_star}', c=c)
    list_Ms_plots.append(Ms)
plt.ylabel('Average final reward over 250 games for variable $\\epsilon(n)$')
plt.ylim(-1, 1)
plt.xlabel('Games played')
xticks = np.arange(nb_games // window_size + 1, step=10)
plt.xticks(xticks, xticks * window_size)
plt.suptitle("QLearning vs Optimal(0.5)")
plt.legend()
plt.show()

# 3. Deep Q-Learning
All of the following implementations will be based on the PyTorch RL Tutorial:  
https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.htmlhttps://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

In [None]:
from dqnplayer import DQNPlayer

We'll try to work on a GPU if one is available to PyTorch:

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

## Game function for DQN
We'll now write the function that wraps the whole training loop.  
First, we need two functions to manipulate states and make them coherent with the format expected by the DQN agent:

In [None]:
def grid33_to_332(grid, player, player2value):
    """
    Converts a grid in 3x3 shape whose values are -1, 0 and 1
    to the format expected by the DQN player, as a 3x3x2 array.
    --player: either 'X' or 'O', which player the DQN agent is.
    --player2value: player (X or O) to index (-1 or 1) association,
        obtained from the environment.
    """
    grid_332 = np.zeros((3, 3, 2))
    # Get the value in the original grid corresponding to the player
    # played by the dqn agent:
    player_ind = player2value[player]

    grid_332[grid == player_ind, 0] = 1
    grid_332[grid == -player_ind, 1] = 1
    return grid_332

def grid_to_tensor(grid, env):
    """
    Converts the numpy array grid returned by the environment to a torch
    tensor in the format expected by the DQN agent.
    """
    grid_tensor = grid33_to_332(grid, dqn_player.player, env.player2value)
    grid_tensor = torch.tensor(grid_tensor, device=device).unsqueeze(0).float()
    return grid_tensor

We can now write the training loop:

In [None]:
def play_games_dqn(dqn_player, benchmark_player, nb_games=20000,
                   games_between_updates=500,
                   seed=666):
    """
    Plays a given number of games between two players, and returns the rewards.
    --dqn_player: Instance of DQNPlayer to train;
    --benchmark_player: Player object implementing act();
    --nb_games: How many games should be played;
    --games_between_updates: how many games are played between two updates of the agent's
        target network.
    --seed: random seed.
    Returns two arrays: rewards, losses
    """
    rewards, losses = [], []
    env = TictactoeEnv()

    for game in trange(nb_games):
        # Sets up the environment for the game
        env.reset()
        grid, _, _ = env.observe()

        # Switch turns
        dqn_player.set_player(j=game)
        benchmark_player.set_player(j=game + 1)

        # Convert the grid from the env's format to that expected by the agent
        grid_tensor = grid_to_tensor(grid, env)

        game_losses = []

        while True:
            # Action step
            # We now need to account for the case where the agent chooses
            # an unavailable position.
            try:
                if env.current_player == dqn_player.player:
                    move = dqn_player.act(grid_tensor)
                else:
                    move = benchmark_player.act(grid)

                grid, end, winner = env.step(move, print_grid=False)
                grid_tensor = grid_to_tensor(grid, env)
                reward = env.reward(dqn_player.player)

            except ValueError:
                # Being here means the agent chose an impossible action
                # Stop the game and set the reward for the agent to -1
                end = True
                reward = -1

            # Learning step
            # The DQN agent must have played at least once to start learning
            if env.current_player == dqn_player.player or end:
                # For the DQN agent, final states will be represented as None
                if end:
                    grid_tensor = None
                game_losses.append(dqn_player.learn(torch.tensor([reward], device=device),
                                               grid_tensor))

            if end:
                losses.append(np.mean(game_losses))
                rewards.append(reward)
                break

        # Update the agent's target network if required
        if game % games_between_updates == 0:
            dqn_player.update()

    return np.array(rewards), np.array(losses)

## 3.2 Learning from experts

In [None]:
semi_opt_player = OptimalPlayer(0.5)
dqn_player = DQNPlayer(device, epsilon=0.01)

# Trains the DQN Agent by playing a semi-optimal player
nb_games = 20000
rewards, losses = play_games_dqn(dqn_player, semi_opt_player, nb_games=nb_games)

# Plots the window-averaged rewards
window_size = 250
fig, ax = plt.subplots(nrows=2, ncols=1)
ax[0].plot(windowed_avg(rewards, window_size))
ax[0].set_ylabel('Average final reward over 250 games')
ax[0].set_ylim(-1, 1)
xticks = np.arange(nb_games // window_size + 1, step=10)
labels = [str(k) for k in xticks * window_size]
ax[0].set_xticks(xticks)
ax[0].set_xticklabels(labels)

# Plots average game losses
ax[1].plot(windowed_avg(losses, window_size))
ax[1].set_ylabel('Average loss over 250 games')
ax[1].set_xticks(xticks)
ax[1].set_xticklabels(labels)
plt.tight_layout()
plt.show()