In [1]:
import numpy as np
import pandas as pd

import random

import multiprocessing as mp
from joblib import Parallel, delayed

from tic_env import TictactoeEnv, OptimalPlayer

import plotly.express as px
import plotly.graph_objects as go

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from collections import deque

from tqdm.notebook import tqdm, trange

In [3]:
class Qnetwork(nn.Module):
    def __init__(self, input_size=18, hidden_size1=128, hidden_size2=128, output_size=9):
        super(Qnetwork, self).__init__()
        self.flattener = nn.Flatten()
        self.inputLayer = nn.Linear(input_size, hidden_size1)
        self.fullyConnected = nn.Linear(hidden_size1, hidden_size2)
        self.outputLayer = nn.Linear(hidden_size2, output_size)

    def forward(self, x):
        x = self.flattener(x)
        x = F.relu(self.inputLayer(x))
        x = F.relu(self.fullyConnected(x))
        x = self.outputLayer(x)
        return x

In [4]:
class Agent():
    def __init__(self,epsilon=0.2,player='X', learningRate=0.05, discountFactor=0.99, n_max=100):
        if isinstance(epsilon, tuple):
            self.epsilon_min, self.epsilon_max = epsilon
            self.epsilon = self.epsilon_max
        else:
            self.epsilon = epsilon
            self.epsilon_min = epsilon
            self.epsilon_max = epsilon
        self.learningRate = learningRate
        self.discountFactor = discountFactor

        self.state = None
        self.action = None

        self.n = 0
        self.n_max = n_max

        self.isLearning = True

        self.player = player # 'X' or 'O'

    def decrease_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon_max * (1 - self.n / self.n_max))

    def set_player(self, player = 'X', j=-1):
        self.player = player
        if j != -1:
            self.player = 'X' if j % 2 == 0 else 'O'

    def empty(self, state):
        """ Return all empty positions. """
        availableActions = []
        for x in range(3):
            for y in range(3):
                position = (x, y)
                if state[position] == 0:
                    availableActions.append(position)
        return availableActions

    def randomAction(self, state):
        """ Choose a random action from the available options. """
        availableActions = self.empty(state)

        return random.choice(availableActions)
    
    def bestAction(self, state):
        pass

    def act(self,state):
        pass

    def learn(self, s_prime, reward, end=False):
        pass

