# Sketchbook for Minesweeper solver with Stable Baselines3

References
> 1. [(medium) article for stable baselines](https://towardsdatascience.com/stable-baselines-a-fork-of-openai-baselines-reinforcement-learning-made-easy-df87c4b2fc82)
> 1. [(colab) example of medium article](https://colab.research.google.com/github/Stable-Baselines-Team/rl-colab-notebooks/blob/master/saving_loading_dqn.ipynb)
> 1. [(github) minesweeper gym environment](https://github.com/aylint/gym-minesweeper)

Helps
> 1. [(github) stable-baselines3](https://github.com/DLR-RM/stable-baselines3)
> 1. [(github) stable-baselines3-contrib](https://github.com/Stable-Baselines-Team/stable-baselines3-contrib)
> 1. [(github) stable-baselines](https://github.com/hill-a/stable-baselines)
> 1. [(doc) stable-baselines](https://stable-baselines.readthedocs.io/en/master/)
> 1. [(doc) stable-baselines3](https://stable-baselines3.readthedocs.io/en/master/index.html)
> 1. [(doc) stable-baselines3-contrib](https://sb3-contrib.readthedocs.io/en/master/index.html)

In [1]:
import sys, os

os.environ['CUDA_VISIBLE_DEVICES'] = '7'

## DQN of Stable Baselines3

In [2]:
import gym
import numpy as np
import torch
import torch.nn as nn

from stable_baselines3 import DQN, PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.logger import TensorBoardOutputFormat, configure
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
#from stable_baselines3.her.her_replay_buffer import HerReplayBuffer
from typing import Callable

import minesweeper_gym_env

In [89]:
import sys
from six import StringIO
from random import randint

import numpy as np
import gym
from gym import spaces

# cell values, non-negatives indicate number of neighboring mines
MINE = -1
CLOSED = -2


class MinesweeperModifiedEnv(gym.Env):
    metadata = {"render.modes": ["ansi", "human"]}

    def __init__(self, board_size=9, num_mines=10):
        """
        Create a minesweeper game.

        Parameters
        ----
        board_size: int     shape of the board
            - int: the same as (int, int)
        num_mines: int   num mines on board
        """

        self.board_size = board_size
        self.num_mines = num_mines
        self.board = self.place_mines(board_size, num_mines)
        self.my_board = np.ones((board_size, board_size), dtype=int) * CLOSED
        self.num_actions = 0

        self.observation_space = spaces.Box(low=-2, high=9,
                                            shape=(1, self.board_size, self.board_size), dtype=np.int)
        self.action_space = spaces.Discrete(self.board_size*self.board_size)
        self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)

    def board2str(self, board, end='\n'):
        """
        Format a board as a string

        Parameters
        ----
        board : np.array
        end : str

        Returns
        ----
        s : str
        """
        s = ''
        for x in range(board.shape[1]):
            for y in range(board.shape[2]):
                s += str(board[0][x][y]) + '\t'
            s += end
        #s += end
        return s[:-len(end)]


    def is_new_move(self, my_board, x, y):
        """ return true if this is not an already clicked place"""
        return my_board[0, x, y] == CLOSED


    def is_valid(self, x, y):
        """ returns if the coordinate is valid"""
        return (x >= 0) & (x < self.board_size) & (y >= 0) & (y < self.board_size)


    def is_win(self, my_board):
        """ return if the game is won """
        return np.count_nonzero(my_board == CLOSED) == self.num_mines


    def is_mine(self, board, x, y):
        """return if the coordinate has a mine or not"""
        return board[0, x, y] == MINE


    def place_mines(self, board_size, num_mines):
        """generate a board, place mines randomly"""
        mines_placed = 0
        board = np.zeros((1, board_size, board_size), dtype=int)
        while mines_placed < num_mines:
            rnd = randint(0, board_size * board_size)
            x = int(rnd / board_size)
            y = int(rnd % board_size)
            if self.is_valid(x, y):
                if not self.is_mine(board, x, y):
                    board[0, x, y] = MINE
                    mines_placed += 1
        return board

    def count_neighbour_mines(self, x, y):
        """return number of mines in neighbour cells given an x-y coordinate

            Cell -->Current Cell(row, col)
            N -->  North(row - 1, col)
            S -->  South(row + 1, col)
            E -->  East(row, col + 1)
            W -->  West(row, col - 1)
            N.E --> North - East(row - 1, col + 1)
            N.W --> North - West(row - 1, col - 1)
            S.E --> South - East(row + 1, col + 1)
            S.W --> South - West(row + 1, col - 1)
        """
        neighbour_mines = 0
        for _x in range(x - 1, x + 2):
            for _y in range(y - 1, y + 2):
                if self.is_valid(_x, _y):
                    if self.is_mine(self.board, _x, _y):
                        neighbour_mines += 1
        return neighbour_mines
    
    def possibility_neighbour_mine(self, x, y):
        """return possibility of mines in neighbour cells given an x-y coordinate
        """
        neighbour_mines = False
        rand = np.random.random() # random value b/w 0 & 1
        for _x in range(x - 1, x + 2):
            for _y in range(y - 1, y + 2):
                if self.is_valid(_x, _y):
                    if self.is_mine(self.board, _x, _y):
                        neighbour_mines = True
                        break
        if neighbour_mines:
            return 1
        return 1 if rand < 0.1 else 0

    def get_next_state(self, state, x, y):
        """
        Get the next state.

        Parameters
        ----
        state : (np.array)   visible board
        x : int    location
        y : int    location

        Returns
        ----
        next_state : (np.array)    next visible board
        game_over : (bool) true if game over

        """
        my_board = state
        mine_point = False
        #game_over = False
        if self.is_mine(self.board, x, y):
            my_board[0, x, y] = MINE
            mine_point = True
            #game_over = True
        else:
            #my_board[0, x, y] = self.count_neighbour_mines(x, y)
            my_board[0, x, y] = self.possibility_neighbour_mine(x, y)

        self.my_board = my_board
        #return my_board, game_over
        return my_board, mine_point

    def reset(self):
        """
        Reset a new game episode. See gym.Env.reset()

        Returns
        ----
        next_state : (np.array, int)    next board
        """
        self.my_board = np.ones((1, self.board_size, self.board_size), dtype=int) * CLOSED
        self.board = self.place_mines(self.board_size, self.num_mines)
        self.num_actions = 0
        self.valid_actions = np.ones((self.board_size * self.board_size), dtype=bool)

        return self.my_board

    def step(self, action):
        """
        See gym.Env.step().

        Parameters
        ----
        action : np.array    location

        Returns
        ----
        next_state : (np.array)    next board
        reward : float        the reward for action
        done : bool           whether the game end or not
        info : {}             {'valid_actions': valid_actions} - a binary vector,
                                where false cells' values are already known to observer
        """
        state = self.my_board
        x = int(action / self.board_size)
        y = int(action % self.board_size)

        # test valid action - uncomment this part to test your action filter if needed
        # if bool(self.valid_actions[action]) is False:
        #    raise Exception("Invalid action was selected! Action Filter: {}, "
        #                    "action taken: {}".format(self.valid_actions, action))

        next_state, reward, done, info = self.next_step(state, x, y)
        self.my_board = next_state
        self.num_actions += 1
        self.valid_actions = (next_state.flatten() == CLOSED)
        info['valid_actions'] = self.valid_actions
        info['num_actions'] = self.num_actions
        return next_state, reward, done, info

    def is_guess(self, my_board, x, y):
        for _x in range(x-1, x+2):
            for _y in range(y-1, y+2):
                if self.is_valid(_x, _y):
                    if not self.is_new_move(my_board, _x, _y):
                        if (x != _x) or (y != _y):
                            return False
        return True
                    

    def next_step(self, state, x, y):
        """
        Get the next observation, reward, done, and info.

        Parameters
        ----
        state : (np.array)    visible board
        x : int    location
        y : int    location

        Returns
        ----
        next_state : (np.array)    next visible board
        reward : float               the reward
        done : bool           whether the game end or not
        info : {}
        """
        my_board = state
        #win_or_lose = False
        reward = 0
        done = False
        t_b = False
        info = {'is_success': False}
        #if self.num_actions > my_board.shape[0] * my_board.shape[1]:
        #    reward = -0.1
            
        if not self.is_new_move(my_board, x, y):
            reward = -0.3
            return my_board, reward, False, info
        is_guess_b = self.is_guess(my_board, x, y) # if guess

        state, mine_point = self.get_next_state(my_board, x, y)

        my_board_flatten = my_board.flatten()
        board_flatten = self.board.flatten()
        if np.array_equal(np.where(my_board_flatten == CLOSED),
                          np.where(board_flatten == MINE)): # Win
            reward = 1
            done = True
            info['is_success'] = True
        elif (np.sum(my_board == CLOSED) == 0) or \
                (np.sum(board_flatten[np.where(my_board_flatten == CLOSED)] != MINE) == 0): # Lose
            #reward = 0
            done = True
        elif mine_point: # Step on a mine
            reward = -0.5
        elif is_guess_b: # Guess
            reward = -0.3
        else: # Progress
            reward = 0.5
            
        return state, reward, done, info

    def render(self, mode='human'):
        """
        See gym.Env.render().
        """
        outfile = StringIO() if mode == 'ansi' else sys.stdout
        s = self.board2str(self.my_board)
        outfile.write(s)
        if mode != 'human':
            return outfile

env = MinesweeperModifiedEnv(4, 3)
check_env(env)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  shape=(1, self.board_size, self.board_size), dtype=np.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)


In [225]:
import sys
from six import StringIO
from random import randint

import numpy as np
import gym
from gym import spaces

# default : easy board
BOARD_SIZE = 5
NUM_MINES = 3

# cell values, non-negatives indicate number of neighboring mines
MINE = -1
CLOSED = -2


def board2str(board, end='\n'):
    """
    Format a board as a string

    Parameters
    ----
    board : np.array
    end : str

    Returns
    ----
    s : str
    """
    s = ''
    for x in range(board.shape[1]):
        for y in range(board.shape[2]):
            s += str(board[0][x][y]) + '\t'
        s += end
    #s += end
    return s[:-len(end)]


def is_new_move(my_board, x, y):
    """ return true if this is not an already clicked place"""
    return my_board[0, x, y] == CLOSED


def is_valid(x, y):
    """ returns if the coordinate is valid"""
    return (x >= 0) & (x < BOARD_SIZE) & (y >= 0) & (y < BOARD_SIZE)


def is_win(my_board):
    """ return if the game is won """
    return np.count_nonzero(my_board == CLOSED) == NUM_MINES


def is_mine(board, x, y):
    """return if the coordinate has a mine or not"""
    return board[0, x, y] == MINE


def place_mines(board_size, num_mines):
    """generate a board, place mines randomly"""
    mines_placed = 0
    board = np.zeros((1, board_size, board_size), dtype=int)
    while mines_placed < num_mines:
        rnd = randint(0, board_size * board_size)
        x = int(rnd / board_size)
        y = int(rnd % board_size)
        if is_valid(x, y):
            if not is_mine(board, x, y):
                board[0, x, y] = MINE
                mines_placed += 1
    return board

class MinesweeperDiscreetEnv(gym.Env):
    metadata = {"render.modes": ["ansi", "human"]}

    def __init__(self, board_size=BOARD_SIZE, num_mines=NUM_MINES):
        """
        Create a minesweeper game.

        Parameters
        ----
        board_size: int     shape of the board
            - int: the same as (int, int)
        num_mines: int   num mines on board
        """

        self.board_size = board_size
        self.num_mines = num_mines
        self.board = place_mines(board_size, num_mines)
        self.my_board = np.ones((board_size, board_size), dtype=int) * CLOSED
        self.num_actions = 0

        self.observation_space = spaces.Box(low=-2, high=9,
                                            shape=(1, self.board_size, self.board_size), dtype=np.int)
        self.action_space = spaces.Discrete(self.board_size*self.board_size)
        self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)

    def count_neighbour_mines(self, x, y):
        """return number of mines in neighbour cells given an x-y coordinate

            Cell -->Current Cell(row, col)
            N -->  North(row - 1, col)
            S -->  South(row + 1, col)
            E -->  East(row, col + 1)
            W -->  West(row, col - 1)
            N.E --> North - East(row - 1, col + 1)
            N.W --> North - West(row - 1, col - 1)
            S.E --> South - East(row + 1, col + 1)
            S.W --> South - West(row + 1, col - 1)
        """
        neighbour_mines = 0
        for _x in range(x - 1, x + 2):
            for _y in range(y - 1, y + 2):
                if is_valid(_x, _y):
                    if is_mine(self.board, _x, _y):
                        neighbour_mines += 1
        return neighbour_mines

    def open_neighbour_cells(self, my_board, x, y):
        """return number of mines in neighbour cells given an x-y coordinate

            Cell -->Current Cell(row, col)
            N -->  North(row - 1, col)
            S -->  South(row + 1, col)
            E -->  East(row, col + 1)
            W -->  West(row, col - 1)
            N.E --> North - East(row - 1, col + 1)
            N.W --> North - West(row - 1, col - 1)
            S.E --> South - East(row + 1, col + 1)
            S.W --> South - West(row + 1, col - 1)
        """
        for _x in range(x-1, x+2):
            for _y in range(y-1, y+2):
                if is_valid(_x, _y):
                    if is_new_move(my_board, _x, _y):
                        my_board[0, _x, _y] = self.count_neighbour_mines(_x, _y)
                        if my_board[0, _x, _y] == 0:
                            my_board = self.open_neighbour_cells(my_board, _x, _y)
        return my_board

    def get_next_state(self, state, x, y):
        """
        Get the next state.

        Parameters
        ----
        state : (np.array)   visible board
        x : int    location
        y : int    location

        Returns
        ----
        next_state : (np.array)    next visible board
        game_over : (bool) true if game over

        """
        my_board = state
        game_over = False
        if is_mine(self.board, x, y):
            my_board[0, x, y] = MINE
            game_over = True
        else:
            my_board[0, x, y] = self.count_neighbour_mines(x, y)
            if my_board[0, x, y] == 0:
                my_board = self.open_neighbour_cells(my_board, x, y)
        self.my_board = my_board
        return my_board, game_over

    def reset(self):
        """
        Reset a new game episode. See gym.Env.reset()

        Returns
        ----
        next_state : (np.array, int)    next board
        """
        self.my_board = np.ones((1, self.board_size, self.board_size), dtype=int) * CLOSED
        self.board = place_mines(self.board_size, self.num_mines)
        self.num_actions = 0
        self.valid_actions = np.ones((self.board_size * self.board_size), dtype=bool)

        return self.my_board

    def step(self, action):
        """
        See gym.Env.step().

        Parameters
        ----
        action : np.array    location

        Returns
        ----
        next_state : (np.array)    next board
        reward : float        the reward for action
        done : bool           whether the game end or not
        info : {}             {'valid_actions': valid_actions} - a binary vector,
                                where false cells' values are already known to observer
        """
        state = self.my_board
        x = int(action / self.board_size)
        y = int(action % self.board_size)

        # test valid action - uncomment this part to test your action filter if needed
        # if bool(self.valid_actions[action]) is False:
        #    raise Exception("Invalid action was selected! Action Filter: {}, "
        #                    "action taken: {}".format(self.valid_actions, action))

        next_state, reward, done, info = self.next_step(state, x, y)
        self.my_board = next_state
        self.num_actions += 1
        self.valid_actions = (next_state.flatten() == CLOSED)
        info['valid_actions'] = self.valid_actions
        info['num_actions'] = self.num_actions
        return next_state, reward, done, info

    def is_guess(self, my_board, x, y):
        for _x in range(x-1, x+2):
            for _y in range(y-1, y+2):
                if is_valid(_x, _y):
                    if not is_new_move(my_board, _x, _y):
                        if (x != _x) or (y != _y):
                            return False
        return True
                    

    def next_step(self, state, x, y):
        """
        Get the next observation, reward, done, and info.

        Parameters
        ----
        state : (np.array)    visible board
        x : int    location
        y : int    location

        Returns
        ----
        next_state : (np.array)    next visible board
        reward : float               the reward
        done : bool           whether the game end or not
        info : {}
        """
        my_board = state
        #win_or_lose = False
        reward = 0
        done = False
        t_b = False
        info = {'is_success': False}
        #if self.num_actions > my_board.shape[0] * my_board.shape[1]:
        #    reward = -0.1
            
        if not is_new_move(my_board, x, y):
            reward += -0.3
            return my_board, reward, False, info
        elif self.is_guess(my_board, x, y): # if guess
            t_b = True

        state, game_over = self.get_next_state(my_board, x, y)

        if game_over:
            reward += -1
            done = True
            #return state, -100, True, {}
        elif is_win(state):
            reward += 1
            done = True
            info['is_success'] = True
            #return state, 1000, True, {}
        elif t_b: # if guess
            reward += -0.3
        else: # progress
            reward += 0.9
            #return state, 0, False, {}
            
        return state, reward, done, info

    def render(self, mode='human'):
        """
        See gym.Env.render().
        """
        outfile = StringIO() if mode == 'ansi' else sys.stdout
        s = board2str(self.my_board)
        outfile.write(s)
        if mode != 'human':
            return outfile

env = MinesweeperDiscreetEnv()
check_env(env)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  shape=(1, self.board_size, self.board_size), dtype=np.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)


In [69]:
obs = env.reset()
done = True

In [84]:
if done:
    obs = env.reset()
    print('New Game!')
row, col = 3, 2

action = 4 * row + col
obs, reward, done, info = env.step(action)
print('reward: {}'.format(reward))
env.render()
print(info.get('is_success'))
print(env.board == -1)
if done:
    print('\nGame Over!')

reward: -1
0	1	1	1	
0	1	-1	1	
1	1	1	1	
-2	-2	1	0	False
[[[False False False False]
  [False False  True False]
  [False False False False]
  [ True  True False False]]]

Game Over!


In [90]:
class CustomCNN(BaseFeaturesExtractor):
    """
    :param observation_space: (gym.Space)
    :param features_dim: (int) Number of features extracted.
        This corresponds to the number of unit for the last layer.
    """

    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 512):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # We assume CxHxW images (channels first)
        # Re-ordering will be done by pre-preprocessing or wrapper
        n_input_channels = observation_space.sample()[None].shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 128, kernel_size=3, stride=1, padding='same', bias=True),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same', bias=True),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same', bias=True),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same', bias=True),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with torch.no_grad():
            n_flatten = self.cnn(
                torch.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(
            nn.Linear(n_flatten, features_dim, bias=True),
            nn.ReLU(),
            nn.Linear(features_dim, features_dim, bias=True),
            nn.ReLU()
        )


    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
    features_extractor_kwargs=dict(features_dim=512),
)

In [91]:
def linear_schedule(initial_value: float) -> Callable[[float], float]:
    """
    Linear learning rate schedule.

    :param initial_value: Initial learning rate.
    :return: schedule that computes
      current learning rate depending on remaining progress
    """
    #lr0 = initial_value
    def func(progress_remaining: float) -> float:
        """
        Progress will decrease from 1 (beginning) to 0.

        :param progress_remaining: = 1.0 - (num_timesteps / total_timesteps)
        :return: current learning rate
        """
        if progress_remaining > 0.8:
            return initial_value
        else:
            return progress_remaining * initial_value * 1.25
        #return progress_remaining * initial_value
        #nonlocal lr0
        #lr0 = max(0.001, lr0 * 0.99975) # 0.99975
        #return lr0

    return func

In [13]:
model = DQN('CnnPolicy', env, 
            learning_rate=linear_schedule(0.001), 
            #policy_kwargs=dict(activation_fn=nn.ReLU,
            #                   net_arch=[256, 256, 256, 256, 512, 512]), 
            policy_kwargs=policy_kwargs,
            batch_size=64, 
            gamma=0.1, 
            train_freq=(1, 'episode'), 
            learning_starts=1,
            #buffer_size=4,
            exploration_fraction=0.16, 
            exploration_initial_eps=0.95, 
            exploration_final_eps=0.01,
            tensorboard_log="./dqn_tensorboard/", verbose=0
           )

In [14]:
#callback = TensorboardCallback(eval_env=MinesweeperDiscreetEnv())
model.learn(total_timesteps=int(1e5), 
            log_interval=10,
            tb_log_name='test_env',
            #eval_log_path='eval_test',
            reset_num_timesteps=True)
model.save("dqn_minesweeper_test_env")
del model  # delete trained model to demonstrate loading

In [217]:
model = DQN.load("dqn_minesweeper_s4m1")
episode_rewards, episode_wins = evaluate(model, env=env, num_episodes=1000)
mean_reward = round(np.mean(episode_rewards), 2)
print('mean_reward: {}'.format(mean_reward))

Win rates: 0.88 Num episodes: 1000
mean_reward: 1.34


In [220]:
model = DQN.load("dqn_minesweeper_s4m2")
episode_rewards, episode_wins = evaluate(model,
                                         env=MinesweeperDiscreetEnv(),
                                         num_episodes=1000)
mean_reward = round(np.mean(episode_rewards), 2)
print('mean_reward: {}'.format(mean_reward))

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  shape=(1, self.board_size, self.board_size), dtype=np.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)


Win rates: 0.57 Num episodes: 1000
mean_reward: -4.82


In [222]:
model = DQN.load("dqn_minesweeper_s5m3")
episode_rewards, episode_wins = evaluate(model,
                                         env=MinesweeperDiscreetEnv(),
                                         num_episodes=1000)
mean_reward = round(np.mean(episode_rewards), 2)
print('mean_reward: {}'.format(mean_reward))

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  shape=(1, self.board_size, self.board_size), dtype=np.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)


