In [None]:
#| default_exp gym

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

In [None]:
#| hide
#| export
from itertools import cycle

import numpy as np
from fastcore.basics import patch

import gym
from gym import spaces

from matatena.core import *

# Gym Environment

Now that we've been able to implement the basic functionalities of the game in Python, our next step is to implement it as a `gym.Env` so that it can be used easily to train reinforcement learning models. As a starting point, we will be following the docs: https://www.gymlibrary.dev/content/environment_creation/.

They remind us to add the `metadata` attribute to specify the render-mode (`human`, `rgb_array` or `ansi`) and the framerate. Every environment should support the render-mode `None`, and you don't need to add it explicitly.

As we have almost defined the environment completelly before, we don't need to add a lot of information to this class (we can inherit from the one we defined before); but we have to explicitly define the attributes `self.observation_space` and `self.action_space`.

- `self.action_space`: Our agents can only choose them column in which they want to place the dice, so our action space is going to be restricted to a number between 0 and 2 (assuming the board has 3 columns, but could depend on it directly).

- `self.observation_space`: What does an agent see? It makes sense to provide all the information available: Its current board, the opponent's board and the dice it has to place. We can implement this easily with a `spaces.Dict`. The different boards can be encoded as `spaces.Box` with `dtype=np.uint8` so that they are discrete environments by with an array-like shape. It should work very similarly with a `spaces.MultiDiscrete` environment for example.

In [None]:
#| export
class MatatenaEnv(gym.Env, Game):
    """
    `gym`-ready implementation of `Game`.
    """
    metadata = {"render_modes":["human"],
                "render_fps":4}

    def __init__(self, **kwargs):
        super(MatatenaEnv, self).__init__(**kwargs)
        self.action_space = spaces.Discrete(self.board_size)
        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(low=0,  high=6, shape=(3,3), dtype=np.uint8),
                "opponent": spaces.Box(low=0,  high=6, shape=(3,3), dtype=np.uint8),
                "dice": spaces.Discrete(6)
            }
        )

In [None]:
matatena = MatatenaEnv()
matatena

Player 1 (0.0) | Player 2 (0.0) *
[[0. 0. 0.]    | [[0. 0. 0.]     
 [0. 0. 0.]    |  [0. 0. 0.]     
 [0. 0. 0.]]   |  [0. 0. 0.]]    

In [None]:
matatena.observation_space.sample()

OrderedDict([('agent',
              array([[6, 1, 3],
                     [6, 3, 6],
                     [3, 6, 5]], dtype=uint8)),
             ('dice', 2),
             ('opponent',
              array([[2, 1, 5],
                     [1, 5, 3],
                     [6, 2, 2]], dtype=uint8))])

In [None]:
matatena.action_space.sample()

0

# Reset

The `reset` method will be called to initiate a new episode. It should be called as well when  a `done` signal is issued by the environment to reset it. It must accept a `reset` parameter. 

It is recommended to use the random generator included when inheriting from `gym.Env`(`self.np_random`), but we need to remember to call `super().reset(seed=seed)` to make sure that the environment is seeded correctly. 

Finally, it must return a tuple of the initial observation and some auxiliary information (which will be `None` in our case).

In [None]:
#| export
@patch
def reset(self: MatatenaEnv,
          seed: int=None, # Seed to control the RNG.
          options=None # Additional options.
          ): # Initial state of the environment.
    """
    Reinitializes the environment and returns the initial state.
    """
    super(MatatenaEnv, self).reset(seed=seed)

    self.boards = np.zeros(shape=(self.n_players, self.board_size, self.board_size))
    self.current_player = self.choose_initial_player()
    self._players = cycle(range(self.n_players))
    opposite_players_mask = np.arange(self.boards.shape[0]) != self.current_player
    self.last_dice = np.random.choice(range(1,7))
    observation =  {
      "agent": self.boards[self.current_player],
      "opponent": self.boards[opposite_players_mask].squeeze(),
      "dice": self.last_dice,
    }
    info = None
    
    return (observation, info)

In [None]:
matatena = MatatenaEnv()
matatena

Player 1 (0.0) * | Player 2 (0.0)
[[0. 0. 0.]      | [[0. 0. 0.]   
 [0. 0. 0.]      |  [0. 0. 0.]   
 [0. 0. 0.]]     |  [0. 0. 0.]]  

In [None]:
matatena.reset()

({'agent': array([[0., 0., 0.],
         [0., 0., 0.],
         [0., 0., 0.]]),
  'opponent': array([[0., 0., 0.],
         [0., 0., 0.],
         [0., 0., 0.]]),
  'dice': 4},
 None)

In [None]:
matatena

Player 1 (0.0) * | Player 2 (0.0)
[[0. 0. 0.]      | [[0. 0. 0.]   
 [0. 0. 0.]      |  [0. 0. 0.]   
 [0. 0. 0.]]     |  [0. 0. 0.]]  

