In [163]:
# !pip install gym
# !pip install tensorflow

import numpy as np
import gym
import matplotlib.pyplot as plt
from gym import spaces
from google.colab import files
import torch
import random

# Battleship Environment




In [164]:
class BatteshipEnv(gym.Env):
  def __init__(self, grid_size, ships_board):
    """
      Initialize a Battleship environment
      :param grid_size: With grid_size = n we have a board of n*n
      :param ships_board: Given board with placed ships
      :return BatteshipEnv object
    """
    super(BatteshipEnv, self).__init__()
    self.grid_size = grid_size
    self.ships_board = ships_board
    self.board = np.zeros((self.grid_size,self.grid_size), dtype='int') # Currently playing board
    self.s = (0,0,0)  # Currently state: a tuple representing (x_coordinate, y_coordinate, health)
    self.max_health = int(np.max(ships_board)) # The maximum health h among the ships
    self.observation_space = spaces.Discrete(self.grid_size * self.grid_size) # Obeservation space
    self.action_space = spaces.Tuple((spaces.Discrete(self.grid_size),spaces.Discrete(self.grid_size))) # Action space
    self.nS = self.grid_size * self.grid_size * (self.max_health+3) # Size of state space n*n*(hmax+3)
    self.nA = self.grid_size * self.grid_size # Size of action space n*n
    self.number_of_hit_to_win = np.sum(self.ships_board) # Number of hit to win the game = sum of health of all ships   
    self.number_of_hit = 0  # Current number of hit 
    self.destroyed = int(self.max_health + 1) # Mark a square as destroyed ship
    self.empty =  int(self.max_health + 1) # Mark a square as square
    
  def calculate_reward(self, action):
    """ 
      Return reward for an action
      :param action: action taken
      :return reward 
    """ 
    x = action[0] # x coordinate of the action
    y = action[1] # y coordinate of the action
    if (self.board[x,y] == self.empty) | (self.board[x,y] == self.destroyed) :
      return -1 # If the agent hits an empty square or a square where a ship has sunk
    else:
      return 0  # Otherwise

  def update_board(self, action):
    """
      Update the playing board
      :param action: action taken
      :return the current state after taking the action
    """
    x = action[0] # x coordinate of the action
    y = action[1] # y coordinate of the action
    # If the agent hits an empty square
    if self.board[x,y] == self.empty:
      self.s = (action[0], action[1], int(self.empty))
    # If the agent hits a square where a ship has sunk 
    elif self.board[x,y] == self.destroyed:
      self.s = (action[0], action[1], int(self.destroyed))
    else:
      # If the agent hits a square with a ship 
      if self.ships_board[x,y] != 0:
        # Make 1 more hit
        if self.board[x,y] < self.ships_board[x,y]:
          self.board[x,y] += 1 # One more hit
          self.number_of_hit += 1
        # The ship has sunk already
        else:
          self.board[x,y] = self.destroyed
      # The agent hits an empty square
      else:
        self.board[x,y] = self.empty
      # Update current state
      self.s = (action[0], action[1], int(self.board[x,y]))
    return self.s

  def step(self, action):
    """ 
      Take a step by executing an action
      :param actionL action taken
      :return next_state: the state after taking the action
      :return reward: reward for taking action from the current state
      :return done: Whether all the ships have been destroyed 
      :return info: none
    """
    next_state = self.update_board(action)
    reward = self.calculate_reward(action)
    done = bool(self.number_of_hit == self.number_of_hit_to_win)
    info = {}
    return next_state, reward, done, info

  def observe(self):
    """
      Return the current state
    """
    return self.s

  def reset(self):
    """
      Reset the environment
      :return the initial playing board
    """
    self.board = np.zeros((self.grid_size,self.grid_size))
    self.number_of_hit = 0
    self.s = (np.random.randint(0, self.grid_size),np.random.randint(0, self.grid_size),0)
    return self.board

  def render(self, mode='human'):
    """
      Render the current playing board
    """
    for i in range(self.grid_size):
      print("-------------------------------------------")
      line = ""
      for j in range(self.grid_size):     
        line += " | "
        if self.board[i,j] == -1:
          line += "O"
        elif self.board[i,j] == 0:
          line += " "
        else:
          line += str(int(self.board[i,j]))
      line += " | "
      print(line)

