<div class="alert alert-warning" role="alert">
    <h4 class="alert-heading">Reminder!</h4>
    <p>Our team uses Plotly to visualise reward in a continuous manner, therefore:</p>
    <ul>
      <li>If you have Plotly and ipywidgets installed in your environment, skip the following code cell</li>
      <li>Otherwise, run the following code cell and <strong>restart your jupyter notebook!</strong></li>
    <ul>
</div>

In [None]:
!pip install plotly
!pip install ipywidgets

In [None]:
import numpy as np
import random
import torch

from tic_env import *

import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
from plotly.subplots import make_subplots

### Helper functions

In [None]:
colours = px.colors.qualitative.Plotly

In [None]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [22]:
def render_figure_widge(mode=None):
    gofig = go.FigureWidget()
    gofig.update_xaxes(title_text='Epoch', autorange=True)
    gofig.update_yaxes(title_text='Avg. Reward', autorange=True)
    
    if mode == 'validation':
        gofig.update_yaxes(title_text='Diff. Rate', autorange=True)
        
    gofig.update_layout(width=1200, height=800, hovermode="x unified")
    
    # Only for question 11 and 12
    if mode == 'Reward+Loss':
        gofig = make_subplots(rows=1, cols=2, subplot_titles=['Average Reward', 'Training Loss'])
        gofig.update_layout(width=1400, height=500, hovermode="x unified")
        gofig = go.FigureWidget(gofig)
        
    return gofig

In [23]:
def init_figure_widge(fig_widge, mode, colour, epsilon_opt=None, epsilon_learn=None, n_star=None):
    # Mode epsilon only used for question 11
    if mode == 'epsilon':
        if not (epsilon_learn >= 0 and epsilon_learn < 1):
            raise Exception('Epsilon value empty or out of range')
        
        # For average reward - 0
        fig_widge.add_trace(go.Scatter(x=[0], y=[0], name=f'Avg. Reward = {epsilon_learn}', marker=dict(color=colour)), row=1, col=1)
        
        # For training loss - 1
        fig_widge.add_trace(go.Scatter(x=[0], y=[0], name=f'Tr. Loss = {epsilon_learn}', marker=dict(color=colour)), row=1, col=2)
        
    # Mode 'descrease exploration' only used for question 12
    elif mode == 'decrease exploration':
        if not isinstance(n_star,int):
            raise TypeError('Cannot find or recognise n_star')       
        
        # For average reward - 0
        fig_widge.add_trace(go.Scatter(x=[0], y=[0], name=f'Avg. Reward = {n_star}', marker=dict(color=colour)), row=1, col=1)
        
        # For training loss - 1
        fig_widge.add_trace(go.Scatter(x=[0], y=[0], name=f'Tr. Loss = {n_star}', marker=dict(color=colour)), row=1, col=2)
        
    elif mode == 'validate n*':
        fig_widge.add_scatter(x=[0], y=[0], name=f'M_opt n* = {n_star}', marker=dict(color=colour))
        fig_widge.add_scatter(x=[0], y=[0], name=f'M_rand n* = {n_star}', marker=dict(color=colour), line=dict(dash='dot'))
        
    elif mode == 'validate epsilon opt':
        fig_widge.add_scatter(x=[0], y=[0], name=f'M_opt epilson opt= {epsilon_opt}', marker=dict(color=colour))
        fig_widge.add_scatter(x=[0], y=[0], name=f'M_rand epilson opt= {epsilon_opt}', marker=dict(color=colour), line=dict(dash='dot'))
        
    else:
        raise NotImplementedError("Unable to recognise mode")
            
    return fig_widge

In [24]:
def check_available(grid, pos):
    """Input grid should be 2 * 3 * 3 tensor"""
    x, y = pos // 3, pos % 3
    if grid[0,x,y] == grid[1,x,y] == 0:
        return True
    return False

