In [149]:
import numpy as np
from tic_env import TictactoeEnv, OptimalPlayer
from collections import defaultdict
import matplotlib.pyplot as plt
import scipy.stats as st
import time
import os
import pickle

In [150]:
# Configurations
save_stats = False
save_figs = False

## 2. Q-Learning

In [151]:
def measure_performance(player_1, player_2, num_episodes = 500):
    """
    Measures performance of player 1 against player 2 (percentage of wins player 1)
    TODO
    """
    meas = 0
    turns = np.array(['X','O'])
    env = TictactoeEnv()
    for itr in range(num_episodes):
        env.reset()
        grid, _, __ = env.observe()
        player_1.set_player(turns[itr%2])
        player_2.set_player(turns[(itr+1)%2])
        while not env.end:
            if env.current_player == player_1.player:
                move = player_1.act(grid)
            else:
                move = player_2.act(grid)
            grid, _, _ = env.step(move, print_grid=False)
        meas += env.reward(player=player_1.player)
    return meas/num_episodes

def running_average(vec, windows_size = 250):
    idx = np.arange(0,len(vec),windows_size)
    return [np.sum(vec[i:i+windows_size])/windows_size for i in idx], idx + windows_size

def encode_state(state):
    return state.tobytes()

def available(grid):
    """
    TODO
    """
    avail_indices = []
    avail_mask = [False] * 9
    for i in range(9):
        pos = (int(i/3), i % 3)
        if grid[pos] == 0:
            avail_indices.append(i)
            avail_mask[i] = True
    return avail_indices, avail_mask

def epsilon_greedy_action(grid, Q, epsilon, num_actions = 9):
    """
    TODO
    """
    avail_indices, avail_mask = available(grid)

    if np.random.uniform(0,1) < epsilon:
        return avail_indices[np.random.randint(0, len(avail_indices))]
    else:
        q = Q[encode_state(grid)]
        q[np.logical_not(avail_mask)] = np.nan
        max_indices = np.argwhere(q == np.nanmax(q))
        return int(max_indices[np.random.randint(0, len(max_indices))])

class QPlayer:
    """
    TODO
    """
    def __init__(self, Q, player='X'):
        self.Q = Q
        self.player = player  # 'x' or 'O'

    def set_player(self, player='X', j=-1):
        self.player = player
        if j != -1:
            self.player = 'X' if j % 2 == 0 else 'O'

    def act(self, grid, **kwargs):
        """
        TODO
        """
        return epsilon_greedy_action(grid, self.Q, 0)


def q_learning(env, alpha = 0.5, gamma = 0.99, num_episodes = 20000, epsilon_exploration = 0.1, epsilon_exploration_rule = None, epsilon_opt = 0.5, test_freq = None, verbose = False):
    """
    TODO
    """
    turns = np.array(['X','O'])
    # Q-values map
    # Dictionary that maps the np.ndarray.tobyte() representation of the grid to an array of action values
    Q = defaultdict(lambda: np.zeros(9))    # All Q-values are initialized to 0
    # Stats of training
    episode_rewards = np.zeros(num_episodes)
    episode_Mopt = [measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=0.))]
    episode_Mrand = [measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=0.))]
    if verbose:
                print('Episode  0 :\tM_opt = ', episode_Mopt[0], '\tM_rand = ', episode_Mrand[0])
    # Rule for exploration
    if epsilon_exploration_rule is None:
        def epsilon_exploration_rule(n):
            return epsilon_exploration

    for itr in range(num_episodes):
        my_player = turns[itr % 2]
        player_opt = OptimalPlayer(epsilon=epsilon_opt, player=turns[(itr+1) % 2])
        env.reset()
        state, _, _ = env.observe()
        # First step of the adversarial
        if env.current_player == player_opt.player:
            move = player_opt.act(state)
            state, _, _ = env.step(move)
        action = epsilon_greedy_action(state, Q, epsilon_exploration_rule(itr+1))
        while not env.end:
            next_state, _, _ = env.step(action)     # Move according to the policy
            if not env.end:
                move = player_opt.act(next_state)   # Adversarial move
                next_state, _, _ = env.step(move)
            # Sarsa update rule
            reward = env.reward(player=my_player)
            if not env.end:
                next_action = epsilon_greedy_action(next_state, Q, epsilon_exploration_rule(itr+1))
                target = reward + gamma * Q[encode_state(next_state)][next_action]
            else:
                target = reward
            Q[encode_state(state)][action] += alpha * (target - Q[encode_state(state)][action])
            # Preparing for the next move
            episode_rewards[itr] = reward
            state = next_state
            action = next_action
        # Testing the performance
        if (test_freq is not None) and ((itr+1)%test_freq == 0):
            M_opt = measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=0.))
            M_rand = measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=1.))
            episode_Mopt.append(M_opt)
            episode_Mrand.append(M_rand)
            if verbose:
                print('Episode ', itr+1, ':\tM_opt = ', M_opt, '\tM_rand = ', M_rand)
    # Dictionary of stats
    stats = {
        'rewards': episode_rewards,
        'test_Mopt': episode_Mopt,
        'test_Mrand': episode_Mrand,
    }
    return Q, stats