##### HELPER FUNCTIONS #####
def plot_results(list_cumulative_rewards, list_cumulative_steps, algo_name, download_plots):
  """
    Plotting the results
    :param list_cumulative_rewards: list of cumulative rewards of all episodes
    :param list_cumulative_steps: list of number of actions of all episodes
    :param algo_name: the name of the learning algorithm
    :param download_plots: "True" if this notebook is running on Google Colab and you want to save the figures to your computer, "False" otherwise
  """
  plt.plot(list_cumulative_rewards, label="legend")
  plt.xlabel('Episode')
  plt.ylabel('Cumulative rewards')
  plt.axhline(24, label ='Optimal Number of Actions', color ='red') 
  if download_plots:
    plt.savefig(algo_name+"-cum-rewards.eps", format='eps', dpi=1200)
    files.download(algo_name+"-cum-rewards.eps")
    plt.clf()
  else:
    plt.show()
  plt.plot(list_cumulative_steps, label="legend")
  plt.xlabel('Episode')
  plt.ylabel('Number of actions to finish the game')
  plt.axhline(24, label ='Optimal Number of Actions', color ='red') 
  if download_plots:
    plt.savefig(algo_name+"-actions.eps", format='eps', dpi=1200)
    files.download(algo_name+"-actions.eps")
    plt.clf() 
  else:
    plt.show()

def env_generator():
  """
    Generate a testing environment with a board of 10x10 squares and 10 ships
    1 ship which needs 1 hit to be sunk
    4 ships which need 2 hits to be sunk
    5 ships which need 3 hits to be sunk
  """
  grid_size = 10
  ships_board = np.zeros((grid_size, grid_size), dtype='int')
  ships_board[0,9] = 1
  ships_board[1,3] = 2 
  ships_board[7,2] = 2 
  ships_board[3,1] = 2
  ships_board[6,7] = 2
  ships_board[3,5] = 3 
  ships_board[9,9] = 3 
  ships_board[4,5] = 3 
  ships_board[9,4] = 3
  ships_board[5,8] = 3
  env = BatteshipEnv(grid_size, ships_board)
  return env


# Q-Learning

In [231]:
class QLearning():
  def __init__(self, env):
    self.env = env
    self.learning_rate = 0.05
    self.epsilon = 1.0 # For deciding exploitation or exploration
    self.epsilon_decay_rate = 0.99 # Epsilon is decayed after each episode with a fixed rate
    self.discount = 0.5 # The weight for future rewards
    self.Q_values = self.Q_values = np.zeros([self.env.nS, self.env.nA]) # Q table

  def update(self, state, action, reward):
    """ Update Q-table - ON POLICY
        :param state: previous state before taking the action
        :param action: action taken
        :param reward: reward for taking the action from the current state
    """
    # Get current state after taking the action and convert from 3D index to 1D index
    new_state = self.env.observe()
    idx_new_state = np.ravel_multi_index([new_state[0], new_state[1], new_state[2]],(env.grid_size, env.grid_size, env.max_health+3))
    
    # Convert to 1D index
    idx_state = np.ravel_multi_index([state[0], state[1], state[2]],(env.grid_size, env.grid_size, env.max_health+3))
    idx_action = np.ravel_multi_index([action[0], action[1]],(env.grid_size, env.grid_size))

    #Update Q values - OFF POLICY
    self.Q_values[idx_state][idx_action] = self.Q_values[idx_state][idx_action] + self.learning_rate * (reward + self.discount * max(self.Q_values[idx_new_state]) - self.Q_values[idx_state][idx_action])

  def get_action(self, state):
    """ Return an action to take based on epsilon (greedy or random action)
        :param state: the current state
        :return action: next action to take
    """
    # Convert to 1D index
    idx_state = np.ravel_multi_index([state[0], state[1], state[2]],(env.grid_size, env.grid_size, env.max_health+3))
    
    random_number = np.random.uniform()
    if random_number < self.epsilon:
      # Random action
      return (np.random.randint(0, self.env.grid_size), np.random.randint(0, self.env.grid_size)) 
    else:
      # Greedy action
      idx_action = np.argmax(self.Q_values[idx_state]) 
      return np.unravel_index(idx_action,(self.env.grid_size,self.env.grid_size)) # Return 2D index of the action