In [25]:
def grid2state(grid, learner_value):
    grid2d = np.tile(grid, (2,1)).reshape(2,3,3)
    if learner_value == "X":
        grid2d[0] = np.clip(grid2d[0], 0, 1)
        grid2d[1] = np.clip(-grid2d[1], 0, 1)
    else:
        grid2d[0] = np.clip(-grid2d[0], 0, 1)
        grid2d[1] = np.clip(grid2d[1], 0, 1)
    return torch.Tensor(grid2d).to(DEVICE)

In [26]:
def validation(epsilon_opt, epsilon_learn, DeepQLearner, epoch=500):
    env = TictactoeEnv()
    Turns = np.array(['O','X'])
    reward_sum = 0
    
    for i in range(epoch):
        np.random.seed(i)
        
        # Our policy makes the first move in the first 250 games
        if i < 250:
            Turns = np.array(['O','X'])
        else:
            Turns = np.array(['X','O'])
        
        env.reset()
        grid, _, __ = env.observe()
        player_opt = OptimalPlayer(epsilon=epsilon_opt, player=Turns[0])
        player_learn = DeepQLearner
        player_learn.epsilon = epsilon_learn
        player_learn.player = Turns[1]

        for j in range(9):
            if env.current_player == player_opt.player:
                action = player_opt.act(grid)
                grid, end, winner = env.step(action, print_grid=False)
            else:
                state = grid2state(grid, player_learn.player).to(DEVICE)
                action = player_learn.act(state)
                # new_position = player_learn.act(grid, q_library)
                if check_available(grid2state(grid, player_learn.player), action.item()):
                    grid, end, winner = env.step(action.item(), print_grid=False)
                else:
                    # End the game if the agent takes an unavailable action
                    end = True
                    # Give the agent a negative reward
                    reward = -1
                    
                    # # Opt wins the games
                    env.winner = player_opt.player

            if end:
                reward_sum += env.reward(player=player_learn.player)
                env.reset()
                break
                
    return reward_sum/epoch