### 2.1 Learning from experts

In [119]:
env = TictactoeEnv()

# Hyper-parameters
alpha = 0.05    # Learning rate
gamma = 0.99    # Discount factor
epsilon_opt = 0.5   # Optimal player's epsilon
num_episodes = 20000
epsilon_exploration = 0.01

Q, stats = q_learning(env, epsilon_exploration=epsilon_exploration)

KeyboardInterrupt: 

In [None]:
# Plotting the average reward for every 250 games during training
running_average_rewards, x = running_average(stats['rewards'])
plt.plot(x, running_average_rewards)
plt.ylim([-1,1])
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Average reward during training')
plt.show()

# Comparing the performance with the optimal player and the random player
turns = np.array(['X','O'])
player_opt = OptimalPlayer(epsilon=0.)
player_rand = OptimalPlayer(epsilon=1.)

names = ['Optimal', 'Random', 'Trained']
players = [OptimalPlayer(epsilon=0.), OptimalPlayer(epsilon=1.), QPlayer(Q=Q)]

for (name, player) in zip(names, players):
    print("\n-----", name, " player-----")
    m_opt = measure_performance(player, player_opt)
    print("M_opt = ", m_opt)
    m_rand = measure_performance(player, player_rand)
    print("M_rand = ", m_rand)

#### 2.1.1 Decreasing exploration

In [None]:
test_freq = 250
epsilon_min = 0.1
epsilon_max = 0.8
vec_n_star = np.hstack((np.array([1, 100, 500, 750]), np.round(np.logspace(3, np.log10(40000), 16))))
print(vec_n_star)
stats_dict_nstar = {}
for n_star in vec_n_star:
    print("------------- Training with n_star =", n_star, "-------------")
    def epsilon_exploration_rule(n):
        return np.max([epsilon_min, epsilon_max * (1 - n/n_star)])
    start = time.time()
    Q, stats = q_learning(env, epsilon_exploration_rule=epsilon_exploration_rule, test_freq=test_freq)
    M_opt = measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=0.), num_episodes=2000)
    M_rand = measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=1.), num_episodes=2000)
    print("M_opt =", M_opt)
    print("M_rand =", M_rand)
    stats_dict_nstar.update({n_star: (stats, M_opt, M_rand)})
    elapsed = time.time() - start
    print("Training with n_star =", n_star, " took:", time.strftime("%Hh%Mm%Ss", time.gmtime(elapsed)), "\n\n")