##### HELPER FUNCTIONS #####
def test_Q_Learning(env, number_of_episodes, download_plots):
  """
    A complete test of Q-Learning agent
    :param env: the environment where the agent plays
    :param number_of_episodes: number of episodes
    :param download_plots: "True" if this notebook is running on Google Colab and you want to save the figures to your computer, "False" otherwise
  """

  agent = QLearning(env)  # Q-Learning Agent
  list_cumulative_rewards = [] # list of cumulative rewards of all episodes
  list_number_of_actions = [] # list of number of actions of all episodes

  for ep in range (number_of_episodes):
    sum_reward = 0
    done = False
    number_of_action = 0  # Current number of actions have been taken in this episode
    while not done:
      # Get current state
      current_state = env.observe()

      # Get next action
      action = agent.get_action(current_state)

      # Execute action and get feedback
      observation, reward, done, info = env.step(action)

      # Update Q-table
      agent.update(current_state, action, reward)

      # Visualize how the agent plays in the last episode
      if ep==number_of_episodes-1:
        i, j = np.unravel_index(action, (env.grid_size, env.grid_size))
        print("Action: (" + str(j) + ") Reward: ", reward)
      
      # Update reward and number of actions
      sum_reward += reward
      number_of_action += 1
    list_cumulative_rewards.append(sum_reward)
    list_number_of_actions.append(number_of_action)

    # Print log
    print("Episode ", ep+1, " Cumulative reward: ", sum_reward, " Number of actions: ", number_of_action)
    
    # Reset env after each episode
    env.reset()

    # Epsilon is decayed since the agent is getting more and more knowledge
    agent.epsilon = agent.epsilon * agent.epsilon_decay_rate
  
  # Plot or download the results
  plot_results(list_cumulative_rewards, list_number_of_actions, "Q-Learning", download_plots)


# SARSA

In [232]:
class Sarsa():
  def __init__(self, env):
    self.env = env
    self.learning_rate = 0.05
    self.epsilon = 1.0 # For deciding exploitation or exploration
    self.epsilon_decay_rate = 0.99 # Epsilon is decayed after each episode with a fixed rate
    self.discount = 0.5 # The weight for future rewards

    self.Q_values = np.zeros([self.env.nS, self.env.nA]) # Q table

  def update(self, state, state2, reward, action, action2):
    """ Update Q-table - ON POLICY
        :param state: state before taking the action
        :param action: next action to be taken
        :param reward: reward for taking the action from the state
        :param state2: the state that the agent enters after taking that action
        :param action2: the next action the agent chooses in its new state2
    """
    
    # Convert all to 1-D indices
    idx_state = np.ravel_multi_index([state[0], state[1], state[2]],(env.grid_size, env.grid_size, env.max_health+3))
    idx_state2 = np.ravel_multi_index([state2[0], state2[1], state2[2]],(env.grid_size, env.grid_size, env.max_health+3))
    idx_action = np.ravel_multi_index([action[0], action[1]],(env.grid_size, env.grid_size))
    idx_action2 = np.ravel_multi_index([action2[0], action2[1]],(env.grid_size, env.grid_size))

    #Update Q values - ON POLICY
    self.Q_values[idx_state][idx_action] = self.Q_values[idx_state][idx_action] \
                                          + self.learning_rate * (reward + self.discount * self.Q_values[idx_state2][idx_action2] - self.Q_values[idx_state][idx_action])

  def get_action(self, state):
    """ Return an action to take based on epsilon (greedy or random action)
        :param state: the current state
        :return action: next action to take
    """
    # Convert to 1D index
    idx_state = np.ravel_multi_index([state[0], state[1], state[2]],(env.grid_size, env.grid_size, env.max_health+3))
    
    random_number = np.random.uniform()
    if random_number < self.epsilon:
      # Random action
      return (np.random.randint(0, self.env.grid_size), np.random.randint(0, self.env.grid_size))
    else:
      # Greedy action
      idx_action = np.argmax(self.Q_values[idx_state])
      return np.unravel_index(idx_action,(self.env.grid_size,self.env.grid_size)) # Return 2-D index of the action