Win rates: 0.54 Num episodes: 1000
mean_reward: -0.04


In [223]:
model = DQN.load("dqn_minesweeper_s5m3_wr0.29")
episode_rewards, episode_wins = evaluate(model,
                                         env=MinesweeperDiscreetEnv(),
                                         num_episodes=1000)
mean_reward = round(np.mean(episode_rewards), 2)
print('mean_reward: {}'.format(mean_reward))

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  shape=(1, self.board_size, self.board_size), dtype=np.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)


Win rates: 0.27 Num episodes: 1000
mean_reward: -10.81


In [227]:
model = PPO('CnnPolicy', env,
            #learning_rate=linear_schedule(0.001),
            #n_step,
            batch_size=64,
            #n_epochs = ,
            gamma=0.1,
            tensorboard_log="./ppo_tensorboard/",
            #create_eval_env=True,
            policy_kwargs=policy_kwargs,
            verbose=0,
            #seed=23,
           )

In [228]:
model.learn(total_timesteps=int(2e5), 
            log_interval=10,
            tb_log_name='s5m3', 
            #eval_env=MinesweeperDiscreetEnv(),
            #eval_freq=100,
            #n_eval_episodes=10,
            #eval_log_path='s5m4'
            reset_num_timesteps=True)
model.save("ppo_minesweeper_s5m3")
del model