if save_stats:
    output_folder = os.path.join(os.getcwd(), 'results')
    os.makedirs(output_folder, exist_ok=True)
    fname = output_folder + '/stats_dict_nstar.pkl'
    with open(fname, 'wb') as handle:
        pickle.dump(stats_dict_nstar, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [152]:
def plot_stats(stats_dict, vec_var, var_name, var_legend_name, save = False):
    fig_reward, ax_reward = plt.subplots()
    fig_performance, ax = plt.subplots(1,2, figsize=(13.4,4.8))

    for var in vec_var:
        (stats, M_opt, M_rand) = stats_dict[var]
        # Plot of the average reward during training
        running_average_rewards, x_reward = running_average(stats['rewards'])
        ax_reward.plot(x_reward, running_average_rewards, label="$"+ var_legend_name +" = "+ str(var) +"$")
        # Plot of M_opt and M_rand during training
        x_performance = np.arange(0, len(stats['rewards'])+1, len(stats['rewards']) / (len(stats['test_Mopt']) - 1))
        ax[0].plot(x_performance, stats['test_Mopt'], label="$"+ var_legend_name +" = "+ str(var) +"$")
        ax[1].plot(x_performance, stats['test_Mrand'], label="$"+ var_legend_name +" = "+ str(var) +"$")
        print(var_name + " =", var,": \tM_opt = ", M_opt, "\tM_rand = ", M_rand)

    ax_reward.set_ylim([-1,1])
    ax_reward.set_xlabel('Episode')
    ax_reward.set_ylabel('Reward')
    ax_reward.set_title('Average reward during training')
    ax_reward.legend(loc='lower right')


    ax[0].hlines(y=0, xmin=x[0], xmax=x[-1], color='r', linestyle='--')
    ax[0].set_ylim([-1,0.1])
    ax[0].set_xlabel('Episode')
    ax[0].set_ylabel('$M_{opt}$')
    ax[0].set_title('$M_{opt}$ during training')
    ax[0].legend(loc='lower right')

    ax[1].set_ylim([-1,1])
    ax[1].set_xlabel('Episode')
    ax[1].set_ylabel('$M_{rand}$')
    ax[1].set_title('$M_{rand}$ during training')
    ax[1].legend(loc='lower right')
    plt.show()
    if save:
        output_folder = os.path.join(os.getcwd(), 'figures')
        os.makedirs(output_folder, exist_ok=True)
        fig_performance.savefig(output_folder + '/performance_'+var_name+'.png')
        fig_reward.savefig(output_folder + '/rewards_'+var_name+'.png')

In [None]:
plot_n_star = vec_n_star
plot_stats(stats_dict_nstar, plot_n_star, 'n_star', "n^{\star}", save=save_figs)

#### 2.1.2 Good experts and bad experts

In [None]:
n_star = 4000 # this should be picked from before not inserted manually
#vec_epsilon_opt = [0, 0.3, 0.5, 0.7, 1]
M = 10 # how much we want to stratify [0, 1]
# a reasonable choice for the epsilon vector here to me is
vec_epsilon_opt = np.hstack([np.array([0]), st.uniform.rvs(loc=np.arange(M) / M, scale=1/M), np.array([1])])
print(vec_epsilon_opt)
stats_dict_epsilon_opt = {}
for epsilon_opt in vec_epsilon_opt:
    print("------------- Training with epsilon_opt =", epsilon_opt, "-------------")
    start = time.time()
    Q, stats = q_learning(env, epsilon_opt=epsilon_opt, test_freq=test_freq)
    M_opt = measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=0.), num_episodes=2000)
    M_rand = measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=1.), num_episodes=2000)
    print("M_opt =", M_opt)
    print("M_rand =", M_rand)
    stats_dict_epsilon_opt.update({epsilon_opt: (stats, M_opt, M_rand)})
    elapsed = time.time() - start
    print("Training with epsilon_opt =", epsilon_opt, " took:", time.strftime("%Hh%Mm%Ss", time.gmtime(elapsed)))