##### HELPER FUNCTIONS #####
def test_SARSA(env, number_of_episodes, download_plots):
  """
    A complete test of SARSA agent
    :param env: the environment where the agent plays
    :param number_of_episodes: number of episodes
    :param download_plots: "True" if this notebook is running on Google Colab and you want to save the figures to your computer, "False" otherwise
  """
  
  agent = Sarsa(env) # SARSA Agent
  list_cumulative_rewards = [] # list of cumulative rewards of all episodes
  list_number_of_actions = [] # list of number of actions of all episodes

  for ep in range (number_of_episodes):
    sum_reward = 0
    done = False
    number_of_action = 0  # Current number of actions have been taken in this episode
    while not done:
      # Get current state
      state = env.observe()

      # Get next action
      action = agent.get_action(state)

      # Get the next state that the agent enters after taking that action
      state2, reward, done, info = env.step(action)
      
      # Get next action from state2
      action2 = agent.get_action(state2)
      
      # Update Q-table 
      agent.update(state, state2, reward, action, action2)
      
      # Visualize how the agent plays in the last episode
      if ep==number_of_episodes-1:
        i, j = np.unravel_index(action2, (env.grid_size, env.grid_size))
        print("Action: (" + str(j) + ") Reward: ", reward)
      
      # Update reward and number of actions
      sum_reward += reward
      number_of_action += 1
    list_cumulative_rewards.append(sum_reward)
    list_number_of_actions.append(number_of_action)
    
    # Print log
    print("Episode ", ep+1, " Cumulative reward: ", sum_reward, " Number of actions: ", number_of_action)
    
    # Reset env after each episode
    env.reset()

    # Epsilon is decayed since the agent is getting more and more knowledge
    agent.epsilon = agent.epsilon * agent.epsilon_decay_rate
  
  # Plot or download the results
  plot_results(list_cumulative_rewards, list_number_of_actions, "SARSA", download_plots)



# Deep Q Network (pytorch)

In [239]:
class Replay():
  """
    Memory for storing experience 
  """
  def __init__(self):
    self.buffer = []
  
  def __len__(self):
    return len(self.buffer)

  def add(self, state, action, reward, next_state, done):
    """
      Add a new experience to the buffer
      :param state: The state before taking the action
      :param action: action taken
      :param reward: Reward for taking that action
      :param next_state: The state that the agent enters after taking the action
      :param done: Whether the agent finishes the game
    """
    self.buffer.append((state, action, reward, next_state, done))

  def sample(self, batch_size):
    """
      Return a batch of samples from the experience buffer
      :param batch_size: The number of sample that you want to take
      :return the batch of samples but decomposed into lists of states, actions, rewards, next_states 
    """
    states, actions, rewards, next_states = [], [], [], []

    # Random samples
    samples = random.sample(self.buffer, batch_size)

    for s in samples:
      s = np.asarray(s)
      state = torch.from_numpy(s[0])
      action = torch.tensor(s[1])
      reward = torch.tensor(s[2])
      next_state = torch.from_numpy(s[3])
      states.append(state)
      actions.append(action)
      rewards.append(reward)
      next_states.append(next_state) 
    return states, actions, rewards, next_states