KeyboardInterrupt: 

In [138]:
model = PPO.load("ppo_minesweeper_s5m3",
                 env=MinesweeperDiscreetEnv(),
                 device='cuda')
episode_rewards, episode_wins = evaluate(model, env=model.get_env(), num_episodes=10000)

KeyboardInterrupt: 

In [93]:
env = MinesweeperModifiedEnv(4, 1)
model = DQN('CnnPolicy', env, 
            learning_rate=linear_schedule(0.001), 
            #policy_kwargs=dict(activation_fn=nn.ReLU,
            #                   net_arch=[256, 256, 256, 256, 512, 512]), 
            policy_kwargs=policy_kwargs,
            batch_size=64, 
            gamma=0.1, 
            #train_freq=(1, 'episode'), 
            learning_starts=1,
            #buffer_size=4,
            exploration_fraction=0.16, 
            exploration_initial_eps=0.95, 
            exploration_final_eps=0.02,
            tensorboard_log="./custom_dqn_tensorboard/", verbose=0
           )
model.learn(total_timesteps=int(5e5), 
            log_interval=10,
            tb_log_name='s4m1',
            #eval_log_path='eval_test',
            reset_num_timesteps=True)
model.save("custom_dqn_minesweeper_s4m1")
del model  # delete trained model to demonstrate loading

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  shape=(1, self.board_size, self.board_size), dtype=np.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)