In [27]:
def run_simulation(epsilon_opt, buffer_capacity=10000, batch_size=64, epoch=20000, plot_interval=250, 
                   colour='black', epsilon_learn_init=None, n_star=None, fig_widge=None, mode=None):

    any_epsilon = (epsilon_learn_init is None) and (n_star is None)
    assert not any_epsilon, "Both epsilon_learn and n_star is None! At least having one of them."

    env = TictactoeEnv()        
    Turns = np.array(['X','O'])
    plot_interval = 250

    # rendering in figure widge
    if fig_widge is not None:
        init_figure_widge(fig_widge, mode, colour, 
                          epsilon_opt=epsilon_opt, 
                          epsilon_learn=epsilon_learn_init, 
                          n_star=n_star)
        
    replay_memory = Buffer(capacity=buffer_capacity, batch_size=batch_size)
    player_opt = OptimalPlayer(epsilon=epsilon_opt, player=Turns[0])
    player_learn = DeepQAgent(epsilon=epsilon_learn_init, player=Turns[1])

    epsilon_min = 0.1
    epsilon_max = 0.8

    reward_sum = 0
    loss_sum = 0
    loss_step = 0
    average_rewards = []
    average_losses = []

    for i in range(epoch):
        # if mode == 'decrease exploration' or 'validation':
        if epsilon_learn_init is None:
            assert n_star is not None, "In this setting, we use n_star to tune the epsilon_learn."
            epsilon_learn = max(epsilon_min, epsilon_max*(1-(i+1)/n_star))
        else:
            epsilon_learn = epsilon_learn_init
        
        player_learn.epsilon = epsilon_learn

        env.reset()
        grid, _, _ = env.observe()

        # Switch Order
        Turns = Turns[::-1]
        player_opt.player = Turns[0]
        player_learn.player = Turns[1]
        env.current_player  = 'X'

        state = None
        next_state = None

        for j in range(9):
            if env.current_player == player_opt.player:
                action_opt = player_opt.act(grid)
                grid, end, winner = env.step(action_opt, print_grid=False)
                
                # Get reward
                reward = env.reward(player=player_learn.player)
                
                # In case opt plays first
                if j > 0:
                    next_state = grid2state(grid, player_learn.player)
            else:
                state = grid2state(grid, player_learn.player)
                action = player_learn.act(state)
                action = action.to(DEVICE)

                # Check the availability of current action.
                if check_available(grid2state(grid, player_learn.player), action.item()):
                    grid, end, winner = env.step(action.item(), print_grid=False)

                    # Get reward.
                    reward = env.reward(player=player_learn.player)
                else:
                    # End the game if the agent takes an unavailable action
                    end = True
                    unavailable_action = True
                    # Give the agent a negative reward
                    reward = -1
                    
                    # Opponent wins the games
                    env.winner = player_opt.player

                    
            if not end:
                # In case opt players first - next_state does not exist
                if next_state != None:
                    replay_memory.push(state.unsqueeze(0), action, next_state, torch.tensor([reward], device=DEVICE))
                    if len(replay_memory) >= replay_memory.batch_size:
                        loss = player_learn.train(replay_memory)
                        loss_step += 1
                        loss_sum += loss
                    next_state = None
                    
            if end:
                # Once the game ends, no matter which player plays first
                # Update is the same.
                if env.winner == player_opt.player:
                    # If opt wins the game, reward is guaranteed to be update-to-date
                    reward = -1
                elif env.winner == player_learn.player:
                    # our agent wins.
                    reward = 1
                else:
                    # Draw
                    reward = 0

                next_state = None
                # Update target model every 500 epoch
                update_target = False

                if (i+1) % 500 == 0:
                    update_target = True
                replay_memory.push(state.unsqueeze(0), action, next_state, torch.tensor([reward], device=DEVICE))
                if len(replay_memory) >= replay_memory.batch_size:
                    loss = player_learn.train(replay_memory, update_target)
                    loss_step += 1
                    loss_sum += loss
                    
                reward_sum += reward
                env.reset()
                break       
            
        #############################
        ######### Plot ##############

        if (i+1) % plot_interval == 0:
            # calculate average reward at the end of the current interval.
            average_reward = reward_sum / plot_interval  
            average_loss = loss_sum / loss_step
            # print(average_reward)  
            # print(average_loss)
            
            if not 'validate' in mode:
                idx = len(fig_widge.data)-2
                
                fig_widge.data[idx].x = np.append(fig_widge.data[idx].x, i+1)[0:]
                fig_widge.data[idx].y = np.append(fig_widge.data[idx].y, average_reward)[0:]
                
                fig_widge.data[idx+1].x = np.append(fig_widge.data[idx+1].x, i+1)[0:]
                fig_widge.data[idx+1].y = np.append(fig_widge.data[idx+1].y, average_loss.item())[0:]
                
            else:
                M_opt = validation(0, 0, player_learn, epoch=500)
                M_rand = validation(1, 0, player_learn, epoch=500)
                
                idx = len(fig_widge.data)-2
                fig_widge.data[idx].x = np.append(fig_widge.data[idx].x, i+1)[0:]
                fig_widge.data[idx].y = np.append(fig_widge.data[idx].y, M_opt)[0:]
                fig_widge.data[idx+1].x = np.append(fig_widge.data[idx+1].x, i+1)[0:]
                fig_widge.data[idx+1].y = np.append(fig_widge.data[idx+1].y, M_rand)[0:]
                fig_widge.layout.title.text = f'Epoch {i+1}, M_opt = {M_opt}, M_rand = {M_rand}'

            average_rewards.append(average_reward)
            average_losses.append(average_loss)
            # reset reward_sum.
            reward_sum = 0
            loss_sum = 0