class DQN():
  """
    Model a DQN agent
  """
  def __init__(self, env):
    self.epsilon = 1.0 # For deciding exploitation or exploration
    self.epsilon_decay_rate = 0.99 # Epsilon is decayed after each episode with a fixed rate
    self.discount = 0.5 # The weight for future rewards
    self.env = env # The environment where the agent acts
    self.device = torch.device('cuda')
    
    # Main network 
    self.main_dqn = torch.nn.Sequential(torch.nn.Linear(3, 64),
                                        torch.nn.LeakyReLU(),
                                        torch.nn.Linear(64, 128),
                                        torch.nn.LeakyReLU(),
                                        torch.nn.Linear(128,self.env.nA))
    # Target network
    self.target_dqn = torch.nn.Sequential(torch.nn.Linear(3, 64),
                                          torch.nn.LeakyReLU(),
                                          torch.nn.Linear(64, 128),
                                          torch.nn.LeakyReLU(),
                                          torch.nn.Linear(128,self.env.nA))
    # Send models to GPU
    self.main_dqn.to(self.device)
    self.target_dqn.to(self.device)

    # Optimizer and Loss function
    self.optimizer = torch.optim.Adam(self.main_dqn.parameters(), lr=1e-4)
    self.mse = torch.nn.MSELoss()

  def select_action(self, state):
    """
      Return an action to take based on epsilon (greedy or random action)
      :param state: the current state
      :return action: next action to take
    """
    state = np.asarray(state)
    random_number = np.random.uniform()
    if random_number < self.epsilon:
      # Random action
      return np.random.randint(0, self.env.grid_size*self.env.grid_size)
    else:
      # Greedy action
      state = torch.from_numpy(state).to(self.device, dtype=torch.float)
      return torch.argmax(self.main_dqn(state)).item()
       
  def train(self, states, actions, rewards, next_states):
    """
      Train the network with a batch of samples
      :param states: The state before taking the action
      :param actions: action taken
      :param rewards: Reward for taking that action
      :param next_states: The state that the agent enters after taking the action
      :return loss: the loss value after training the batch of samples
    """
    # Send data to GPU
    states = torch.stack(states).to(self.device, dtype=torch.float)
    actions = torch.stack(actions).to(self.device)
    rewards = torch.stack(rewards).to(self.device, dtype=torch.float)
    next_states = torch.stack(next_states).to(self.device, dtype=torch.float)

    # Calculate target Q values using the Target Network  
    next_qs = self.target_dqn(next_states)
    max_next_qs = torch.max(next_qs, axis=-1)  
    target = rewards + max_next_qs[0]*self.discount

    # Calculate Q values using the Main Network
    action_masks = torch.nn.functional.one_hot(actions, self.env.nA)
    q_value = torch.sum(action_masks*self.main_dqn(states), dim=1)
    target = target.detach()
    
    # Calculate MSE loss
    loss = self.mse(target, q_value)

    # Optimize the model
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

    return loss

  def runner(self, number_of_episodes, download_plots):
    
    batch_size = 256              # Size of the batch of samples
    buffer = Replay()             # Experience Buffer
    list_cumulative_rewards = []  # List of cumulative rewards in each episode
    list_number_of_actions = []   # List of number of actions in each episode

    for episode in range(number_of_episodes+1):
      ep_reward = 0         # Reward for this episode
      done = False          # Whether the game is finished
      number_of_action = 0  # Number of action in this episode

      # Reset the env
      self.env.reset()

      # Get the current state
      state = self.env.observe()
      state = np.asarray(state)

      while not done:
        # Get and execute the next action for the current state
        action = self.select_action(state)
        next_state, reward, done, info = env.step(np.unravel_index(action,(self.env.grid_size,self.env.grid_size)))
        ep_reward += reward
        next_state = np.asarray(next_state)

        # Save what the agent just learnt to the experience buffer.
        buffer.add(state, action, reward, next_state, done)
        
        state = next_state

        # Every 200 actions: Copy the Main Network's weights to the Target Network 
        if number_of_action>0 and (number_of_action%200==0) :
          self.target_dqn.load_state_dict(self.main_dqn.state_dict())

        # Start training once the size of the buffer greater than the batch size
        if len(buffer) >= batch_size:
          states, actions, rewards, next_states = buffer.sample(batch_size)
          loss = self.train(states, actions, rewards, next_states)
        
        number_of_action += 1

      # Print log
      print(f'Episode {episode}/{number_of_episodes}. Epsilon: {self.epsilon:.3f}. Number of actions: {number_of_action}. Loss: {loss} '
            f'Reward: {ep_reward}')
      
      list_cumulative_rewards.append(ep_reward)
      list_number_of_actions.append(number_of_action)
      
      # Epsilon is decayed since the agent is getting more and more knowledge
      self.epsilon = self.epsilon * self.epsilon_decay_rate

    # Close the env and plot or download the results
    env.close()
    plot_results(list_cumulative_rewards, list_number_of_actions, "Deep-QN", download_plots)



# Main Function

In [None]:
if __name__ == "__main__":
  number_of_episodes = 3000  # Number of episodes you want to run the agent
  download_plots = False     # "True" if this notebook is running on Google Colab and you want to save the figures to your computer, "False" otherwise
  env = env_generator()

  # Test Q-Learning agent
  env.reset()
  test_Q_Learning(env, number_of_episodes, download_plots)

  # Test SARSA agent
  env.reset()
  test_SARSA(env, number_of_episodes, download_plots)

  # Test DQN agent 
  env.reset()
  dqn = DQN(env)
  dqn.runner(number_of_episodes, download_plots)
