# Section 1 : Environnement

In [None]:
import numpy as np

class BattleshipGame:
    def __init__(self, grid_size=5):
        self.grid_size = grid_size
        self.reset()

    def reset(self):
        self.grid = np.zeros((self.grid_size, self.grid_size), dtype=int)    #grid with ships, hits and misses
        self.grid_visible = np.zeros((self.grid_size, self.grid_size), dtype=int)    #grid visible to the player (with hits and misses)
        self.number_of_hits = 0
        self.ships = []   #list of ships with their length, hits, position, direction and sunk status
        self.initialize_ships_little()    #initialize ships on the grid
        return self.grid_visible.reshape((self.grid_size * self.grid_size))      #return the visible grid as the state

    def initialize_ships(self):
        ship_definitions = [(2, 1), (3, 2), (4, 1), (5, 1)]   #length of the ships and their quantity
        for length, quantity in ship_definitions:
            for _ in range(quantity):
                self.place_ship_randomly(length)

    def initialize_ships_little(self):
      ship_definitions = [(3,1), (4,1)]     #length of the ships and their quantity
      for length, quantity in ship_definitions:
          for _ in range(quantity):
              self.place_ship_randomly(length)

    def display(self):
        symbols = {0: 'ðŸŒŠ', 2: 'ðŸš¢', 1: 'ðŸ’¥', -1: 'âšª'}

        header = " " + "   ".join(str(i).rjust(3) for i in range(self.grid_size))
        print(header)

        for y in range(self.grid_size):
            row_symbols = [symbols[self.grid[y, x]] for x in range(self.grid_size)]
            row_str = str(y).ljust(3) + ' ' + '   '.join(row_symbols)
            print(row_str)

    def display_visible(self):
        symbols = {0: 'ðŸŒŠ', 2: 'ðŸš¢', 1: 'ðŸ’¥', -1: 'âšª'}

        header = " " + "   ".join(str(i).rjust(3) for i in range(self.grid_size))
        print(header)

        for y in range(self.grid_size):
            row_symbols = [symbols[self.grid_visible[y, x]] for x in range(self.grid_size)]
            row_str = str(y).ljust(3) + ' ' + '   '.join(row_symbols)
            print(row_str)

    def step(self, action):
        x = action % self.grid_size
        y = action // self.grid_size
        reward = 0
        done = False

        if self.grid[y, x] == 2:
            self.grid[y, x] = 1    #hit
            self.grid_visible[y, x] = 1    #hit
            sunk = self.update_ships_status((x, y))      #update the status of the ships
            if sunk:
                reward = 5
            else:
                reward = 1
        elif self.grid[y, x] == 0:
            self.grid[y, x] = -1      #miss
            self.grid_visible[y, x] = -1     #miss
            reward = -0.1
        else:
            reward = -1

        done = self.check_all_sunk()
        new_state = y * self.grid_size + x
        return self.grid_visible.reshape((self.grid_size * self.grid_size)), new_state, reward, done

    def update_ships_status(self, action):
        x, y = action
        sunk = False
        for ship in self.ships:
            if not ship['sunk']:
                positions = self.calculate_ship_positions(ship)
                if (x, y) in positions:
                    ship['hits'] += 1
                    if ship['hits'] == ship['length']:
                        ship['sunk'] = True
                        sunk = True
                        break
        return sunk

    def calculate_ship_positions(self, ship):
        x, y = ship['position']
        positions = [(x+i, y) if ship['direction'] == 'H' else (x, y+i) for i in range(ship['length'])]
        return positions

    def check_all_sunk(self):
        return all(ship['sunk'] for ship in self.ships)

    def is_action_valid(self, action):
        x, y = action
        return self.grid[y, x] >= 0

    def place_ship_randomly(self, ship_length):
        placed = False
        while not placed:
            direction = 'H' if np.random.rand() > 0.5 else 'V'
            if direction == 'H':
                x = np.random.randint(0, self.grid_size - ship_length + 1)
                y = np.random.randint(0, self.grid_size)
            else:
                x = np.random.randint(0, self.grid_size)
                y = np.random.randint(0, self.grid_size - ship_length + 1)
            if self.check_free_space(ship_length, (x, y), direction):
                self.add_ship(ship_length, (x, y), direction)
                placed = True

    def check_free_space(self, ship_length, position, direction):
        x, y = position
        if direction.upper() == 'H':
            return all(self.grid[y, x+i] == 0 for i in range(ship_length))
        else:
            return all(self.grid[y+i, x] == 0 for i in range(ship_length))

    def add_ship(self, ship_length, position, direction):
        x, y = position
        if direction == 'H':
            self.grid[y, x:x+ship_length] = 2
        else:
            self.grid[y:y+ship_length, x] = 2
        self.ships.append({'length': ship_length, 'hits': 0, 'position': position, 'direction': direction, 'sunk': False})

    def play_episode(self, agent):
        self.reset()
        state = 0
        done = False
        reward = 0
        while not done:
            action = agent.choose_action(state, reward)
            self.number_of_hits += 1
            grid, next_state, reward, done = self.step(action)
            state = next_state
            if self.number_of_hits % 100 == 0:
                self.display()
        print(f"Number of hits: {self.number_of_hits}")
        return self.number_of_hits

