<a href="https://colab.research.google.com/github/albertaillet/Miniproject1/blob/master/Deep_Q-Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [59]:
import os
if "Miniproject1" not in os.getcwd():
  !git clone https://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@github.com/albertaillet/Miniproject1/
  %cd Miniproject1/
%ls

Deep_Q-Learning.ipynb    plot_table.py         README.md
[0m[01;34mimages[0m/                  [01;34m__pycache__[0m/          tic_env.py
MP_TicTocToe.pdf         Q-learning_DAM.ipynb  tic_tac_toe.ipynb
performance_measures.py  Q-learning.ipynb


In [60]:
import random
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from tic_env import TictactoeEnv, OptimalPlayer

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

## 3. Deep Q-Learning

As our 2nd algorithm, we use Deep Q-Learning (DQN) combined with $\epsilon$-greedy policy. You can watch again Part 1 of Deep Reinforcement Learning Lecture 1 for an introduction to DQN and Part 1 of Deep Reinforcement Learning Lecture 2 (in particular slide 8) for more details. The idea in DQN is to approximate $Q$-values by a neural network instead of a look-up table as in Tabular Q-learning. For implementation, you can use ideas from the DQN tutorials of [Keras](https://keras.io/examples/rl/deep_q_network_breakout/) and [PyTorch](https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html).

### 3.1 Implementation details
State representation: We represent state $s_{t}$ by a $3 \times 3 \times 2$ tensor `x_t`. Each element of `x_t` takes a value of 0 or 1. The $3 \times 3$ matrix `x_t[:,:,0]` shows positions taken by you, and `x_t[:,:,1]` shows positions taken by your opponent. If `x_t[i, j, 0]=x_t[i, j, 1]=0`, then position $(i, j)$ is available.
Neural network architecture: We use a fully connected network. State `x_t` is fed to the network at the input layer. We consider 2 hidden layers each with 128 neurons - with ReLu activation functions. The output layer has 9 neurons (for 9 different actions) with linear activation functions. Each neuron at the output layer shows the $Q$-value of the corresponding action at state `x_t`.

Unavailable actions: For DQN, we do not constraint actions to only available actions. However, whenever the agent takes an unavailable action, we end the game and give the agent a negative reward of value $r_{\text {unav }}=-1$.

Free parameters: DQN has many hyper parameters. For convenience, we fix the discount factor at $\gamma=0.99$. We assume a buffer size of $10^{\prime} 000$ and a batch size of 64 . We update the target network every 500 games. Instead of squared loss, we use the Huber loss (with $\delta=1$ ) with Adam optimizer (c.f. the DQN tutorials of Keras and PyTorch). You can fine tune the learning rate if needed, but we suggest $5 \times 10^{-4}$ as a starting point.

Other options? There are tens of different ways to make training of deep networks more efficient. Do you feel like trying some and learning more? You are welcome to do so; you just need to explain the main features of your implementation and a brief summary of your reasoning in less than 300 words under the title 'Implementation details' in your report.

In [38]:
#@title Parameters

INPUT_SIZE = 18 #@param {type:"integer"}
HIDDEN_SIZE = 128 #@param {type:"integer"}
OUTPUT_SIZE = 9 #@param {type:"integer"}
GAMMA = 0.99  #@param {type:"number"}
ALPHA = 5e-4  #@param {type:"number"}
BUFFER_SIZE = 10000 #@param {type:"integer"}
BATCH_SIZE = 64 #@param {type:"integer"}
UPDATE_EVERY = 500 #@param {type:"integer"}

In [39]:
class DQN(nn.Module):

    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()        
        self.input_to_hidden1 = nn.Linear(input_size, hidden_size)
        self.hidden1_to_hidden2 = nn.Linear(hidden_size, hidden_size)
        self.hidden2_to_output = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = x.to(device)
        x = x.flatten()
        x = self.input_to_hidden1(x)
        x = F.relu(x)
        x = self.hidden1_to_hidden2(x)
        x = F.relu(x)
        x = self.hidden2_to_output(x)
        return x

The following cell creates a class for the replay buffer and uses code from [Replay Memory](https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html#replay-memory)

In [68]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayBuffer(object):

    def __init__(self, buffer_size, batch_size):
        self.buffer = deque([],maxlen=buffer_size)
        self.batch_size = batch_size

    def push(self, *args):
        """Save a transition"""
        self.buffer.append(Transition(*args))

    def get_batch(self, batch_size=None):
        if batch_size is None:
          batch_size = self.batch_size
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)

    def has_one_batch(self, batch_size=None):
        if batch_size is None:
          batch_size = self.batch_size
        return len(self) >= batch_size

The following cell creates a function to call to optimize the model and uses code from 
[Training loop](https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html#training-loop)

In [163]:
def optimize_model(model: nn.Module, buffer: ReplayBuffer, optimizer: optim.Optimizer):
    if not buffer.has_one_batch():
        return
    transitions = buffer.get_batch()
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = model(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = model(non_final_next_states).max(1)[0].detach()
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in model.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [224]:
class DeepEpsilonGreedy:
    def __init__(self, epsilon=0, n_actions=9):
        self.epsilon = epsilon
        self.n_actions = n_actions
    
    def set_epsilon(self, epsilon):
        self.epsilon = epsilon
    
    def act(self, state):
      if np.random.random() > self.epsilon:
          with torch.no_grad():
              # t.max(1) will return largest column value of each row.
              # second column on max result is index of where max element was
              # found, so we pick action with the larger expected reward.
              return model(state).max(0).indices.view(1)
      else:
          return torch.tensor([random.randrange(self.n_actions)], device=device, dtype=torch.long)

In [225]:
def state_to_tensor(state):
  t = np.zeros((3, 3, 2), dtype=np.int8)
  t[:, :, 0] = (state == 1)
  t[:, :, 1] = (state ==-1)
  return torch.tensor(t, dtype=torch.float32, device=device)

In [226]:
env = TictactoeEnv()
buffer = ReplayBuffer(BUFFER_SIZE, BATCH_SIZE)
model = DQN(INPUT_SIZE, HIDDEN_SIZE, OUTPUT_SIZE).to(device)
optimizer = optim.Adam(model.parameters())
epsilon_greedy = DeepEpsilonGreedy(epsilon=0.1, n_actions=OUTPUT_SIZE)

In [227]:
rewards = []
num_trained_games = 0
av_rewards = []

In [228]:
num_games = 20000
opponent_player = OptimalPlayer(0.5, player='O')
learning_player = 'X'
rewards = []

for itr in range(num_games):
    env.reset()
    state, end, _ = env.observe()

    opponent_player.player, learning_player = learning_player, opponent_player.player

    if opponent_player.player == 'X':
        opponent_move = opponent_player.act(state)
        state, end, _ = env.step(opponent_move)

    while not end:
        move = epsilon_greedy.act(state_to_tensor(state))
        valid_move = env.check_valid(move.item())
        if valid_move:
          next_state, end, _ = env.step(move.item())

        if valid_move and (not end) and (env.current_player == opponent_player.player):
            opponent_move = opponent_player.act(next_state)
            next_state, end, _ = env.step(opponent_move)
        
        reward = env.reward(player=learning_player)
        if not valid_move:
          reward = -1

        buffer.push(state_to_tensor(state), 
                    move, 
                    state_to_tensor(next_state), 
                    torch.tensor([reward], device=device))
            
        optimize_model(model, buffer, optimizer)

        state = next_state

    rewards.append(reward)
    if len(rewards) >= 50:
        av_rewards.append(np.mean(rewards))
        rewards = []
        plt.plot(av_rewards)
        plt.show()
    num_trained_games += 1
    env.reset()

RuntimeError: ignored

In [221]:
state = np.array([0, 0, 1, 0, -1, -1, 0, 0, 0]).reshape((3, 3))
state_to_tensor(state).dtype
a = model(state_to_tensor(state)).max(0).indices.view(1).item()
a

5

In [230]:
64*18

1152