# Step

The `.step()` method contains the logic of the environment. Must accept an `action`, compute the state of the environment after applying the `action` and return a 4-tuple: `(observation, reward, done, info)`.

> In our case, the `action` should be the column in which the agent wants to place the rolled dice.

In [None]:
#| export

@patch
def step(self: MatatenaEnv,
         action, # Action to be executed on the environment. Should be the column in which the agent wants to place the dice.
         ): # (observation, reward, done, info) tuple.

    ## 1. Add the dice to the desired column
    self.add_dice(player=self.current_player,
                  column=action,
                  dice=self.last_dice)
    
    ## 2. Check if the game is done
    done = self.is_done()

    ## 3. Give rewards regarding if they win or not
    if done:
        scores = [self.score(player) for player in range(self.n_players)]
        reward = 1 if scores[self.current_player] == max(scores) else -1
    else:
        reward = 0

    ## 4. Roll a new dice and change current player
    self.last_dice = np.random.choice(range(1,7))  
    self._change_player()
    
    ## 5. Build new observation
    opposite_players_mask = np.arange(self.boards.shape[0]) != self.current_player
    observation =  {
      "agent": self.boards[self.current_player],
      "opponent": self.boards[opposite_players_mask].squeeze(),
      "dice": self.last_dice,
    }
    
    return observation, reward, done, None

# `Render`

> Lastly, only rendering the environment is left.

As we have previously built a quite decent `__repr__` method, we are going to only use that one. It would be nice to get something nicer runnig with *PyGame*, tho.

In [None]:
#| export

@patch
def render(self: MatatenaEnv):
    print(self.__repr__())

# Usage

> Simple usage examples.

In [None]:
env = MatatenaEnv()
obs, info = env.reset()
env.render()
print(f"Rolled dice is: {obs['dice']}")

Player 1 (0.0) | Player 2 (0.0) *
[[0. 0. 0.]    | [[0. 0. 0.]     
 [0. 0. 0.]    |  [0. 0. 0.]     
 [0. 0. 0.]]   |  [0. 0. 0.]]    
Rolled dice is: 6


In [None]:
action = env.action_space.sample()
print(f"Placing the dice in column: {action}")
obs, reward, done, info = env.step(action)
env.render()

Placing the dice in column: 2
Player 1 (0.0) * | Player 2 (6.0)
[[0. 0. 0.]      | [[0. 0. 6.]   
 [0. 0. 0.]      |  [0. 0. 0.]   
 [0. 0. 0.]]     |  [0. 0. 0.]]  


We can even perform a full game:

In [None]:
#| notest

env = MatatenaEnv()
obs, info = env.reset()
done = False

while not done:
    action = env.action_space.sample()
    obs, reward, done, info = env.step(action)
    env.render()

Player 1 (0.0) * | Player 2 (4.0)
[[0. 0. 0.]      | [[0. 0. 4.]   
 [0. 0. 0.]      |  [0. 0. 0.]   
 [0. 0. 0.]]     |  [0. 0. 0.]]  
Player 1 (2.0) | Player 2 (4.0) *
[[2. 0. 0.]    | [[0. 0. 4.]     
 [0. 0. 0.]    |  [0. 0. 0.]     
 [0. 0. 0.]]   |  [0. 0. 0.]]    
Player 1 (2.0) * | Player 2 (5.0)
[[2. 0. 0.]      | [[1. 0. 4.]   
 [0. 0. 0.]      |  [0. 0. 0.]   
 [0. 0. 0.]]     |  [0. 0. 0.]]  
Player 1 (3.0) | Player 2 (5.0) *
[[2. 0. 1.]    | [[1. 0. 4.]     
 [0. 0. 0.]    |  [0. 0. 0.]     
 [0. 0. 0.]]   |  [0. 0. 0.]]    
Player 1 (3.0) * | Player 2 (7.0)
[[2. 0. 1.]      | [[1. 2. 4.]   
 [0. 0. 0.]      |  [0. 0. 0.]   
 [0. 0. 0.]]     |  [0. 0. 0.]]  
Player 1 (5.0) | Player 2 (5.0) *
[[2. 2. 1.]    | [[1. 0. 4.]     
 [0. 0. 0.]    |  [0. 0. 0.]     
 [0. 0. 0.]]   |  [0. 0. 0.]]    
Player 1 (3.0) * | Player 2 (7.0)
[[2. 0. 1.]      | [[1. 2. 4.]   
 [0. 0. 0.]      |  [0. 0. 0.]   
 [0. 0. 0.]]     |  [0. 0. 0.]]  
Player 1 (8.0) | Player 2 (7.0) *
[[2. 0. 1.]   