In [None]:
# Agent that plays randomly

class random_agent():
    def __init__(self, grid_size=10):
        self.grid_size = grid_size
    def choose_action(self, state, hit):
        return np.random.randint(0, self.grid_size**2)

In [None]:
# Agent that searches the grid randomly and then hunts the ship when it finds one until it is sunk then it searches again

class search_and_hunt_agent():
    def __init__(self, grid_size=5):
        self.grid_size = grid_size
        self.is_hunting = False
        self.ship_hunted = []
        self.searched = []

    def choose_action(self, state, reward):
        if not self.is_hunting and reward == 1:
            self.is_hunting = True
        if self.is_hunting:
            return self.hunt_ship(state, reward)
        else:
            return self.search_ship(state)

    def search_ship(self, state):
        action = np.random.randint(0, self.grid_size**2)
        while action in self.searched:
            action = np.random.randint(0, self.grid_size**2)
        self.searched.append(action)
        return action

    def hunt_ship(self, state, reward):
        if reward == 1:
            self.ship_hunted.append(state)
        for state in self.ship_hunted:
            x, y = state % self.grid_size, state // self.grid_size
            if x > 0 and (state-1) not in self.searched:
                self.searched.append(state-1)
                return state-1
            elif x < self.grid_size-1 and (state+1) not in self.searched:
                self.searched.append(state+1)
                return state+1
            elif y > 0 and (state-self.grid_size) not in self.searched:
                self.searched.append(state-self.grid_size)
                return state-self.grid_size
            elif y < self.grid_size-1 and (state+self.grid_size) not in self.searched:
                self.searched.append(state+self.grid_size)
                return state+self.grid_size
        self.is_hunting = False
        return self.search_ship(state)

# Section 2 : Simple Agents

In [None]:
game = BattleshipGame()
random_a = random_agent()
search_and_hunt = search_and_hunt_agent()

In [None]:
#game.play_episode(random_a)
score = 0
for i in range(1000):
    search_and_hunt = search_and_hunt_agent()
    score += game.play_episode(search_and_hunt)
print(score / 1000)

Number of hits: 11
Number of hits: 20
Number of hits: 15
Number of hits: 11
Number of hits: 15
Number of hits: 10
Number of hits: 12
Number of hits: 23
Number of hits: 20
Number of hits: 17
Number of hits: 17
Number of hits: 14
Number of hits: 15
Number of hits: 10
Number of hits: 11
Number of hits: 21
Number of hits: 12
Number of hits: 21
Number of hits: 10
Number of hits: 16
Number of hits: 16
Number of hits: 15
Number of hits: 18
Number of hits: 17
Number of hits: 17
Number of hits: 18
Number of hits: 12
Number of hits: 15
Number of hits: 21
Number of hits: 14
Number of hits: 20
Number of hits: 12
Number of hits: 15
Number of hits: 16
Number of hits: 14
Number of hits: 19
Number of hits: 17
Number of hits: 18
Number of hits: 11
Number of hits: 15
Number of hits: 9
Number of hits: 18
Number of hits: 16
Number of hits: 18
Number of hits: 20
Number of hits: 11
Number of hits: 19
Number of hits: 18
Number of hits: 20
Number of hits: 9
Number of hits: 17
Number of hits: 22
Number of hits