In [28]:
def self_practice_simulation(epsilon_opt=None, epoch=20000, plot_interval=250, 
                   colour='black', epsilon_learn_init=None, n_star=None, fig_widge=None, mode=None):

    any_epsilon = (epsilon_learn_init is None) and (n_star is None)
    assert not any_epsilon, "Both epsilon_learn and n_star is None! At least having one of them."

    env = TictactoeEnv()        
    Turns = np.array(['X','O'])
    plot_interval = 250

    # rendering in figure widge
    if fig_widge is not None:
        if n_star is None:
            fig_widge.add_scatter(x=[0], y=[0], name=f'M-opt vs. Q-agent, epsilon = {epsilon_learn_init}', marker=dict(color=colour))
            fig_widge.add_scatter(x=[0], y=[0], name=f'M-rand vs. Q-agent, epsilon = {epsilon_learn_init}', marker=dict(color=colour), line=dict(dash='dot'))
        else:
            fig_widge.add_scatter(x=[0], y=[0], name=f'M-opt vs. Q-agent, n* = {n_star}', marker=dict(color=colour))
            fig_widge.add_scatter(x=[0], y=[0], name=f'M-rand vs. Q-agent, n* = {n_star}', marker=dict(color=colour), line=dict(dash='dot'))
        
    replay_memory = Buffer(capacity=10000, batch_size=64)
    shared_model = FCN()
    # Use player_A as our agent. Need to exchange order when using player_B.
    player_A = DeepQAgent(epsilon=epsilon_learn_init, player='O', model=shared_model)
    player_B = DeepQAgent(epsilon=epsilon_learn_init, player='X', model=shared_model)

    epsilon_min = 0.1
    epsilon_max = 0.8

    reward_sum_A = 0
    loss_sum_A = 0
    loss_step_A = 0
    average_rewards_A = []
    average_losses_A = []

    for i in range(epoch):
        # if mode == 'decrease exploration' or 'validation':
        if epsilon_learn_init is None:
            assert n_star is not None, "In this setting, we use n_star to tune the epsilon_learn."
            epsilon_learn = max(epsilon_min, epsilon_max*(1-(i+1)/n_star))
        else:
            epsilon_learn = epsilon_learn_init
        
        player_A.epsilon = epsilon_learn
        player_B.epsilon = epsilon_learn

        env.reset()
        grid, _, _ = env.observe()

        # Switch Order
        # Turns = Turns[::-1] 
        # env.current_player = Turns[0]
        Turns = Turns[::-1]
        player_A.player = Turns[0]
        player_B.player = Turns[1]
        env.current_player  = 'X'

        state_A = None
        next_state_A = None
        state_B = None
        next_state_B = None

        for j in range(9):
            if env.current_player == player_A.player:
                # state_A is observed from the perspective of A.
                state_A = grid2state(grid, player_A.player)
                action_A = player_A.act(state_A)
                action_A = action_A.to(DEVICE)
                
                # Get reward
                if check_available(grid2state(grid, player_A.player), action_A.item()):
                    grid, end, winner = env.step(action_A.item(), print_grid=False)

                    # Get reward.
                    reward_A = env.reward(player=player_A.player)
                    # In case A plays first.
                    if j > 0:
                        next_state_B = grid2state(grid, player_B.player)
                else:
                    # End the game if the agent takes an unavailable action
                    end = True
                    # Give the agent a negative reward
                    reward_A = -1
                    
                    # # Opt wins the games
                    env.winner = player_B.player

            else:
                # state_B is observed from the perspective of B.
                state_B = grid2state(grid, player_B.player)
                action_B = player_B.act(state_B)
                action_B = action_B.to(DEVICE)

                if check_available(grid2state(grid, player_B.player), action_B.item()):
                    grid, end, winner = env.step(action_B.item(), print_grid=False)

                    # Get reward.
                    reward_B = env.reward(player=player_B.player)
                    # In case A plays first
                    if j > 0:
                        next_state_A = grid2state(grid, player_A.player)
                else:
                    # End the game if the agent takes an unavailable action
                    end = True
                    # Give the agent a negative reward
                    reward_B = -1
                    
                    # # Opt wins the games
                    env.winner = player_A.player
    
            if not end:
                # In case opt players first - next_state does not exist
                if next_state_A != None:
                    replay_memory.push(state_A.unsqueeze(0), action_A, next_state_A, torch.tensor([reward_A], device=DEVICE))
                    if len(replay_memory) >= replay_memory.batch_size:
                        loss_A = player_A.train(replay_memory)
                        loss_sum_A += loss_A
                        loss_step_A += 1
                    next_state_A = None
                
                if next_state_B != None:
                    replay_memory.push(state_B.unsqueeze(0), action_B, next_state_B, torch.tensor([reward_B], device=DEVICE))
                    if len(replay_memory) >= replay_memory.batch_size:
                        _ = player_B.train(replay_memory)
                    next_state_B = None
                          
            if end:
                # Once ending the game, no matter which player plays first
                # Update is the same
                if env.winner == player_A.player:
                    # If opt wins the game, reward is guaranteed to be update-to-date
                    reward_A = 1
                    reward_B = -1
                elif env.winner == player_B.player:
                    reward_A = -1
                    reward_B = 1
                else:
                    # Draw
                    reward_A = 0
                    reward_B = 0
                
                # If one agent wins the game, we don't actually real next_state
                # Because q(s', a') will be cancelled out.
                next_state_A = None
                next_state_B = None
                
                replay_memory.push(state_A.unsqueeze(0), action_A, next_state_A, torch.tensor([reward_A], device=DEVICE))
                replay_memory.push(state_B.unsqueeze(0), action_B, next_state_B, torch.tensor([reward_B], device=DEVICE))
                
                # Update target model every 500 epoch.
                update_target = False

                if (i+1) % 500 == 0:
                    update_target = True

                # Training model when game over, and replay_memory size >= batch size
                if len(replay_memory) >= replay_memory.batch_size:
                    loss_A = player_A.train(replay_memory, update_target)
                    loss_sum_A += loss_A
                    loss_step_A += 1
                    _ = player_B.train(replay_memory, update_target)

                    
                # always focus on the average reward of player_A.
                reward_sum_A += reward_A
                env.reset()
                break       
            
        #############################
        ######### Plot ##############

        if (i+1) % plot_interval == 0:
            # calculate average reward at the end of the current interval.
            average_reward_A = reward_sum_A / plot_interval  
            average_loss_A = loss_sum_A / loss_step_A
            print(average_reward_A)  
            print(average_loss_A)
            
            M_opt_1 = validation(0, 0, player_A, epoch=500)
            M_rand_1 = validation(1, 0, player_A, epoch=500)

            idx = len(fig_widge.data)-2
            fig_widge.data[idx].x = np.append(fig_widge.data[idx].x, i+1)[0:]
            fig_widge.data[idx].y = np.append(fig_widge.data[idx].y, M_opt_1)[0:]
            fig_widge.data[idx+1].x = np.append(fig_widge.data[idx+1].x, i+1)[0:]
            fig_widge.data[idx+1].y = np.append(fig_widge.data[idx+1].y, M_rand_1)[0:]
            fig_widge.layout.title.text = f'Epoch {i+1}, M_opt = {M_opt_1}, M_rand = {M_rand_1}'

            average_rewards_A.append(average_reward_A)
            average_losses_A.append(average_loss_A)
            # reset reward_sum.
            reward_sum_A = 0
            loss_sum_A = 0
            loss_step_A = 0

    return (shared_model, replay_memory)