KeyboardInterrupt: 

In [97]:
model = DQN.load("custom_dqn_minesweeper_s4m1w8")
episode_rewards, episode_wins = evaluate(model, env=MinesweeperModifiedEnv(4, 1), num_episodes=1000)
mean_reward = round(np.mean(episode_rewards), 2)
print('mean_reward: {}'.format(mean_reward))

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  shape=(1, self.board_size, self.board_size), dtype=np.int)
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)


Win rates: 0.0 Num episodes: 1000
mean_reward: -59.64


In [95]:
def evaluate(model, env, num_episodes=10000):
    """
    Evaluate a RL agent
    :param model: (BaseRLModel object) the RL Agent
    :param num_steps: (int) number of timesteps to evaluate it
    :return: (float) Mean reward for the last 100 episodes
    """
    episode_rewards = [0.0]
    episode_wins = []
    for i in range(num_episodes):
        obs = env.reset()
        episode_rewards.append(0.0)
        #if i % 100 == 1:
        #    print('Playing episode {}'.format(i))
        while True:
            action, _states = model.predict(obs)
            obs, reward, done, info = env.step(action)
            
            episode_rewards[-1] += reward
            if done:
                episode_wins.append(info.get('is_success'))
                break
            elif info.get('num_actions') > 200:
                #print('Episode {}. Over action in obs, action: \n{}, {}'.format(i, obs, action))
                episode_wins.append(False)
                break
    """
    # Compute mean reward for the last 100 episodes
    mean_100ep_reward = round(np.mean(episode_rewards[-100:]), 1)
    print("Mean reward:", mean_100ep_reward, "Num episodes:", len(episode_rewards))
    """
    win_rate = round(np.mean(episode_wins), 2)
    print("Win rates:", win_rate, "Num episodes:", len(episode_wins))
    
    return episode_rewards, episode_wins