# Section 3 : Reinforce based on TD6

In [None]:
import torch
from typing import List, Tuple, Deque, Optional, Callable
from numpy.typing import NDArray

class PolicyNetwork(torch.nn.Module):
    def __init__(self, n_observations: int, n_actions: int):
        super(PolicyNetwork, self).__init__()
        self.layer1 = torch.nn.Linear(n_observations, 200)
        self.layer2 = torch.nn.Linear(200, n_actions)

    def forward(self, state_tensor: torch.Tensor) -> torch.Tensor:

        out = self.layer1(state_tensor)
        out = torch.relu(out)
        out = self.layer2(out)

        return torch.softmax(out, dim=-1)

def sample_discrete_action(policy_nn: PolicyNetwork,
                           state: NDArray[np.float64]) -> Tuple[int, torch.Tensor]:

    state_tensor = torch.tensor(state, dtype=torch.float32)
    action_probs = policy_nn(state_tensor)
    m = torch.distributions.Categorical(action_probs)
    sampled_action = m.sample()
    sampled_action_log_probability = m.log_prob(sampled_action)
    return sampled_action.item(), sampled_action_log_probability


def sample_one_episode(env: BattleshipGame,
                       policy_nn: PolicyNetwork,
                       max_episode_duration: int,
                       disp: bool = False) -> Tuple[List[NDArray[np.float64]], List[int], List[float], List[torch.Tensor]]:

    state_t = env.reset()

    episode_states = []
    episode_actions = []
    episode_log_prob_actions = []
    episode_rewards = []
    episode_states.append(state_t)
    for t in range(max_episode_duration):

        action, log_prob_action = sample_discrete_action(policy_nn, state_t)

        next_state, _, reward, done = env.step(action)

        episode_actions.append(action)
        episode_log_prob_actions.append(log_prob_action)
        episode_rewards.append(reward)
        episode_states.append(next_state)
        if disp == True:
            env.display()
        state_t = next_state
        if done:
            break

    return episode_states, episode_actions, episode_rewards, episode_log_prob_actions

env = BattleshipGame()
policy_nn = PolicyNetwork(25, 25)
max_episode_duration = 100
episode_states, episode_actions, episode_rewards, episode_log_prob_actions = sample_one_episode(env, policy_nn, max_episode_duration)

In [None]:
def avg_return_on_multiple_episodes(env: BattleshipGame,
                                    policy_nn: PolicyNetwork,
                                    num_test_episode: int,
                                    max_episode_duration: int) -> float:
    """
    Play multiple episodes of the environment and calculate the average return.

    Parameters
    ----------
    env : gym.Env
        The environment to play in.
    policy_nn : PolicyNetwork
        The policy neural network.
    num_test_episode : int
        The number of episodes to play.
    max_episode_duration : int
        The maximum duration of an episode.
    render : bool, optional
        Whether to render the environment, by default False.

    Returns
    -------
    float
        The average return.
    """

    total_return = 0.0

    for i in range(num_test_episode):

        episode_states, episode_actions, episode_rewards, episode_log_prob_actions = sample_one_episode(env, policy_nn, max_episode_duration)
        episode_return = sum(episode_rewards)
        total_return += episode_return

    return total_return / num_test_episode

env = BattleshipGame()
policy_nn = PolicyNetwork(25, 25)
max_episode_duration = 100
num_test_episode = 100
avg_return = avg_return_on_multiple_episodes(env, policy_nn, num_test_episode, max_episode_duration)
print(avg_return)

-29.64899999999999