if save_stats:
    output_folder = os.path.join(os.getcwd(), 'results')
    os.makedirs(output_folder, exist_ok=True)
    fname = output_folder + '/stats_dict_epsilon_opt.pkl'
    with open(fname, 'wb') as handle:
        pickle.dump(stats_dict_epsilon_opt, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
plot_epsilon_opt = vec_epsilon_opt
plot_stats(stats_dict_epsilon_opt, plot_epsilon_opt, "epsilon_opt", "\epsilon_{opt}", save=save_figs)

### 2.2 Learning by self-practice

In [163]:
def q_learning_self_practice(env, alpha = 0.5, gamma = 0.99, num_episodes = 20000,
               epsilon_exploration = 0.1, epsilon_exploration_rule = None, epsilon_opt = 0.5,
               test_freq = None, verbose = False):
    """
    TODO
    """
    turns = np.array(['X','O'])
    # Q-values map
    # Dictionary that maps the np.ndarray.tobyte() representation of the grid to an array of action values
    Q = defaultdict(lambda: np.zeros(9))    # All Q-values are initialized to 0
    # Stats of training
    episode_rewards = np.zeros(num_episodes)
    # playing against itself with the same Q-values
    episode_Mopt = [measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=0.))]
    episode_Mrand = [measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=1.))]
    if verbose:
                print('Episode  0 :\tM_opt = ', episode_Mopt[0], '\tM_rand = ', episode_Mrand[0])
    # Rule for exploration
    if epsilon_exploration_rule is None:
        def epsilon_exploration_rule(n):
            return epsilon_exploration

    for itr in range(num_episodes):
        my_player = turns[(itr + 1) % 2]
        adv_player = turns[itr % 2]
        env.reset()
        state, _, _ = env.observe()
        while not env.end:
            if env.current_player == adv_player and not env.end:
                action_adv = epsilon_greedy_action(state, Q, epsilon_exploration_rule(itr + 1))
                grid, _, _ = env.step(action_adv)
                adv_reward = env.reward(player=adv_player)
                my_reward = - 1 * adv_reward
                next_state_adv, _, _ = env.observe()
                if not env.end:
                    next_action_adv = epsilon_greedy_action(grid, Q, epsilon_exploration_rule(itr + 1))
                    target = adv_reward + gamma * Q[encode_state(next_state_adv)][next_action_adv]
                else:
                    target = adv_reward
                Q[encode_state(state)][action_adv] += alpha * (target - Q[encode_state(state)][action_adv])
                state = next_state_adv
            if env.current_player == my_player and not env.end:
                my_action = epsilon_greedy_action(state, Q, epsilon_exploration_rule(itr + 1))
                grid, _, _ = env.step(my_action)
                my_reward = env.reward(player=my_player)
                my_next_state, _, _ = env.observe()
                if not env.end:
                    my_next_action = epsilon_greedy_action(grid, Q, epsilon_exploration_rule(itr + 1))
                    target = my_reward + gamma * Q[encode_state(my_next_state)][my_next_action]
                else:
                    target = my_reward
                Q[encode_state(state)][my_action] += alpha * (target - Q[encode_state(state)][my_action]) 
                # Preparing for the next move
            episode_rewards[itr] = my_reward
            state = my_next_state
            action = my_next_action
        # Testing the performance
        if (test_freq is not None) and ((itr+1)%test_freq == 0):
            M_opt = measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=0.))
            M_rand = measure_performance(QPlayer(Q=Q), OptimalPlayer(epsilon=1.))
            episode_Mopt.append(M_opt)
            episode_Mrand.append(M_rand)
            if verbose:
                print('Episode ', itr+1, ':\tM_opt = ', M_opt, '\tM_rand = ', M_rand)
    # Dictionary of stats
    stats = {
        'rewards': episode_rewards,
        'test_Mopt': episode_Mopt,
        'test_Mrand': episode_Mrand,
    }
    return Q, stats

In [168]:
env = TictactoeEnv()

# Hyper-parameters
alpha = 0.05    # Learning rate
gamma = 0.99    # Discount factor
epsilon_opt = 0.5   # Optimal player's epsilon
epsilon_vec = [0, 0.3, 0.5, 0.7, 1]
stats_dic_eps = {}
for epsilon in epsilon_vec:
    print("------------- Training with epsilon =", epsilon, "-------------")
    Q, stats = q_learning_self_practice(env, alpha=alpha, gamma=gamma, num_episodes=10000,
                        epsilon_exploration=epsilon, test_freq = 1000, verbose=True)
    stats_dic_eps.update({epsilon: stats})

------------- Training with epsilon = 0 -------------
Episode  0 :	M_opt =  -0.912 	M_rand =  0.028
Episode  1000 :	M_opt =  -0.922 	M_rand =  0.092
Episode  2000 :	M_opt =  -0.922 	M_rand =  0.048
Episode  3000 :	M_opt =  -0.934 	M_rand =  0.07
Episode  4000 :	M_opt =  -0.914 	M_rand =  0.116
Episode  5000 :	M_opt =  -0.924 	M_rand =  0.1
Episode  6000 :	M_opt =  -0.932 	M_rand =  0.092
Episode  7000 :	M_opt =  -0.92 	M_rand =  0.02
Episode  8000 :	M_opt =  -0.932 	M_rand =  0.018
Episode  9000 :	M_opt =  -0.938 	M_rand =  0.07
Episode  10000 :	M_opt =  -0.93 	M_rand =  0.076
------------- Training with epsilon = 0.3 -------------
Episode  0 :	M_opt =  -0.93 	M_rand =  0.032
Episode  1000 :	M_opt =  -0.916 	M_rand =  0.182
Episode  2000 :	M_opt =  -0.982 	M_rand =  0.244
Episode  3000 :	M_opt =  -0.994 	M_rand =  0.32
Episode  4000 :	M_opt =  -0.978 	M_rand =  0.368
Episode  5000 :	M_opt =  -0.998 	M_rand =  0.428
Episode  6000 :	M_opt =  -0.998 	M_rand =  0.384
Episode  7000 :	M_opt 

## Deep Q-Learning