## TRPO

In [229]:
from sb3_contrib import TRPO

ModuleNotFoundError: No module named 'sb3_contrib'

In [None]:
env = minesweeper_gym.MinesweeperDiscreetEnv()

model = TRPO('MlpPolicy', env, verbose=1)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [None]:
# Random Agent, before training
mean_reward_before_train = evaluate(model, num_steps=10000)

Mean reward: -99.0 Num episodes: 931


In [None]:
# Train the agent
model.learn(total_timesteps=int(2e4), log_interval=10)
# Save the agent
model.save("trpo_minesweeper")
del model  # delete trained model to demonstrate loading

----------------------------------------
| rollout/                  |          |
|    ep_len_mean            | 10.7     |
|    ep_rew_mean            | -100     |
| time/                     |          |
|    fps                    | 445      |
|    iterations             | 10       |
|    time_elapsed           | 45       |
|    total_timesteps        | 20480    |
| train/                    |          |
|    explained_variance     | -0.0141  |
|    is_line_search_success | 1        |
|    kl_divergence_loss     | 0.000774 |
|    learning_rate          | 0.001    |
|    n_updates              | 9        |
|    policy_objective       | 0.0703   |
|    value_loss             | 3.26e+03 |
----------------------------------------


In [None]:
# Load the trained agent
model = TRPO.load("trpo_minesweeper")

In [None]:
# Evaluate the trained agent
mean_reward = evaluate(model, num_steps=10000)

Mean reward: -99.0 Num episodes: 865