In [None]:
from tqdm.notebook import tqdm
def train_reinforce_discrete(env: BattleshipGame,
                             num_train_episodes: int,
                             num_test_per_episode: int,
                             max_episode_duration: int,
                             learning_rate: float) -> Tuple[PolicyNetwork, List[float]]:
    """
    Train a policy using the REINFORCE algorithm.

    Parameters
    ----------
    env : gym.Env
        The environment to train in.
    num_train_episodes : int
        The number of training episodes.
    num_test_per_episode : int
        The number of tests to perform per episode.
    max_episode_duration : int
        The maximum length of an episode, by default EPISODE_DURATION.
    learning_rate : float
        The initial step size.

    Returns
    -------
    Tuple[PolicyNetwork, List[float]]
        The final trained policy and the average returns for each episode.
    """
    episode_avg_return_list = []

    state_size = env.grid_size * env.grid_size
    action_size = env.grid_size * env.grid_size

    policy_nn = PolicyNetwork(state_size, action_size)
    optimizer = torch.optim.Adam(policy_nn.parameters(), lr=learning_rate)

    for episode_index in tqdm(range(num_train_episodes)):

        states, actions, rewards, log_probs = sample_one_episode(env, policy_nn, max_episode_duration)

        returns = []
        r = 0
        for reward in rewards[::-1]:
            r = reward + 0.99 * r
            returns.insert(0, r)

        returns = torch.tensor(returns)

        loss = 0
        for log_prob, r in zip(log_probs, returns):
            loss += -log_prob * r

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Test the current policy
        test_avg_return = avg_return_on_multiple_episodes(env=env,
                                                          policy_nn=policy_nn,
                                                          num_test_episode=num_test_per_episode,
                                                          max_episode_duration=max_episode_duration)
        print(test_avg_return)

        # Monitoring
        episode_avg_return_list.append(test_avg_return)

    return policy_nn, episode_avg_return_list

In [None]:
env = BattleshipGame()
reinforce_policy_nn, episode_reward_list = train_reinforce_discrete(env=env,
                                                                    num_train_episodes=5,
                                                                    num_test_per_episode=5,
                                                                    max_episode_duration=400,
                                                                    learning_rate=0.001)


  0%|          | 0/5 [00:00<?, ?it/s]

-22.580000000000002
-33.74
-15.559999999999997
-29.2
-18.900000000000002


In [None]:
env = BattleshipGame()
episode_states, episode_actions, episode_rewards, episode_log_prob_actions = sample_one_episode(env, reinforce_policy_nn, 200, disp=True)

   0     1     2     3     4
0   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
1   âšª   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
2   ðŸŒŠ   ðŸŒŠ   ðŸš¢   ðŸš¢   ðŸš¢
3   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
4   ðŸš¢   ðŸš¢   ðŸš¢   ðŸš¢   ðŸŒŠ
   0     1     2     3     4
0   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
1   âšª   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
2   ðŸŒŠ   ðŸŒŠ   ðŸš¢   ðŸš¢   ðŸ’¥
3   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
4   ðŸš¢   ðŸš¢   ðŸš¢   ðŸš¢   ðŸŒŠ
   0     1     2     3     4
0   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
1   âšª   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
2   ðŸŒŠ   ðŸŒŠ   ðŸš¢   ðŸš¢   ðŸ’¥
3   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
4   ðŸš¢   ðŸš¢   ðŸš¢   ðŸš¢   ðŸŒŠ
   0     1     2     3     4
0   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   âšª   ðŸŒŠ
1   âšª   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
2   ðŸŒŠ   ðŸŒŠ   ðŸš¢   ðŸš¢   ðŸ’¥
3   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ
4   ðŸš¢   ðŸš¢   ðŸš¢   ðŸš¢   ðŸŒŠ
   0     1     2     3     4
0   ðŸŒŠ   ðŸŒŠ   ðŸŒŠ   âšª   ðŸŒŠ
1   âšª   ðŸŒŠ   ðŸŒŠ   âšª   ðŸŒŠ
2   ðŸŒŠ   ðŸŒŠ   ðŸš¢   ðŸš¢   ðŸ’¥
3   ðŸŒŠ   ð

# Section 4 : DeepQLearning based on TD6