In [29]:
def generate_text(lst):
    text = ['X' if x == 1 else 'O' if x == -1 else '' for x in lst]
    return np.flip(np.reshape(text, (3, 3)), 0)

In [30]:
def retrieve_avail_qv(q_values, states):
    return [val if (states[idx] == 0) else 0 for (idx, val) in enumerate(q_values)]

Question 11.

In [None]:
fw1 = render_figure_widge('Reward+Loss')
fw1

In [None]:
epsilon_opt = 0.5  # This epsilon value represents exploration level of optimal player. Fixed in Question 1.
epsilon_learn = [0.0, 0.2, 0.4, 0.8]  # This epsilon value represents exploration level of Q-learning agent. Defind by ourselves.
epoch = 20000  # number of games to play in Question 1.
plot_interval = 250  # The interval of games to calculate an average reward.
for idx, epsilon in enumerate(epsilon_learn):
    run_simulation(
        epsilon_opt=epsilon_opt,
        epsilon_learn_init=epsilon, 
        fig_widge=fw1,
        colour=colours[idx],
        mode='epsilon'
        )

Question 12.

In [20]:
fw2 = render_figure_widge('Reward+Loss')
fw2

FigureWidget({
    'data': [],
    'layout': {'annotations': [{'font': {'size': 16},
                         …

In [21]:
epsilon_opt = 0.5  # This epsilon value represents exploration level of optimal player. Fixed in Question 1.
epsilon_learn = [0.0, 0.2, 0.4, 0.8]  # This epsilon value represents exploration level of Q-learning agent. Defind by ourselves.
epoch = 20000  # number of games to play in Question 1.
plot_interval = 250  # The interval of games to calculate an average reward.
for idx, epsilon in enumerate(epsilon_learn):
    run_simulation(
        epsilon_opt=epsilon_opt,
        buffer_capacity=1,
        batch_size=1,
        epsilon_learn_init=epsilon, 
        fig_widge=fw2,
        colour=colours[idx],
        mode='epsilon'
        )

Question 13.

In [None]:
fw3 = render_figure_widge()
fw3

In [None]:
epsilon_opt = 0.5  # This epsilon value represents exploration level of optimal player.
n_stars = [1, 100, 1000, 2000, 5000, 10000, 20000, 40000]
epoch = 20000  # number of games to play in Question 1.
plot_interval = 250  # The interval of games to calculate an average reward.
for idx, n_s in enumerate(n_stars):
    run_simulation(
        epsilon_opt=epsilon_opt, 
        n_star=n_s,
        epoch=epoch, 
        plot_interval=plot_interval, 
        colour=colours[idx%len(colours)], 
        fig_widge=fw3, 
        mode="validate n*"
        )

In [None]:
from google.colab import output
output.enable_custom_widget_manager()

In [None]:
from google.colab import output
output.disable_custom_widget_manager()

Question 14

In [None]:
fw4 = render_figure_widge()
fw4

In [None]:
epsilon_opts = [.0, .2, .4, .6, .8]
best_n_star = 1000
for idx, epsilon_opt in enumerate(epsilon_opts):
    run_simulation(
        epsilon_opt=epsilon_opt, 
        n_star=best_n_star,
        fig_widge=fw4,
        colour=colours[idx%len(colours)],
        mode='validate epsilon opt'
        )

Question 16

In [None]:
fw5 = render_figure_widge('validation')
fw5

In [None]:
epsilon_learn = [0, 0.2, 0.4, 0.6, 0.8]
for idx, epsilon in enumerate(epsilon_learn):
    self_practice_simulation(
        colour=colours[idx%len(colours)],
        epsilon_learn_init=epsilon,
        fig_widge=fw5,
        )

Question 17

In [None]:
fw6 = render_figure_widge('validation')
fw6

In [None]:
n_stars = [1, 100, 1000, 5000, 10000, 20000]
libraries = []

for idx, n_s in enumerate(n_stars):
    libraries.append(self_practice_simulation(colour=colours[idx%len(colours)],n_star=n_s,fig_widge=fw6))

Question 19 

In [None]:
# Retrieve FCN model for qvalue prediction and memory for sampling
model, memory = self_practice_simulation(colour='black',n_star=1000,fig_widge=render_figure_widge('validation'))

In [None]:
fig = make_subplots(rows=1, cols=3)

for i in range(3):
    random_sample = memory.sample(1)
    state  = random_sample[0].state
    grid = state[0][0] - state[0][1]
    lst = grid_to_state(grid.cpu().detach().numpy())

    qvalue = model(state)[0].cpu().tolist()
    
    fig.add_trace(go.Heatmap(z=np.flip(np.reshape(retrieve_avail_qv(qvalue, lst), (3,3)), 0),
                         text= generate_text(lst), texttemplate="%{text}",
                         textfont={"size":20}, coloraxis='coloraxis'), row=1, col=i+1)
    
fig.update_layout(width=1200, height=500)
fig.update_xaxes(visible=False)       
fig.update_yaxes(visible=False)
fig.update_layout(coloraxis=dict(colorscale='RdBu', colorbar_thickness=23))
fig.show()