In [87]:
class DQN_agent(Agent):
    """
    Our Q-network will be a simple linear neural network with two hidden layers.
    """
    def __init__(self, epsilon=0.2, player='X', learningRate=0.0005, discountFactor=1.0 , n_max=100, Qmodel=Qnetwork(), batch_size=64, C=500 , R=deque(maxlen=10_000), criterion=nn.HuberLoss()):
        super(DQN_agent, self).__init__()

        # If a GPU is available
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.Qmodel = Qmodel.to(self.device)
        self.Qtarget = Qnetwork().to(self.device)
        self.Qtarget.load_state_dict(self.Qmodel.state_dict())

        self.batch_size = batch_size

        self.t = 0
        self.C = C

        self.R = R

        # criterion is Huber loss (with delta = 1)
        self.criterion = criterion

        # optimizer is Adam
        self.optimizer = torch.optim.Adam(self.Qmodel.parameters(), lr=learningRate)        

    def bestAction(self, state):
        """
        Choose the available actions which have a maximum expected future reward
        using the Q-network.
        """
        # convert state to tensor, adding batch dimension
        with torch.no_grad():
            q_values = self.Qmodel.forward(state)
        return q_values.argmax(dim=1).item()

    def act(self, grid):
        """
        epsilon-greedy action selection, according to the Q-table.
        """
        state = torch.tensor(grid, dtype=torch.int64)
        state = F.one_hot(state+1,3)
        state = state[:,:,(2,0)]
        state = state.unsqueeze(0)
        state = state.type(torch.float).to(self.device)
        self.state = state

        # whether move in random or not
        if random.random() < self.epsilon:
            action = self.randomAction(grid)
            self.action = action[0] * 3 + action[1]
        else:
            # Get the best move
            self.action = self.bestAction(self.state)
            # action is a tuple of (x, y) from self.action
            action = (self.action // 3, self.action % 3)


        return action

    def learn(self, grid, reward, end=False):
        if self.isLearning:
            if not end:
                s_prime = torch.tensor(grid, dtype=torch.int64)
                s_prime = F.one_hot(s_prime+1,3)
                s_prime = s_prime[:,:,(2,0)]
                s_prime = s_prime.unsqueeze(0)
                s_prime = s_prime.type(torch.float).to(self.device)

                self.R.append((self.state, self.action, reward, s_prime))
            else:
                self.R.append((self.state, self.action, reward, None))

                self.state = None
                self.action = None

                self.n += 1
                self.decrease_epsilon()
            # self.R is a deque with maxlen=buffer_size so it auto pop

            if len(self.R) < self.batch_size:
                batch = self.R
                maxQtarget = torch.zeros(len(self.R)).to(self.device)
            else:
                # sample random minibatch from self.R
                batch = random.sample(self.R, self.batch_size)
                maxQtarget = torch.zeros(self.batch_size).to(self.device) 

            # convert to tensor
            states = torch.cat([x[0] for x in batch]).to(self.device)
            actions = [x[1] for x in batch]
            rewards = torch.tensor([x[2] for x in batch]).to(self.device)
            s_primes = torch.cat([x[3] for x in batch if x[3] is not None]).to(self.device) 
            s_prime_mask = torch.tensor([x[3] is not None for x in batch], dtype=torch.bool).to(self.device)  

            self.optimizer.zero_grad()

            Q_theta_sj_aj = self.Qmodel.forward(states)[torch.arange(len(actions)),actions]
            
            maxQtarget[s_prime_mask] = self.Qtarget.forward(s_primes).max(dim=1).values.detach()

            loss = self.criterion(Q_theta_sj_aj, rewards + self.discountFactor*maxQtarget)

            loss.backward()
            self.optimizer.step()

            self.t += 1
            if self.t == self.C:
                self.t = 0
                self.Qtarget.load_state_dict(self.Qmodel.state_dict())
        
        elif end:
            self.state = None
            self.action = None

In [88]:
def play_games(player_opt, agent, maxGames=20_000, env=TictactoeEnv()):
    Turns = np.array(['X','O'])
    winnerList = np.zeros(maxGames)

    pBar = trange(maxGames)
    for nbGames in range(maxGames):
        env.reset()
        grid, _, __ = env.observe()

        player_opt.player = Turns[nbGames%2]
        agent.player = Turns[(nbGames+1)%2]

        for roundGame in range(9):
            if env.current_player == player_opt.player:
                if roundGame > 1 and isinstance(player_opt, Agent):
                    player_opt.learn(grid, 0)
                move = player_opt.act(grid)   
                badMove = env.grid[move] != 0
                if badMove:
                    if not player_opt.isLearning:
                        print("A player should continue to learn before playing for real.")
                        break
                while badMove:
                    player_opt.learn(grid, -1)
                    move = player_opt.act(grid)
                    badMove = env.grid[move] != 0
            else:
                if roundGame > 1 and isinstance(agent, Agent):
                    agent.learn(grid, 0)
                move = agent.act(grid)   
                badMove = env.grid[move] != 0
                if badMove:
                    if not agent.isLearning:
                        print("A player should continue to learn before playing for real.")
                        break
                while badMove:
                    agent.learn(grid, -1)
                    move = agent.act(grid)
                    badMove = env.grid[move] != 0
                             

            grid, end, winner = env.step(move, print_grid=False)

            if end:
                if winner == agent.player:
                    winnerList[nbGames] = 1
                    if isinstance(player_opt, Agent):
                        player_opt.learn(grid, -1, end=True)
                    if isinstance(agent, Agent):
                        agent.learn(grid, 1, end=True)
                elif winner == player_opt.player:
                    winnerList[nbGames] = -1
                    if isinstance(player_opt, Agent):
                        player_opt.learn(grid, 1, end=True)
                    if isinstance(agent, Agent):
                        agent.learn(grid, -1, end=True)
                else:
                    if isinstance(player_opt, Agent):
                        player_opt.learn(grid, 0, end=True)
                    if isinstance(agent, Agent):
                        agent.learn(grid, 0, end=True)
                break     
        pBar.update(1)
    pBar.close()
    env.reset()
    return winnerList

In [89]:
epsilon = 0.6

player_opt = OptimalPlayer(epsilon=0.5)
agent = DQN_agent(epsilon=epsilon)

winnerList = play_games(player_opt, agent, maxGames=20_000)

groupSize = 250
y=winnerList.reshape(winnerList.size//groupSize, groupSize).mean(axis=1)
x=np.arange(y.size)*groupSize

fig = px.line(x=x, y=y, title=f'Average reward over time of RL agent with policy epsilon={epsilon}')
fig.update_layout(xaxis_title='Game number', yaxis_title='Average reward')
fig.show()

  0%|          | 0/20000 [00:00<?, ?it/s]

.