In [None]:
class QNetwork(torch.nn.Module):
    """
    A Q-Network implemented with PyTorch.

    Attributes
    ----------
    layer1 : torch.nn.Linear
        First fully connected layer.
    layer2 : torch.nn.Linear
        Second fully connected layer.
    layer3 : torch.nn.Linear
        Third fully connected layer.

    Methods
    -------
    forward(x: torch.Tensor) -> torch.Tensor
        Define the forward pass of the QNetwork.
    """

    def __init__(self, n_observations: int, n_actions: int, nn_l1: int, nn_l2: int):
        """
        Initialize a new instance of QNetwork.

        Parameters
        ----------
        n_observations : int
            The size of the observation space.
        n_actions : int
            The size of the action space.
        nn_l1 : int
            The number of neurons on the first layer.
        nn_l2 : int
            The number of neurons on the second layer.
        """
        super(QNetwork, self).__init__()

        self.n_observations = n_observations
        self.n_actions = n_actions
        self.nn_l1 = nn_l1
        self.nn_l2 = nn_l2
        self.fc1 = torch.nn.Linear(n_observations, nn_l1)
        self.fc2 = torch.nn.Linear(nn_l1, nn_l2)
        self.fc3 = torch.nn.Linear(nn_l2, n_actions)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Define the forward pass of the QNetwork.

        Parameters
        ----------
        x : torch.Tensor
            The input tensor (state).

        Returns
        -------
        torch.Tensor
            The output tensor (Q-values).
        """

        x = self.fc3(torch.relu(self.fc2(torch.relu(self.fc1(x)))))

        return x

In [None]:
def test_q_network_agent(env: BattleshipGame, q_network: torch.nn.Module, max_iter = 100, disp: bool = False) -> List[int]:
    """
    Test a naive agent in the given environment using the provided Q-network.

    Parameters
    ----------
    env : gym.Env
        The environment in which to test the agent.
    q_network : torch.nn.Module
        The Q-network to use for decision making.
    num_episode : int, optional
        The number of episodes to run, by default 1.
    render : bool, optional
        Whether to render the environment, by default True.

    Returns
    -------
    List[int]
        A list of rewards per episode.
    """
    episode_reward_list = []


    state= env.reset()
    episode_reward = 0

    for i in range(max_iter):
        if disp:
            env.display()

        # Convert the state to a PyTorch tensor and add a batch dimension (unsqueeze)
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

        action = torch.argmax(q_network(state_tensor))
        state, _, reward, done = env.step(action.detach().numpy())
        if done:
            break
        episode_reward += reward

    episode_reward_list.append(episode_reward)
    print(f"Episode reward: {episode_reward}")

    return episode_reward_list

In [None]:
q_network = QNetwork(25, 25, nn_l1=64, nn_l2=64)

In [None]:
env = BattleshipGame()
test_q_network_agent(env, q_network)

Episode reward: -99.1


[-99.1]

In [None]:
class EpsilonGreedy:
    """
    An Epsilon-Greedy policy.

    Attributes
    ----------
    epsilon : float
        The initial probability of choosing a random action.
    epsilon_min : float
        The minimum probability of choosing a random action.
    epsilon_decay : float
        The decay rate for the epsilon value after each action.
    env : gym.Env
        The environment in which the agent is acting.
    q_network : torch.nn.Module
        The Q-Network used to estimate action values.

    Methods
    -------
    __call__(state: np.ndarray) -> np.int64
        Select an action for the given state using the epsilon-greedy policy.
    decay_epsilon()
        Decay the epsilon value after each action.
    """

    def __init__(self,
                 epsilon_start: float,
                 epsilon_min: float,
                 epsilon_decay:float,
                 env: BattleshipGame,
                 q_network: torch.nn.Module):
        """
        Initialize a new instance of EpsilonGreedy.

        Parameters
        ----------
        epsilon_start : float
            The initial probability of choosing a random action.
        epsilon_min : float
            The minimum probability of choosing a random action.
        epsilon_decay : float
            The decay rate for the epsilon value after each episode.
        env : gym.Env
            The environment in which the agent is acting.
        q_network : torch.nn.Module
            The Q-Network used to estimate action values.
        """
        self.epsilon = epsilon_start
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.env = env
        self.q_network = q_network

    def __call__(self, state: np.ndarray) -> np.int64:
        """
        Select an action for the given state using the epsilon-greedy policy.

        If a randomly chosen number is less than epsilon, a random action is chosen.
        Otherwise, the action with the highest estimated action value is chosen.

        Parameters
        ----------
        state : np.ndarray
            The current state of the environment.

        Returns
        -------
        np.int64
            The chosen action.
        """

        test = np.random.uniform()

        if test < self.epsilon:
            action = np.random.randint(0, self.env.grid_size**2)
        else:
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

            action = torch.argmax(q_network(state_tensor)).detach().numpy()

        return action

    def decay_epsilon(self):
        """
        Decay the epsilon value after each episode.

        The new epsilon value is the maximum of `epsilon_min` and the product of the current
        epsilon value and `epsilon_decay`.
        """
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

In [None]:
from torch.optim.lr_scheduler import _LRScheduler
class MinimumExponentialLR(torch.optim.lr_scheduler.ExponentialLR):
    def __init__(self, optimizer: torch.optim.Optimizer, lr_decay: float, last_epoch: int = -1, min_lr: float = 1e-6):
        """
        Initialize a new instance of MinimumExponentialLR.

        Parameters
        ----------
        optimizer : torch.optim.Optimizer
            The optimizer whose learning rate should be scheduled.
        lr_decay : float
            The multiplicative factor of learning rate decay.
        last_epoch : int, optional
            The index of the last epoch. Default is -1.
        min_lr : float, optional
            The minimum learning rate. Default is 1e-6.
        """
        self.min_lr = min_lr
        super().__init__(optimizer, lr_decay, last_epoch=-1)

    def get_lr(self) -> List[float]:
        """
        Compute learning rate using chainable form of the scheduler.

        Returns
        -------
        List[float]
            The learning rates of each parameter group.
        """
        return [
            max(base_lr * self.gamma ** self.last_epoch, self.min_lr)
            for base_lr in self.base_lrs
        ]

In [None]:
import collections
import random
class ReplayBuffer:
    """
    A Replay Buffer.

    Attributes
    ----------
    buffer : collections.deque
        A double-ended queue where the transitions are stored.

    Methods
    -------
    add(state: np.ndarray, action: np.int64, reward: float, next_state: np.ndarray, done: bool)
        Add a new transition to the buffer.
    sample(batch_size: int) -> Tuple[np.ndarray, float, float, np.ndarray, bool]
        Sample a batch of transitions from the buffer.
    __len__()
        Return the current size of the buffer.
    """

    def __init__(self, capacity: int):
        """
        Initializes a ReplayBuffer instance.

        Parameters
        ----------
        capacity : int
            The maximum number of transitions that can be stored in the buffer.
        """
        self.buffer = collections.deque(maxlen=capacity)

    def add(self, state: np.ndarray, action: np.int64, reward: float, next_state: np.ndarray, done: bool):
        """
        Add a new transition to the buffer.

        Parameters
        ----------
        state : np.ndarray
            The state vector of the added transition.
        action : np.int64
            The action of the added transition.
        reward : float
            The reward of the added transition.
        next_state : np.ndarray
            The next state vector of the added transition.
        done : bool
            The final state of the added transition.
        """
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size: int) -> Tuple[np.ndarray, float, float, np.ndarray, bool]:
        """
        Sample a batch of transitions from the buffer.

        Parameters
        ----------
        batch_size : int
            The number of transitions to sample.

        Returns
        -------
        Tuple[np.ndarray, float, float, np.ndarray, bool]
            A batch of `batch_size` transitions.
        """
        # Here, `random.sample(self.buffer, batch_size)`
        # returns a list of tuples `(state, action, reward, next_state, done)`
        # where:
        # - `state`  and `next_state` are numpy arrays
        # - `action` and `reward` are floats
        # - `done` is a boolean
        #
        # `states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))`
        # generates 5 tuples `state`, `action`, `reward`, `next_state` and `done`, each having `batch_size` elements.
        states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))
        return np.array(states), np.array(actions), rewards, np.array(next_states), dones

    def __len__(self):
        """
        Return the current size of the buffer.

        Returns
        -------
        int
            The current size of the buffer.
        """
        return len(self.buffer)

In [None]:
import itertools
def train_dqn2_agent(env: BattleshipGame,
                     q_network: torch.nn.Module,
                     target_q_network: torch.nn.Module,
                     optimizer: torch.optim.Optimizer,
                     loss_fn: Callable,
                     epsilon_greedy: EpsilonGreedy,
                     lr_scheduler: _LRScheduler,
                     num_episodes: int,
                     gamma: float,
                     batch_size: int,
                     replay_buffer: ReplayBuffer) -> List[float]:
    """
    Train the Q-network on the given environment.

    Parameters
    ----------
    env : gym.Env
        The environment to train on.
    q_network : torch.nn.Module
        The Q-network to train.
    target_q_network : torch.nn.Module
        The target Q-network to use for estimating the target Q-values.
    optimizer : torch.optim.Optimizer
        The optimizer to use for training.
    loss_fn : callable
        The loss function to use for training.
    epsilon_greedy : EpsilonGreedy
        The epsilon-greedy policy to use for action selection.
    device : torch.device
        The device to use for PyTorch computations.
    lr_scheduler : torch.optim.lr_scheduler._LRScheduler
        The learning rate scheduler to adjust the learning rate during training.
    num_episodes : int
        The number of episodes to train for.
    gamma : float
        The discount factor for future rewards.
    batch_size : int
        The size of the batch to use for training.
    replay_buffer : ReplayBuffer
        The replay buffer storing the experiences with their priorities.
    target_q_network_sync_period : int
        The number of episodes after which the target Q-network should be updated with the weights of the Q-network.

    Returns
    -------
    List[float]
        A list of cumulated rewards per episode.
    """
    iteration = 0
    episode_reward_list = []

    for episode_index in tqdm(range(1, num_episodes)):
        state = env.reset()
        episode_reward = 0

        for t in itertools.count():

            # Get action, next_state and reward

            action = epsilon_greedy(state)

            next_state, _, reward, done = env.step(action)

            replay_buffer.add(state, action, reward, next_state, done)

            episode_reward += reward

            # Update the q_network weights with a batch of experiences from the buffer

            if len(replay_buffer) > batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
                batch_states_tensor = torch.tensor(states, dtype=torch.float32)
                batch_actions_tensor = torch.tensor(actions, dtype=torch.long)
                batch_rewards_tensor = torch.tensor(rewards, dtype=torch.float32)
                batch_next_states_tensor = torch.tensor(next_states, dtype=torch.float32)
                batch_dones_tensor = torch.tensor(dones, dtype=torch.float32)
                q_w_s_a = q_network(batch_states_tensor).gather(1, batch_actions_tensor.unsqueeze(1)).squeeze(1)
                q_w_s_prime = target_q_network(batch_next_states_tensor).max(1)[0]
                target = batch_rewards_tensor + gamma * q_w_s_prime * (1 - batch_dones_tensor)
                loss = loss_fn(target, q_w_s_a)

                # Optimize the model
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                lr_scheduler.step()

            # Update the target q-network

            # Every few training steps (e.g., every 100 steps), the weights of the target network are updated with the weights of the Q-network

            if iteration%100 == 0:
                q_network_params = q_network.state_dict()

                target_q_network.load_state_dict(q_network_params)

            iteration += 1

            if done:
                break

            state = next_state

        episode_reward_list.append(episode_reward)
        if episode_index % 500 == 0:
            print(f"Episode {episode_index}, Reward: {sum(episode_reward_list[-500:])/500}, Epsilon: {epsilon_greedy.epsilon}")
        epsilon_greedy.decay_epsilon()

    return episode_reward_list

In [None]:
env = BattleshipGame()

q_network = QNetwork(25, 25, nn_l1=64, nn_l2=64)

q_network_params = q_network.state_dict()

target_q_network = QNetwork(25, 25, nn_l1=64, nn_l2=64)
target_q_network.load_state_dict(q_network_params)

optimizer = torch.optim.AdamW(q_network.parameters(), lr=0.001, amsgrad=True)
#lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.999)
lr_scheduler = MinimumExponentialLR(optimizer, lr_decay=0.97, min_lr=0.0001)
loss_fn = torch.nn.MSELoss()

epsilon_greedy = EpsilonGreedy(epsilon_start=1, epsilon_min=0.01, epsilon_decay=0.9996, env=env, q_network=q_network)

replay_buffer = ReplayBuffer(2000)

# Train the q-network

episode_reward_list = train_dqn2_agent(env,
                                        q_network,
                                        target_q_network,
                                        optimizer,
                                        loss_fn,
                                        epsilon_greedy,
                                        lr_scheduler,
                                        num_episodes=100,
                                        gamma=0.9,
                                        batch_size=128,
                                        replay_buffer=replay_buffer)


torch.save(q_network, "dqn2_q_network.pth")


  0%|          | 0/99 [00:00<?, ?it/s]